1. create datafram from rdd
val colors = List("white","green","yellow","red","brown","pink")
val color_df = sc.parallelize(colors).map(x => (x,x.length)).toDF("color","length")
color_df
res0: org.apache.spark.sql.DataFrame = [color: string, length: int]
color_df.dtypes //Note the implicit type inference
res1: Array[(String, String)] = Array((color,StringType), (length,IntegerType))
color_df.show()
2 create dataframe from json
val df = sqlContext.read.json("./authors.json")
df.show()
3. create dataframe from jdbc
spark-shell --driver-class-path /user/share/mysql-connector.jar
val peopleDF = sqlContext.read.format("jdbc").options(
Map("url" -> "jdbc:mysql://localhost",
"dbtable" -> "test.people",
"user" -> "root",
"password" -> "mysql")).load()
peopleDF.show()
4. create dataframe from apache parquet
peopleDF.write.parquet("writers.parquet")
val writersDF = sqlContext.read.parquet("writers.parquet")
5. 기타 operations
val colors = List("white","green","yellow","red","brown","pink")
val color_df = sc.parallelize(colors)
.map(x => (x,x.length)).toDF("color","length")
color_df.dtypes
color_df.count()
color_df.columns
color_df.drop("length").show()
color_df.toJSON.first()
color_df.filter(color_df("length").between(4,5))
.select(color_df("color").alias("mid_length")).show()
color_df.filter(color_df("length") > 4).filter(color_df( "color")!=="white").show()
color_df..sort("color").show()
color_df.orderBy("length","color").take(4)
color_df.sort(color_df("length").desc, color_df("color").asc).show()
color_df.groupBy("length").count().show()
val df1 = sqlContext.read.json("./authors_missing.json")
df1.show()
val df2 = df1.na.drop()
df2.show()
- young = users.filter(users.age < 21)
- young.registerTempTable("young")
- context.sql("SELECT count(*) FROM young")
df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show()
import spark.implicits._
df.printSchema()
case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show()
'NoSQL > Spark' 카테고리의 다른 글
spark ml 기초 (0) | 2017.08.17 |
---|---|
pyspark 기본 (0) | 2017.08.16 |
spark library 유형 (0) | 2016.11.06 |
spark 파일(데이터) 포맷 (0) | 2016.07.05 |
spark 대화형 쉘 (0) | 2016.07.01 |