๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
Data Science/Spark

[Spark] pyspark 3.0 dataframe new function ํ…Œ์ŠคํŠธ#2(csv๊ด€๋ จ)

by ํ™ํ›„์ถ” 2020. 7. 6.

 

2020/07/03 - [Data Science/Spark] - [Spark] spark 3.0 dataframe ์ƒˆ๋กœ์šด ๊ธฐ๋Šฅ ์ •๋ฆฌ #1


์ง์ „ ํฌ์ŠคํŠธ์—์„œ ์ค€๋น„ํ•œ ๋ฐ์ดํ„ฐ๋กœ ์‹ ๊ทœ๊ธฐ๋Šฅ, ๊ฐœ์„ ๊ธฐ๋Šฅ์„ ํ…Œ์ŠคํŠธ

โ—‹ ํ…Œ์ŠคํŠธ๋ฐ์ดํ„ฐ ํ™•์ธ


โ—‹ csv๊ด€๋ จ ( from_csv, to_csv, schema_of_csv )
from_json์˜ csv(comma seperated value)๋Œ€์‘๊ธฐ๋Šฅ, ์ปด๋งˆ(',')๋กœ ๊ตฌ๋ถ„๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃฐ๋•Œ ์‚ฌ์šฉ

- to_csv(col, options={})
col : listํ˜•์‹์œผ๋กœ ๋œ ์ปฌ๋Ÿผ [a,b,c,d,1,2,3]
options : spark.read.csv์—์„œ ์‚ฌ์šฉํ•˜๋Š” option๋“ค์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. ์ž์„ธํ•œ ์˜ต์…˜๋“ค์€ ์•„๋ž˜ url์ฐธ๊ณ 
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv

#to_csv ํ…Œ์ŠคํŠธ
data = df.head(df.count()) 
data_list = []
i = 1
for d in data : 
  tmp = [i, d]
  data_list.append(tmp)
  i += 1
to_csv_test_df_tmp = spark.createDataFrame(data_list, ("key", "value"))
to_csv_test_df = to_csv_test_df_tmp.select(func.to_csv(to_csv_test_df_tmp.value).alias("csv_format"))
to_csv_test_df.show()

- from_json (col, schema, options={})
col : csvํฌ๋งท์˜ string column
schema : DDL format
options : spark.read.csv์—์„œ ์‚ฌ์šฉํ•˜๋Š” option๋“ค์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. ์ž์„ธํ•œ ์˜ต์…˜๋“ค์€ ์•„๋ž˜ url์ฐธ๊ณ 
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv

#from_csvํ…Œ์ŠคํŠธ
#schema๋Š” structType์ด ์•„๋‹Œ string์œผ๋กœ ํ•ด์ค˜์•ผํ•จ.
str_schema = """year string, 
                month string, 
                geo int, 
                nongeo int, 
                etc int, 
                fail int,
                sum int
                """
to_csv_test_df.select(func.from_csv("csv_format", str_schema)).show()
#list๋กœ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

 

- schema_of_csv(csv, options={})
์ด๋ ‡๊ฒŒ csv schema๋ฅผ ์ง€์ •ํ•˜๋Š” ๊ฒƒ์€ ๋ฐ์ดํ„ฐ๋ฅผ ๋ชจ๋‘ ์•Œ๊ณ  ์žˆ์„๋•Œ ๊ฐ€๋Šฅํ•œ ์ผ์ด๋‹ค.
๋”ฐ๋ผ์„œ ๋Œ€๋žต์ ์ธ csv schema๋ฅผ ์•Œ๊ณ ์ž ํ•  ๋•Œ, schema๋ฅผ ๋ชจ๋‘ ์“ฐ๋Š”๊ฒŒ ๊ท€์ฐฎ์„ ๋•Œ schema_of_csv๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

# schema_of_csv ํ…Œ์ŠคํŠธ
csv_list = [row.csv_format for row in to_csv_test_df.head(1)]
csv_str = ','.join(csv_list)
print("csv_str : " + csv_str)

to_csv_test_df.select(func.schema_of_csv(csv_str)).collect()

# dataframe์•ˆ์—์„œ ์‹œ๋„ 
# 1. csv_format ์ปฌ๋Ÿผ
# 2. csv_format ์ปฌ๋Ÿผ์˜ csv schema infer ์‹œ๋„
# 3. csv_format ์ปฌ๋Ÿผ์˜ csv schema infer options๋„ฃ๊ณ  ์‹œ๋„
to_csv_test_df.select("csv_format", func.split("csv_format",','),func.schema_of_csv("csv_format"),
                      func.schema_of_csv(csv="csv_format", options={'sep':','})).show()
                      
#๊ฒฐ๊ณผ๋Š” ์ž˜ ์•ˆ๋œ๋‹ค.

 

 

Reference

http://spark.apache.org/releases/spark-release-3-0-0.html

 

Spark Release 3.0.0 | Apache Spark

Spark Release 3.0.0 Apache Spark 3.0.0 is the first release of the 3.x line. The vote passed on the 10th of June, 2020. This release is based on git tag v3.0.0 which includes all commits up to June 10. Apache Spark 3.0 builds on many of the innovations fro

spark.apache.org

https://medium.com/javarevisited/spark-3-0-new-functions-in-a-nutshell-a929fca93413

 

Spark 3.0 โ€Š—โ€ŠNew Functions in a Nutshell

Recently Apache Spark community releases the preview of Spark 3.0 which holds many significant new features that will help Spark to make a…

medium.com

 

๋Œ“๊ธ€