List
선물 을 돌려주는 방법이 있습니다
List<Future<O>> futures = getFutures();
이제 모든 미래가 성공적으로 처리되거나 미래에 의해 출력이 반환되는 작업이 예외를 throw 할 때까지 기다립니다. 하나의 과제가 예외를 던지더라도 다른 미래를 기다릴 필요는 없습니다.
간단한 접근 방식은
wait() {
For(Future f : futures) {
try {
f.get();
} catch(Exception e) {
//TODO catch specific exception
// this future threw exception , means somone could not do its task
return;
}
}
}
그러나 여기서 문제는 예를 들어 4 번째 미래에 예외가 발생하면 처음 3 개의 미래가 사용 가능할 때까지 불필요하게 기다릴 것입니다.
이것을 해결하는 방법? 카운트 다운이 어떤 식 으로든 도움이 되나요? isDone
Java 문서가 말하기 때문에 Future를 사용할 수 없습니다.
boolean isDone()
Returns true if this task completed. Completion may be due to normal termination, an exception, or cancellation -- in all of these cases, this method will return true.
답변
CompletionService 를 사용 하여 선물이 준비 되 자마자 선물을 받고 예외 중 하나가 예외를 처리하면 처리를 취소 할 수 있습니다. 이 같은:
Executor executor = Executors.newFixedThreadPool(4);
CompletionService<SomeResult> completionService =
new ExecutorCompletionService<SomeResult>(executor);
//4 tasks
for(int i = 0; i < 4; i++) {
completionService.submit(new Callable<SomeResult>() {
public SomeResult call() {
...
return result;
}
});
}
int received = 0;
boolean errors = false;
while(received < 4 && !errors) {
Future<SomeResult> resultFuture = completionService.take(); //blocks if none available
try {
SomeResult result = resultFuture.get();
received ++;
... // do something with the result
}
catch(Exception e) {
//log
errors = true;
}
}
여전히 실행중인 작업 중 하나에 오류가 발생하면 취소하도록 추가 개선 할 수 있다고 생각합니다.
답변
Java 8 을 사용하는 경우 제공된 모든 CompletableFuture가 완료된 후에 만 콜백을 적용하는 CompletableFuture 및 CompletableFuture.allOf를 사용하여이 작업을 쉽게 수행 할 수 있습니다 .
// Waits for *all* futures to complete and returns a list of results.
// If *any* future completes exceptionally then the resulting future will also complete exceptionally.
public static <T> CompletableFuture<List<T>> all(List<CompletableFuture<T>> futures) {
CompletableFuture[] cfs = futures.toArray(new CompletableFuture[futures.size()]);
return CompletableFuture.allOf(cfs)
.thenApply(ignored -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
답변
를 사용하여 CompletableFuture
자바 (8)
// Kick of multiple, asynchronous lookups
CompletableFuture<User> page1 = gitHubLookupService.findUser("Test1");
CompletableFuture<User> page2 = gitHubLookupService.findUser("Test2");
CompletableFuture<User> page3 = gitHubLookupService.findUser("Test3");
// Wait until they are all done
CompletableFuture.allOf(page1,page2,page3).join();
logger.info("--> " + page1.get());
답변
ExecutorCompletionService 를 사용할 수 있습니다 . 문서에는 정확한 사용 사례에 대한 예가 있습니다.
대신, 태스크 세트의 첫 번째 널이 아닌 결과를 사용하고 예외가 발생하는 것을 무시하고 첫 번째 태스크가 준비되면 다른 모든 태스크를 취소한다고 가정하십시오.
void solve(Executor e, Collection<Callable<Result>> solvers) throws InterruptedException {
CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
int n = solvers.size();
List<Future<Result>> futures = new ArrayList<Future<Result>>(n);
Result result = null;
try {
for (Callable<Result> s : solvers)
futures.add(ecs.submit(s));
for (int i = 0; i < n; ++i) {
try {
Result r = ecs.take().get();
if (r != null) {
result = r;
break;
}
} catch (ExecutionException ignore) {
}
}
} finally {
for (Future<Result> f : futures)
f.cancel(true);
}
if (result != null)
use(result);
}
여기서 주목해야 할 것은 ecs.take ()는 처음 제출 된 작업뿐만 아니라 첫 번째로 완료된 작업을 가져옵니다 . 따라서 실행을 마치거나 예외를 던지는 순서대로 가져와야합니다.
답변
Java 8을 사용하고 있고 CompletableFuture
s 를 조작하지 않으려면 List<Future<T>>
스트리밍 사용 결과를 검색하는 도구를 작성했습니다 . 열쇠는 map(Future::get)
던질 때 금지되어 있다는 것 입니다.
public final class Futures
{
private Futures()
{}
public static <E> Collector<Future<E>, Collection<E>, List<E>> present()
{
return new FutureCollector<>();
}
private static class FutureCollector<T> implements Collector<Future<T>, Collection<T>, List<T>>
{
private final List<Throwable> exceptions = new LinkedList<>();
@Override
public Supplier<Collection<T>> supplier()
{
return LinkedList::new;
}
@Override
public BiConsumer<Collection<T>, Future<T>> accumulator()
{
return (r, f) -> {
try
{
r.add(f.get());
}
catch (InterruptedException e)
{}
catch (ExecutionException e)
{
exceptions.add(e.getCause());
}
};
}
@Override
public BinaryOperator<Collection<T>> combiner()
{
return (l1, l2) -> {
l1.addAll(l2);
return l1;
};
}
@Override
public Function<Collection<T>, List<T>> finisher()
{
return l -> {
List<T> ret = new ArrayList<>(l);
if (!exceptions.isEmpty())
throw new AggregateException(exceptions, ret);
return ret;
};
}
@Override
public Set<java.util.stream.Collector.Characteristics> characteristics()
{
return java.util.Collections.emptySet();
}
}
이것은 AggregateException
C #처럼 작동 하는 것이 필요합니다.
public class AggregateException extends RuntimeException
{
/**
*
*/
private static final long serialVersionUID = -4477649337710077094L;
private final List<Throwable> causes;
private List<?> successfulElements;
public AggregateException(List<Throwable> causes, List<?> l)
{
this.causes = causes;
successfulElements = l;
}
public AggregateException(List<Throwable> causes)
{
this.causes = causes;
}
@Override
public synchronized Throwable getCause()
{
return this;
}
public List<Throwable> getCauses()
{
return causes;
}
public List<?> getSuccessfulElements()
{
return successfulElements;
}
public void setSuccessfulElements(List<?> successfulElements)
{
this.successfulElements = successfulElements;
}
}
이 구성 요소는 C #의 Task.WaitAll 과 동일하게 작동합니다 . 나는 CompletableFuture.allOf
(와 동등한 Task.WhenAll
) 변형을 연구하고 있습니다.
내가 이것을 한 이유는 내가 스프링을 사용 하고 있고 더 표준적인 방법 임에도 불구하고 ListenableFuture
포팅하고 싶지 않기 CompletableFuture
때문입니다.
답변
CompletableFutures 목록을 결합하려는 경우 다음을 수행 할 수 있습니다.
List<CompletableFuture<Void>> futures = new ArrayList<>();
// ... Add futures to this ArrayList of CompletableFutures
// CompletableFuture.allOf() method demand a variadic arguments
// You can use this syntax to pass a List instead
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()]));
// Wait for all individual CompletableFuture to complete
// All individual CompletableFutures are executed in parallel
allFutures.get();
Future & CompletableFuture에 대한 자세한 내용, 유용한 링크 :
1. 미래 : https://www.baeldung.com/java-future
2. CompletableFuture : https://www.baeldung.com/java-completablefuture
3. CompletableFuture : https : //www.callicoder.com/java-8-completablefuture-tutorial/
답변
어쩌면 이것은 도움이 될 것입니다 (아무것도 원시 스레드로 대체되지 않습니다. 그렇습니다!) 각 Future
스레드를 분리 된 스레드로 실행하는 것이 좋습니다 (병렬로 진행됨). 오류가 발생하면 관리자에게 신호를 보냅니다 ( Handler
클래스).
class Handler{
//...
private Thread thisThread;
private boolean failed=false;
private Thread[] trds;
public void waitFor(){
thisThread=Thread.currentThread();
List<Future<Object>> futures = getFutures();
trds=new Thread[futures.size()];
for (int i = 0; i < trds.length; i++) {
RunTask rt=new RunTask(futures.get(i), this);
trds[i]=new Thread(rt);
}
synchronized (this) {
for(Thread tx:trds){
tx.start();
}
}
for(Thread tx:trds){
try {tx.join();
} catch (InterruptedException e) {
System.out.println("Job failed!");break;
}
}if(!failed){System.out.println("Job Done");}
}
private List<Future<Object>> getFutures() {
return null;
}
public synchronized void cancelOther(){if(failed){return;}
failed=true;
for(Thread tx:trds){
tx.stop();//Deprecated but works here like a boss
}thisThread.interrupt();
}
//...
}
class RunTask implements Runnable{
private Future f;private Handler h;
public RunTask(Future f,Handler h){this.f=f;this.h=h;}
public void run(){
try{
f.get();//beware about state of working, the stop() method throws ThreadDeath Error at any thread state (unless it blocked by some operation)
}catch(Exception e){System.out.println("Error, stopping other guys...");h.cancelOther();}
catch(Throwable t){System.out.println("Oops, some other guy has stopped working...");}
}
}
위의 코드가 오류가 발생했다고 말해야하지만 (확인하지 않았 음) 해결책을 설명 할 수 있기를 바랍니다. 시도하십시오.