다음 Observable
과 같이.
var one = someObservable.take(1);
one.subscribe(function(){ /* do something */ });
그런 다음 두 번째가 있습니다 Observable
.
var two = someOtherObservable.take(1);
지금, 나는 원하는 subscribe()
에 two
,하지만 난이 있는지 확인하려면 one
전과 완료 two
가입자가 발생합니다.
two
두 번째가 첫 번째가 완료 될 때까지 기다리도록하려면 어떤 종류의 버퍼링 방법을 사용할 수 있습니까?
완료 two
될 때까지 일시 중지하려고합니다 one
.
답변
내가 생각할 수있는 몇 가지 방법
import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'
//Method one
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});
//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));
답변
실행 순서가 유지되는지 확인하려면 다음 예제와 같이 flatMap을 사용할 수 있습니다.
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));
first
.flatMap(() => second)
.flatMap(() => third)
.subscribe(()=> console.log('finished'));
결과는 다음과 같습니다.
"1"
"11"
"111"
"finished"
답변
last ()와 함께 skipUntil ()
skipUntil : 다른 Observable이 방출 될 때까지 방출 된 항목을 무시합니다.
last : 시퀀스에서 마지막 값을 방출합니다 (즉, 완료 될 때까지 기다린 다음 방출)
에 전달 된 Observable에서 방출되는 모든 skipUntil
것은 건너 뛰기를 취소 last()
하므로 스트림이 완료 될 때까지 기다릴 추가가 필요합니다 .
main$.skipUntil(sequence2$.pipe(last()))
공식 : https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
가능한 문제 : 아무것도 내 보내지 않으면 last()
자체적으로 오류 가 발생합니다. last()
연산자는 가지고 default
술어와 함께 사용되는 매개 변수 만 때를. 이 상황이 당신에게 문제라면 ( sequence2$
방출하지 않고 완료 될 수 있다면 ) 다음 중 하나가 작동해야한다고 생각합니다 (현재 테스트되지 않음).
main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
내보낼 수 undefined
있는 유효한 항목이지만 실제로는 모든 값이 될 수 있습니다. 또한 이것은 파이프가 sequence2$
아닌 연결된 main$
파이프입니다.
답변
다음은 switchMap의 결과 선택기를 활용하는 또 다른 가능성입니다.
var one$ = someObservable.take(1);
var two$ = someOtherObservable.take(1);
two$.switchMap(
/** Wait for first Observable */
() => one$,
/** Only return the value we're actually interested in */
(value2, value1) => value2
)
.subscribe((value2) => {
/* do something */
});
switchMap의 결과 선택기가 감가 상각되었으므로 다음은 업데이트 된 버전입니다.
const one$ = someObservable.pipe(take(1));
const two$ = someOtherObservable.pipe(
take(1),
switchMap(value2 => one$.map(_ => value2))
);
two$.subscribe(value2 => {
/* do something */
});
답변
다음은 재사용 가능한 방법입니다 (타입 스크립트이지만 js에 적용 할 수 있음).
export function waitFor<T>(signal: Observable<any>) {
return (source: Observable<T>) =>
new Observable<T>(observer =>
signal.pipe(first())
.subscribe(_ =>
source.subscribe(observer)
)
);
}
모든 연산자처럼 사용할 수 있습니다.
var two = someOtherObservable.pipe(waitFor(one), take(1));
기본적으로 관찰 가능한 신호가 첫 번째 이벤트를 방출 할 때까지 관찰 가능한 소스에 대한 구독을 연기하는 연산자입니다.
답변
두 번째 Observable이 hot 이면 일시 중지 / 재개 를 수행하는 다른 방법 이 있습니다 .
var pauser = new Rx.Subject();
var source1 = Rx.Observable.interval(1000).take(1);
/* create source and pause */
var source2 = Rx.Observable.interval(1000).pausable(pauser);
source1.doOnCompleted(function () {
/* resume paused source2 */
pauser.onNext(true);
}).subscribe(function(){
// do something
});
source2.subscribe(function(){
// start to recieve data
});
또한 버퍼링 된 버전 pausableBuffered 를 사용 하여 일시 중지가 켜져있는 동안 데이터를 유지할 수 있습니다 .
답변
여기에 또 다른 것이 있지만 더 간단하고 직관적입니다 (또는 약속에 익숙하다면 적어도 자연 스럽습니다), 접근 방식입니다. 기본적으로 Observable.create()
랩핑 one
및 two
단일 Observable로 사용하여 Observable을 만듭니다 . 이것은 Promise.all()
작동 방식과 매우 유사합니다 .
var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
// observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
});
그래서 여기서 무슨 일이 일어나고 있습니까? 먼저 새로운 Observable을 생성합니다. 에 전달 된 함수는 Observable.create()
적절하게 이름이 지정 onSubscription
되며 관찰자 (에 전달한 매개 변수에서 빌드 됨 subscribe()
)로 전달되며 , 이는 새 Promise를 만들 때 단일 객체 resolve
와 유사 하고 reject
결합됩니다. 이것이 우리가 마법을 작동시키는 방법입니다.
에 onSubscription
첫 번째 Observable을 구독합니다 (위의 예에서는라고 함 one
). 우리가 처리하는 방법 next
과 error
당신에게 달려 있지만, 제 샘플에 제공된 기본값은 일반적으로 적절할 것입니다. 그러나 complete
이벤트를 받으면 one
이제 완료 되었음을 의미 합니다. 다음 Observable을 구독 할 수 있습니다. 따라서 첫 번째 Observable이 완료된 후 두 번째 Observable을 발사합니다.
두 번째 Observable에 제공된 예제 옵저버는 매우 간단합니다. 원래,second
이제 two
OP에서 예상 하는 것처럼 작동합니다. 더 구체적으로, 오류가 없다고 가정 second
하여 someOtherObservable
(때문에 take(1)
) 에서 내 보낸 첫 번째 값만 내 보낸 다음 완료됩니다.
예
실생활에서 작동하는 내 예제를 보려면 복사 / 붙여 넣기 할 수있는 전체 작동 예제가 있습니다.
var someObservable = Observable.from([1, 2, 3, 4, 5]);
var someOtherObservable = Observable.from([6, 7, 8, 9]);
var first = someObservable.take(1);
var second = Observable.create((observer) => {
return first.subscribe(
function onNext(value) {
/* do something with value like: */
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
someOtherObservable.take(1).subscribe(
function onNext(value) {
observer.next(value);
},
function onError(error) {
observer.error(error);
},
function onComplete() {
observer.complete();
}
);
}
);
}).subscribe(
function onNext(value) {
console.log(value);
},
function onError(error) {
console.error(error);
},
function onComplete() {
console.log("Done!");
}
);
콘솔을 보면 위의 예가 인쇄됩니다.
1
6
끝난!