[python] Spark DataFrame에 새 열을 추가하려면 어떻게해야합니까 (PySpark 사용)?

Spark DataFrame (PySpark 1.5.1 사용)이 있고 새 열을 추가하고 싶습니다.

나는 성공하지 않고 다음을 시도했습니다.

type(randomed_hours) # => list

# Create in Python and transform to RDD

new_col = pd.DataFrame(randomed_hours, columns=['new_col'])

spark_new_col = sqlContext.createDataFrame(new_col)

my_df_spark.withColumn("hours", spark_new_col["new_col"])

또한 이것을 사용하여 오류가 발생했습니다.

my_df_spark.withColumn("hours",  sc.parallelize(randomed_hours))

그렇다면 PySpark를 사용하여 기존 DataFrame에 새 열 (Python 벡터 기반)을 어떻게 추가합니까?



답변

DataFrameSpark에 임의의 열을 추가 할 수 없습니다 . 새 열은 리터럴을 사용해서 만 만들 수 있습니다 (다른 리터럴 유형은 Spark DataFrame에서 상수 열을 추가하는 방법에 설명되어 있음 ).

from pyspark.sql.functions import lit

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()

## +---+---+-----+---+
## | x1| x2|   x3| x4|
## +---+---+-----+---+
## |  1|  a| 23.0|  0|
## |  3|  B|-23.0|  0|
## +---+---+-----+---+

기존 열 변환 :

from pyspark.sql.functions import exp

df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()

## +---+---+-----+---+--------------------+
## | x1| x2|   x3| x4|                  x5|
## +---+---+-----+---+--------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9|
## |  3|  B|-23.0|  0|1.026187963170189...|
## +---+---+-----+---+--------------------+

다음을 사용하여 포함 join:

from pyspark.sql.functions import exp

lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
    .join(lookup, col("x1") == col("k"), "leftouter")
    .drop("k")
    .withColumnRenamed("v", "x6"))

## +---+---+-----+---+--------------------+----+
## | x1| x2|   x3| x4|                  x5|  x6|
## +---+---+-----+---+--------------------+----+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|
## |  3|  B|-23.0|  0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+

또는 함수 / udf로 생성 :

from pyspark.sql.functions import rand

df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()

## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2|   x3| x4|                  x5|  x6|                 x7|
## +---+---+-----+---+--------------------+----+-------------------+
## |  1|  a| 23.0|  0| 9.744803446248903E9| foo|0.41930610446846617|
## |  3|  B|-23.0|  0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+

pyspark.sql.functionsCatalyst 표현식에 매핑되는 성능 측면의 내장 함수 ( )는 일반적으로 Python 사용자 정의 함수보다 선호됩니다.

임의의 RDD의 내용을 열로 추가하려면 다음을 수행 할 수 있습니다.


답변

UDF를 사용하여 열을 추가하려면 다음을 수행하십시오.

df = sqlContext.createDataFrame(
    [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 'cat1'
   elif value == 2: return 'cat2'
   ...
   else: return 'n/a'

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()

## +---+---+-----+---------+
## | x1| x2|   x3| category|
## +---+---+-----+---------+
## |  1|  a| 23.0|     cat1|
## |  3|  B|-23.0|      n/a|
## +---+---+-----+---------+


답변

대한 스파크 2.0

# assumes schema has 'age' column 
df.select('*', (df.age + 10).alias('agePlusTen'))


답변

pySpark에 새 열을 추가하는 방법에는 여러 가지가 있습니다.

먼저 간단한 DataFrame을 만들어 보겠습니다.

date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())

이제 열 값을 두 배로 늘리고 새 열에 저장해 보겠습니다. PFB는 동일한 것을 달성하기 위해 몇 가지 다른 접근 방식을 사용합니다.

# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()

# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()

# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()

# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()

Spark DataFrame 함수에 대한 더 많은 예제와 설명을 보려면 내 블로그를 방문하십시오 .

이게 도움이 되길 바란다.


답변

다음을 udf추가 할 때 새로 정의 할 수 있습니다 column_name.

u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')


답변

from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
    lambda val: val, # do sth to val
    StringType()
)
df.withColumn('new_col', func_name(df.old_col))


답변

매우 유사한 사용 사례에 대한 일반화 된 예제를 제공하고 싶습니다.

사용 사례 : 다음으로 구성된 csv가 있습니다.

First|Third|Fifth
data|data|data
data|data|data
...billion more lines

몇 가지 변환을 수행해야하고 최종 CSV는 다음과 같아야합니다.

First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines

이것은 일부 모델에 의해 정의 된 스키마이고 최종 데이터가 SQL 대량 삽입 및 이와 같은 것들과 상호 운용 될 수 있어야하기 때문에이 작업을 수행해야합니다.

그래서:

1) spark.read를 사용하여 원본 csv를 읽고 “df”라고합니다.

2) 데이터에 뭔가를합니다.

3)이 스크립트를 사용하여 null 열을 추가합니다.

outcols = []
for column in MY_COLUMN_LIST:
    if column in df.columns:
        outcols.append(column)
    else:
        outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(outcols)

이러한 방식으로 csv를로드 한 후 스키마를 구조화 할 수 있습니다 (많은 테이블에 대해이 작업을 수행해야하는 경우 열 재정렬에도 사용할 수 있음).