Stream
알 수없는 번호의 이기종 원격 저장된 JSON 파일 세트를 병렬 처리 하는 데 사용하고 싶습니다 (파일 수는 미리 알려지지 않았습니다). 파일의 크기는 파일 당 1 개의 JSON 레코드부터 다른 파일의 100,000 개까지 다양 할 수 있습니다. JSON 기록 이 경우 파일에서 한 줄로 표시되는 독립적 인 JSON 객체를 의미한다.
실제로 이것을 위해 Streams를 사용하고 싶기 때문에 이것을 구현했습니다 Spliterator
.
public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {
abstract protected JsonStreamSupport<METADATA> openInputStream(String path);
abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);
private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
private static final int MAX_BUFFER = 100;
private final Iterator<String> paths;
private JsonStreamSupport<METADATA> reader = null;
public JsonStreamSpliterator(Iterator<String> paths) {
this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
super(est, additionalCharacteristics);
this.paths = paths;
}
private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
this(est, additionalCharacteristics, paths);
open(nextPath);
}
@Override
public boolean tryAdvance(Consumer<? super RECORD> action) {
if(reader == null) {
String path = takeNextPath();
if(path != null) {
open(path);
}
else {
return false;
}
}
Map<String, Object> json = reader.readJsonLine();
if(json != null) {
RECORD item = parse(reader.getMetadata(), json);
action.accept(item);
return true;
}
else {
reader.close();
reader = null;
return tryAdvance(action);
}
}
private void open(String path) {
reader = openInputStream(path);
}
private String takeNextPath() {
synchronized(paths) {
if(paths.hasNext()) {
return paths.next();
}
}
return null;
}
@Override
public Spliterator<RECORD> trySplit() {
String nextPath = takeNextPath();
if(nextPath != null) {
return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
@Override
protected JsonStreamSupport<METADATA> openInputStream(String path) {
return JsonStreamSpliterator.this.openInputStream(path);
}
@Override
protected RECORD parse(METADATA metaData, Map<String,Object> json) {
return JsonStreamSpliterator.this.parse(metaData, json);
}
};
}
else {
List<RECORD> records = new ArrayList<RECORD>();
while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
// loop
}
if(records.size() != 0) {
return records.spliterator();
}
else {
return null;
}
}
}
}
내가 겪고있는 문제는 Stream이 처음에 아름답게 병렬화되는 동안 결국 가장 큰 파일이 단일 스레드에서 처리되는 것입니다. 근위 원인은 잘 문서화되어 있다고 생각합니다. 스플리터는 “불균형”입니다.
보다 구체적으로, 수명주기 trySplit
의 특정 시점 이후에이 메소드가 호출되지 않는 것으로 보이 Stream.forEach
므로 마지막에 작은 배치를 분배하는 추가 로직 trySplit
이 거의 실행되지 않습니다.
trySplit에서 반환 된 모든 스플리터가 어떻게 동일한 paths
반복자를 공유하는지 확인하십시오 . 나는 이것이 모든 스플리터에서 작업의 균형을 잡는 정말 영리한 방법이라고 생각했지만 완전한 병렬 처리를 달성하기에는 충분하지 않았습니다.
병렬 처리가 파일 전체에서 먼저 진행된 다음 큰 파일이 여전히 분할되지 않은 경우 나머지 파일의 청크로 병렬 처리하고 싶습니다. 그것 else
의 끝에서 블록 의 의도 였다 trySplit
.
이 문제를 해결하기 쉬운 / 단순 / 정식 방법이 있습니까?
답변
귀하는 trySplit
관계없이 기본 파일의 크기의 동일한 크기의 출력 분할,해야한다. 모든 파일을 단일 단위로 취급하고 ArrayList
매번 같은 수의 JSON 객체로 백업 된 스플리터를 채워야합니다 . 하나의 스플릿을 처리하는 데 1 ~ 10 밀리 초 (1ms 미만)가 걸리고 배치를 작업자 스레드에 전달하는 비용에 접근하기 시작하는 것보다 높은 객체에 도달해야합니다. 너무 거친 작업.
스플리터는 크기 추정을보고 할 의무가 없으며 이미이 작업을 올바르게 수행하고 있습니다. 추정값은 Long.MAX_VALUE
“제한 없음”을 의미하는 특수한 값입니다. 그러나 단일 JSON 객체가있는 파일이 많은 경우 크기가 1 인 배치가 발생하면 두 가지 방식으로 성능이 저하됩니다. 파일을 열고 닫는 오버 헤드로 인해 병목 현상이 발생할 수 있으며 즉, 스레드 핸드 오프 비용은 하나의 항목을 처리하는 비용과 비교하여 중요 할 수 있으며 병목 현상이 다시 발생합니다.
5 년 전 비슷한 문제를 해결하고 있었고 내 솔루션을 살펴볼 수 있습니다 .
답변
많은 실험을 거친 후에도 여전히 크기 추정을 통해 병렬 처리를 추가 할 수 없었습니다. 기본적으로, 이외의 값은 Long.MAX_VALUE
다른 손에있는 동안 (및 분할없이)는 spliterator 너무 일찍 종료하게하는 경향이 Long.MAX_VALUE
추정 원인이됩니다 trySplit
그것을 반환 할 때까지 끊임없이 호출 할 null
.
내가 찾은 해결책은 스플리터간에 리소스를 내부적으로 공유하고 서로간에 균형을 조정하는 것입니다.
작업 코드 :
public class AwsS3LineSpliterator<LINE> extends AbstractSpliterator<AwsS3LineInput<LINE>> {
public final static class AwsS3LineInput<LINE> {
final public S3ObjectSummary s3ObjectSummary;
final public LINE lineItem;
public AwsS3LineInput(S3ObjectSummary s3ObjectSummary, LINE lineItem) {
this.s3ObjectSummary = s3ObjectSummary;
this.lineItem = lineItem;
}
}
private final class InputStreamHandler {
final S3ObjectSummary file;
final InputStream inputStream;
InputStreamHandler(S3ObjectSummary file, InputStream is) {
this.file = file;
this.inputStream = is;
}
}
private final Iterator<S3ObjectSummary> incomingFiles;
private final Function<S3ObjectSummary, InputStream> fileOpener;
private final Function<InputStream, LINE> lineReader;
private final Deque<S3ObjectSummary> unopenedFiles;
private final Deque<InputStreamHandler> openedFiles;
private final Deque<AwsS3LineInput<LINE>> sharedBuffer;
private final int maxBuffer;
private AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener,
Function<InputStream, LINE> lineReader,
Deque<S3ObjectSummary> unopenedFiles, Deque<InputStreamHandler> openedFiles, Deque<AwsS3LineInput<LINE>> sharedBuffer,
int maxBuffer) {
super(Long.MAX_VALUE, 0);
this.incomingFiles = incomingFiles;
this.fileOpener = fileOpener;
this.lineReader = lineReader;
this.unopenedFiles = unopenedFiles;
this.openedFiles = openedFiles;
this.sharedBuffer = sharedBuffer;
this.maxBuffer = maxBuffer;
}
public AwsS3LineSpliterator(Iterator<S3ObjectSummary> incomingFiles, Function<S3ObjectSummary, InputStream> fileOpener, Function<InputStream, LINE> lineReader, int maxBuffer) {
this(incomingFiles, fileOpener, lineReader, new ConcurrentLinkedDeque<>(), new ConcurrentLinkedDeque<>(), new ArrayDeque<>(maxBuffer), maxBuffer);
}
@Override
public boolean tryAdvance(Consumer<? super AwsS3LineInput<LINE>> action) {
AwsS3LineInput<LINE> lineInput;
synchronized(sharedBuffer) {
lineInput=sharedBuffer.poll();
}
if(lineInput != null) {
action.accept(lineInput);
return true;
}
InputStreamHandler handle = openedFiles.poll();
if(handle == null) {
S3ObjectSummary unopenedFile = unopenedFiles.poll();
if(unopenedFile == null) {
return false;
}
handle = new InputStreamHandler(unopenedFile, fileOpener.apply(unopenedFile));
}
for(int i=0; i < maxBuffer; ++i) {
LINE line = lineReader.apply(handle.inputStream);
if(line != null) {
synchronized(sharedBuffer) {
sharedBuffer.add(new AwsS3LineInput<LINE>(handle.file, line));
}
}
else {
return tryAdvance(action);
}
}
openedFiles.addFirst(handle);
return tryAdvance(action);
}
@Override
public Spliterator<AwsS3LineInput<LINE>> trySplit() {
synchronized(incomingFiles) {
if (incomingFiles.hasNext()) {
unopenedFiles.add(incomingFiles.next());
return new AwsS3LineSpliterator<LINE>(incomingFiles, fileOpener, lineReader, unopenedFiles, openedFiles, sharedBuffer, maxBuffer);
} else {
return null;
}
}
}
}