hdfs 위치에서 많은 텍스트 파일을 읽고 spark를 사용하여 반복적으로 매핑하고 싶습니다.
JavaRDD<String> records = ctx.textFile(args[1], 1);
한 번에 하나의 파일 만 읽을 수 있습니다.
둘 이상의 파일을 읽고 단일 RDD로 처리하고 싶습니다. 어떻게?
답변
전체 디렉토리를 지정하고 와일드 카드를 사용하고 디렉토리 및 와일드 카드의 CSV도 사용할 수 있습니다. 예 :
sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")
Nick Chammas가 지적한 것처럼 이것은 Hadoop의 노출 FileInputFormat
이므로 Hadoop (및 Scalding)에서도 작동합니다.
답변
다음 union
과 같이 사용하십시오 :
val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)
그런 다음 bigRdd
모든 파일이 포함 된 RDD입니다.
답변
단일 textFile 호출을 사용하여 여러 파일을 읽을 수 있습니다. 스칼라 :
sc.textFile(','.join(files))
답변
이것을 사용할 수 있습니다
먼저 S3 경로의 버퍼 / 목록을 얻을 수 있습니다.
import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest
def listFiles(s3_bucket:String, base_prefix : String) = {
var files = new ArrayList[String]
//S3 Client and List Object Request
var s3Client = new AmazonS3Client();
var objectListing: ObjectListing = null;
var listObjectsRequest = new ListObjectsRequest();
//Your S3 Bucket
listObjectsRequest.setBucketName(s3_bucket)
//Your Folder path or Prefix
listObjectsRequest.setPrefix(base_prefix)
//Adding s3:// to the paths and adding to a list
do {
objectListing = s3Client.listObjects(listObjectsRequest);
for (objectSummary <- objectListing.getObjectSummaries().asScala) {
files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
}
listObjectsRequest.setMarker(objectListing.getNextMarker());
} while (objectListing.isTruncated());
//Removing Base Directory Name
files.remove(0)
//Creating a Scala List for same
files.asScala
}
이제이 List 객체를 다음 코드 조각에 전달하십시오. 참고 : sc는 SQLContext의 객체입니다.
var df: DataFrame = null;
for (file <- files) {
val fileDf= sc.textFile(file)
if (df!= null) {
df= df.unionAll(fileDf)
} else {
df= fileDf
}
}
이제 최종 Unified RDD 즉 df를 얻었습니다.
선택 사항이며 단일 BigRDD로 다시 파티션을 나눌 수도 있습니다
val files = sc.textFile(filename, 1).repartition(1)
재 파티셔닝은 항상 작동합니다 : D
답변
PySpark에서 파일을 구문 분석하는 추가 유용한 방법을 찾았습니다. 아마도 스칼라에는 동등한 것이 있지만, 나는 일하는 번역을 내기에 충분히 편안하지 않습니다. 실제로 레이블이 추가 된 textFile 호출입니다 (아래 예에서는 키 = 파일 이름, 값 = 파일에서 한 줄).
“레이블이있는”textFile
입력:
import glob
from pyspark import SparkContext
SparkContext.stop(sc)
sc = SparkContext("local","example") # if running locally
sqlContext = SQLContext(sc)
for filename in glob.glob(Data_File + "/*"):
Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)
출력 : filename-as-key를 사용하여 튜플을 포함하는 각 항목과 값 = 파일의 각 줄을 가진 배열. 기술적 으로이 방법을 사용하면 실제 파일 경로 이름 외에도 다른 키를 사용하여 메모리에 저장하는 해시 표현을 사용할 수 있습니다. 즉.
[('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'),
('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'),
('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'),
('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'),
...]
행 목록으로 다시 결합 할 수도 있습니다.
Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()
[('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']),
('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]
또는 전체 파일을 다시 단일 문자열로 다시 결합하십시오 (이 예제에서는 결과가 wholeTextFiles에서 얻은 것과 동일하지만 파일 경로에서 문자열 “file :”이 제거됨).
Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()
답변
당신이 사용할 수있는
JavaRDD<String , String> records = sc.wholeTextFiles("path of your directory")
여기에서 파일의 경로와 해당 파일의 내용을 얻을 수 있습니다. 오버 헤드를 줄여주는 전체 파일 작업을 한 번에 수행 할 수 있습니다.
답변
모든 답변은 sc.textFile
wholeTextFiles
예를 들어이 경우에 왜 그렇지 않은지 궁금했습니다 .
val minPartitions = 2
val path = "/pathtohdfs"
sc.wholeTextFiles(path,minPartitions)
.flatMap{case (path, text)
...
한 가지 제한 사항은 작은 파일을로드해야합니다. 그렇지 않으면 성능이 저하되어 OOM이 발생할 수 있습니다.
노트 :
- 전체 파일은 메모리에 맞아야합니다
- XML 파일과 같이 줄 단위로 분할 할 수없는 파일 형식에 적합
방문에 대한 추가 참조