내가 여러 개의 미래를 가지고 있고 그중 하나 가 실패 하거나 모두 성공할 때까지 기다려야한다고 가정 합니다.
예를 들면 다음과 같습니다하자 3 개 선물이있다 : f1
, f2
, f3
.
-
경우
f1
성공 및f2
실패 나는 기다리지 않는다f3
(반환 실패를 클라이언트로). -
경우
f2
동안 실패f1
하고f3
아직 가동 나는 그들 (그리고 반환을 기다리지 않는다 실패 ) -
경우
f1
성공 후f2
성공 나는 기다리고 계속f3
.
어떻게 구현 하시겠습니까?
답변
대신 다음과 같이 이해를 위해 사용할 수 있습니다.
val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}
val aggFut = for{
f1Result <- fut1
f2Result <- fut2
f3Result <- fut3
} yield (f1Result, f2Result, f3Result)
이 예에서 선물 1, 2, 3은 병렬로 시작됩니다. 그런 다음 for comprehension에서 결과 1, 2, 3이 나올 때까지 기다립니다. 1 또는 2가 실패하면 더 이상 3을 기다리지 않습니다. 3 개가 모두 성공하면 aggFut
val은 3 개의 Future의 결과에 따라 3 개의 슬롯이있는 튜플을 보유합니다.
이제 fut2가 먼저 실패한다고 말하면 기다리지 않으려는 동작이 필요하면 상황이 조금 더 까다로워집니다. 위의 예에서 fut2가 실패했음을 인식하기 전에 fut1이 완료 될 때까지 기다려야합니다. 이를 해결하기 위해 다음과 같이 시도 할 수 있습니다.
val fut1 = Future{Thread.sleep(3000);1}
val fut2 = Promise.failed(new RuntimeException("boo")).future
val fut3 = Future{Thread.sleep(1000);3}
def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
val fut = if (futures.size == 1) futures.head._2
else Future.firstCompletedOf(futures.values)
fut onComplete{
case Success(value) if (futures.size == 1)=>
prom.success(value :: values)
case Success(value) =>
processFutures(futures - value, value :: values, prom)
case Failure(ex) => prom.failure(ex)
}
prom.future
}
val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
aggFut onComplete{
case value => println(value)
}
이제 이것은 올바르게 작동하지만 문제는 성공적으로 완료되었을 때 Future
제거 할 항목을 아는 데서 발생 Map
합니다. 결과를 해당 결과를 생성 한 Future와 적절하게 연관시킬 수있는 방법이있는 한, 이와 같은 것이 작동합니다. 맵에서 완료된 Future를 계속 제거 Future.firstCompletedOf
하고 남은 Futures
것이 없을 때까지 계속 호출 하여 결과를 수집합니다. 예쁘지는 않지만 당신이 말하는 행동이 정말로 필요하다면 이것 또는 비슷한 것이 작동 할 수 있습니다.
답변
약속을 사용하고 첫 번째 실패 또는 마지막으로 완료된 집계 성공을 보낼 수 있습니다.
def sequenceOrBailOut[A, M[_] <: TraversableOnce[_]](in: M[Future[A]] with TraversableOnce[Future[A]])(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]], executor: ExecutionContext): Future[M[A]] = {
val p = Promise[M[A]]()
// the first Future to fail completes the promise
in.foreach(_.onFailure{case i => p.tryFailure(i)})
// if the whole sequence succeeds (i.e. no failures)
// then the promise is completed with the aggregated success
Future.sequence(in).foreach(p trySuccess _)
p.future
}
그런 다음 차단하려는 경우 Await
그 결과 로 만들거나 다른 것으로 만들 수 있습니다.Future
map
for comprehension과의 차이점은 여기서 첫 번째 오류가 실패하는 반면, comprehension을 사용하면 입력 컬렉션의 순회 순서에서 첫 번째 오류가 발생합니다 (다른 것이 먼저 실패하더라도). 예를 들면 :
val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }
Future.sequence(List(f1,f2,f3)).onFailure{case i => println(i)}
// this waits one second, then prints "java.lang.ArithmeticException: / by zero"
// the first to fail in traversal order
과:
val f1 = Future { Thread.sleep(1000) ; 5 / 0 }
val f2 = Future { 5 }
val f3 = Future { None.get }
sequenceOrBailOut(List(f1,f2,f3)).onFailure{case i => println(i)}
// this immediately prints "java.util.NoSuchElementException: None.get"
// the 'actual' first to fail (usually...)
// and it returns early (it does not wait 1 sec)
답변
액터를 사용하지 않는 솔루션입니다.
import scala.util._
import scala.concurrent._
import java.util.concurrent.atomic.AtomicInteger
// Nondeterministic.
// If any failure, return it immediately, else return the final success.
def allSucceed[T](fs: Future[T]*): Future[T] = {
val remaining = new AtomicInteger(fs.length)
val p = promise[T]
fs foreach {
_ onComplete {
case s @ Success(_) => {
if (remaining.decrementAndGet() == 0) {
// Arbitrarily return the final success
p tryComplete s
}
}
case f @ Failure(_) => {
p tryComplete f
}
}
}
p.future
}
답변
당신은 선물만으로 이것을 할 수 있습니다. 다음은 한 가지 구현입니다. 실행이 일찍 종료되지는 않습니다! 이 경우 좀 더 정교한 작업을 수행해야합니다 (그리고 직접 중단을 구현할 수도 있습니다). 그러나 작동하지 않을 일을 계속 기다리고 싶지 않다면, 핵심은 첫 번째 일이 끝날 때까지 계속 기다렸다가 아무것도 남지 않았거나 예외가 발생하면 중지하는 것입니다.
import scala.annotation.tailrec
import scala.util.{Try, Success, Failure}
import scala.concurrent._
import scala.concurrent.duration.Duration
import ExecutionContext.Implicits.global
@tailrec def awaitSuccess[A](fs: Seq[Future[A]], done: Seq[A] = Seq()):
Either[Throwable, Seq[A]] = {
val first = Future.firstCompletedOf(fs)
Await.ready(first, Duration.Inf).value match {
case None => awaitSuccess(fs, done) // Shouldn't happen!
case Some(Failure(e)) => Left(e)
case Some(Success(_)) =>
val (complete, running) = fs.partition(_.isCompleted)
val answers = complete.flatMap(_.value)
answers.find(_.isFailure) match {
case Some(Failure(e)) => Left(e)
case _ =>
if (running.length > 0) awaitSuccess(running, answers.map(_.get) ++: done)
else Right( answers.map(_.get) ++: done )
}
}
}
다음은 모든 것이 정상적으로 작동 할 때 작동하는 예입니다.
scala> awaitSuccess(Seq(Future{ println("Hi!") },
Future{ Thread.sleep(1000); println("Fancy meeting you here!") },
Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
Fancy meeting you here!
Bye!
res1: Either[Throwable,Seq[Unit]] = Right(List((), (), ()))
그러나 무언가 잘못되면 :
scala> awaitSuccess(Seq(Future{ println("Hi!") },
Future{ Thread.sleep(1000); throw new Exception("boo"); () },
Future{ Thread.sleep(2000); println("Bye!") }
))
Hi!
res2: Either[Throwable,Seq[Unit]] = Left(java.lang.Exception: boo)
scala> Bye!
답변
이를 위해 Akka 배우를 사용합니다. for-comprehension과는 달리, 미래가 실패하자마자 실패하므로 그런 의미에서 좀 더 효율적입니다.
class ResultCombiner(futs: Future[_]*) extends Actor {
var origSender: ActorRef = null
var futsRemaining: Set[Future[_]] = futs.toSet
override def receive = {
case () =>
origSender = sender
for(f <- futs)
f.onComplete(result => self ! if(result.isSuccess) f else false)
case false =>
origSender ! SomethingFailed
case f: Future[_] =>
futsRemaining -= f
if(futsRemaining.isEmpty) origSender ! EverythingSucceeded
}
}
sealed trait Result
case object SomethingFailed extends Result
case object EverythingSucceeded extends Result
그런 다음 액터를 만들고 메시지를 보내고 (응답을 보낼 위치를 알 수 있도록) 응답을 기다립니다.
val actor = actorSystem.actorOf(Props(new ResultCombiner(f1, f2, f3)))
try {
val f4: Future[Result] = actor ? ()
implicit val timeout = new Timeout(30 seconds) // or whatever
Await.result(f4, timeout.duration).asInstanceOf[Result] match {
case SomethingFailed => println("Oh noes!")
case EverythingSucceeded => println("It all worked!")
}
} finally {
// Avoid memory leaks: destroy the actor
actor ! PoisonPill
}
답변
이 질문에 대한 답변이 있지만 여기에 하나가 없기 때문에 내 가치 등급 솔루션 (2.10에서 추가 된 가치 등급)을 게시하고 있습니다. 자유롭게 비판 해주십시오.
implicit class Sugar_PimpMyFuture[T](val self: Future[T]) extends AnyVal {
def concurrently = ConcurrentFuture(self)
}
case class ConcurrentFuture[A](future: Future[A]) extends AnyVal {
def map[B](f: Future[A] => Future[B]) : ConcurrentFuture[B] = ConcurrentFuture(f(future))
def flatMap[B](f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = concurrentFutureFlatMap(this, f) // work around no nested class in value class
}
def concurrentFutureFlatMap[A,B](outer: ConcurrentFuture[A], f: Future[A] => ConcurrentFuture[B]) : ConcurrentFuture[B] = {
val p = Promise[B]()
val inner = f(outer.future)
inner.future onFailure { case t => p.tryFailure(t) }
outer.future onFailure { case t => p.tryFailure(t) }
inner.future onSuccess { case b => p.trySuccess(b) }
ConcurrentFuture(p.future)
}
ConcurrentFuture는 기본 Future 맵 / flatMap을 do-this-then-that에서 Combine-all-and-fail-if-any-fail로 변경하는 오버 헤드없는 Future 래퍼입니다. 용법:
def func1 : Future[Int] = Future { println("f1!");throw new RuntimeException; 1 }
def func2 : Future[String] = Future { Thread.sleep(2000);println("f2!");"f2" }
def func3 : Future[Double] = Future { Thread.sleep(2000);println("f3!");42.0 }
val f : Future[(Int,String,Double)] = {
for {
f1 <- func1.concurrently
f2 <- func2.concurrently
f3 <- func3.concurrently
} yield for {
v1 <- f1
v2 <- f2
v3 <- f3
} yield (v1,v2,v3)
}.future
f.onFailure { case t => println("future failed $t") }
위의 예에서 f1, f2 및 f3은 동시에 실행되며 어떤 순서로든 실패하면 튜플의 미래는 즉시 실패합니다.
답변
Twitter의 Future API를 확인하고 싶을 수 있습니다. 특히 Future.collect 메소드입니다. 원하는 것을 정확히 수행합니다 : https://twitter.github.io/scala_school/finagle.html
소스 코드 Future.scala는 여기에서 사용할 수 있습니다 :
https://github.com/twitter/util/blob/master/util-core/src/main/scala/com/twitter/util/Future.scala