1. 파일읽기
(sc.textFile, sc.wholeTextFiles => file, 내용리턴)
md = sc.textFile("test.txt") <-- textFile(file, minPartitions(defult 2))
md.count() <-- action시 프로그램 수행
md.toDebugString() <-- debug message
URI
- file : file:/tmp/test1.txt
- hdfs : hdfs://nnhost/tmp/test1.txt
sc.textFile("test1.text")
sc.textFile("tmp/")
sc.textFile("tmp/*.log")
sc.textFile("test1.txt, test2.txt")
myrdd1 = sc.wholeTextFiles(mydir)
myrdd2 = myrdd1.map(lambda (fname,s): json.loads(s))
for record in myrdd2.take(2):
print record.get("firstName",None)
2. RDD func
1) Actions
count() : elements의 수
take(n) : 처음 n개 return
collect() : 모든 요소에 array return
saveAsTextFile(dir) : text file 저장
first
foreach
top(n)
2) Transfrom
map(function) : RDD 각각 record에 function 적용
filter(function) : Base RDD에 각각에 include/exclude 적용
(flatMap, distinct, sortBy, intersection, union, zip, subtract)
map(lambda line : line.upper())
filter(lambda line : line.startswith('I'))
=> sc.textFile("test.txt").map(lambda line: line.upper()).filter(lambda line: line.startwith('I')).count()
sc.textFile(file).flatMap(lambda line: line.split(' ')).distinct()
sc.textFile(file).map(lambda line: line.split('\t')).map(lambda fields: (fields[0], fields[1])).flatMapValues(lambda line: line.split(':'))
sc.textFile(file).flatMap(lambda line: line.split(' ')).map(lambda word: (word,1)).reduceByKey(lambda v1, v2 : v1 + v2)
* map phase : map, flatMap, filter, keyBy, mapValues, flatMapValues
* reduce phase : reduceByKey, sortByKey, mean, groupByKey
3. Anonymous function(lambda) 적용
def toUpper(s):
return s.upper()
mydata.map(toUpper).take(2)
python : lambda x: ...
scala : x =>
java : x ->
4. 파일 저장
(sc.hadoopFile, saveAsHadoopFile, saveAsTextFile)
5. Spark SQL
Catalyst Optimizer 지원, SQL 엔진과 command line
1) SQLContext : basic 구현
2) HiveContext : Hive/HCatalog 읽고 쓰고, HiveQL 언어 지원 (CDH는 HiveContext 사용 권장)
sqlContext 데이터 소스 : table, json, parquet, jdbc, orc, Avro, HBase, CSV, MySQL
(schema, printSchema, cache/persist, columns, dtype, explain)
from pyspark import HiveContext
sqlContext = HiveContext(sc)
* sqlContext 함수 : json, parquet, orc, table, jdbc
testDF = sqlContext.read.json("test.json")
custDF = sqlContext.read.table("customers")
testDF.limit(3).show()
testDF.select("age")
test1 = testDF.select(testDF['age'])
testDF.sort(testDF['age'].desc())
testDF.join(joinDF, "code")
testDF.join(joinDF, "code", "left_outer")
testDF.join(joinDF, testDF.code = joinDF.tcode)
sqlContext.read.format("com.databricks.spark.avro").load("/loudacre/account_avro")
sqlContext.read.format("jdbc")
.option("url", "jdbc:mysql://localhost/loudacre")
.option("dbtable", "accounts")
.option("user", "test")
.option("password", "test")
.load()
for item in testDF.dtypes : print item
testDF.registerTempTable("test")
sqlContext.sql("""select * from test where name like 'Test%' """)
sqlContext.sql("""select * from json.`/tmp/test.json` where name like 'Test%' """)
# groupBy, orderBy, agg, join, unionAll, intersect, avg, sampleBy, corr, cov, rollup, cube
testDF.write.saveAsTable("test") # jdbc, json, parquet, orc, text, saveAsTable
testDF.write.format("parquet").mode("append").partitionBy("age").saveAsTable("test")
RDD <---> DataFrame
testRDD = testDF.rdd
testRDD = testDF.map(lambda row: (row.code, row.name))
testByCode = testDF.groupByKey()
from pyspark.sql.types import *
schema = StructType([StructField("age", IntegerType(), True),
StructField("name", IntegerType(), True),
StructField("code", StringType(), True)])
rdd = sc.parallelize([(40, "test1", "12345"),
(50, "test2", "678910")])
df = sqlContext.createDataFrame(rdd, schema)
'NoSQL > Spark' 카테고리의 다른 글
pyspark 예제 (0) | 2017.08.31 |
---|---|
spark ml 기초 (0) | 2017.08.17 |
spark sql (0) | 2016.12.02 |
spark library 유형 (0) | 2016.11.06 |
spark 파일(데이터) 포맷 (0) | 2016.07.05 |