[apache-spark] 여러 개의 텍스트 파일을 단일 RDD로 읽는 방법은 무엇입니까?

hdfs 위치에서 많은 텍스트 파일을 읽고 spark를 사용하여 반복적으로 매핑하고 싶습니다.

JavaRDD<String> records = ctx.textFile(args[1], 1); 한 번에 하나의 파일 만 읽을 수 있습니다.

둘 이상의 파일을 읽고 단일 RDD로 처리하고 싶습니다. 어떻게?



답변

전체 디렉토리를 지정하고 와일드 카드를 사용하고 디렉토리 및 와일드 카드의 CSV도 사용할 수 있습니다. 예 :

sc.textFile("/my/dir1,/my/paths/part-00[0-5]*,/another/dir,/a/specific/file")

Nick Chammas가 지적한 것처럼 이것은 Hadoop의 노출 FileInputFormat이므로 Hadoop (및 Scalding)에서도 작동합니다.


답변

다음 union과 같이 사용하십시오 :

val sc = new SparkContext(...)
val r1 = sc.textFile("xxx1")
val r2 = sc.textFile("xxx2")
...
val rdds = Seq(r1, r2, ...)
val bigRdd = sc.union(rdds)

그런 다음 bigRdd모든 파일이 포함 된 RDD입니다.


답변

단일 textFile 호출을 사용하여 여러 파일을 읽을 수 있습니다. 스칼라 :

sc.textFile(','.join(files))


답변

이것을 사용할 수 있습니다

먼저 S3 경로의 버퍼 / 목록을 얻을 수 있습니다.

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

이제이 List 객체를 다음 코드 조각에 전달하십시오. 참고 : sc는 SQLContext의 객체입니다.

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

이제 최종 Unified RDD 즉 df를 얻었습니다.

선택 사항이며 단일 BigRDD로 다시 파티션을 나눌 수도 있습니다

val files = sc.textFile(filename, 1).repartition(1)

재 파티셔닝은 항상 작동합니다 : D


답변

PySpark에서 파일을 구문 분석하는 추가 유용한 방법을 찾았습니다. 아마도 스칼라에는 동등한 것이 있지만, 나는 일하는 번역을 내기에 충분히 편안하지 않습니다. 실제로 레이블이 추가 된 textFile 호출입니다 (아래 예에서는 키 = 파일 이름, 값 = 파일에서 한 줄).

“레이블이있는”textFile

입력:

import glob
from pyspark import SparkContext
SparkContext.stop(sc)
sc = SparkContext("local","example") # if running locally
sqlContext = SQLContext(sc)

for filename in glob.glob(Data_File + "/*"):
    Spark_Full += sc.textFile(filename).keyBy(lambda x: filename)

출력 : filename-as-key를 사용하여 튜플을 포함하는 각 항목과 값 = 파일의 각 줄을 가진 배열. 기술적 으로이 방법을 사용하면 실제 파일 경로 이름 외에도 다른 키를 사용하여 메모리에 저장하는 해시 표현을 사용할 수 있습니다. 즉.

[('/home/folder_with_text_files/file1.txt', 'file1_contents_line1'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line2'),
 ('/home/folder_with_text_files/file1.txt', 'file1_contents_line3'),
 ('/home/folder_with_text_files/file2.txt', 'file2_contents_line1'),
  ...]

행 목록으로 다시 결합 할 수도 있습니다.

Spark_Full.groupByKey().map(lambda x: (x[0], list(x[1]))).collect()

[('/home/folder_with_text_files/file1.txt', ['file1_contents_line1', 'file1_contents_line2','file1_contents_line3']),
 ('/home/folder_with_text_files/file2.txt', ['file2_contents_line1'])]

또는 전체 파일을 다시 단일 문자열로 다시 결합하십시오 (이 예제에서는 결과가 wholeTextFiles에서 얻은 것과 동일하지만 파일 경로에서 문자열 “file :”이 제거됨).

Spark_Full.groupByKey().map(lambda x: (x[0], ' '.join(list(x[1])))).collect()


답변

당신이 사용할 수있는

JavaRDD<String , String> records = sc.wholeTextFiles("path of your directory")

여기에서 파일의 경로와 해당 파일의 내용을 얻을 수 있습니다. 오버 헤드를 줄여주는 전체 파일 작업을 한 번에 수행 할 수 있습니다.


답변

모든 답변은 sc.textFile

wholeTextFiles예를 들어이 경우에 왜 그렇지 않은지 궁금했습니다 .

val minPartitions = 2
val path = "/pathtohdfs"
    sc.wholeTextFiles(path,minPartitions)
      .flatMap{case (path, text)
    ...

한 가지 제한 사항은 작은 파일을로드해야합니다. 그렇지 않으면 성능이 저하되어 OOM이 발생할 수 있습니다.

노트 :

  • 전체 파일은 메모리에 맞아야합니다
  • XML 파일과 같이 줄 단위로 분할 할 수없는 파일 형식에 적합

방문에 대한 추가 참조