[java] ThreadPoolExecutor가 대기열에 들어가기 전에 스레드를 최대로 늘리는 방법은 무엇입니까?

나는 우리 중 많은 사람들이 사용 ThreadPoolExecutor하는 ExecutorService스레드 풀을 뒷받침하는 기본 동작에 대해 한동안 좌절 했습니다 . Javadocs에서 인용하려면 :

corePoolSize보다 많지만 maximumPoolSize보다 작은 스레드가 실행중인 경우 대기열이 가득 찬 경우에만 새 스레드가 생성 됩니다 .

이것이 의미하는 바는 다음 코드를 사용하여 스레드 풀을 정의 하면는 제한되지 않기 때문에 두 번째 스레드를 시작 하지 않는다는 것입니다 LinkedBlockingQueue.

ExecutorService threadPool =
   new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
      TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

제한된 대기열이 있고 대기열이 가득 찬 경우에만 코어 번호 이상의 스레드가 시작됩니다. 많은 수의 주니어 자바 멀티 스레드 프로그래머가 ThreadPoolExecutor.

이제 이것이 최적이 아닌 특정 사용 사례가 있습니다. 나는 내 자신의 TPE 수업을 작성하지 않고 그것을 해결하는 방법을 찾고 있습니다.

내 요구 사항은 신뢰할 수없는 제 3 자에게 콜백하는 웹 서비스에 대한 것입니다.

  • 웹 요청과 동 기적으로 콜백을 만들고 싶지 않으므로 스레드 풀을 사용하고 싶습니다.
  • 나는 일반적으로이 두 개를 1 분에 얻으므로 newFixedThreadPool(...)대부분 휴면 상태 인 많은 수의 스레드 를 갖고 싶지 않습니다 .
  • 자주 나는이 트래픽의 버스트를 얻고 스레드 수를 최대 값 (50이라고 가정 해 보겠습니다)으로 확장하려고합니다.
  • 모든 콜백을 수행 하기 위해 최선의 시도를해야하므로 50 이상의 추가 콜백을 대기열에 추가하고 싶습니다.을 사용하여 나머지 웹 서버를 압도하고 싶지는 않습니다 newCachedThreadPool().

더 많은 스레드가 시작 되기 전에ThreadPoolExecutor 대기열 이 제한 되고 가득 차야하는 경우이 제한을 어떻게 해결할 수 있습니까? 작업 대기열에 추가 하기 전에 더 많은 스레드를 시작하려면 어떻게 해야합니까?

편집하다:

@Flavio는 ThreadPoolExecutor.allowCoreThreadTimeOut(true)코어 스레드 시간 초과 및 종료 를 위해 를 사용하는 것에 대해 좋은 지적을합니다 . 나는 그것을 고려했지만 여전히 코어 스레드 기능을 원했습니다. 가능한 경우 풀의 스레드 수가 코어 크기 아래로 떨어지기를 원하지 않았습니다.



답변

ThreadPoolExecutor더 많은 스레드가 시작되기 전에 대기열 이 제한 되고 가득 차야하는 경우이 제한을 어떻게 해결할 수 있습니까 ?

나는 마침내이 제한에 대한 다소 우아한 (아마도 약간 해키) 해결책을 찾았다 고 생각 ThreadPoolExecutor합니다. 그것은 LinkedBlockingQueue그것을 반환하도록 확장 하는 것을 포함 false합니다queue.offer(...)이미 일부 작업이 대기열 있을 때 . 현재 스레드가 대기중인 작업을 따라 가지 못하는 경우 TPE는 추가 스레드를 추가합니다. 풀이 이미 최대 스레드에있는 경우이 RejectedExecutionHandler호출됩니다. 그런 다음 put(...)대기열로 작업 을 수행하는 핸들러입니다 .

offer(...)반환 할 수 있는 대기열을 작성하는 것은 확실히 이상합니다.false 있고 put()차단하지 않는 . 그러나 이것은 TPE의 대기열 사용과 잘 작동하므로 이것을 수행하는 데 아무런 문제가 없습니다.

코드는 다음과 같습니다.

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        // Offer it to the queue if there is 0 items already queued, else
        // return false so the TPE will add another thread. If we return false
        // and max threads have been reached then the RejectedExecutionHandler
        // will be called which will do the put into the queue.
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            // This does the actual put into the queue. Once the max threads
            //  have been reached, the tasks will then queue up.
            executor.getQueue().put(r);
            // we do this after the put() to stop race conditions
            if (executor.isShutdown()) {
                throw new RejectedExecutionException(
                    "Task " + r + " rejected from " + e);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

이 메커니즘을 사용하면 작업을 대기열에 제출할 때 ThreadPoolExecutor 됩니다.

  1. 스레드 수를 초기에 코어 크기까지 확장합니다 (여기서는 1).
  2. 대기열에 제공하십시오. 대기열이 비어 있으면 기존 스레드에서 처리하도록 대기열에 추가됩니다.
  3. 큐에 이미 하나 이상의 요소가있는 경우 offer(...)는 false를 반환합니다.
  4. false가 반환되면 최대 수 (여기서는 50)에 도달 할 때까지 풀의 스레드 수를 늘립니다.
  5. 최대에 있으면 RejectedExecutionHandler
  6. 그런 RejectedExecutionHandler다음 FIFO 순서로 사용 가능한 첫 번째 스레드에서 처리 할 작업을 대기열에 넣습니다.

위의 예제 코드에서 큐는 제한되지 않지만 바인딩 된 큐로 정의 할 수도 있습니다. 예를 들어, 1000의 용량을 추가하면 LinkedBlockingQueue다음이 수행됩니다.

  1. 스레드를 최대로 확장
  2. 1000 개의 작업으로 가득 찰 때까지 대기
  3. 그런 다음 대기열에서 공간을 사용할 수있을 때까지 발신자를 차단합니다.

당신이 사용하는 데 필요한 경우에도 offer(...)에서
RejectedExecutionHandler다음은 사용할 수있는 offer(E, long, TimeUnit)대신과 방법을 Long.MAX_VALUE시간 제한 등.

경고:

executor 가 종료 된 작업이 추가 될 것으로 예상한다면 executor-service가 종료되었을 때 RejectedExecutionException사용자 지정을 버리는 것이 더 현명 할 수 있습니다 RejectedExecutionHandler. 이것을 지적한 @RaduToader에게 감사드립니다.

편집하다:

이 답변에 대한 또 다른 조정은 유휴 스레드가 있는지 TPE에 요청하고있는 경우에만 항목을 대기열에 넣는 것입니다. 이것에 대한 진정한 클래스를 만들고 ourQueue.setThreadPoolExecutor(tpe);그것에 메소드를 추가 해야합니다.

그러면 offer(...)방법은 다음과 같습니다.

  1. tpe.getPoolSize() == tpe.getMaximumPoolSize()이 경우 전화 만하는지 확인하십시오 super.offer(...).
  2. 그렇지 않으면 tpe.getPoolSize() > tpe.getActiveCount()전화super.offer(...) 유휴 스레드가있는 것처럼 보이므로 .
  3. 그렇지 않으면 false다른 스레드를 포크로 돌아갑니다 .

아마도 이것은 :

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

TPE의 get 메서드는 volatile필드에 액세스 하거나 (의 경우 getActiveCount()) TPE를 잠그고 스레드 목록을 확인하기 때문에 비용이 많이 듭니다 . 또한 여기에는 작업이 부적절하게 대기열에 추가되거나 유휴 스레드가있을 때 다른 스레드가 분기 될 수있는 경합 조건이 있습니다.


답변

코어 크기와 최대 크기를 동일한 값으로 설정하고 .NET Core를 사용하여 풀에서 코어 스레드를 제거 할 수 있습니다 allowCoreThreadTimeOut(true).


답변

이 질문에 대해 이미 두 가지 다른 답을 얻었지만 이것이 최고라고 생각합니다.

현재 허용되는 답변 의 기술을 기반으로합니다 .

  1. offer()(때때로) false를 반환하도록 큐의 메서드를 재정의합니다 .
  2. 이로 인해에서 ThreadPoolExecutor새 스레드를 생성하거나 작업을 거부합니다.
  3. 거부시 실제로 작업을 대기열에 넣 RejectedExecutionHandler도록 설정합니다 .

문제는 언제 offer()false를 반환해야 하는가입니다. 대기열에 몇 가지 작업이있을 때 현재 허용되는 답변은 false를 반환하지만, 제가 거기에서 지적했듯이 이로 인해 바람직하지 않은 결과가 발생합니다. 또는 항상 false를 반환하면 대기열에서 대기중인 스레드가 있어도 새 스레드가 계속 생성됩니다.

이 솔루션은 자바 7을 사용하는 것입니다 LinkedTransferQueue및이 offer()전화를 tryTransfer(). 대기중인 소비자 스레드가 있으면 작업이 해당 스레드로 전달됩니다. 그렇지 않으면 offer()false를 반환 ThreadPoolExecutor하고 새 스레드를 생성합니다.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });


답변

참고 : 이제 다른 답변을 선호하고 권장 합니다.

여기에 훨씬 더 간단한 버전이 있습니다. 새 작업이 실행될 때마다 corePoolSize (maximumPoolSize 제한까지)를 늘린 다음 작업이 완료됩니다.

다시 말해, 실행 중이거나 대기열에 추가 된 작업의 수를 추적하고 사용자가 지정한 “코어 풀 크기”와 maximumPoolSize 사이에있는 한 corePoolSize가 작업 수와 동일한 지 확인합니다.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

서면으로 사용자 변경을 지원하지 않는 클래스는 건설 후 corePoolSize를 또는 maximumPoolSize를 지정하고, 직접 또는를 통해 작업 큐를 조작 지원하지 않습니다 remove()purge().


답변

ThreadPoolExecutor추가 creationThreshold및 재정의 를 받는의 하위 클래스가 있습니다 execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

그게 도움이 될 수도 있지만 당연히 더 예술적으로 보입니다 …


답변

권장 답변은 JDK 스레드 풀과 관련된 문제 중 하나 (1) 만 해결합니다.

  1. JDK 스레드 풀은 큐에 편향되어 있습니다. 따라서 새 스레드를 생성하는 대신 작업을 대기열에 넣습니다. 큐가 한계에 도달하는 경우에만 스레드 풀이 새 스레드를 생성합니다.

  2. 로드가 가벼워지면 스레드 폐기가 발생하지 않습니다. 예를 들어, 풀에 도달하는 작업이 많아서 풀이 최대가되고 한 번에 최대 2 개의 작업이 경부하되는 경우 풀은 모든 스레드를 사용하여 스레드 만료를 방지하는 경부 하를 처리합니다. (2 개의 스레드 만 필요합니다…)

위의 행동이 마음에 들지 않아 위의 결함을 극복하기 위해 풀을 구현했습니다.

해결하려면 2) Lifo 스케줄링을 사용하면 문제가 해결됩니다. 이 아이디어는 ACM Applicative 2015 컨퍼런스 : Systems @ Facebook 규모 에서 Ben Maurer가 발표했습니다.

그래서 새로운 구현이 탄생했습니다.

LifoThreadPoolExecutorSQP

지금까지이 구현은 ZEL에 대한 비동기 실행 성능을 향상시킵니다 .

구현은 컨텍스트 전환 오버 헤드를 줄일 수있는 스핀 가능하며 특정 사용 사례에 대해 우수한 성능을 제공합니다.

도움이 되었기를 바랍니다 …

추신 : JDK 포크 조인 풀은 ExecutorService를 구현하고 “일반적인”스레드 풀로 작동합니다. 구현은 성능이 뛰어나고 LIFO 스레드 스케줄링을 사용하지만 내부 대기열 크기, 만료 시간 제한을 제어 할 수 없습니다. 가장 중요한 것은 작업이 불가능하다는 것입니다. 취소 할 때 중단됨


답변

참고 : 이제 다른 답변을 선호하고 권장 합니다.

false를 반환하도록 대기열을 변경하는 원래 아이디어에 따라 다른 제안이 있습니다. 이 작업에서는 모든 작업이 대기열에 들어갈 수 있지만 작업이 대기열에 추가 될 때마다 execute()대기열이 거부하는 센티넬 no-op 작업을 따라 가며 새 스레드가 생성되고 즉시 작업이 실행되지 않습니다. 대기열에서 뭔가.

작업자 스레드가 LinkedBlockingQueue새 작업을 위해 폴링 할 수 있기 때문에 사용 가능한 스레드가있는 경우에도 작업이 대기열에 추가 될 수 있습니다. 사용 가능한 스레드가있는 경우에도 새 스레드를 생성하지 않으려면 대기열에서 새 작업을 기다리는 스레드 수를 추적하고 대기중인 스레드보다 대기열에 더 많은 작업이있을 때만 새 스레드를 생성해야합니다.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();
    }
});