[python] Spark로 CSV 파일로드

저는 Spark를 처음 사용하고 Spark를 사용하여 파일에서 CSV 데이터를 읽으려고합니다. 내가하는 일은 다음과 같습니다.

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

이 호출이 내 파일의 첫 번째 두 열 목록을 제공 할 것으로 예상하지만이 오류가 발생합니다.

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

내 CSV 파일이 둘 이상의 열이지만.



답변

모든 행에 2 개 이상의 열이있는 것이 확실 합니까? 확인하기 위해 다음과 같은 것을 시도해 볼 수 있습니까? :

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

또는 범인 (있는 경우)을 인쇄 할 수 있습니다.

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()


답변

Spark 2.0.0 이상

내장 된 csv 데이터 소스를 직접 사용할 수 있습니다.

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

또는

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

외부 종속성을 포함하지 않습니다.

스파크 <2.0.0 :

일반적인 경우에는 사소한 것이 아닌 수동 구문 분석 대신 다음을 권장합니다 spark-csv.

확인 스파크 CSV가 경로에 포함되어 있는지 확인 ( --packages, --jars, --driver-class-path)

그리고 다음과 같이 데이터를로드합니다.

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

로드, 스키마 추론, 잘못된 라인 삭제를 처리 할 수 ​​있으며 Python에서 JVM으로 데이터를 전달할 필요가 없습니다.

참고 :

스키마를 알고 있다면 스키마 추론을 피하고에 전달하는 것이 DataFrameReader좋습니다. 정수, 이중 및 문자열의 세 열이 있다고 가정합니다.

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))


답변

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");

print(df.collect())


답변

Pandas를 사용하여 CSV 파일을 읽은 다음 Pandas DataFrame을 Spark로 가져 오는 또 다른 옵션입니다.

예를 들면 :

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)


답변

단순히 쉼표로 분할하면 필드 (예 :)에있는 쉼표도 분할 a,b,"1,2,3",c되므로 권장되지 않습니다. DataFrames API를 사용하려는 경우 zero323의 대답 은 좋지만 기본 Spark를 고수하려면 csv 모듈을 사용하여 기본 Python에서 csv를 구문 분석 할 수 있습니다 .

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

편집 : @muon이 주석에서 언급했듯이 이것은 헤더를 다른 행과 같이 취급하므로 수동으로 추출해야합니다. 예를 들어, header = rdd.first(); rdd = rdd.filter(lambda x: x != header)( header필터가 평가되기 전에 수정하지 마십시오 ). 그러나이 시점에서 기본 제공 csv 파서를 사용하는 것이 좋습니다.


답변

이것은 PYSPARK에 있습니다.

path="Your file path with file name"

df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

그런 다음 확인할 수 있습니다.

df.show(5)
df.count()


답변

csv를 데이터 프레임으로로드하려면 다음을 수행 할 수 있습니다.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

그것은 나를 위해 잘 작동했습니다.