[java] ExecutorService를 통해 CompletionService를 언제 사용해야합니까?

방금 이 블로그 게시물 에서 CompletionService를 찾았 습니다 . 그러나 이것은 표준 ExecutorService에 비해 CompletionService의 장점을 실제로 보여주지는 않습니다. 둘 중 하나를 사용하여 동일한 코드를 작성할 수 있습니다. 그렇다면 CompletionService는 언제 유용할까요?

명확하게하기 위해 짧은 코드 샘플을 제공 할 수 있습니까? 예를 들어,이 코드 샘플은 CompletionService가 필요하지 않은 경우 (= ExecutorService와 동일)를 보여줍니다.

    ExecutorService taskExecutor = Executors.newCachedThreadPool();
    //        CompletionService<Long> taskCompletionService =
    //                new ExecutorCompletionService<Long>(taskExecutor);
    Callable<Long> callable = new Callable<Long>() {
        @Override
        public Long call() throws Exception {
            return 1L;
        }
    };

    Future<Long> future = // taskCompletionService.submit(callable);
        taskExecutor.submit(callable);

    while (!future.isDone()) {
        // Do some work...
        System.out.println("Working on something...");
    }
    try {
        System.out.println(future.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }



답변

를 사용하면 ExecutorService실행할 작업을 제출 한 후에는 완료된 작업의 결과를 효율적으로 가져 오기 위해 수동으로 코딩해야합니다.

를 사용 CompletionService하면 이것은 거의 자동화됩니다. 하나의 작업 만 제출하기 때문에 제시 한 코드에서 그 차이가 분명하지 않습니다. 그러나 제출할 작업 목록이 있다고 가정합니다. 아래 예에서는 여러 작업이 CompletionService에 제출됩니다. 그런 다음 (결과를 얻기 위해) 완료된 작업을 찾는 대신 CompletionService 인스턴스에 결과가 사용 가능 해지면 반환하도록 요청합니다.

public class CompletionServiceTest {

        class CalcResult {
             long result ;

             CalcResult(long l) {
                 result = l;
             }
        }

        class CallableTask implements Callable<CalcResult> {
            String taskName ;
            long  input1 ;
            int input2 ;

            CallableTask(String name , long v1 , int v2 ) {
                taskName = name;
                input1 = v1;
                input2 = v2 ;
            }

            public CalcResult call() throws Exception {
                System.out.println(" Task " + taskName + " Started -----");
                for(int i=0;i<input2 ;i++) {
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        System.out.println(" Task " + taskName + " Interrupted !! ");
                        e.printStackTrace();
                    }
                    input1 += i;
                }
                System.out.println(" Task " + taskName + " Completed @@@@@@");
                return new CalcResult(input1) ;
            }

        }

        public void test(){
            ExecutorService taskExecutor = Executors.newFixedThreadPool(3);
            CompletionService<CalcResult> taskCompletionService = new ExecutorCompletionService<CalcResult>(taskExecutor);

            int submittedTasks = 5;
            for (int i=0;i< submittedTasks;i++) {
                taskCompletionService.submit(new CallableTask (
                        String.valueOf(i),
                            (i * 10),
                            ((i * 10) + 10  )
                        ));
               System.out.println("Task " + String.valueOf(i) + "subitted");
            }
            for (int tasksHandled=0;tasksHandled<submittedTasks;tasksHandled++) {
                try {
                    System.out.println("trying to take from Completion service");
                    Future<CalcResult> result = taskCompletionService.take();
                    System.out.println("result for a task availble in queue.Trying to get()");
                    // above call blocks till atleast one task is completed and results availble for it
                    // but we dont have to worry which one

                    // process the result here by doing result.get()
                    CalcResult l = result.get();
                    System.out.println("Task " + String.valueOf(tasksHandled) + "Completed - results obtained : " + String.valueOf(l.result));

                } catch (InterruptedException e) {
                    // Something went wrong with a task submitted
                    System.out.println("Error Interrupted exception");
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    // Something went wrong with the result
                    e.printStackTrace();
                    System.out.println("Error get() threw exception");
                }
            }
        }
    }


답변

많은 세부 사항 생략 :

  • ExecutorService = 들어오는 큐 + 작업자 스레드
  • CompletionService = 수신 대기열 + 작업자 스레드 + 출력 대기열


답변

기본적으로 CompletionService여러 작업을 병렬로 실행 한 다음 완료 순서대로 작업 하려면 a를 사용합니다 . 그래서 제가 5 개의 작업을 실행하면 CompletionService에서 첫 번째 작업 이 완료됩니다. 작업이 하나 뿐인 예 ExecutorCallable.


답변

나는 javadoc이 언제 CompletionService가 유용 ExecutorService하지 않은지 에 대한 질문에 가장 잘 대답한다고 생각합니다 .

완료된 작업의 결과 소비에서 새로운 비동기 작업의 생산을 분리하는 서비스입니다.

기본적으로이 인터페이스를 사용하면 프로그램이 해당 작업의 결과에 대한 다른 소비자에 대해 알지 못해도 작업을 만들고 제출하는 (그리고 해당 제출의 결과를 조사하는) 생산자를 가질 수 있습니다. 한편, 작업을 제출하는 생산자에 대해 알지 못하는 상태 에서 CompletionServicepoll또는 take결과 를 알고있는 소비자 .

기록을 위해, 다소 늦기 때문에 틀릴 수 있지만 그 블로그 게시물의 샘플 코드가 메모리 누수를 일으킨다는 것은 상당히 확신합니다. 적극적인 소비자가 ExecutorCompletionService의 내부 대기열에서 결과를 가져 오지 않으면 블로거가 해당 대기열이 어떻게 소모 될 것으로 예상했는지 확신 할 수 없습니다.


답변

우선, 프로세서 시간을 낭비하지 않으려면

while (!future.isDone()) {
        // Do some work...
}

우리는 사용해야합니다

service.shutdown();
service.awaitTermination(14, TimeUnit.DAYS);

이 코드의 나쁜 점은 종료된다는 것입니다 ExecutorService. 작업을 계속하려면 (예 : 반복 작업 생성이있는 경우) invokeAll 또는 ExecutorService.

invokeAll모든 작업이 완료 될 때까지 기다립니다. ExecutorService결과를 하나씩 가져 오거나 투표 할 수 있습니다.

그리고 마지막으로 재귀적인 예 :

ExecutorService executorService = Executors.newFixedThreadPool(THREAD_NUMBER);
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);

while (Tasks.size() > 0) {
    for (final Task task : Tasks) {
        completionService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return DoTask(task);
            }
        });
    }

    try {
        int taskNum = Tasks.size();
        Tasks.clear();
        for (int i = 0; i < taskNum; ++i) {
            Result result = completionService.take().get();
            if (result != null)
                Tasks.add(result.toTask());
        }
    } catch (InterruptedException e) {
    //  error :(
    } catch (ExecutionException e) {
    //  error :(
    }
}


답변

런타임에 직접 확인하고 두 솔루션 (Executorservice 및 Completionservice)을 구현해보십시오. 그러면 두 솔루션이 어떻게 다른지 알 수 있으며 둘 중 하나를 사용할시기가 더 명확해질 것입니다. http://rdafbn.blogspot.co.uk/2013/01/executorservice-vs-completionservice-vs.html 을 원한다면 여기에 예가 있습니다.


답변

5 개의 장기 실행 작업 (호출 가능한 작업)이 있고 해당 작업을 실행자 서비스에 제출했다고 가정 해 보겠습니다. 이제 5 개의 작업이 모두 경쟁 할 때까지 기다리지 않고 하나가 완료되면 이러한 작업에 대해 일종의 처리를하고 싶다고 상상해보십시오. 이제 향후 개체에 폴링 논리를 작성하거나이 API를 사용하여이 작업을 수행 할 수 있습니다.