from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql.functions import lit
from pyspark.sql.types import DoubleType
sc.stop()
conf = SparkConf().setAppName("comstat-test").set("spark.yarn.driver.memoryOverhead", "2048") \
.set("spark.yarn.executor.memoryOverhead", "2048") \
.set("spark.default.parallelism", "116") \
.set("spark.shuffle.compress", "true") \
.set("spark.io.compression.codec", "snappy")
sc = SparkContext(conf=conf)
sqlContext.setConf("spark.sql.avro.compression.codec","uncompressed")
s_file="/tmp/khk/p_t0_prod_cd/*.avro"
df = sqlContext.read.format("com.databricks.spark.avro").load(s_file)
df.show() #결과 확인
df_test = df.select("prod_id", "prod_nm", "parent_prod_id", "parent_prod_nm", "chnl_prod_id", "brand_id", "brand_nm")
df_test.show() #결과 확인
df_newtb = df_test.registerTempTable("PROD_TB")
df_newtb.printSchema()
sqlContext.sql("select * from PROD_TB");
dftab.show()
dftab.printSchema()
dfwrite = dftab.withColumn('new_id', lit(10))
dfwrite.show()
dfwrite.write.format("com.databricks.spark.avro").save("/tmp/khk/new_p_t0_prod_cd/test")
dfwrite = dftab.withColumn('new_id', dftab["new_id"].cast("double"))
dfwrite.write.mode("overwrite").format("com.databricks.spark.avro").save("/tmp/khk/new_p_t0_prod_cd/test")
'NoSQL > Spark' 카테고리의 다른 글
spark ml 기초 (0) | 2017.08.17 |
---|---|
pyspark 기본 (0) | 2017.08.16 |
spark sql (0) | 2016.12.02 |
spark library 유형 (0) | 2016.11.06 |
spark 파일(데이터) 포맷 (0) | 2016.07.05 |