Spark DataFrame (PySpark 1.5.1 사용)이 있고 새 열을 추가하고 싶습니다.
나는 성공하지 않고 다음을 시도했습니다.
type(randomed_hours) # => list
# Create in Python and transform to RDD
new_col = pd.DataFrame(randomed_hours, columns=['new_col'])
spark_new_col = sqlContext.createDataFrame(new_col)
my_df_spark.withColumn("hours", spark_new_col["new_col"])
또한 이것을 사용하여 오류가 발생했습니다.
my_df_spark.withColumn("hours", sc.parallelize(randomed_hours))
그렇다면 PySpark를 사용하여 기존 DataFrame에 새 열 (Python 벡터 기반)을 어떻게 추가합니까?
답변
DataFrame
Spark에 임의의 열을 추가 할 수 없습니다 . 새 열은 리터럴을 사용해서 만 만들 수 있습니다 (다른 리터럴 유형은 Spark DataFrame에서 상수 열을 추가하는 방법에 설명되어 있음 ).
from pyspark.sql.functions import lit
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
df_with_x4 = df.withColumn("x4", lit(0))
df_with_x4.show()
## +---+---+-----+---+
## | x1| x2| x3| x4|
## +---+---+-----+---+
## | 1| a| 23.0| 0|
## | 3| B|-23.0| 0|
## +---+---+-----+---+
기존 열 변환 :
from pyspark.sql.functions import exp
df_with_x5 = df_with_x4.withColumn("x5", exp("x3"))
df_with_x5.show()
## +---+---+-----+---+--------------------+
## | x1| x2| x3| x4| x5|
## +---+---+-----+---+--------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9|
## | 3| B|-23.0| 0|1.026187963170189...|
## +---+---+-----+---+--------------------+
다음을 사용하여 포함 join
:
from pyspark.sql.functions import exp
lookup = sqlContext.createDataFrame([(1, "foo"), (2, "bar")], ("k", "v"))
df_with_x6 = (df_with_x5
.join(lookup, col("x1") == col("k"), "leftouter")
.drop("k")
.withColumnRenamed("v", "x6"))
## +---+---+-----+---+--------------------+----+
## | x1| x2| x3| x4| x5| x6|
## +---+---+-----+---+--------------------+----+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|
## | 3| B|-23.0| 0|1.026187963170189...|null|
## +---+---+-----+---+--------------------+----+
또는 함수 / udf로 생성 :
from pyspark.sql.functions import rand
df_with_x7 = df_with_x6.withColumn("x7", rand())
df_with_x7.show()
## +---+---+-----+---+--------------------+----+-------------------+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+-----+---+--------------------+----+-------------------+
## | 1| a| 23.0| 0| 9.744803446248903E9| foo|0.41930610446846617|
## | 3| B|-23.0| 0|1.026187963170189...|null|0.37801881545497873|
## +---+---+-----+---+--------------------+----+-------------------+
pyspark.sql.functions
Catalyst 표현식에 매핑되는 성능 측면의 내장 함수 ( )는 일반적으로 Python 사용자 정의 함수보다 선호됩니다.
임의의 RDD의 내용을 열로 추가하려면 다음을 수행 할 수 있습니다.
- 기존 데이터 프레임 에 행 번호 추가
zipWithIndex
RDD를 호출 하고 데이터 프레임으로 변환- 인덱스를 조인 키로 사용하여 둘 다 조인
답변
UDF를 사용하여 열을 추가하려면 다음을 수행하십시오.
df = sqlContext.createDataFrame(
[(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))
from pyspark.sql.functions import udf
from pyspark.sql.types import *
def valueToCategory(value):
if value == 1: return 'cat1'
elif value == 2: return 'cat2'
...
else: return 'n/a'
# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("x1"))
df_with_cat.show()
## +---+---+-----+---------+
## | x1| x2| x3| category|
## +---+---+-----+---------+
## | 1| a| 23.0| cat1|
## | 3| B|-23.0| n/a|
## +---+---+-----+---------+
답변
대한 스파크 2.0
# assumes schema has 'age' column
df.select('*', (df.age + 10).alias('agePlusTen'))
답변
pySpark에 새 열을 추가하는 방법에는 여러 가지가 있습니다.
먼저 간단한 DataFrame을 만들어 보겠습니다.
date = [27, 28, 29, None, 30, 31]
df = spark.createDataFrame(date, IntegerType())
이제 열 값을 두 배로 늘리고 새 열에 저장해 보겠습니다. PFB는 동일한 것을 달성하기 위해 몇 가지 다른 접근 방식을 사용합니다.
# Approach - 1 : using withColumn function
df.withColumn("double", df.value * 2).show()
# Approach - 2 : using select with alias function.
df.select("*", (df.value * 2).alias("double")).show()
# Approach - 3 : using selectExpr function with as clause.
df.selectExpr("*", "value * 2 as double").show()
# Approach - 4 : Using as clause in SQL statement.
df.createTempView("temp")
spark.sql("select *, value * 2 as double from temp").show()
Spark DataFrame 함수에 대한 더 많은 예제와 설명을 보려면 내 블로그를 방문하십시오 .
이게 도움이 되길 바란다.
답변
다음을 udf
추가 할 때 새로 정의 할 수 있습니다 column_name
.
u_f = F.udf(lambda :yourstring,StringType())
a.select(u_f().alias('column_name')
답변
from pyspark.sql.functions import udf
from pyspark.sql.types import *
func_name = udf(
lambda val: val, # do sth to val
StringType()
)
df.withColumn('new_col', func_name(df.old_col))
답변
매우 유사한 사용 사례에 대한 일반화 된 예제를 제공하고 싶습니다.
사용 사례 : 다음으로 구성된 csv가 있습니다.
First|Third|Fifth
data|data|data
data|data|data
...billion more lines
몇 가지 변환을 수행해야하고 최종 CSV는 다음과 같아야합니다.
First|Second|Third|Fourth|Fifth
data|null|data|null|data
data|null|data|null|data
...billion more lines
이것은 일부 모델에 의해 정의 된 스키마이고 최종 데이터가 SQL 대량 삽입 및 이와 같은 것들과 상호 운용 될 수 있어야하기 때문에이 작업을 수행해야합니다.
그래서:
1) spark.read를 사용하여 원본 csv를 읽고 “df”라고합니다.
2) 데이터에 뭔가를합니다.
3)이 스크립트를 사용하여 null 열을 추가합니다.
outcols = []
for column in MY_COLUMN_LIST:
if column in df.columns:
outcols.append(column)
else:
outcols.append(lit(None).cast(StringType()).alias('{0}'.format(column)))
df = df.select(outcols)
이러한 방식으로 csv를로드 한 후 스키마를 구조화 할 수 있습니다 (많은 테이블에 대해이 작업을 수행해야하는 경우 열 재정렬에도 사용할 수 있음).
