실행기 서비스에 제출해야 할 작업으로 가득 찬 대기열이 있다고 가정 해보십시오. 한 번에 하나씩 처리하기를 원합니다. 내가 생각할 수있는 가장 간단한 방법은 다음과 같습니다.
- 대기열에서 작업 수행
- 유언 집행자에게 제출
- 반환 된 Future에서 .get을 호출하고 결과가 제공 될 때까지 차단
- 대기열에서 다른 작업을 수행하십시오 …
그러나 완전히 차단하지 않으려 고합니다. 작업이 한 번에 하나씩 처리되어야하는 10,000 개의 대기열이있는 경우 대부분 차단 된 스레드를 유지하므로 스택 공간이 부족합니다.
내가 원하는 것은 작업을 제출하고 작업이 완료되면 호출되는 콜백을 제공하는 것입니다. 이 콜백 알림을 플래그로 사용하여 다음 작업을 보냅니다. (functionaljava와 jetlang은 분명히 그러한 비 차단 알고리즘을 사용하지만 코드를 이해할 수 없습니다)
내 실행 프로그램을 작성하는 데 부족한 JDK의 java.util.concurrent를 사용하여 어떻게 할 수 있습니까?
(이 작업을 제공하는 대기열은 자체적으로 차단 될 수 있지만 나중에 해결해야 할 문제입니다)
답변
완료 알림에 전달할 매개 변수를 수신하도록 콜백 인터페이스를 정의하십시오. 그런 다음 작업이 끝날 때 호출하십시오.
Runnable 태스크에 대한 일반 랩퍼를 작성하여에 제출할 수도 ExecutorService
있습니다. 또는 Java 8에 내장 된 메커니즘에 대해서는 아래를 참조하십시오.
class CallbackTask implements Runnable {
private final Runnable task;
private final Callback callback;
CallbackTask(Runnable task, Callback callback) {
this.task = task;
this.callback = callback;
}
public void run() {
task.run();
callback.complete();
}
}
와 함께 CompletableFuture
Java 8에는 프로세스를 비동기식 및 조건부로 완료 할 수있는 파이프 라인을 작성하는보다 정교한 수단이 포함되었습니다. 여기에는 생각이 있지만 완전한 알림의 예가 있습니다.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
public class GetTaskNotificationWithoutBlocking {
public static void main(String... argv) throws Exception {
ExampleService svc = new ExampleService();
GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
f.thenAccept(listener::notify);
System.out.println("Exiting main()");
}
void notify(String msg) {
System.out.println("Received message: " + msg);
}
}
class ExampleService {
String work() {
sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
char[] str = new char[5];
ThreadLocalRandom current = ThreadLocalRandom.current();
for (int idx = 0; idx < str.length; ++idx)
str[idx] = (char) ('A' + current.nextInt(26));
String msg = new String(str);
System.out.println("Generated message: " + msg);
return msg;
}
public static void sleep(long average, TimeUnit unit) {
String name = Thread.currentThread().getName();
long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
try {
unit.sleep(timeout);
System.out.println(name + " awoke.");
} catch (InterruptedException abort) {
Thread.currentThread().interrupt();
System.out.println(name + " interrupted.");
}
}
public static long exponential(long avg) {
return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
}
}
답변
Java 8에서는 CompletableFuture 를 사용할 수 있습니다 . 다음은 코드를 사용하여 사용자 서비스에서 사용자를 가져 와서 내보기 객체에 매핑 한 다음 내보기를 업데이트하거나 오류 대화 상자를 표시하는 예제입니다 (GUI 응용 프로그램).
CompletableFuture.supplyAsync(
userService::listUsers
).thenApply(
this::mapUsersToUserViews
).thenAccept(
this::updateView
).exceptionally(
throwable -> { showErrorDialogFor(throwable); return null; }
);
비동기 적으로 실행됩니다. 두 가지 개인 방법을 사용 mapUsersToUserViews
하고 updateView
있습니다 : 및 .
답변
Guava의 청취 가능한 미래 API를 사용 하고 콜백을 추가하십시오. Cf. 웹 사이트에서 :
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(explosion, new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
});
답변
당신은 확장 할 수 FutureTask
클래스를하고, 무시 done()
하고 추가 방법을 FutureTask
받는 개체를 ExecutorService
소위, done()
때 방법은 호출합니다 FutureTask
즉시 완료.
답변
ThreadPoolExecutor
또한이 beforeExecute
와 afterExecute
당신이 무시하고 사용을 할 수 후크 방법. 여기에서 설명이다 ThreadPoolExecutor
의 Javadoc과는 .
후크 방법
이 클래스는 보호 된 재정의 가능
beforeExecute(java.lang.Thread, java.lang.Runnable)
및afterExecute(java.lang.Runnable, java.lang.Throwable)
각 작업 실행 전후에 호출되는 메서드를 제공합니다 . 이것들은 실행 환경을 조작하는 데 사용될 수 있습니다. 예를 들어, 다시 초기화ThreadLocals
, 통계 수집 또는 로그 항목 추가. 또한terminated()
일단Executor
종료 된 후에 수행해야하는 특수 처리를 수행하기 위해 메소드 를 대체 할 수 있습니다 . 후크 또는 콜백 메소드에서 예외가 발생하면 내부 작업자 스레드가 실패하고 갑자기 종료 될 수 있습니다.
답변
를 사용하십시오 CountDownLatch
.
그것은 시작되었으며 java.util.concurrent
계속하기 전에 여러 스레드가 실행을 완료 할 때까지 기다리는 방법입니다.
당신이 찾고있는 콜백 효과를 얻으려면 약간의 추가 작업이 필요합니다. 즉, 이것을 사용하고 CountDownLatch
대기 하는 별도의 스레드에서 직접 처리 한 다음 통지해야 할 내용을 알리는 작업을 계속합니다. 콜백이나 그 효과와 유사한 것은 기본적으로 지원되지 않습니다.
편집 : 이제 귀하의 질문을 더 이해했기 때문에 불필요하게 너무 멀리 도달하고 있다고 생각합니다. 정기적 SingleThreadExecutor
인 경우 모든 작업을 수행하면 기본적으로 큐잉이 수행됩니다.
답변
작업이 동시에 실행되지 않도록하려면 SingleThreadedExecutor 를 사용하십시오 . 작업은 제출 된 순서대로 처리됩니다. 작업을 보류 할 필요조차 없으며 exec에 제출하십시오.
