[java] Java ExecutorService 태스크의 예외 처리

ThreadPoolExecutor고정 된 수의 스레드로 많은 수의 무거운 작업을 실행 하기 위해 Java 클래스 를 사용하려고 합니다. 각 작업에는 예외로 인해 실패 할 수있는 여러 위치가 있습니다.

하위 클래스를 만들었고 작업을 실행하는 동안 발생하지 않은 예외를 제공 해야하는 메소드를 ThreadPoolExecutor재정의했습니다 afterExecute. 그러나 나는 그것이 효과가있는 것처럼 보이지 않습니다.

예를 들면 다음과 같습니다.

public class ThreadPoolErrors extends ThreadPoolExecutor {
    public ThreadPoolErrors() {
        super(  1, // core threads
                1, // max threads
                1, // timeout
                TimeUnit.MINUTES, // timeout units
                new LinkedBlockingQueue<Runnable>() // work queue
        );
    }

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if(t != null) {
            System.out.println("Got an error: " + t);
        } else {
            System.out.println("Everything's fine--situation normal!");
        }
    }

    public static void main( String [] args) {
        ThreadPoolErrors threadPool = new ThreadPoolErrors();
        threadPool.submit(
                new Runnable() {
                    public void run() {
                        throw new RuntimeException("Ouch! Got an error.");
                    }
                }
        );
        threadPool.shutdown();
    }
}

이 프로그램의 결과는 “모든 것이 정상입니다. 상황은 정상입니다!” 스레드 풀에 제출 된 유일한 Runnable에서 예외가 발생하더라도. 여기서 무슨 일이 일어나고 있는지에 대한 단서가 있습니까?

감사!



답변

로부터 문서 :

참고 : 작업이 명시 적으로 또는 제출과 같은 메서드를 통해 작업 (예 : FutureTask)에 포함 된 경우 이러한 작업 개체는 계산 예외를 포착 및 유지하므로 갑작스러운 종료를 일으키지 않으며 내부 예외는이 메서드로 전달되지 않습니다. .

Runnable을 제출하면 미래에 포장됩니다.

afterExecute는 다음과 같아야합니다.

public final class ExtendedExecutor extends ThreadPoolExecutor {

    // ...

    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Future<?> future = (Future<?>) r;
                if (future.isDone()) {
                    future.get();
                }
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            System.out.println(t);
        }
    }
}


답변

경고 :이 솔루션은 호출 스레드를 차단합니다.


작업에서 발생한 예외를 처리하려면 일반적으로보다 사용하는 Callable것이 좋습니다 Runnable.

Callable.call() 확인 된 예외를 throw 할 수 있으며 호출 된 스레드로 다시 전파됩니다.

Callable task = ...
Future future = executor.submit(task);
try {
   future.get();
} catch (ExecutionException ex) {
   ex.getCause().printStackTrace();
}

경우 Callable.call()예외를 throw, 이것은에 싸여됩니다 ExecutionException에 의해 발생합니다 Future.get().

이것은 서브 클래 싱보다 훨씬 바람직 할 것 ThreadPoolExecutor입니다. 또한 예외가 복구 가능한 경우 작업을 다시 제출할 수있는 기회를 제공합니다.


답변

이 동작에 대한 설명은 afterExecute 에 대한 javadoc에 있습니다 .

참고 : 작업이 명시 적으로 또는 제출과 같은 메서드를 통해 작업 (예 : FutureTask)에 포함 된 경우 이러한 작업 개체는 계산 예외를 포착 및 유지하므로 갑작스러운 종료를 일으키지 않으며 내부 예외는이 메서드로 전달되지 않습니다. .


답변

실행기에 제출 된 제공된 실행 파일을 래핑하여 해결했습니다.

CompletableFuture.runAsync(() -> {
        try {
              runnable.run();
        } catch (Throwable e) {
              Log.info(Concurrency.class, "runAsync", e);
        }
}, executorService);


답변

모든 예외를 삼키고 기록 하는 jcabi-logVerboseRunnable 클래스를 사용 하고 있습니다. 예를 들어 매우 편리합니다.

import com.jcabi.log.VerboseRunnable;
scheduler.scheduleWithFixedDelay(
  new VerboseRunnable(
    Runnable() {
      public void run() {
        // the code, which may throw
      }
    },
    true // it means that all exceptions will be swallowed and logged
  ),
  1, 1, TimeUnit.MILLISECONDS
);


답변

또 다른 해결책은 ManagedTaskManagedTaskListener 를 사용하는 것 입니다.

ManagedTask 인터페이스를 구현 하는 Callable 또는 Runnable 이 필요합니다 .

이 메소드 getManagedTaskListener는 원하는 인스턴스를 반환합니다.

public ManagedTaskListener getManagedTaskListener() {

그리고 ManagedTaskListener 에서taskDone 메소드 .

@Override
public void taskDone(Future<?> future, ManagedExecutorService executor, Object task, Throwable exception) {
    if (exception != null) {
        LOGGER.log(Level.SEVERE, exception.getMessage());
    }
}

에 대한 자세한 내용은 관리 작업 수명주기와 청취자 .


답변

이 작동합니다

  • SingleThreadExecutor에서 파생되었지만 쉽게 조정할 수 있습니다.
  • Java 8 람다 코드, 수정하기 쉬운

단일 스레드로 Executor를 작성하여 많은 작업을 수행 할 수 있습니다. 현재 실행이 다음 다음으로 시작될 때까지 기다립니다.

주의하지 않은 오류 또는 예외가 발생한 경우 uncaughtExceptionHandler 를 포착합니다.

공개 최종 클래스 SingleThreadExecutorWithExceptions {

    공개 정적 ExecutorService newSingleThreadExecutorWithExceptions (최종 Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {

        ThreadFactory 팩토리 = (실행 가능한 실행 가능)-> {
            final Thread newThread = 새 스레드 (실행 가능, "SingleThreadExecutorWithExceptions");
            newThread.setUncaughtExceptionHandler ((최종 스레드 caugthThread, 최종 Throwable throwable)-> {
                uncaughtExceptionHandler.uncaughtException (caugthThread, throwable);
            });
            newThread를 반환;
        };
        새로운 FinalizableDelegatedExecutorService를 반환합니다
                (새로운 ThreadPoolExecutor (1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        새로운 LinkedBlockingQueue (),
                        공장){


                    protected void afterExecute (실행 가능 실행 가능, Throwable Throwable) {
                        super.afterExecute (실행 가능, 던지기 가능);
                        if (throwable == null && runnable instance of Future) {
                            {
                                미래 미래 = (미래) 실행 가능;
                                if (future.isDone ()) {
                                    future.get ();
                                }
                            } catch (CancellationException ce) {
                                던질 수있는 = ce;
                            } catch (ExecutionException ee) {
                                throwable = ee.getCause ();
                            } catch (InterruptedException 즉) {
                                Thread.currentThread (). interrupt (); // 무시 / 리셋
                            }
                        }
                        if (throwable! = null) {
                            uncaughtExceptionHandler.uncaughtException (Thread.currentThread (), throwable);
                        }
                    }
                });
    }



    개인 정적 클래스 FinalizableDelegatedExecutorService
            DelegatedExecutorService {
        FinalizableDelegatedExecutorService (ExecutorService executor) {
            슈퍼 (실행자);
        }
        protected void finalize () {
            super.shutdown ();
        }
    }

    / **
     * ExecutorService 메소드 만 노출하는 랩퍼 클래스
     ExecutorService 구현의 *
     * /
    개인 정적 클래스 DelegatedExecutorService는 AbstractExecutorService를 확장합니다 {
        개인 최종 ExecutorService e;
        DelegatedExecutorService (ExecutorService executor) {e = 집행자; }
        공공 무효 실행 (실행 가능한 명령) {e.execute (명령); }
        공공 무효 종료 () {e.shutdown (); }
        공개리스트 shutdownNow () {return e.shutdownNow (); }
        공개 부울 isShutdown () {return e.isShutdown (); }
        공개 부울 isTerminated () {return e.isTerminated (); }
        public boolean awaitTermination (긴 타임 아웃, TimeUnit 단위)
                InterruptedException {
            반환 e.awaitTermination (시간 초과, 단위);
        }
        공개 미래 제출 (실행 가능한 작업) {
            반환 e. 제출 (작업);
        }
        공개 미래 제출 (통화 가능한 작업) {
            반환 e. 제출 (작업);
        }
        공개 미래 제출 (실행 가능한 작업, T 결과) {
            반환 e. 제출 (작업, 결과);
        }
        공개 목록> invokeAll (Collection> tasks)
                InterruptedException {
            반환 e.invokeAll (작업);
        }
        공개 목록> invokeAll (Collection> 작업,
                                             긴 시간 초과, 시간 단위 단위)
                InterruptedException {
            반환 e.invokeAll (작업, 시간 초과, 단위);
        }
        공개 T invokeAny (Collection> tasks)
                InterruptedException, ExecutionException {
            반환 e.invokeAny (작업);
        }
        공개 T invokeAny (Collection> 작업,
                               긴 시간 초과, 시간 단위 단위)
                InterruptedException, ExecutionException, TimeoutException {을 발생시킵니다.
            반환 e.invokeAny (작업, 시간 초과, 단위);
        }
    }



    개인 SingleThreadExecutorWithExceptions () {}
}