Creating DataFrames





1. create datafram from rdd

val colors = List("white","green","yellow","red","brown","pink")  

val color_df = sc.parallelize(colors).map(x => (x,x.length)).toDF("color","length")


color_df 

res0: org.apache.spark.sql.DataFrame = [color: string, length: int] 



color_df.dtypes  //Note the implicit type inference   

res1: Array[(String, String)] = Array((color,StringType), (length,IntegerType)) 



color_df.show()



2 create dataframe from json


val df = sqlContext.read.json("./authors.json")

df.show()


3. create dataframe from jdbc

 

spark-shell --driver-class-path /user/share/mysql-connector.jar


val peopleDF = sqlContext.read.format("jdbc").options( 

           Map("url" -> "jdbc:mysql://localhost", 

               "dbtable" -> "test.people", 

               "user" -> "root", 

               "password" -> "mysql")).load()


peopleDF.show() 


4. create dataframe from apache parquet


peopleDF.write.parquet("writers.parquet")

val writersDF = sqlContext.read.parquet("writers.parquet")  



5. 기타 operations

val colors = List("white","green","yellow","red","brown","pink") 

val color_df = sc.parallelize(colors) 

        .map(x => (x,x.length)).toDF("color","length") 

color_df.dtypes 

color_df.count()

color_df.columns

color_df.drop("length").show()

color_df.toJSON.first() 


color_df.filter(color_df("length").between(4,5)) 

       .select(color_df("color").alias("mid_length")).show() 



color_df.filter(color_df("length") > 4).filter(color_df( "color")!=="white").show()

color_df..sort("color").show()

color_df.orderBy("length","color").take(4) 

color_df.sort(color_df("length").desc, color_df("color").asc).show()

color_df.groupBy("length").count().show() 



val df1 = sqlContext.read.json("./authors_missing.json") 

df1.show()

val df2 = df1.na.drop() 

df2.show()


  1. young = users.filter(users.age < 21)
  2. young.registerTempTable("young")
  3. context.sql("SELECT count(*) FROM young")
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

import spark.implicits._
df.printSchema()
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()


'NoSQL > Spark' 카테고리의 다른 글

spark ml 기초  (0) 2017.08.17
pyspark 기본  (0) 2017.08.16
spark library 유형  (0) 2016.11.06
spark 파일(데이터) 포맷  (0) 2016.07.05
spark 대화형 쉘  (0) 2016.07.01

+ Recent posts