1.  파일읽기

    (sc.textFile, sc.wholeTextFiles => file, 내용리턴)

    md = sc.textFile("test.txt")        <-- textFile(file, minPartitions(defult 2)) 

    md.count()                           <-- action시 프로그램 수행

    md.toDebugString()                <-- debug message


    URI 

       - file :    file:/tmp/test1.txt

       - hdfs :   hdfs://nnhost/tmp/test1.txt


    sc.textFile("test1.text")

    sc.textFile("tmp/")

    sc.textFile("tmp/*.log")

    sc.textFile("test1.txt, test2.txt")


    myrdd1 = sc.wholeTextFiles(mydir)

    myrdd2 = myrdd1.map(lambda (fname,s): json.loads(s))

    for record in myrdd2.take(2):

       print record.get("firstName",None)



2. RDD func

  1) Actions

     count() : elements의 수

     take(n) : 처음 n개 return

     collect() : 모든 요소에 array return

     saveAsTextFile(dir)  : text file 저장

     first

     foreach

     top(n)  


  2) Transfrom

     map(function) : RDD 각각 record에 function 적용

     filter(function) : Base RDD에 각각에 include/exclude 적용

     (flatMap, distinct, sortBy,  intersection, union, zip, subtract)  


     map(lambda line : line.upper())

     filter(lambda line : line.startswith('I'))

     => sc.textFile("test.txt").map(lambda line: line.upper()).filter(lambda line: line.startwith('I')).count()


     sc.textFile(file).flatMap(lambda line: line.split(' ')).distinct()

     sc.textFile(file).map(lambda line: line.split('\t')).map(lambda fields: (fields[0], fields[1])).flatMapValues(lambda line: line.split(':'))

     sc.textFile(file).flatMap(lambda line: line.split(' ')).map(lambda word: (word,1)).reduceByKey(lambda v1, v2 : v1 + v2)



     * map phase :  map, flatMap, filter, keyBy, mapValues, flatMapValues

     * reduce phase : reduceByKey, sortByKey, mean, groupByKey



3. Anonymous function(lambda) 적용

   def toUpper(s):

return s.upper()


   mydata.map(toUpper).take(2)


   python : lambda x: ...

   scala  :   x =>

   java   :   x ->



4. 파일 저장

   (sc.hadoopFile, saveAsHadoopFile, saveAsTextFile)



5. Spark SQL

   Catalyst Optimizer 지원, SQL 엔진과 command line


   1) SQLContext : basic 구현

   2) HiveContext : Hive/HCatalog 읽고 쓰고, HiveQL 언어 지원 (CDH는 HiveContext 사용 권장)


   sqlContext 데이터 소스 : table, json, parquet, jdbc, orc, Avro, HBase, CSV, MySQL

   (schema, printSchema, cache/persist, columns, dtype, explain)


   from pyspark import HiveContext


   sqlContext = HiveContext(sc)  

   * sqlContext 함수 :  json, parquet, orc, table, jdbc


   testDF = sqlContext.read.json("test.json")

   custDF = sqlContext.read.table("customers")


   testDF.limit(3).show()

   testDF.select("age")

   test1 = testDF.select(testDF['age'])

   testDF.sort(testDF['age'].desc())

   testDF.join(joinDF, "code")

   testDF.join(joinDF, "code", "left_outer")

   testDF.join(joinDF, testDF.code = joinDF.tcode)


   sqlContext.read.format("com.databricks.spark.avro").load("/loudacre/account_avro")

   sqlContext.read.format("jdbc")

.option("url", "jdbc:mysql://localhost/loudacre")

.option("dbtable", "accounts")

.option("user", "test")

.option("password", "test")

.load()


   for item in testDF.dtypes : print item


   testDF.registerTempTable("test")

   sqlContext.sql("""select * from test where name like 'Test%' """)

   sqlContext.sql("""select * from json.`/tmp/test.json` where name like 'Test%' """)


   # groupBy, orderBy, agg, join, unionAll, intersect, avg, sampleBy, corr, cov, rollup, cube


   testDF.write.saveAsTable("test")   # jdbc, json, parquet, orc, text, saveAsTable

   testDF.write.format("parquet").mode("append").partitionBy("age").saveAsTable("test")


   RDD <---> DataFrame

   testRDD = testDF.rdd


   testRDD = testDF.map(lambda row: (row.code, row.name))

   testByCode = testDF.groupByKey()


   from pyspark.sql.types import *


   schema = StructType([StructField("age", IntegerType(), True),

    StructField("name", IntegerType(), True),

    StructField("code", StringType(), True)])

   rdd = sc.parallelize([(40, "test1", "12345"),

 (50, "test2", "678910")])

   df = sqlContext.createDataFrame(rdd, schema)




   




'NoSQL > Spark' 카테고리의 다른 글

pyspark 예제  (0) 2017.08.31
spark ml 기초  (0) 2017.08.17
spark sql  (0) 2016.12.02
spark library 유형  (0) 2016.11.06
spark 파일(데이터) 포맷  (0) 2016.07.05

+ Recent posts