[java] Java 8 병렬 스트림의 사용자 정의 스레드 풀

Java 8 병렬 스트림에 대한 사용자 정의 스레드 풀을 지정할 수 있습니까? 어디서나 찾을 수 없습니다.

서버 응용 프로그램이 있고 병렬 스트림을 사용하고 싶다고 가정하십시오. 그러나 응용 프로그램은 크고 멀티 스레드이므로 구획화하고 싶습니다. 한 모듈의 응용 프로그램에서 다른 모듈의 작업을 느리게 실행하고 싶지 않습니다.

다른 모듈에 대해 다른 스레드 풀을 사용할 수 없으면 대부분의 실제 상황에서 병렬 스트림을 안전하게 사용할 수 없습니다.

다음 예제를 시도하십시오. 별도의 스레드에서 실행되는 CPU 집약적 작업이 있습니다. 작업은 병렬 스트림을 활용합니다. 첫 번째 작업이 중단되었으므로 각 단계는 1 초가 걸립니다 (스레드 절전으로 시뮬레이션 됨). 문제는 다른 스레드가 멈추고 깨진 작업이 끝날 때까지 기다리는 것입니다. 이것은 예를 들어 설명되었지만 서블릿 앱과 누군가가 장기 실행 작업을 공유 포크 조인 풀에 제출한다고 가정합니다.

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}



답변

실제로 특정 포크 조인 풀에서 병렬 작업을 실행하는 방법이 있습니다. 포크 조인 풀에서 작업으로 실행하면 그대로 유지되며 일반적인 것을 사용하지 않습니다.

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List<Integer> primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

이 트릭은 ForkJoinTask.fork 를 기반으로 합니다. “해당되는 경우 현재 작업이 실행중인 풀에서 또는 해당 ForkJoinPool ()이 아닌 경우 ForkJoinPool.commonPool ()을 사용하여 풀에서이 작업을 비동기식으로 실행하도록 구성합니다.”


답변

병렬 스트림은 기본 사용 하면 프로세서가 기본적으로 하나 개의 적은 스레드가을 에 의해 반환, (그들은 또한 메인 스레드를 사용하기 때문에 병렬 스트림의 모든 프로세서를 사용하는 것이이 방법을) :ForkJoinPool.commonPoolRuntime.getRuntime().availableProcessors()

별도 또는 사용자 지정 풀이 필요한 응용 프로그램의 경우 지정된 대상 병렬 수준으로 ForkJoinPool을 구성 할 수 있습니다. 기본적으로 사용 가능한 프로세서 수와 같습니다.

또한 중첩 된 병렬 스트림 또는 여러 병렬 스트림이 동시에 시작된 경우 모두 동일한 풀을 공유 합니다. 장점 : 기본값 (사용 가능한 프로세서 수) 이상을 사용하지 마십시오. 단점 : 시작한 각 병렬 스트림에 “모든 프로세서”가 할당되지 않을 수 있습니다 (하나 이상이있는 경우). 분명히 ManagedBlocker 를 사용하여 이를 피할 수 있습니다.

병렬 스트림이 실행되는 방식을 변경하려면 다음 중 하나를 수행하십시오.

  • 병렬 스트림 실행을 자신의 ForkJoinPool에 제출하십시오. yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();또는
  • System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")20 개 스레드의 대상 병렬 처리에 대해 시스템 특성을 사용하여 공통 풀의 크기를 변경할 수 있습니다 . 그러나 백 포트 패치 https://bugs.openjdk.java.net/browse/JDK-8190974 후에는 더 이상 작동하지 않습니다 .

프로세서가 8 개인 내 컴퓨터의 예입니다. 다음 프로그램을 실행하면

long start = System.currentTimeMillis();
IntStream s = IntStream.range(0, 20);
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
s.parallel().forEach(i -> {
    try { Thread.sleep(100); } catch (Exception ignore) {}
    System.out.print((System.currentTimeMillis() - start) + " ");
});

출력은 다음과 같습니다.

215216216216216216216216216315316316316316316316316415416416416416

따라서 병렬 스트림이 한 번에 8 개의 항목을 처리한다는 것을 알 수 있습니다. 즉, 8 개의 스레드를 사용합니다. 그러나 주석 처리 된 행의 주석을 해제하면 출력은 다음과 같습니다.

215215215215215215216216216216216216216216216216216216216216216216216

이번에는 병렬 스트림이 20 개의 스레드를 사용했으며 스트림의 20 개 요소가 모두 동시에 처리되었습니다.


답변

자신의 forkJoinPool 내에서 병렬 계산을 트리거하는 트릭 대신 다음과 같이 해당 풀을 CompletableFuture.supplyAsync 메서드로 전달할 수도 있습니다.

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CompletableFuture<List<Integer>> primes = CompletableFuture.supplyAsync(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList()),
    forkJoinPool
);


답변

원래 솔루션 (ForkJoinPool 공통 병렬 처리 속성 설정)이 더 이상 작동하지 않습니다. 원래 답변의 링크를 보면이 문제를 해결하는 업데이트가 Java 8로 다시 포팅되었습니다. 링크 된 스레드에서 언급 했듯이이 솔루션은 영원히 작동하지 않을 수 있습니다. 이를 바탕으로 솔루션은 forkjoinpool.submit with .get 솔루션으로 허용되는 답변에 설명되어 있습니다. 백 포트가이 솔루션의 신뢰성도 해결한다고 생각합니다.

ForkJoinPool fjpool = new ForkJoinPool(10);
System.out.println("stream.parallel");
IntStream range = IntStream.range(0, 20);
fjpool.submit(() -> range.parallel()
        .forEach((int theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();
System.out.println("list.parallelStream");
int [] array = IntStream.range(0, 20).toArray();
List<Integer> list = new ArrayList<>();
for (int theInt: array)
{
    list.add(theInt);
}
fjpool.submit(() -> list.parallelStream()
        .forEach((theInt) ->
        {
            try { Thread.sleep(100); } catch (Exception ignore) {}
            System.out.println(Thread.currentThread().getName() + " -- " + theInt);
        })).get();


답변

다음 속성을 사용하여 기본 병렬 처리를 변경할 수 있습니다.

-Djava.util.concurrent.ForkJoinPool.common.parallelism=16

더 많은 병렬 처리를 사용하도록 설정할 수 있습니다.


답변

사용 된 스레드의 실제 수를 측정하려면 다음을 확인하십시오 Thread.activeCount().

    Runnable r = () -> IntStream
            .range(-42, +42)
            .parallel()
            .map(i -> Thread.activeCount())
            .max()
            .ifPresent(System.out::println);

    ForkJoinPool.commonPool().submit(r).join();
    new ForkJoinPool(42).submit(r).join();

이는 4 코어 CPU에서 다음과 같은 출력을 생성 할 수 있습니다.

5 // common pool
23 // custom pool

.parallel()그것 없이는 :

3 // common pool
4 // custom pool


답변

지금까지 나는이 질문에 대한 답변에 설명 된 솔루션을 사용했습니다. 이제 병렬 스트림 지원 이라는 작은 라이브러리를 생각해 냈습니다 .

ForkJoinPool pool = new ForkJoinPool(NR_OF_THREADS);
ParallelIntStreamSupport.range(1, 1_000_000, pool)
    .filter(PrimesPrint::isPrime)
    .collect(toList())

그러나 @PabloMatiasGomez가 주석에서 지적했듯이 공통 스트림의 크기에 크게 의존하는 병렬 스트림의 분할 메커니즘에 대한 단점이 있습니다. HashSet의 병렬 스트림이 병렬로 실행되지 않음을 참조하십시오 .

이 솔루션을 다른 유형의 작업에 대해 별도의 풀을 갖기 위해 사용하고 있지만 사용하지 않더라도 공통 풀의 크기를 1로 설정할 수 없습니다.