ThreadPoolExecutor
최대 크기에 도달하고 대기열이 가득 차면 새 작업을 추가하려고 할 때 submit()
메서드가 차단 되도록 만들고 싶습니다 . 이를 위해 사용자 정의를 구현해야 RejectedExecutionHandler
합니까 아니면 표준 Java 라이브러리를 사용하여이를 수행하는 기존 방법이 있습니까?
답변
방금 찾은 가능한 해결책 중 하나 :
public class BoundedExecutor {
private final Executor exec;
private final Semaphore semaphore;
public BoundedExecutor(Executor exec, int bound) {
this.exec = exec;
this.semaphore = new Semaphore(bound);
}
public void submitTask(final Runnable command)
throws InterruptedException, RejectedExecutionException {
semaphore.acquire();
try {
exec.execute(new Runnable() {
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
semaphore.release();
throw e;
}
}
}
다른 해결책이 있습니까? RejectedExecutionHandler
이러한 상황을 처리하는 표준 방법처럼 보이기 때문에 기반으로하는 것을 선호 합니다.
답변
ThreadPoolExecutor 및 blockingQueue를 사용할 수 있습니다.
public class ImageManager {
BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(blockQueueSize);
RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.CallerRunsPolicy();
private ExecutorService executorService = new ThreadPoolExecutor(numOfThread, numOfThread,
0L, TimeUnit.MILLISECONDS, blockingQueue, rejectedExecutionHandler);
private int downloadThumbnail(String fileListPath){
executorService.submit(new yourRunnable());
}
}
답변
CallerRunsPolicy
호출 스레드에서 거부 된 작업을 실행하는를 사용해야합니다 . 이렇게하면 해당 작업이 완료 될 때까지 실행 프로그램에 새 작업을 제출할 수 없습니다.이 시점에서 여유 풀 스레드가 생기거나 프로세스가 반복됩니다.
문서에서 :
거부 된 작업
execute (java.lang.Runnable) 메서드에 제출 된 새 작업은 Executor가 종료 될 때와 Executor가 최대 스레드와 작업 대기열 용량 모두에 대해 유한 경계를 사용하고 포화 상태 일 때 거부됩니다. 두 경우 모두 execute 메소드는 RejectedExecutionHandler의 RejectedExecutionHandler.rejectedExecution (java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 메소드를 호출합니다. 4 개의 사전 정의 된 핸들러 정책이 제공됩니다.
- 기본 ThreadPoolExecutor.AbortPolicy에서 핸들러는 거부시 런타임 RejectedExecutionException을 발생시킵니다.
- ThreadPoolExecutor.CallerRunsPolicy에서 실행을 호출하는 스레드가 작업을 실행합니다. 이것은 새로운 작업이 제출되는 속도를 늦추는 간단한 피드백 제어 메커니즘을 제공합니다.
- ThreadPoolExecutor.DiscardPolicy에서 실행할 수없는 작업은 단순히 삭제됩니다.
- ThreadPoolExecutor.DiscardOldestPolicy에서 실행 프로그램이 종료되지 않으면 작업 대기열의 헤드에있는 작업이 삭제 된 다음 실행이 다시 시도됩니다 (다시 실패하여이 작업이 반복 될 수 있음).
또한 ThreadPoolExecutor
생성자를 호출 할 때 ArrayBlockingQueue와 같은 제한된 큐를 사용해야합니다 . 그렇지 않으면 아무것도 거부되지 않습니다.
편집 : 귀하의 의견에 대한 응답으로 ArrayBlockingQueue의 크기를 스레드 풀의 최대 크기와 동일하게 설정하고 AbortPolicy를 사용하십시오.
편집 2 : 좋아, 당신이 무엇을 얻고 있는지 봅니다. 이건 어떨까요 :를 초과하지 않는지 beforeExecute()
확인하기 위해 메서드를 재정의하고 getActiveCount()
초과하지 않으면 getMaximumPoolSize()
잠자고 다시 시도합니까?
답변
Hibernate BlockPolicy
는 간단하고 원하는 것을 할 수 있습니다 :
참조 : Executors.java
/**
* A handler for rejected tasks that will have the caller block until
* space is available.
*/
public static class BlockPolicy implements RejectedExecutionHandler {
/**
* Creates a <tt>BlockPolicy</tt>.
*/
public BlockPolicy() { }
/**
* Puts the Runnable to the blocking queue, effectively blocking
* the delegating thread until space is available.
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
try {
e.getQueue().put( r );
}
catch (InterruptedException e1) {
log.error( "Work discarded, thread was interrupted while waiting for space to schedule: {}", r );
}
}
}
답변
BoundedExecutor
에서 위의 인용 답변을 연습 자바 동시성 실행 프로그램에 대한 무제한의 큐를 사용하거나 결합 세마포어는 큐의 크기보다 더 큰없는 경우에만 제대로 작동합니다. 세마포어는 제출 스레드와 풀의 스레드간에 공유되는 상태이므로 큐 크기 <바운드 <= (큐 크기 + 풀 크기) 인 경우에도 실행기를 포화시킬 수 있습니다.
사용 CallerRunsPolicy
은 작업이 영원히 실행되지 않는 경우에만 유효합니다.이 경우 제출 스레드가 rejectedExecution
영원히 유지되며 제출 스레드가 새 작업을 제출할 수 없기 때문에 작업을 실행하는 데 오랜 시간이 걸리는 경우에는 좋지 않습니다. 작업 자체를 실행하는 경우 다른 작업을 수행하십시오.
허용되지 않는 경우 작업을 제출하기 전에 실행자의 제한된 대기열 크기를 확인하는 것이 좋습니다. 대기열이 가득 찬 경우 다시 제출하기 전에 잠시 기다리십시오. 처리량은 저하되지만 제안 된 다른 많은 솔루션보다 더 간단한 솔루션이며 거부되는 작업이 없음을 보장합니다.
답변
나는 그것이 해킹이라는 것을 알고 있지만 내 생각에는 여기에서 제공되는 것들 사이의 가장 깨끗한 해킹 😉
ThreadPoolExecutor는 “put”대신 차단 대기열 “offer”를 사용하므로 차단 대기열의 “offer”동작을 재정의 할 수 있습니다.
class BlockingQueueHack<T> extends ArrayBlockingQueue<T> {
BlockingQueueHack(int size) {
super(size);
}
public boolean offer(T task) {
try {
this.put(task);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return true;
}
}
ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 2, 1, TimeUnit.MINUTES, new BlockingQueueHack(5));
나는 그것을 테스트했고 작동하는 것 같습니다. 시간 제한 정책을 구현하는 것은 독자의 연습으로 남겨집니다.
답변
다음 클래스는 ThreadPoolExecutor를 감싸고 Semaphore를 사용하여 차단하면 작업 대기열이 가득 찼습니다.
public final class BlockingExecutor {
private final Executor executor;
private final Semaphore semaphore;
public BlockingExecutor(int queueSize, int corePoolSize, int maxPoolSize, int keepAliveTime, TimeUnit unit, ThreadFactory factory) {
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
this.executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit, queue, factory);
this.semaphore = new Semaphore(queueSize + maxPoolSize);
}
private void execImpl (final Runnable command) throws InterruptedException {
semaphore.acquire();
try {
executor.execute(new Runnable() {
@Override
public void run() {
try {
command.run();
} finally {
semaphore.release();
}
}
});
} catch (RejectedExecutionException e) {
// will never be thrown with an unbounded buffer (LinkedBlockingQueue)
semaphore.release();
throw e;
}
}
public void execute (Runnable command) throws InterruptedException {
execImpl(command);
}
}
이 래퍼 클래스는 Brian Goetz의 Java Concurrency in Practice 책에 제공된 솔루션을 기반으로합니다. 이 책의 솔루션은 두 개의 생성자 매개 변수 인 an Executor
과 세마포에 사용되는 경계 만 사용합니다. 이것은 Fixpoint가 제공하는 답변에 나와 있습니다. 이 접근 방식에는 문제가 있습니다. 풀 스레드가 사용 중이고 큐가 꽉 찼지만 세마포어가 방금 허가를 해제 한 상태가 될 수 있습니다. ( semaphore.release()
finally 블록에서). 이 상태에서 새 작업은 방금 해제 된 허가를 얻을 수 있지만 작업 대기열이 가득 차서 거부됩니다. 물론 이것은 당신이 원하는 것이 아닙니다. 이 경우 차단하고 싶습니다.
이를 해결하려면 JCiP가 명확하게 언급했듯이 제한되지 않은 대기열을 사용해야합니다. 세마포어는 가드 역할을하여 가상 큐 크기의 영향을줍니다. 이것은 유닛이 maxPoolSize + virtualQueueSize + maxPoolSize
작업 을 포함 할 수 있다는 부작용이 있습니다 . 왜 그런 겁니까? 왜냐하면
semaphore.release()
finally 블록에서. 모든 풀 스레드가 동시에이 문을 호출하면 maxPoolSize
허용이 해제되어 동일한 수의 작업이 장치에 들어갈 수 있습니다. 제한된 대기열을 사용하는 경우 여전히 가득 차서 작업이 거부됩니다. 이제 이것은 풀 스레드가 거의 완료되었을 때만 발생한다는 것을 알고 있으므로 문제가되지 않습니다. 풀 스레드가 차단되지 않으므로 곧 대기열에서 작업을 가져옵니다.
그래도 제한된 대기열을 사용할 수 있습니다. 크기가 virtualQueueSize + maxPoolSize
. 더 큰 크기는 쓸모가 없으며 세마포어는 더 많은 항목을 허용하지 않습니다. 크기가 작을수록 작업이 거부됩니다. 작업이 거부 될 가능성은 크기가 줄어들수록 증가합니다. 예를 들어, maxPoolSize = 2 및 virtualQueueSize = 5 인 제한된 실행기를 원한다고 가정합니다. 그런 다음 5 + 2 = 7 허용과 5 + 2 = 7의 실제 큐 크기로 세마포어를 가져옵니다. 유닛에있을 수있는 실제 작업 수는 2 + 5 + 2 = 9입니다. 실행기가 가득 차고 (대기열에 5 개 작업, 스레드 풀에 2 개, 따라서 0 개 허용 가능) 모든 풀 스레드가 허용을 해제하면 들어오는 작업이 정확히 2 개 허용을 가져올 수 있습니다.
이제 JCiP의 솔루션은 이러한 모든 제약 (제한되지 않은 대기열 또는 해당 수학 제한 등)을 적용하지 않기 때문에 사용하기가 다소 번거 롭습니다. 나는 이것이 이미 사용 가능한 부분을 기반으로 새 스레드 안전 클래스를 빌드 할 수있는 방법을 보여주는 좋은 예일 뿐이지 만 완전히 성장하고 재사용 가능한 클래스가 아니라고 생각합니다. 나는 후자가 저자의 의도라고 생각하지 않습니다.