저는 Spark를 처음 사용하고 Spark를 사용하여 파일에서 CSV 데이터를 읽으려고합니다. 내가하는 일은 다음과 같습니다.
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
이 호출이 내 파일의 첫 번째 두 열 목록을 제공 할 것으로 예상하지만이 오류가 발생합니다.
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
내 CSV 파일이 둘 이상의 열이지만.
답변
모든 행에 2 개 이상의 열이있는 것이 확실 합니까? 확인하기 위해 다음과 같은 것을 시도해 볼 수 있습니까? :
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1])) \
.collect()
또는 범인 (있는 경우)을 인쇄 할 수 있습니다.
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)<=1) \
.collect()
답변
Spark 2.0.0 이상
내장 된 csv 데이터 소스를 직접 사용할 수 있습니다.
spark.read.csv(
"some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)
또는
(spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv"))
외부 종속성을 포함하지 않습니다.
스파크 <2.0.0 :
일반적인 경우에는 사소한 것이 아닌 수동 구문 분석 대신 다음을 권장합니다 spark-csv
.
확인 스파크 CSV가 경로에 포함되어 있는지 확인 ( --packages
, --jars
, --driver-class-path
)
그리고 다음과 같이 데이터를로드합니다.
(df = sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
로드, 스키마 추론, 잘못된 라인 삭제를 처리 할 수 있으며 Python에서 JVM으로 데이터를 전달할 필요가 없습니다.
참고 :
스키마를 알고 있다면 스키마 추론을 피하고에 전달하는 것이 DataFrameReader
좋습니다. 정수, 이중 및 문자열의 세 열이 있다고 가정합니다.
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
(sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv"))
답변
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");
print(df.collect())
답변
Pandas를 사용하여 CSV 파일을 읽은 다음 Pandas DataFrame을 Spark로 가져 오는 또 다른 옵션입니다.
예를 들면 :
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
답변
단순히 쉼표로 분할하면 필드 (예 :)에있는 쉼표도 분할 a,b,"1,2,3",c
되므로 권장되지 않습니다. DataFrames API를 사용하려는 경우 zero323의 대답 은 좋지만 기본 Spark를 고수하려면 csv 모듈을 사용하여 기본 Python에서 csv를 구문 분석 할 수 있습니다 .
# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
편집 : @muon이 주석에서 언급했듯이 이것은 헤더를 다른 행과 같이 취급하므로 수동으로 추출해야합니다. 예를 들어, header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
( header
필터가 평가되기 전에 수정하지 마십시오 ). 그러나이 시점에서 기본 제공 csv 파서를 사용하는 것이 좋습니다.
답변
이것은 PYSPARK에 있습니다.
path="Your file path with file name"
df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)
그런 다음 확인할 수 있습니다.
df.show(5)
df.count()
답변
csv를 데이터 프레임으로로드하려면 다음을 수행 할 수 있습니다.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load('sampleFile.csv') # this is your csv file
그것은 나를 위해 잘 작동했습니다.