2020/07/03 - [Data Science/Spark] - [Spark] spark 3.0 dataframe ์๋ก์ด ๊ธฐ๋ฅ ์ ๋ฆฌ #1
์ง์ ํฌ์คํธ์์ ์ค๋นํ ๋ฐ์ดํฐ๋ก ์ ๊ท๊ธฐ๋ฅ, ๊ฐ์ ๊ธฐ๋ฅ์ ํ
์คํธ
โ ํ
์คํธ๋ฐ์ดํฐ ํ์ธ
โ csv๊ด๋ จ ( from_csv, to_csv, schema_of_csv )
from_json์ csv(comma seperated value)๋์๊ธฐ๋ฅ, ์ปด๋ง(',')๋ก ๊ตฌ๋ถ๋ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃฐ๋ ์ฌ์ฉ
- to_csv(col, options={})
col : listํ์์ผ๋ก ๋ ์ปฌ๋ผ [a,b,c,d,1,2,3]
options : spark.read.csv์์ ์ฌ์ฉํ๋ option๋ค์ ์ฌ์ฉํ ์ ์๋ค. ์์ธํ ์ต์
๋ค์ ์๋ url์ฐธ๊ณ
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv
#to_csv ํ
์คํธ
data = df.head(df.count())
data_list = []
i = 1
for d in data :
tmp = [i, d]
data_list.append(tmp)
i += 1
to_csv_test_df_tmp = spark.createDataFrame(data_list, ("key", "value"))
to_csv_test_df = to_csv_test_df_tmp.select(func.to_csv(to_csv_test_df_tmp.value).alias("csv_format"))
to_csv_test_df.show()
- from_json (col, schema, options={})
col : csvํฌ๋งท์ string column
schema : DDL format
options : spark.read.csv์์ ์ฌ์ฉํ๋ option๋ค์ ์ฌ์ฉํ ์ ์๋ค. ์์ธํ ์ต์
๋ค์ ์๋ url์ฐธ๊ณ
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv
#from_csvํ
์คํธ
#schema๋ structType์ด ์๋ string์ผ๋ก ํด์ค์ผํจ.
str_schema = """year string,
month string,
geo int,
nongeo int,
etc int,
fail int,
sum int
"""
to_csv_test_df.select(func.from_csv("csv_format", str_schema)).show()
#list๋ก ๋ฐํํ๋ค.
- schema_of_csv(csv, options={})
์ด๋ ๊ฒ csv schema๋ฅผ ์ง์ ํ๋ ๊ฒ์ ๋ฐ์ดํฐ๋ฅผ ๋ชจ๋ ์๊ณ ์์๋ ๊ฐ๋ฅํ ์ผ์ด๋ค.
๋ฐ๋ผ์ ๋๋ต์ ์ธ csv schema๋ฅผ ์๊ณ ์ ํ ๋, schema๋ฅผ ๋ชจ๋ ์ฐ๋๊ฒ ๊ท์ฐฎ์ ๋ schema_of_csv๋ฅผ ์ฌ์ฉํ๋ค.
# schema_of_csv ํ
์คํธ
csv_list = [row.csv_format for row in to_csv_test_df.head(1)]
csv_str = ','.join(csv_list)
print("csv_str : " + csv_str)
to_csv_test_df.select(func.schema_of_csv(csv_str)).collect()
# dataframe์์์ ์๋
# 1. csv_format ์ปฌ๋ผ
# 2. csv_format ์ปฌ๋ผ์ csv schema infer ์๋
# 3. csv_format ์ปฌ๋ผ์ csv schema infer options๋ฃ๊ณ ์๋
to_csv_test_df.select("csv_format", func.split("csv_format",','),func.schema_of_csv("csv_format"),
func.schema_of_csv(csv="csv_format", options={'sep':','})).show()
#๊ฒฐ๊ณผ๋ ์ ์๋๋ค.
Reference
http://spark.apache.org/releases/spark-release-3-0-0.html
https://medium.com/javarevisited/spark-3-0-new-functions-in-a-nutshell-a929fca93413
๋๊ธ