나는 우리 중 많은 사람들이 사용 ThreadPoolExecutor
하는 ExecutorService
스레드 풀을 뒷받침하는 기본 동작에 대해 한동안 좌절 했습니다 . Javadocs에서 인용하려면 :
corePoolSize보다 많지만 maximumPoolSize보다 작은 스레드가 실행중인 경우 대기열이 가득 찬 경우에만 새 스레드가 생성 됩니다 .
이것이 의미하는 바는 다음 코드를 사용하여 스레드 풀을 정의 하면는 제한되지 않기 때문에 두 번째 스레드를 시작 하지 않는다는 것입니다 LinkedBlockingQueue
.
ExecutorService threadPool =
new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));
제한된 대기열이 있고 대기열이 가득 찬 경우에만 코어 번호 이상의 스레드가 시작됩니다. 많은 수의 주니어 자바 멀티 스레드 프로그래머가 ThreadPoolExecutor
.
이제 이것이 최적이 아닌 특정 사용 사례가 있습니다. 나는 내 자신의 TPE 수업을 작성하지 않고 그것을 해결하는 방법을 찾고 있습니다.
내 요구 사항은 신뢰할 수없는 제 3 자에게 콜백하는 웹 서비스에 대한 것입니다.
- 웹 요청과 동 기적으로 콜백을 만들고 싶지 않으므로 스레드 풀을 사용하고 싶습니다.
- 나는 일반적으로이 두 개를 1 분에 얻으므로
newFixedThreadPool(...)
대부분 휴면 상태 인 많은 수의 스레드 를 갖고 싶지 않습니다 . - 자주 나는이 트래픽의 버스트를 얻고 스레드 수를 최대 값 (50이라고 가정 해 보겠습니다)으로 확장하려고합니다.
- 모든 콜백을 수행 하기 위해 최선의 시도를해야하므로 50 이상의 추가 콜백을 대기열에 추가하고 싶습니다.을 사용하여 나머지 웹 서버를 압도하고 싶지는 않습니다
newCachedThreadPool()
.
더 많은 스레드가 시작 되기 전에ThreadPoolExecutor
대기열 이 제한 되고 가득 차야하는 경우이 제한을 어떻게 해결할 수 있습니까? 작업 을 대기열에 추가 하기 전에 더 많은 스레드를 시작하려면 어떻게 해야합니까?
편집하다:
@Flavio는 ThreadPoolExecutor.allowCoreThreadTimeOut(true)
코어 스레드 시간 초과 및 종료 를 위해 를 사용하는 것에 대해 좋은 지적을합니다 . 나는 그것을 고려했지만 여전히 코어 스레드 기능을 원했습니다. 가능한 경우 풀의 스레드 수가 코어 크기 아래로 떨어지기를 원하지 않았습니다.
답변
ThreadPoolExecutor
더 많은 스레드가 시작되기 전에 대기열 이 제한 되고 가득 차야하는 경우이 제한을 어떻게 해결할 수 있습니까 ?
나는 마침내이 제한에 대한 다소 우아한 (아마도 약간 해키) 해결책을 찾았다 고 생각 ThreadPoolExecutor
합니다. 그것은 LinkedBlockingQueue
그것을 반환하도록 확장 하는 것을 포함 false
합니다queue.offer(...)
이미 일부 작업이 대기열 있을 때 . 현재 스레드가 대기중인 작업을 따라 가지 못하는 경우 TPE는 추가 스레드를 추가합니다. 풀이 이미 최대 스레드에있는 경우이 RejectedExecutionHandler
호출됩니다. 그런 다음 put(...)
대기열로 작업 을 수행하는 핸들러입니다 .
offer(...)
반환 할 수 있는 대기열을 작성하는 것은 확실히 이상합니다.false
있고 put()
차단하지 않는 . 그러나 이것은 TPE의 대기열 사용과 잘 작동하므로 이것을 수행하는 데 아무런 문제가 없습니다.
코드는 다음과 같습니다.
// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
private static final long serialVersionUID = -6903933921423432194L;
@Override
public boolean offer(Runnable e) {
// Offer it to the queue if there is 0 items already queued, else
// return false so the TPE will add another thread. If we return false
// and max threads have been reached then the RejectedExecutionHandler
// will be called which will do the put into the queue.
if (size() == 0) {
return super.offer(e);
} else {
return false;
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// This does the actual put into the queue. Once the max threads
// have been reached, the tasks will then queue up.
executor.getQueue().put(r);
// we do this after the put() to stop race conditions
if (executor.isShutdown()) {
throw new RejectedExecutionException(
"Task " + r + " rejected from " + e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
});
이 메커니즘을 사용하면 작업을 대기열에 제출할 때 ThreadPoolExecutor
됩니다.
- 스레드 수를 초기에 코어 크기까지 확장합니다 (여기서는 1).
- 대기열에 제공하십시오. 대기열이 비어 있으면 기존 스레드에서 처리하도록 대기열에 추가됩니다.
- 큐에 이미 하나 이상의 요소가있는 경우
offer(...)
는 false를 반환합니다. - false가 반환되면 최대 수 (여기서는 50)에 도달 할 때까지 풀의 스레드 수를 늘립니다.
- 최대에 있으면
RejectedExecutionHandler
- 그런
RejectedExecutionHandler
다음 FIFO 순서로 사용 가능한 첫 번째 스레드에서 처리 할 작업을 대기열에 넣습니다.
위의 예제 코드에서 큐는 제한되지 않지만 바인딩 된 큐로 정의 할 수도 있습니다. 예를 들어, 1000의 용량을 추가하면 LinkedBlockingQueue
다음이 수행됩니다.
- 스레드를 최대로 확장
- 1000 개의 작업으로 가득 찰 때까지 대기
- 그런 다음 대기열에서 공간을 사용할 수있을 때까지 발신자를 차단합니다.
당신이 사용하는 데 필요한 경우에도 offer(...)
에서
RejectedExecutionHandler
다음은 사용할 수있는 offer(E, long, TimeUnit)
대신과 방법을 Long.MAX_VALUE
시간 제한 등.
경고:
executor 가 종료 된 후 작업이 추가 될 것으로 예상한다면 executor-service가 종료되었을 때 RejectedExecutionException
사용자 지정을 버리는 것이 더 현명 할 수 있습니다 RejectedExecutionHandler
. 이것을 지적한 @RaduToader에게 감사드립니다.
편집하다:
이 답변에 대한 또 다른 조정은 유휴 스레드가 있는지 TPE에 요청하고있는 경우에만 항목을 대기열에 넣는 것입니다. 이것에 대한 진정한 클래스를 만들고 ourQueue.setThreadPoolExecutor(tpe);
그것에 메소드를 추가 해야합니다.
그러면 offer(...)
방법은 다음과 같습니다.
tpe.getPoolSize() == tpe.getMaximumPoolSize()
이 경우 전화 만하는지 확인하십시오super.offer(...)
.- 그렇지 않으면
tpe.getPoolSize() > tpe.getActiveCount()
전화super.offer(...)
유휴 스레드가있는 것처럼 보이므로 . - 그렇지 않으면
false
다른 스레드를 포크로 돌아갑니다 .
아마도 이것은 :
int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
return super.offer(e);
} else {
return false;
}
TPE의 get 메서드는 volatile
필드에 액세스 하거나 (의 경우 getActiveCount()
) TPE를 잠그고 스레드 목록을 확인하기 때문에 비용이 많이 듭니다 . 또한 여기에는 작업이 부적절하게 대기열에 추가되거나 유휴 스레드가있을 때 다른 스레드가 분기 될 수있는 경합 조건이 있습니다.
답변
코어 크기와 최대 크기를 동일한 값으로 설정하고 .NET Core를 사용하여 풀에서 코어 스레드를 제거 할 수 있습니다 allowCoreThreadTimeOut(true)
.
답변
이 질문에 대해 이미 두 가지 다른 답을 얻었지만 이것이 최고라고 생각합니다.
현재 허용되는 답변 의 기술을 기반으로합니다 .
offer()
(때때로) false를 반환하도록 큐의 메서드를 재정의합니다 .- 이로 인해에서
ThreadPoolExecutor
새 스레드를 생성하거나 작업을 거부합니다. - 거부시 실제로 작업을 대기열에 넣
RejectedExecutionHandler
도록 설정합니다 .
문제는 언제 offer()
false를 반환해야 하는가입니다. 대기열에 몇 가지 작업이있을 때 현재 허용되는 답변은 false를 반환하지만, 제가 거기에서 지적했듯이 이로 인해 바람직하지 않은 결과가 발생합니다. 또는 항상 false를 반환하면 대기열에서 대기중인 스레드가 있어도 새 스레드가 계속 생성됩니다.
이 솔루션은 자바 7을 사용하는 것입니다 LinkedTransferQueue
및이 offer()
전화를 tryTransfer()
. 대기중인 소비자 스레드가 있으면 작업이 해당 스레드로 전달됩니다. 그렇지 않으면 offer()
false를 반환 ThreadPoolExecutor
하고 새 스레드를 생성합니다.
BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
return tryTransfer(e);
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
답변
참고 : 이제 다른 답변을 선호하고 권장 합니다.
여기에 훨씬 더 간단한 버전이 있습니다. 새 작업이 실행될 때마다 corePoolSize (maximumPoolSize 제한까지)를 늘린 다음 작업이 완료됩니다.
다시 말해, 실행 중이거나 대기열에 추가 된 작업의 수를 추적하고 사용자가 지정한 “코어 풀 크기”와 maximumPoolSize 사이에있는 한 corePoolSize가 작업 수와 동일한 지 확인합니다.
public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
private int userSpecifiedCorePoolSize;
private int taskCount;
public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
userSpecifiedCorePoolSize = corePoolSize;
}
@Override
public void execute(Runnable runnable) {
synchronized (this) {
taskCount++;
setCorePoolSizeToTaskCountWithinBounds();
}
super.execute(runnable);
}
@Override
protected void afterExecute(Runnable runnable, Throwable throwable) {
super.afterExecute(runnable, throwable);
synchronized (this) {
taskCount--;
setCorePoolSizeToTaskCountWithinBounds();
}
}
private void setCorePoolSizeToTaskCountWithinBounds() {
int threads = taskCount;
if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
setCorePoolSize(threads);
}
}
서면으로 사용자 변경을 지원하지 않는 클래스는 건설 후 corePoolSize를 또는 maximumPoolSize를 지정하고, 직접 또는를 통해 작업 큐를 조작 지원하지 않습니다 remove()
나 purge()
.
답변
ThreadPoolExecutor
추가 creationThreshold
및 재정의 를 받는의 하위 클래스가 있습니다 execute
.
public void execute(Runnable command) {
super.execute(command);
final int poolSize = getPoolSize();
if (poolSize < getMaximumPoolSize()) {
if (getQueue().size() > creationThreshold) {
synchronized (this) {
setCorePoolSize(poolSize + 1);
setCorePoolSize(poolSize);
}
}
}
}
그게 도움이 될 수도 있지만 당연히 더 예술적으로 보입니다 …
답변
권장 답변은 JDK 스레드 풀과 관련된 문제 중 하나 (1) 만 해결합니다.
-
JDK 스레드 풀은 큐에 편향되어 있습니다. 따라서 새 스레드를 생성하는 대신 작업을 대기열에 넣습니다. 큐가 한계에 도달하는 경우에만 스레드 풀이 새 스레드를 생성합니다.
-
로드가 가벼워지면 스레드 폐기가 발생하지 않습니다. 예를 들어, 풀에 도달하는 작업이 많아서 풀이 최대가되고 한 번에 최대 2 개의 작업이 경부하되는 경우 풀은 모든 스레드를 사용하여 스레드 만료를 방지하는 경부 하를 처리합니다. (2 개의 스레드 만 필요합니다…)
위의 행동이 마음에 들지 않아 위의 결함을 극복하기 위해 풀을 구현했습니다.
해결하려면 2) Lifo 스케줄링을 사용하면 문제가 해결됩니다. 이 아이디어는 ACM Applicative 2015 컨퍼런스 : Systems @ Facebook 규모 에서 Ben Maurer가 발표했습니다.
그래서 새로운 구현이 탄생했습니다.
지금까지이 구현은 ZEL에 대한 비동기 실행 성능을 향상시킵니다 .
구현은 컨텍스트 전환 오버 헤드를 줄일 수있는 스핀 가능하며 특정 사용 사례에 대해 우수한 성능을 제공합니다.
도움이 되었기를 바랍니다 …
추신 : JDK 포크 조인 풀은 ExecutorService를 구현하고 “일반적인”스레드 풀로 작동합니다. 구현은 성능이 뛰어나고 LIFO 스레드 스케줄링을 사용하지만 내부 대기열 크기, 만료 시간 제한을 제어 할 수 없습니다. 가장 중요한 것은 작업이 불가능하다는 것입니다. 취소 할 때 중단됨
답변
참고 : 이제 다른 답변을 선호하고 권장 합니다.
false를 반환하도록 대기열을 변경하는 원래 아이디어에 따라 다른 제안이 있습니다. 이 작업에서는 모든 작업이 대기열에 들어갈 수 있지만 작업이 대기열에 추가 될 때마다 execute()
대기열이 거부하는 센티넬 no-op 작업을 따라 가며 새 스레드가 생성되고 즉시 작업이 실행되지 않습니다. 대기열에서 뭔가.
작업자 스레드가 LinkedBlockingQueue
새 작업을 위해 폴링 할 수 있기 때문에 사용 가능한 스레드가있는 경우에도 작업이 대기열에 추가 될 수 있습니다. 사용 가능한 스레드가있는 경우에도 새 스레드를 생성하지 않으려면 대기열에서 새 작업을 기다리는 스레드 수를 추적하고 대기중인 스레드보다 대기열에 더 많은 작업이있을 때만 새 스레드를 생성해야합니다.
final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };
final AtomicInteger waitingThreads = new AtomicInteger(0);
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
@Override
public boolean offer(Runnable e) {
// offer returning false will cause the executor to spawn a new thread
if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
else return super.offer(e);
}
@Override
public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.poll(timeout, unit);
} finally {
waitingThreads.decrementAndGet();
}
}
@Override
public Runnable take() throws InterruptedException {
try {
waitingThreads.incrementAndGet();
return super.take();
} finally {
waitingThreads.decrementAndGet();
}
}
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
@Override
public void execute(Runnable command) {
super.execute(command);
if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
}
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r == SENTINEL_NO_OP) return;
else throw new RejectedExecutionException();
}
});