[java] Java Future를 CompletableFuture로 전환

Java 8 CompletableFuture은 구성 가능한 Future의 새로운 구현 인을 도입했습니다 (thenXxx 메서드 여러 개 포함). 이것을 독점적으로 사용하고 싶지만 사용하려는 많은 라이브러리는 구성 할 수없는 Future인스턴스 만 반환 합니다.

반환 된 Future인스턴스를 내부에 래핑하여 CompleteableFuture구성 할 수있는 방법이 있습니까?



답변

방법이 있지만 마음에 들지 않을 것입니다. 다음 메서드는 a Future<T>를 a로 변환 합니다 CompletableFuture<T>.

public static <T> CompletableFuture<T> makeCompletableFuture(Future<T> future) {
  if (future.isDone())
    return transformDoneFuture(future);
  return CompletableFuture.supplyAsync(() -> {
    try {
      if (!future.isDone())
        awaitFutureIsDoneInForkJoinPool(future);
      return future.get();
    } catch (ExecutionException e) {
      throw new RuntimeException(e);
    } catch (InterruptedException e) {
      // Normally, this should never happen inside ForkJoinPool
      Thread.currentThread().interrupt();
      // Add the following statement if the future doesn't have side effects
      // future.cancel(true);
      throw new RuntimeException(e);
    }
  });
}

private static <T> CompletableFuture<T> transformDoneFuture(Future<T> future) {
  CompletableFuture<T> cf = new CompletableFuture<>();
  T result;
  try {
    result = future.get();
  } catch (Throwable ex) {
    cf.completeExceptionally(ex);
    return cf;
  }
  cf.complete(result);
  return cf;
}

private static void awaitFutureIsDoneInForkJoinPool(Future<?> future)
    throws InterruptedException {
  ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker() {
    @Override public boolean block() throws InterruptedException {
      try {
        future.get();
      } catch (ExecutionException e) {
        throw new RuntimeException(e);
      }
      return true;
    }
    @Override public boolean isReleasable() {
      return future.isDone();
    }
  });
}

분명히이 접근 방식의 문제는 각 Future 에 대해 스레드가 Future 의 결과를 기다리도록 차단 된다는 것입니다. 어떤 경우에는 더 잘할 수 있습니다. 그러나 일반적으로 Future 의 결과를 적극적으로 기다리지 않고서는 해결책이 없습니다 .


답변

사용하려는 라이브러리가 Future 스타일 외에 콜백 스타일 메서드도 제공하는 경우 추가 스레드 차단없이 CompletableFuture를 완료하는 핸들러를 제공 할 수 있습니다. 이렇게 :

    AsynchronousFileChannel open = AsynchronousFileChannel.open(Paths.get("/some/file"));
    // ... 
    CompletableFuture<ByteBuffer> completableFuture = new CompletableFuture<ByteBuffer>();
    open.read(buffer, position, null, new CompletionHandler<Integer, Void>() {
        @Override
        public void completed(Integer result, Void attachment) {
            completableFuture.complete(buffer);
        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            completableFuture.completeExceptionally(exc);
        }
    });
    completableFuture.thenApply(...)

콜백이 없으면이 문제를 해결하는 유일한 방법은 모든 Future.isDone()검사를 단일 스레드에 배치 한 다음 Future가 얻을 수있을 때마다 완료를 호출 하는 폴링 루프를 사용하는 것 입니다.


답변

당신이 경우 Future에 대한 호출의 결과입니다 ExecutorService방법 (예를 들어 submit()), 가장 쉬운 방법은을 사용하는 것입니다CompletableFuture.runAsync(Runnable, Executor) 대신에 방법을.

에서

Runnbale myTask = ... ;
Future<?> future = myExecutor.submit(myTask);

…에

Runnbale myTask = ... ;
CompletableFuture<?> future = CompletableFuture.runAsync(myTask, myExecutor);

그만큼 CompletableFuture 다음 “기본적으로”생성됩니다.

편집 : @MartinAndersson이 수정 한 @SamMefford의 주석을 추구하면을 전달 Callable하려면을 호출 supplyAsync()하여를 Callable<T>으로 변환 해야합니다 Supplier<T>.

CompletableFuture.supplyAsync(() -> {
    try { return myCallable.call(); }
    catch (Exception ex) { throw new RuntimeException(ex); } // Or return default value
}, myExecutor);

때문에 T Callable.call() throws Exception;예외를 던져 T Supplier.get();하지 않습니다, 당신은 프로토 타입이 호환되도록 예외를 catch해야합니다.


답변

나는 단순한 방법 보다 더 나은 것을 만들려고 노력 하는 작은 미래 프로젝트를 발표했습니다. 대답 .

주요 아이디어는 내부의 모든 Futures 상태를 확인하기 위해 단 하나의 스레드 (물론 스핀 루프가 아닌)를 사용하는 것입니다. 이는 각 Future-> CompletableFuture 변환에 대해 풀에서 스레드를 차단하는 것을 방지하는 데 도움이됩니다.

사용 예 :

Future oldFuture = ...;
CompletableFuture profit = Futurity.shift(oldFuture);


답변

암시:

http://www.thedevpiece.com/converting-old-java-future-to-completablefuture/

그러나 기본적으로 :

public class CompletablePromiseContext {
    private static final ScheduledExecutorService SERVICE = Executors.newSingleThreadScheduledExecutor();

    public static void schedule(Runnable r) {
        SERVICE.schedule(r, 1, TimeUnit.MILLISECONDS);
    }
}

그리고 CompletablePromise :

public class CompletablePromise<V> extends CompletableFuture<V> {
    private Future<V> future;

    public CompletablePromise(Future<V> future) {
        this.future = future;
        CompletablePromiseContext.schedule(this::tryToComplete);
    }

    private void tryToComplete() {
        if (future.isDone()) {
            try {
                complete(future.get());
            } catch (InterruptedException e) {
                completeExceptionally(e);
            } catch (ExecutionException e) {
                completeExceptionally(e.getCause());
            }
            return;
        }

        if (future.isCancelled()) {
            cancel(true);
            return;
        }

        CompletablePromiseContext.schedule(this::tryToComplete);
    }
}

예:

public class Main {
    public static void main(String[] args) {
        final ExecutorService service = Executors.newSingleThreadExecutor();
        final Future<String> stringFuture = service.submit(() -> "success");
        final CompletableFuture<String> completableFuture = new CompletablePromise<>(stringFuture);

        completableFuture.whenComplete((result, failure) -> {
            System.out.println(result);
        });
    }
}


답변

다른 옵션을 제안하겠습니다.
https://github.com/vsilaev/java-async-await/tree/master/com.farata.lang.async.examples/src/main/java/com/farata /병발 사정

간단히 말해서 아이디어는 다음과 같습니다.

  1. CompletableTask<V>인터페이스 소개 – CompletionStage<V>+ 의 결합
    RunnableFuture<V>
  2. 워프 ExecutorService반환 CompletableTask에서 submit(...)방법 (대신Future<V> )
  3. 완료되었습니다. 실행 가능하고 구성 가능한 선물이 있습니다.

구현은 대체 CompletionStage 구현을 사용합니다 (주의, CompletionStage 오히려 CompletableFuture 이상) :

용법:

J8ExecutorService exec = J8Executors.newCachedThreadPool();
CompletionStage<String> = exec
   .submit( someCallableA )
   .thenCombineAsync( exec.submit(someCallableB), (a, b) -> a + " " + b)
   .thenCombine( exec.submit(someCallableC), (ab, b) -> ab + " " + c);


답변