AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
.map(record -> new Record(recordNumber.incrementAndGet(), record))
.parallel()
.filter(record -> doSomeOperation())
.findFirst()
이 글을 쓰면 맵 뒤에 병렬이 배치되므로 스레드가 맵 호출 만 생성된다고 가정했습니다. 그러나 파일의 일부 줄은 매 실행마다 다른 레코드 번호를 얻었습니다.
공식 Java 스트림 설명서 와 몇 가지 웹 사이트를 읽고 스트림에서 스트림이 작동하는 방식을 이해합니다.
몇 가지 질문 :
-
Java 병렬 스트림은 ArrayList, LinkedList 등의 모든 컬렉션에 의해 구현되는 SplitIterator를 기반으로 작동 합니다. 컬렉션에서 병렬 스트림을 구성 할 때 해당 분할 반복자는 컬렉션을 분할하고 반복하는 데 사용됩니다. 이것은 맵 결과 (예 : 레코드 포조)가 아닌 원래 입력 소스 (파일 라인) 레벨에서 병렬 처리가 발생한 이유를 설명합니다. 내 이해가 정확합니까?
-
필자의 경우 입력은 파일 IO 스트림입니다. 어떤 분할 반복기가 사용됩니까?
-
parallel()
파이프 라인에서 어디에 배치하든 문제가되지 않습니다 . 원래 입력 소스는 항상 분리되고 나머지 중간 작업이 적용됩니다.이 경우 Java는 사용자가 원본 소스를 제외하고 파이프 라인의 어느 곳에 나 병렬 작업을 배치 할 수 없도록해야합니다. Java 스트림이 내부에서 어떻게 작동하는지 모르는 사람들에게는 잘못 이해하고 있기 때문입니다.
parallel()
Stream 객체 유형에 대해 작업이 정의되었을 것이므로 이러한 방식으로 작동합니다. 그러나 대체 솔루션을 제공하는 것이 좋습니다. -
위의 코드 스 니펫에서 입력 파일의 모든 레코드에 줄 번호를 추가하려고하므로 순서를 지정해야합니다. 그러나
doSomeOperation()
무거운 논리이기 때문에 병렬로 적용하고 싶습니다 . 달성하는 한 가지 방법은 나만의 맞춤형 분할 반복자를 작성하는 것입니다. 다른 방법이 있습니까?
답변
이것은 맵 결과 (예 : 레코드 포조)가 아닌 원래 입력 소스 (파일 라인) 레벨에서 병렬 처리가 발생한 이유를 설명합니다.
전체 스트림은 병렬 또는 순차적입니다. 순차적으로 또는 병렬로 실행할 작업의 하위 집합을 선택하지 않습니다.
터미널 작업이 시작되면 스트림 파이프 라인은 호출 된 스트림의 방향에 따라 순차적으로 또는 병렬로 실행됩니다. […] 터미널 작업이 시작되면 스트림 파이프 라인은 호출 된 스트림의 모드에 따라 순차적으로 또는 병렬로 실행됩니다. 동일한 소스
언급했듯이 병렬 스트림은 분할 반복자를 사용합니다. 분명히 이것은 작업이 시작되기 전에 데이터를 분할하는 것입니다.
필자의 경우 입력은 파일 IO 스트림입니다. 어떤 분할 반복기가 사용됩니까?
소스를 보면 java.nio.file.FileChannelLinesSpliterator
파이프 라인에서 parallel ()을 어디에 두든 상관 없습니다. 원래 입력 소스는 항상 분리되고 나머지 중간 작업이 적용됩니다.
권리. 전화 parallel()
를 sequential()
여러 번 할 수도 있습니다 . 마지막으로 호출 한 사람이 이길 것입니다. 를 호출하면 parallel()
반환 된 스트림에 대해 설정합니다. 위에서 언급 한 것처럼 모든 작업 은 순차적으로 또는 병렬로 실행됩니다.
이 경우 Java는 사용자가 원래 소스를 제외하고 파이프 라인의 어느 곳에 나 병렬 작업을 배치 할 수 없도록해야합니다 …
이것은 의견의 문제가됩니다. Zabuza가 JDK 디자이너의 선택을 지원해야 할 충분한 이유가 있다고 생각합니다.
달성하는 한 가지 방법은 나만의 맞춤형 분할 반복자를 작성하는 것입니다. 다른 방법이 있습니까?
이것은 운영에 따라 다릅니다
- 경우
findFirst()
실제 터미널 작업입니다 많은 전화가 없기 때문에, 당신도, 병렬 실행에 대해 걱정할 필요가 없습니다doSomething()
어쨌든 (findFirst()
짧은 단락)입니다..parallel()
실제로 하나 이상의 요소가 처리 될 수 있지만findFirst()
순차 스트림에서는이를 방지 할 수 있습니다. -
터미널 작업으로 많은 양의 데이터가 생성되지 않으면
Record
순차적 스트림을 사용 하여 객체를 만든 다음 결과를 병렬로 처리 할 수 있습니다.List<Record> smallData = Files.lines(inputFile.toPath(), StandardCharsets.UTF_8) .map(record -> new Record(recordNumber.incrementAndGet(), record)) .collect(Collectors.toList()) .parallelStream() .filter(record -> doSomeOperation()) .collect(Collectors.toList());
-
파이프 라인이 메모리에 많은 양의 데이터를로드하는 경우 (사용중인 이유 일 수 있음
Files.lines()
) 사용자 지정 분할 반복자가 필요할 수 있습니다. 그러나 거기에 가기 전에 다른 옵션을 살펴볼 것입니다 (ID 열이있는 줄을 저장하는 것은 저의 의견 일뿐입니다).
또한 다음과 같이 작은 배치로 레코드를 처리하려고합니다.AtomicInteger recordNumber = new AtomicInteger(); final int batchSize = 10; try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), StandardCharsets.UTF_8);) { Supplier<List<Record>> batchSupplier = () -> { List<Record> batch = new ArrayList<>(); for (int i = 0; i < batchSize; i++) { String nextLine; try { nextLine = reader.readLine(); } catch (IOException e) { //hanlde exception throw new RuntimeException(e); } if(null == nextLine) return batch; batch.add(new Record(recordNumber.getAndIncrement(), nextLine)); } System.out.println("next batch"); return batch; }; Stream.generate(batchSupplier) .takeWhile(list -> list.size() >= batchSize) .map(list -> list.parallelStream() .filter(record -> doSomeOperation()) .collect(Collectors.toList())) .flatMap(List::stream) .forEach(System.out::println); }
이것은
doSomeOperation()
모든 데이터를 메모리에로드하지 않고 병렬로 실행 됩니다. 그러나batchSize
생각이 필요합니다.
답변
원래 스트림 디자인에는 다른 병렬 실행 설정으로 후속 파이프 라인 단계를 지원한다는 아이디어가 포함되었지만이 아이디어는 포기되었습니다. API는이 시점에서 비롯 될 수 있지만, 반면에 호출자가 병렬 또는 순차적 실행에 대해 명백한 단일 결정을 내 리도록하는 API 설계는 훨씬 더 복잡합니다.
실제 Spliterator
사용 Files.lines(…)
은 구현에 따라 다릅니다. Java 8 (Oracle 또는 OpenJDK)에서는 항상와 동일합니다 BufferedReader.lines()
. 최신 JDK Path
에서 기본 파일 시스템에 속하고 문자 세트가이 기능에서 지원되는 것 중 하나 인 경우 전용 Spliterator
구현을 가진 Stream을 얻 습니다 java.nio.file.FileChannelLinesSpliterator
. 전제 조건이 충족되지 않으면 with와 동일하게 구현 되며 BufferedReader.lines()
,이를 통해 Iterator
구현 BufferedReader
된 및를 통해 래핑됩니다 Spliterators.spliteratorUnknownSize
.
특정 작업은 Spliterator
병렬 처리 전에 소스에서 바로 라인 번호 매기기를 수행하여 제한없이 후속 병렬 처리를 수행 할 수 있는 사용자 지정으로 처리하는 것이 가장 좋습니다 .
public static Stream<Record> records(Path p) throws IOException {
LineNoSpliterator sp = new LineNoSpliterator(p);
return StreamSupport.stream(sp, false).onClose(sp);
}
private static class LineNoSpliterator implements Spliterator<Record>, Runnable {
int chunkSize = 100;
SeekableByteChannel channel;
LineNumberReader reader;
LineNoSpliterator(Path path) throws IOException {
channel = Files.newByteChannel(path, StandardOpenOption.READ);
reader=new LineNumberReader(Channels.newReader(channel,StandardCharsets.UTF_8));
}
@Override
public void run() {
try(Closeable c1 = reader; Closeable c2 = channel) {}
catch(IOException ex) { throw new UncheckedIOException(ex); }
finally { reader = null; channel = null; }
}
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
try {
String line = reader.readLine();
if(line == null) return false;
action.accept(new Record(reader.getLineNumber(), line));
return true;
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
@Override
public Spliterator<Record> trySplit() {
Record[] chunks = new Record[chunkSize];
int read;
for(read = 0; read < chunks.length; read++) {
int pos = read;
if(!tryAdvance(r -> chunks[pos] = r)) break;
}
return Spliterators.spliterator(chunks, 0, read, characteristics());
}
@Override
public long estimateSize() {
try {
return (channel.size() - channel.position()) / 60;
} catch (IOException ex) {
return 0;
}
}
@Override
public int characteristics() {
return ORDERED | NONNULL | DISTINCT;
}
}
답변
다음은 병렬 적용이 적용되는 경우의 간단한 데모입니다. 엿봄의 결과는 두 예제의 차이점을 분명히 보여줍니다. 참고 : map
이전에 다른 방법을 추가하기 위해 호출이 시작되었습니다 parallel
.
IntStream.rangeClosed (1,20).peek(a->System.out.print(a+" "))
.map(a->a + 200).sum();
System.out.println();
IntStream.rangeClosed(1,20).peek(a->System.out.print(a+" "))
.map(a->a + 200).parallel().sum();