[javascript] 하나의 Observable 시퀀스가 ​​방출하기 전에 다른 시퀀스가 ​​완료 될 때까지 기다리는 방법은 무엇입니까?

다음 Observable과 같이.

var one = someObservable.take(1);

one.subscribe(function(){ /* do something */ });

그런 다음 두 번째가 있습니다 Observable.

var two = someOtherObservable.take(1);

지금, 나는 원하는 subscribe()two,하지만 난이 있는지 확인하려면 one전과 완료 two가입자가 발생합니다.

two두 번째가 첫 번째가 완료 될 때까지 기다리도록하려면 어떤 종류의 버퍼링 방법을 사용할 수 있습니까?

완료 two될 때까지 일시 중지하려고합니다 one.



답변

내가 생각할 수있는 몇 가지 방법

import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'

//Method one

var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});

//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));


답변

실행 순서가 유지되는지 확인하려면 다음 예제와 같이 flatMap을 사용할 수 있습니다.

const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));

first
  .flatMap(() => second)
  .flatMap(() => third)
  .subscribe(()=> console.log('finished'));

결과는 다음과 같습니다.

"1"
"11"
"111"
"finished"


답변

last ()와 함께 skipUntil ()

skipUntil : 다른 Observable이 방출 될 때까지 방출 된 항목을 무시합니다.

last : 시퀀스에서 마지막 값을 방출합니다 (즉, 완료 될 때까지 기다린 다음 방출)

에 전달 된 Observable에서 방출되는 모든 skipUntil것은 건너 뛰기를 취소 last()하므로 스트림이 완료 될 때까지 기다릴 추가가 필요합니다 .

main$.skipUntil(sequence2$.pipe(last()))

공식 : https://rxjs-dev.firebaseapp.com/api/operators/skipUntil


가능한 문제 : 아무것도 내 보내지 않으면 last()자체적으로 오류 가 발생합니다. last()연산자는 가지고 default술어와 함께 사용되는 매개 변수 만 때를. 이 상황이 당신에게 문제라면 ( sequence2$방출하지 않고 완료 될 수 있다면 ) 다음 중 하나가 작동해야한다고 생각합니다 (현재 테스트되지 않음).

main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))

내보낼 수 undefined있는 유효한 항목이지만 실제로는 모든 값이 될 수 있습니다. 또한 이것은 파이프가 sequence2$아닌 연결된 main$파이프입니다.


답변

다음은 switchMap의 결과 선택기를 활용하는 또 다른 가능성입니다.

var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
    /** Wait for first Observable */
    () => one$,
    /** Only return the value we're actually interested in */
    (value2, value1) => value2
  )
  .subscribe((value2) => {
    /* do something */
  });

switchMap의 결과 선택기가 감가 상각되었으므로 다음은 업데이트 된 버전입니다.

const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
  take(1),
  switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
  /* do something */
});


답변

다음은 재사용 가능한 방법입니다 (타입 스크립트이지만 js에 적용 할 수 있음).

export function waitFor<T>(signal: Observable<any>) {
    return (source: Observable<T>) =>
        new Observable<T>(observer =>
            signal.pipe(first())
                .subscribe(_ =>
                    source.subscribe(observer)
                )
        );
}

모든 연산자처럼 사용할 수 있습니다.

var two = someOtherObservable.pipe(waitFor(one), take(1));

기본적으로 관찰 가능한 신호가 첫 번째 이벤트를 방출 할 때까지 관찰 가능한 소스에 대한 구독을 연기하는 연산자입니다.


답변

두 번째 Observable이 hot 이면 일시 중지 / 재개 를 수행하는 다른 방법 이 있습니다 .

var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);

source1.doOnCompleted(function () {
  /* resume paused source2 */
  pauser.onNext(true);
}).subscribe(function(){
  // do something
});

source2.subscribe(function(){
  // start to recieve data 
});

또한 버퍼링 된 버전 pausableBuffered 를 사용 하여 일시 중지가 켜져있는 동안 데이터를 유지할 수 있습니다 .


답변

여기에 또 다른 것이 있지만 더 간단하고 직관적입니다 (또는 약속에 익숙하다면 적어도 자연 스럽습니다), 접근 방식입니다. 기본적으로 Observable.create()랩핑 onetwo단일 Observable로 사용하여 Observable을 만듭니다 . 이것은 Promise.all()작동 방식과 매우 유사합니다 .

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      // observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
});

그래서 여기서 무슨 일이 일어나고 있습니까? 먼저 새로운 Observable을 생성합니다. 에 전달 된 함수는 Observable.create()적절하게 이름이 지정 onSubscription되며 관찰자 (에 전달한 매개 변수에서 빌드 됨 subscribe())로 전달되며 , 이는 새 Promise를 만들 때 단일 객체 resolve와 유사 하고 reject결합됩니다. 이것이 우리가 마법을 작동시키는 방법입니다.

onSubscription 첫 번째 Observable을 구독합니다 (위의 예에서는라고 함 one). 우리가 처리하는 방법 nexterror당신에게 달려 있지만, 제 샘플에 제공된 기본값은 일반적으로 적절할 것입니다. 그러나 complete이벤트를 받으면 one이제 완료 되었음을 의미 합니다. 다음 Observable을 구독 할 수 있습니다. 따라서 첫 번째 Observable이 완료된 후 두 번째 Observable을 발사합니다.

두 번째 Observable에 제공된 예제 옵저버는 매우 간단합니다. 원래,second 이제 twoOP에서 예상 하는 것처럼 작동합니다. 더 구체적으로, 오류가 없다고 가정 second하여 someOtherObservable(때문에 take(1)) 에서 내 보낸 첫 번째 값만 내 보낸 다음 완료됩니다.

실생활에서 작동하는 내 예제를 보려면 복사 / 붙여 넣기 할 수있는 전체 작동 예제가 있습니다.

var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);

var first = someObservable.take(1);
var second = Observable.create((observer) => {
  return first.subscribe(
    function onNext(value) {
      /* do something with value like: */
      observer.next(value);
    },
    function onError(error) {
      observer.error(error);
    },
    function onComplete() {
      someOtherObservable.take(1).subscribe(
        function onNext(value) {
          observer.next(value);
        },
        function onError(error) {
          observer.error(error);
        },
        function onComplete() {
          observer.complete();
        }
      );
    }
  );
}).subscribe(
  function onNext(value) {
    console.log(value);
  },
  function onError(error) {
    console.error(error);
  },
  function onComplete() {
    console.log("Done!");
  }
);

콘솔을 보면 위의 예가 인쇄됩니다.

1

6

끝난!