[java] 병렬 무한 Java 스트림에 메모리 부족

다음 Java 프로그램이 왜을 제공하는지 OutOfMemoryError, 해당 프로그램이없는 이유를 이해하려고합니다 .parallel().

System.out.println(Stream
    .iterate(1, i -> i+1)
    .parallel()
    .flatMap(n -> Stream.iterate(n, i -> i+n))
    .mapToInt(Integer::intValue)
    .limit(100_000_000)
    .sum()
);

두 가지 질문이 있습니다.

  1. 이 프로그램의 의도 된 결과는 무엇입니까?

    .parallel()이것이 없으면 단순히 출력 sum(1+2+3+...)되는 것 같습니다 . 이는 flatMap의 첫 번째 스트림에서 단순히 “고착”되는 것을 의미합니다.

    병렬로 예상되는 동작이 있는지는 알지 못하지만 병렬 작업자 수는 n어디에서나 첫 번째 스트림에 인터리브 된 것으로 추측 n됩니다. 청킹 / 버퍼링 동작에 따라 약간 다를 수도 있습니다.

  2. 메모리 부족을 일으키는 원인은 무엇입니까? 특히 이러한 스트림이 어떻게 구현되는지 이해하려고합니다.

    나는 무언가가 스트림을 차단한다고 추측하고 있기 때문에 결코 끝나지 않고 생성 된 값을 제거 할 수는 있지만 어떤 순서로 평가되고 버퍼링이 발생하는지는 알 수 없습니다.

편집 : 관련이있는 경우 Java 11을 사용하고 있습니다.

Editt 2 : 간단한 프로그램에서도 같은 일이 발생하기 IntStream.iterate(1,i->i+1).limit(1000_000_000).parallel().sum()때문에 limit오히려 게으름과 관련이있을 수 있습니다 flatMap.



답변

하지만 어떤 순서로 평가되고 버퍼링이 발생하는지는 잘 모르겠습니다. ”라고 말합니다. 이는 정확히 병렬 스트림에 관한 것입니다. 평가 순서는 지정되어 있지 않습니다.

귀하의 예에서 중요한 부분은 .limit(100_000_000)입니다. 이것은 구현이 임의의 값을 합할 수 는 없지만 처음 100,000,000 개의 숫자를 합산해야 함을 의미합니다 . 참조 구현 .unordered().limit(100_000_000)에서 결과를 변경하지는 않습니다. 즉, 정렬되지 않은 경우에 대한 특별한 구현은 없지만 구현 세부 사항입니다.

이제 작업자 스레드가 요소를 처리 할 때 특정 요소를 처리 할 수있는 요소 수에 따라 사용할 수있는 요소를 알아야하기 때문에 요소를 요약 할 수 없습니다. 이 스트림은 크기를 알지 못하므로 접두사 요소가 처리 된 경우에만 알 수 있으며 무한 스트림에서는 발생하지 않습니다. 따라서 작업자 스레드는 현재 버퍼링을 유지하므로이 정보를 사용할 수있게됩니다.

원칙적으로 작업자 스레드가 가장 왼쪽의 작업 척을 처리한다는 것을 알면 즉시 요소를 요약하고 계산하여 한계에 도달하면 끝을 알릴 수 있습니다. 따라서 스트림이 종료 될 수 있지만 많은 요인에 따라 다릅니다.

귀하의 경우, 그럴듯한 시나리오는 다른 작업자 스레드가 가장 왼쪽 작업이 계산하는 것보다 버퍼 할당 속도가 빠르다는 것입니다. 이 시나리오에서는 타이밍을 약간 변경하면 스트림이 때때로 값과 함께 리턴 될 수 있습니다.

가장 왼쪽의 청크를 처리하는 스레드를 제외한 모든 작업자 스레드 속도를 늦추면 스트림을 종료 할 수 있습니다 (적어도 대부분의 실행에서).

System.out.println(IntStream
    .iterate(1, i -> i+1)
    .parallel()
    .peek(i -> { if(i != 1) LockSupport.parkNanos(1_000_000_000); })
    .flatMap(n -> IntStream.iterate(n, i -> i+n))
    .limit(100_000_000)
    .sum()
);

¹ 처리 순서가 아닌 만남 순서에 대해 말할 때 왼쪽에서 오른쪽 순서를 사용 하라는 Stuart Marks의 제안을 따릅니다 .


답변

내 생각 엔 추가하는 parallel()것이 내부 행동 flatMap()이미 이전에 게으르게 평가되는 데 문제가 있었다는 것이다.

OutOfMemoryError당신이보고되었다지고 있다는 오류 [JDK-8202307]을 java.lang.OutOfMemoryError와 방법 :. Stream.iterator를 호출 할 때 Java 힙 공간을 () flatMap 무한 / 매우 큰 스트림을 사용하는 스트림 () 다음에 . 티켓을 보면 거의 동일한 스택 추적을 얻을 수 있습니다. 다음과 같은 이유로 티켓이 수정되지 않음으로 마감되었습니다.

iterator()spliterator()는 다른 작업을 사용할 수 없습니다 때 방법이 사용되는 “탈출 해치”입니다. 스트림 구현의 푸시 모델을 풀 모델로 변환하기 때문에 몇 가지 제한 사항이 있습니다. 이러한 전이 는 요소가 둘 이상의 요소에 (평평하게) 매핑 될 때와 같은 특정 경우 버퍼링을 필요합니다 . 중첩 된 요소 생성 레이어를 통해 얼마나 많은 요소를 가져와야하는지에 대한 배압의 개념을 지원하기 위해 일반적인 경우를 희생하면서 스트림 구현을 상당히 복잡하게 만듭니다.


답변

OOME은 스트림이 무한 하지 않기 때문에 발생 하지 않지만 사실 이 아닙니다 .

즉,를 주석 처리하면 .limit(...)메모리가 부족하지 않지만 물론 끝나지 않습니다.

일단 분할되면 스트림은 각 스레드 내에 누적 된 요소 수만 추적 할 수 있습니다 (실제 누산기가있는 것처럼 보입니다 Spliterators$ArraySpliterator#array).

당신이없이 재현 할 수있는 것 같은데 flatMap, 단지와 함께 다음을 실행합니다 -Xmx128m:

    System.out.println(Stream
            .iterate(1, i -> i + 1)
            .parallel()
      //    .flatMap(n -> Stream.iterate(n, i -> i+n))
            .mapToInt(Integer::intValue)
            .limit(100_000_000)
            .sum()
    );

그러나을 주석 처리 한 후 limit()랩톱을 교체하기로 결정할 때까지 제대로 실행됩니다.

실제 구현 세부 사항 외에도 다음과 같이 생각합니다.

를 사용 limit하면 sum감속기는 첫 번째 X 요소를 합산하기를 원하므로 스레드가 부분 합을 방출 할 수 없습니다. 각 “슬라이스”(스레드)는 요소를 누적하고 통과시켜야합니다. 제한이 없으면 그러한 제약이 없으므로 각 “슬라이스”는 결과를 최종적으로 방출한다고 가정 할 때 (영원히) 얻는 요소 중 부분 합계를 계산합니다.


답변