[java] Java ThreadPoolExecutor : 코어 풀 크기 업데이트는 들어오는 작업을 간헐적으로 거부합니다.

ThreadPoolExecutor풀을 만든 후의 코어 풀 크기를 다른 숫자 로 크기 조정하려고하면 간헐적으로 여러 작업을 RejectedExecutionException제출하지 않아도 일부 작업이 거부되는 문제가 발생 queueSize + maxPoolSize합니다.

내가 해결하려고하는 문제는 ThreadPoolExecutor스레드 풀의 대기열에 앉아 보류중인 실행을 기반으로 핵심 스레드의 크기를 조정하는 것입니다. 기본적으로 a ThreadPoolExecutorThread대기열이 가득 찬 경우에만 새 항목을 작성 하기 때문에 이것이 필요합니다 .

다음은 문제를 보여주는 작은 독립형 순수 Java 8 프로그램입니다.

import static java.lang.Math.max;
import static java.lang.Math.min;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolResizeTest {

    public static void main(String[] args) throws Exception {
        // increase the number of iterations if unable to reproduce
        // for me 100 iterations have been enough
        int numberOfExecutions = 100;

        for (int i = 1; i <= numberOfExecutions; i++) {
            executeOnce();
        }
    }

    private static void executeOnce() throws Exception {
        int minThreads = 1;
        int maxThreads = 5;
        int queueCapacity = 10;

        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                minThreads, maxThreads,
                0, TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(queueCapacity),
                new ThreadPoolExecutor.AbortPolicy()
        );

        ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
        scheduler.scheduleAtFixedRate(() -> resizeThreadPool(pool, minThreads, maxThreads),
                0, 10, TimeUnit.MILLISECONDS);
        CompletableFuture<Void> taskBlocker = new CompletableFuture<>();

        try {
            int totalTasksToSubmit = queueCapacity + maxThreads;

            for (int i = 1; i <= totalTasksToSubmit; i++) {
                // following line sometimes throws a RejectedExecutionException
                pool.submit(() -> {
                    // block the thread and prevent it from completing the task
                    taskBlocker.join();
                });
                // Thread.sleep(10); //enabling even a small sleep makes the problem go away
            }
        } finally {
            taskBlocker.complete(null);
            scheduler.shutdown();
            pool.shutdown();
        }
    }

    /**
     * Resize the thread pool if the number of pending tasks are non-zero.
     */
    private static void resizeThreadPool(ThreadPoolExecutor pool, int minThreads, int maxThreads) {
        int pendingExecutions = pool.getQueue().size();
        int approximateRunningExecutions = pool.getActiveCount();

        /*
         * New core thread count should be the sum of pending and currently executing tasks
         * with an upper bound of maxThreads and a lower bound of minThreads.
         */
        int newThreadCount = min(maxThreads, max(minThreads, pendingExecutions + approximateRunningExecutions));

        pool.setCorePoolSize(newThreadCount);
        pool.prestartAllCoreThreads();
    }
}

queueCapacity + maxThreads 이상을 제출하지 않으면 풀에서 RejectedExecutionException을 발생시켜야하는 이유는 무엇입니까? ThreadPoolExecutor의 정의에 의해 최대 스레드를 변경하지는 않습니다. 스레드 또는 대기열의 작업을 수용해야합니다.

물론 풀의 크기를 조정하지 않으면 스레드 풀이 제출을 거부하지 않습니다. 제출에 지연을 추가하면 문제가 해결되므로 디버깅하기도 어렵습니다.

RejectedExecutionException을 해결하는 방법에 대한 조언이 있습니까?



답변

이것이 일어나는 이유는 다음과 같습니다.

내 예제에서는 minThreads = 0, maxThreads = 2 및 queueCapacity = 2를 사용하여 더 짧게 만듭니다. 첫 번째 명령이 제출되며 이는 execute 메소드에서 수행됩니다.

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

이 명령에 대해 addWorker (null, false)보다 workQueue.offer (command)가 실행됩니다. 워커 스레드는 먼저 스레드 실행 메소드의 큐에서이 명령을 가져 오므로이 큐에는 여전히 하나의 명령이 있습니다.

이번에는 workQueue.offer (command)가 실행되면서 두 번째 명령이 제출됩니다. 이제 대기열이 가득 찼습니다

이제 ScheduledExecutorService는 maxThreads로 setCorePoolSize를 호출하는 resizeThreadPool 메소드를 실행합니다. 다음은 setCorePoolSize 메소드입니다.

 public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

이 메소드는 addWorker (null, true)를 사용하여 한 명의 작업자를 추가합니다. 실행중인 두 개의 작업자 큐가 없습니다. 최대 값과 큐가 가득 찼습니다.

workQueue.offer (command) 및 addWorker (command, false)가 실패하여 세 번째 명령이 제출되고 실패하여 예외가 발생합니다.

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@24c22fe rejected from java.util.concurrent.ThreadPoolExecutor@cd1e646[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
at ThreadPoolResizeTest.executeOnce(ThreadPoolResizeTest.java:60)
at ThreadPoolResizeTest.runTest(ThreadPoolResizeTest.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:69)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:48)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
at org.junit.runners.ParentRunner.run(ParentRunner.java:292)
at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:365)

이 문제를 해결하려면 큐 용량을 실행하려는 최대 명령으로 설정해야한다고 생각합니다.


답변

이것이 버그인지 여부는 확실하지 않습니다. 이것은 대기열이 가득 찬 후에 추가 작업자 스레드가 작성 될 때의 동작이지만 호출자가 거부되는 작업을 처리 해야하는 Java 문서에서 언급되었습니다.

자바 문서

새로운 쓰레드를위한 팩토리. 모든 스레드는이 팩토리를 사용하여 작성됩니다 (addWorker 메소드 사용). 모든 호출자는 addWorker가 실패 할 수 있도록 준비해야합니다. 이는 스레드 수를 제한하는 시스템 또는 사용자 정책을 반영 할 수 있습니다. 오류로 처리되지 않더라도 스레드를 만들지 못하면 새 작업이 거부되거나 기존 작업이 대기열에 남아있을 수 있습니다.

당신은 핵심 풀 크기를 크기를 조정하면, 말 증가 할 수 있습니다, 추가 노동자 (생성 addWorker의 방법 setCorePoolSize) 및 추가 작업을 만들 수있는 호출 ( addWorker에서 방법을 execute때 거부) addWorkerfalse를 반환 ( add Worker충분 한 추가 노동자 마지막 코드는) 이미 setCorePoolSize 큐에 업데이트를 반영하기 위해 아직 작성 하지 않았지만 아직 실행되지 않았습니다 .

관련 부품

비교

public void setCorePoolSize(int corePoolSize) {
    ....
    int k = Math.min(delta, workQueue.size());
    while (k-- > 0 && addWorker(null, true)) {
        if (workQueue.isEmpty())
             break;
    }
}

public void execute(Runnable command) {
    ...
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

private boolean addWorker(Runnable firstTask, boolean core) {
....
   if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
     return false;
}

사용자 정의 재시도 거부 실행 핸들러를 사용하십시오 (최대 풀 크기). 필요에 따라 조정하십시오.

public static class RetryRejectionPolicy implements RejectedExecutionHandler {
    public RetryRejectionPolicy () {}

    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
           while(true)
            if(e.getQueue().offer(r)) break;
        }
    }
}

ThreadPoolExecutor pool = new ThreadPoolExecutor(
      minThreads, maxThreads,
      0, TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(queueCapacity),
      new ThreadPoolResizeTest.RetryRejectionPolicy()
 );

또한 제출 된 작업이 완료 될 때까지 기다리지 awaitTermination않고 대신 사용하기 때문에 종료 사용이 올바르지 않습니다 .


답변