spark FIFO 스케줄러와 함께 YARN 클러스터 모드에서 실행중인 spark 2.4.4 사용.
가변 개수의 스레드가있는 스레드 풀 실행기를 사용하여 여러 개의 스파크 데이터 프레임 작업 (즉, S3에 데이터 쓰기)을 제출하고 있습니다. 스레드가 ~ 10 개이면 잘 작동하지만 수백 개의 스레드를 사용하면 Spark UI에 따라 작업이 예약되지 않은 교착 상태가있는 것으로 보입니다.
동시에 예약 할 수있는 작업 수를 제어하는 요인은 무엇입니까? 드라이버 리소스 (예 : 메모리 / 코어)? 다른 스파크 구성 설정?
편집하다:
다음은 내 코드에 대한 간략한 개요입니다.
ExecutorService pool = Executors.newFixedThreadPool(nThreads);
ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(pool);
Dataset<Row> aHugeDf = spark.read.json(hundredsOfPaths);
List<Future<Void>> futures = listOfSeveralHundredThings
.stream()
.map(aThing -> ecs.submit(() -> {
df
.filter(col("some_column").equalTo(aThing))
.write()
.format("org.apache.hudi")
.options(writeOptions)
.save(outputPathFor(aThing));
return null;
}))
.collect(Collectors.toList());
IntStream.range(0, futures.size()).forEach(i -> ecs.poll(30, TimeUnit.MINUTES));
exec.shutdownNow();
어느 시점에서, nThreads
증가함에 따라 spark는 더 이상 다음과 같은 증거로 작업을 예약하지 않는 것 같습니다.
ecs.poll(...)
결국 시간 초과- 활성 작업이없는 Spark UI 작업 탭
- 실행기에 대한 활성 작업이없는 Spark UI 실행기 탭
nThreads
실행중인 작업 ID없이 실행중인 쿼리를 표시 하는 Spark UI SQL 탭
내 처형 환경은
- AWS EMR 5.28.1
- 스파크 2.4.4
- 마스터 노드 =
m5.4xlarge
- 핵심 노드 = 3x
rd5.24xlarge
spark.driver.cores=24
spark.driver.memory=32g
spark.executor.memory=21g
spark.scheduler.mode=FIFO
답변
가능한 경우 작업의 출력을 AWS Elastic MapReduce hdfs에 작성하고 (거의 즉각적인 이름 변경 및 로컬 hdfs의 파일 IO 향상을 활용하기 위해) dstcp 단계를 추가하여 파일을 S3로 이동하여 처리하는 모든 문제를 해결하십시오. 파일 시스템이 되고자하는 객체 저장소의 내부. 또한 로컬 hdfs에 쓰면 DirectOutputCommiter와 관련된 교착 상태 함정에 빠지지 않고 런 어웨이 작업을 제어 할 수 있습니다.
S3를 출력 디렉토리로 사용해야하는 경우 다음 Spark 구성이 설정되어 있는지 확인하십시오.
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
spark.speculation false
참고 : DirectParquetOutputCommitter는 데이터 손실 가능성으로 인해 Spark 2.0에서 제거되었습니다. 불행히도 S3a와의 일관성이 향상 될 때까지 해결 방법을 사용해야합니다. Hadoop 2.8로 상황이 개선되고 있습니다
사전 식 순서로 키 이름을 사용하지 마십시오. 해싱 / 랜덤 접두사 또는 날짜 / 시간 역순을 사용하여 키를 계층 적으로 지정하여 가장 일반적인 항목을 키 왼쪽에 배치하는 것이 요령입니다. DNS 문제로 인해 버킷 이름에 밑줄이 없어도됩니다.
fs.s3a.fast.upload upload
단일 파일의 일부를 Amazon S3에 병렬로 활성화
자세한 내용은이 기사를 참조하십시오.
s3에 쓰는 동안 Spark 2.1.0에서 spark.speculation 설정
답변
IMO 당신은이 문제에 잘못 접근했을 것입니다. 작업 당 작업 수가 매우 적다는 것을 보장 할 수 없으면 한 번에 100 개의 작업을 병렬화하여 성능을 크게 향상 시키지는 못할 것입니다. 기본 병렬 처리를 200으로하는 1.5 개의 작업 만 사용한다고 가정하면 클러스터는 한 번에 300 개의 작업 만 지원할 수 있습니다. 최대 동시 쿼리 수를 10으로 제한하기 위해 코드를 다시 작성하는 것이 좋습니다. 실제로 수백 건의 단일 작업으로 300 개의 쿼리가 있다고 생각합니다. 이러한 이유로 대부분의 OLTP 데이터 처리 시스템은 기존 RDS 시스템과 비교하여 의도적으로 상당히 낮은 수준의 동시 쿼리를 갖습니다.
또한
- Apache Hudi의 기본 병렬 처리는 수백 FYI입니다.
- 왜 필터 열을 기준으로 파티셔닝하지 않습니까?