[java] ExecutorService, 모든 작업이 완료 될 때까지 기다리는 방법

모든 작업이 ExecutorService완료 될 때까지 기다리는 가장 간단한 방법은 무엇입니까 ? 내 작업은 주로 계산 작업이므로 각 코어마다 하나씩 많은 수의 작업을 실행하고 싶습니다. 지금 내 설정은 다음과 같습니다.

ExecutorService es = Executors.newFixedThreadPool(2);
for (DataTable singleTable : uniquePhrases) {
    es.execute(new ComputeDTask(singleTable));
}
try{
    es.wait();
}
catch (InterruptedException e){
    e.printStackTrace();
}

ComputeDTask실행 가능을 구현합니다. 이 올바르게 작업을 실행하는 것처럼 보이지만 코드에 충돌 wait()과 함께 IllegalMonitorStateException. 나는 장난감 예제를 가지고 놀았고 작동하는 것처럼 보였기 때문에 이상합니다.

uniquePhrases수만 개의 요소를 포함합니다. 다른 방법을 사용해야합니까? 가능한 한 간단한 것을 찾고 있습니다



답변

가장 간단한 방법은 ExecutorService.invokeAll()하나의 라이너로 원하는 것을 사용 하는 것입니다. 당신의 말로, 당신은 ComputeDTask구현 하기 위해 수정하거나 줄 바꿈해야 Callable<>합니다. 아마도 앱에는의 의미있는 구현이 Callable.call()있지만를 사용하지 않는 경우 랩핑하는 방법이 Executors.callable()있습니다.

ExecutorService es = Executors.newFixedThreadPool(2);
List<Callable<Object>> todo = new ArrayList<Callable<Object>>(singleTable.size());

for (DataTable singleTable: uniquePhrases) {
    todo.add(Executors.callable(new ComputeDTask(singleTable)));
}

List<Future<Object>> answers = es.invokeAll(todo);

다른 사람들이 지적했듯이 invokeAll()적절한 경우 시간 초과 버전을 사용할 수 있습니다 . 이 예에서는 null을 반환하는 s answers뭉치를 포함합니다 Future(의 정의 참조 Executors.callable(). 아마 당신이하고 싶은 것은 약간의 리팩토링이므로 유용한 답변을 얻거나 기본에 대한 참조를 ComputeDTask얻을 수 있지만 당신의 모범에서 말하지 마십시오.

확실 invokeAll()하지 않으면 모든 작업이 완료 될 때까지 반환되지 않습니다. (즉, 컬렉션 Future의 모든 s가 요청 answers되면보고 .isDone()합니다.) 이렇게하면 모든 수동 종료, awaitTermination 등을 피할 수 있으며 ExecutorService원하는 경우 여러주기 동안 깔끔하게 재사용 할 수 있습니다 .

SO에 대한 몇 가지 관련 질문이 있습니다.

이 중 어느 것도 엄격에 포인트 질문에 대한 없지만, 그들은 사람들이 생각하는 방법에 대한 색의 약간 제공 할 Executor/가 ExecutorService사용되어야한다을.


답변

모든 작업이 완료 될 때까지 기다리려면 shutdown대신 방법을 사용하십시오 wait. 그런 다음로 수행하십시오 awaitTermination.

또한 Runtime.availableProcessors스레드 풀 수를 올바르게 초기화 할 수 있도록 하드웨어 스레드 수를 얻는 데 사용할 수 있습니다 .


답변

ExecutorService완료 할 때까지의 모든 작업을 기다리는 것이 정확히 목표가 아니라 특정 작업 이 완료 될 때까지 기다리는 경우, 특히CompletionService -를 사용할 수 있습니다 ExecutorCompletionService.

아이디어는이 만드는 것입니다 ExecutorCompletionService귀하의 포장 Executor, 제출 일부 알려진 번호를 관통 작업을 CompletionService한 후, 그 그릴 같은 번호 로부터 결과의 완성 대기열 중 하나를 사용하여 take()(이 블록) 또는 poll()(하지 않음). 제출 한 작업에 해당하는 모든 예상 결과를 가져 오면 모두 완료된 것입니다.

인터페이스에서 명확하지 않기 때문에 이것을 다시 한 번 언급하겠습니다 CompletionService. 이것은 특히 take()메소드 와 관련이 있습니다. 한 번 너무 많이 호출하면 다른 스레드가 다른 작업을 동일한 작업에 제출 할 때까지 호출 스레드를 차단합니다 CompletionService.

Java Concurrency in Practice 책에서 사용하는 방법을 보여주는 몇 가지 예가CompletionService 있습니다 .


답변

당신이 실행을 완료하기 위해 실행 프로그램 서비스를 대기하는 경우, 전화를 shutdown()한 후, awaitTermination (단위 UNITTYPE)awaitTermination(1, MINUTE). ExecutorService는 자체 모니터를 차단하지 않으므로 사용할 수 없습니다 wait.


답변

특정 간격으로 작업이 완료 될 때까지 기다릴 수 있습니다.

int maxSecondsPerComputeDTask = 20;
try {
    while (!es.awaitTermination(uniquePhrases.size() * maxSecondsPerComputeDTask, TimeUnit.SECONDS)) {
        // consider giving up with a 'break' statement under certain conditions
    }
} catch (InterruptedException e) {
    throw new RuntimeException(e);
}

또는 ExecutorService를 사용할 수 있습니다 . ( Runnable )을 제출 하고 반환 하는 Future 객체를 수집하고 각각 get () 을 호출 하여 완료 될 때까지 기다립니다.

ExecutorService es = Executors.newFixedThreadPool(2);
Collection<Future<?>> futures = new LinkedList<<Future<?>>();
for (DataTable singleTable : uniquePhrases) {
    futures.add(es.submit(new ComputeDTask(singleTable)));
}
for (Future<?> future : futures) {
   try {
       future.get();
   } catch (InterruptedException e) {
       throw new RuntimeException(e);
   } catch (ExecutionException e) {
       throw new RuntimeException(e);
   }
}

올바르게 처리하려면 InterruptedException 이 매우 중요합니다. 그것은 당신이나 당신의 라이브러리 사용자가 긴 프로세스를 안전하게 종료 할 수있게합니다.


답변

그냥 사용

latch = new CountDownLatch(noThreads)

각 실에서

latch.countDown();

그리고 장벽으로

latch.await();


답변

IllegalMonitorStateException의 근본 원인 :

스레드가 객체의 모니터에서 대기를 시도했거나 지정된 모니터를 소유하지 않고 객체의 모니터에서 대기중인 다른 스레드에게 알리려고 시도했음을 나타냅니다.

코드에서 잠금을 소유하지 않고 ExecutorService에서 wait ()를 호출했습니다.

아래 코드는 수정됩니다 IllegalMonitorStateException

try
{
    synchronized(es){
        es.wait(); // Add some condition before you call wait()
    }
} 

에 제출 된 모든 작업이 완료 될 때까지 아래 방법 중 하나를 따르십시오 ExecutorService.

  1. Futuresubmit에서 모든 작업을 반복 하고 객체 에 대한 ExecutorService호출 get()을 차단하여 상태를 확인하십시오.Future

  2. 사용 invokeAll을을ExecutorService

  3. CountDownLatch 사용

  4. 사용 ForkJoinPool 또는 newWorkStealingPool을 의을 Executors(자바 8부터)

  5. Oracle 설명서 페이지 에서 권장하는대로 풀 종료

    void shutdownAndAwaitTermination(ExecutorService pool) {
       pool.shutdown(); // Disable new tasks from being submitted
       try {
       // Wait a while for existing tasks to terminate
       if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
           pool.shutdownNow(); // Cancel currently executing tasks
           // Wait a while for tasks to respond to being cancelled
           if (!pool.awaitTermination(60, TimeUnit.SECONDS))
           System.err.println("Pool did not terminate");
       }
    } catch (InterruptedException ie) {
         // (Re-)Cancel if current thread also interrupted
         pool.shutdownNow();
         // Preserve interrupt status
         Thread.currentThread().interrupt();
    }

    옵션 1-4 대신 옵션 5를 사용할 때 모든 작업이 완료 될 때까지 기다리려면 변경하십시오.

    if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {

    while(condition)되는 매 1 분 체크한다.