[java] Observable vs Flowable rxJava2

나는 새로운 rx java 2를보고 있었고 backpressure더 이상 아이디어를 이해하고 있는지 잘 모르겠습니다 …

나는 우리 Observablebackpressure지원 하지 않는 것과 그것을 가지고 있다는 것을 알고 Flowable있습니다.

따라서 예를 들어 다음 flowableinterval같이 말합니다 .

        Flowable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

이것은 약 128 개의 값 후에 충돌 할 것이며, 항목을 얻는 것보다 더 느리게 소비하고 있음이 분명합니다.

그러나 우리는 Observable

     Observable.interval(1, TimeUnit.MILLISECONDS, Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    // do smth
                }
            });

소비에 약간의 지연을 두어도 여전히 작동하지만 전혀 충돌하지 않습니다. Flowable작업을 수행 하려면 onBackpressureDrop연산자를 넣으면 충돌이 사라졌지 만 모든 값이 방출되는 것은 아닙니다.

그래서 내가 현재 내 머리 속에 답을 찾을 수없는 기본 질문은 왜 내가 backpressure평범한 것을 Observable관리하지 않고 여전히 모든 값을받을 수있을 때 신경을 써야 buffer합니까? 아니면 다른 측면 backpressure에서 소비를 관리하고 처리하는 데 어떤 이점 이 있습니까?



답변

실제로 백 프레셔가 나타나는 것은 바운드 버퍼이며 Flowable.observeOn, dowstream이 취할 수있는 한 빨리 드레인되는 128 개의 요소 버퍼를 가지고 있습니다. 버스트 소스를 처리하기 위해이 버퍼 크기를 개별적으로 늘릴 수 있으며 모든 배압 관리 관행은 1.x부터 계속 적용됩니다. Observable.observeOn요소를 계속 수집하는 제한되지 않은 버퍼가 있으며 앱의 메모리가 부족할 수 있습니다.

Observable예를 들어 다음을 사용할 수 있습니다 .

  • GUI 이벤트 처리
  • 짧은 시퀀스 작업 (총 요소 수가 1000 개 미만)

Flowable예를 들어 다음을 사용할 수 있습니다 .

  • 추위 및 시간 제한이없는 소스
  • 소스와 같은 생성기
  • 네트워크 및 데이터베이스 접근 자

답변

배압은 관찰 가능 (게시자)이 구독자가 처리 할 수있는 것보다 더 많은 이벤트를 생성하는 경우입니다. 따라서 구독자가 누락 된 이벤트를 얻거나 결국 메모리 부족으로 이어지는 거대한 이벤트 대기열을 얻을 수 있습니다. Flowable배압을 고려합니다. Observable하지 않습니다. 그게 다야.

액체가 너무 많으면 넘쳐나는 깔때기를 생각 나게합니다. Flowable은 그런 일이 일어나지 않도록 도와줍니다.

엄청난 배압으로 :

여기에 이미지 설명 입력

그러나 flowable을 사용하면 배압이 훨씬 적습니다.

여기에 이미지 설명 입력

Rxjava2에는 사용 사례에 따라 사용할 수있는 몇 가지 역압 전략이 있습니다. 전략상 Rxjava2는 오버플로 (역압)로 인해 처리 할 수없는 객체를 처리하는 방법을 제공합니다.

여기에 전략이 있습니다.
나는 그것들을 모두 살펴 보지는 않을 것이지만, 예를 들어 넘친 아이템에 대해 걱정하지 않으려면 다음과 같은 드롭 전략을 사용할 수 있습니다.

observable.toFlowable (BackpressureStrategy.DROP)

내가 아는 한 대기열에 128 개 항목 제한이 있어야하며 그 후에 오버플로 (역압)가 발생할 수 있습니다. 128이 아니더라도 그 숫자에 가깝습니다. 이것이 누군가를 돕기를 바랍니다.

버퍼 크기를 128에서 변경해야하는 경우 다음과 같이 수행 할 수있는 것처럼 보입니다 (하지만 메모리 제약을 확인하십시오.

myObservable.toFlowable(BackpressureStrategy.MISSING).buffer(256); //but using MISSING might be slower.  

소프트웨어 개발에서 일반적으로 배압 전략은 소비자가 방출하는 이벤트의 속도를 처리 할 수 ​​없기 때문에 방출기에 약간의 속도를 늦추라고 말하는 것을 의미합니다.


답변

Flowable배압 처리없이 128 개 값 을 내 보낸 후 충돌이 발생 했다고 해서 항상 정확히 128 개 값 후에 충돌이 발생한다는 의미는 아닙니다. 때로는 10 개 후에 충돌하고 어떤 경우에는 전혀 충돌하지 않습니다. 나는 이것이 당신이 예제를 시도했을 때 일어난 일이라고 믿습니다 Observable-배압이 없었으므로 코드가 정상적으로 작동했지만 다음에 그렇지 않을 수도 있습니다. RxJava 2의 차이점은 Observables에는 더 이상 배압 개념이 없으며 이를 처리 할 방법이 없다는 것입니다. 명시적인 배압 처리가 필요할 수있는 반응 시퀀스를 설계하는 경우 Flowable최선의 선택입니다.


답변