from pyspark import SparkConf

from pyspark import SparkContext

from pyspark.sql.functions import lit

from pyspark.sql.types import DoubleType



sc.stop()


conf = SparkConf().setAppName("comstat-test").set("spark.yarn.driver.memoryOverhead", "2048")   \

         .set("spark.yarn.executor.memoryOverhead", "2048")   \

         .set("spark.default.parallelism", "116") \

         .set("spark.shuffle.compress", "true") \

         .set("spark.io.compression.codec", "snappy")




sc = SparkContext(conf=conf)

sqlContext.setConf("spark.sql.avro.compression.codec","uncompressed")


s_file="/tmp/khk/p_t0_prod_cd/*.avro"  

df = sqlContext.read.format("com.databricks.spark.avro").load(s_file)

df.show()           #결과 확인


df_test = df.select("prod_id", "prod_nm", "parent_prod_id", "parent_prod_nm", "chnl_prod_id", "brand_id", "brand_nm")

df_test.show()     #결과 확인


df_newtb = df_test.registerTempTable("PROD_TB")

df_newtb.printSchema()


sqlContext.sql("select * from PROD_TB");

dftab.show()

dftab.printSchema()


dfwrite = dftab.withColumn('new_id', lit(10))

dfwrite.show()

dfwrite.write.format("com.databricks.spark.avro").save("/tmp/khk/new_p_t0_prod_cd/test")


dfwrite = dftab.withColumn('new_id', dftab["new_id"].cast("double"))    

dfwrite.write.mode("overwrite").format("com.databricks.spark.avro").save("/tmp/khk/new_p_t0_prod_cd/test")

'NoSQL > Spark' 카테고리의 다른 글

spark ml 기초  (0) 2017.08.17
pyspark 기본  (0) 2017.08.16
spark sql  (0) 2016.12.02
spark library 유형  (0) 2016.11.06
spark 파일(데이터) 포맷  (0) 2016.07.05

+ Recent posts