NoSQL/Spark

pyspark 예제

세모데 2017. 8. 31. 16:30

from pyspark import SparkConf

from pyspark import SparkContext

from pyspark.sql.functions import lit

from pyspark.sql.types import DoubleType



sc.stop()


conf = SparkConf().setAppName("comstat-test").set("spark.yarn.driver.memoryOverhead", "2048")   \

         .set("spark.yarn.executor.memoryOverhead", "2048")   \

         .set("spark.default.parallelism", "116") \

         .set("spark.shuffle.compress", "true") \

         .set("spark.io.compression.codec", "snappy")




sc = SparkContext(conf=conf)

sqlContext.setConf("spark.sql.avro.compression.codec","uncompressed")


s_file="/tmp/khk/p_t0_prod_cd/*.avro"  

df = sqlContext.read.format("com.databricks.spark.avro").load(s_file)

df.show()           #결과 확인


df_test = df.select("prod_id", "prod_nm", "parent_prod_id", "parent_prod_nm", "chnl_prod_id", "brand_id", "brand_nm")

df_test.show()     #결과 확인


df_newtb = df_test.registerTempTable("PROD_TB")

df_newtb.printSchema()


sqlContext.sql("select * from PROD_TB");

dftab.show()

dftab.printSchema()


dfwrite = dftab.withColumn('new_id', lit(10))

dfwrite.show()

dfwrite.write.format("com.databricks.spark.avro").save("/tmp/khk/new_p_t0_prod_cd/test")


dfwrite = dftab.withColumn('new_id', dftab["new_id"].cast("double"))    

dfwrite.write.mode("overwrite").format("com.databricks.spark.avro").save("/tmp/khk/new_p_t0_prod_cd/test")