[java-8] Java 8 스트림과 RxJava 옵저버 블의 차이점

Java 8 스트림은 RxJava와 비슷합니까?

Java 8 스트림 정의 :

java.util.stream패키지의 클래스 는 요소 스트림에서 기능 스타일 작업을 지원하는 Stream API를 제공합니다.



답변

TL; DR : 모든 시퀀스 / 스트림 프로세싱 라이브러리는 파이프 라인 구축을 위해 매우 유사한 API를 제공합니다. 멀티 스레딩 및 파이프 라인 구성을 처리하기위한 API의 차이점이 있습니다.

RxJava는 Stream과 상당히 다릅니다. 모든 JDK 것들 중, rx.Observable에 가장 가까운 아마도 java.util.stream.Collector 추가 모나드 층 처리의 비용으로 제공 스트림 + CompletableFuture 콤보 (즉 핸들 변환 사이에 필요 Stream<CompletableFuture<T>>하고CompletableFuture<Stream<T>> ).

Observable과 Stream 사이에는 중요한 차이점이 있습니다.

  • 스트림은 풀 기반이며, Observable은 푸시 기반입니다. 이것은 너무 추상적으로 들릴지 모르지만 매우 구체적인 결과를 초래합니다.
  • 스트림은 한 번만 사용할 수 있으며 Observable은 여러 번 구독 할 수 있습니다
  • Stream#parallel()시퀀스를 파티션으로 분할 Observable#subscribeOn()하고 Observable#observeOn()그렇지 않습니다. Stream#parallel()Observable 을 사용하여 동작 을 에뮬레이트하는 것은 까다 롭고 한 번 .parallel()방법이 있었지만이 방법은 너무 혼란 .parallel()스러워서 지원이 github RxJavaParallel의 별도 저장소로 이동되었습니다. 자세한 내용은 다른 답변에 있습니다.
  • Stream#parallel()선택적 스케줄러를 허용하는 대부분의 RxJava 메소드와 달리 사용할 스레드 풀을 지정할 수 없습니다. 이후 모든 JVM을에서 스트림 인스턴스가 동일한 사용 추가, 수영장 포크 – 조인 .parallel()실수로 프로그램의 다른 모듈의 동작에 영향을 미칠 수
  • 스트림은 같은 시간 관련 작업 부족 Observable#interval(), Observable#window()및 많은 다른 사람을; 스트림은 풀 기반이며 업스트림은 다음 요소 다운 스트림 을 언제 방출해야하는지 제어 수 없기 때문입니다
  • 스트림은 RxJava와 비교하여 제한된 조작 세트를 제공합니다. 예를 들어 스트림에는 컷오프 작업이 없습니다 ( takeWhile(), takeUntil()). 해결 방법 Stream#anyMatch()은 제한적입니다. 터미널 작업이므로 스트림 당 두 번 이상 사용할 수 없습니다.
  • JDK 8부터는 Stream # zip 작업이 없으므로 때로는 유용합니다.
  • 스트림은 스스로 구성하기가 어렵습니다. Observable은 여러 가지 방법으로 구성 할 수 있습니다 . 편집 : 의견에서 언급했듯이 스트림을 구성하는 방법이 있습니다. 그러나 비단 말 단락이 없기 때문에 파일에서 쉽게 라인 스트림을 생성 할 수 없습니다 (JDK는 즉시 파일 # 라인 및 BufferedReader # 라인을 제공하지만 스트림을 구성하여 다른 유사한 시나리오를 관리 할 수 ​​있음) Iterator에서).
  • 관찰 가능한 자원 관리 시설 ( Observable#using()); IO 스트림 또는 뮤텍스를 랩핑하고 사용자가 리소스를 해제하는 것을 잊지 않도록해야합니다. 구독이 종료되면 자동으로 처리됩니다. 스트림에는 onClose(Runnable)메서드가 있지만 리소스를 사용하거나 리소스를 통해 직접 호출해야합니다. 예 : Files # lines () try-with-resources 블록으로 묶어야 한다는 것을 명심 해야합니다 .
  • Observables는 끝까지 동기화됩니다 (실제로 Streams가 동일한 지 여부는 실제로 확인하지 않았습니다). 이렇게하면 기본 작업이 스레드 안전인지 여부를 생각할 필요가 없습니다 (버그가 없으면 대답은 항상 ‘예’입니다).하지만 코드의 필요 여부에 관계없이 동시성 관련 오버 헤드가 있습니다.

반올림 : RxJava는 Streams와 크게 다릅니다. Real RxJava 대안은 Akka의 관련 부분과 같은 ReactiveStreams의 다른 구현입니다 .

업데이트 . 에 대한 기본이 아닌 포크 조인 풀을 사용하는 트릭이 있습니다. Java 8 병렬 스트림의 사용자 정의 스레드 풀을Stream#parallel 참조하십시오.

업데이트 . 위의 모든 내용은 RxJava 1.x 경험을 기반으로합니다. 지금 RxJava 2.x에서이 여기 ,이 답변이 오래된 될 수있다.


답변

Java 8 Stream과 RxJava는 매우 비슷합니다. 그들은 비슷한 연산자 (필터, 맵, flatMap …)를 가지고 있지만 동일한 사용법으로 만들어지지 않았습니다.

RxJava를 사용하여 비동기 작업을 수행 할 수 있습니다.

Java 8 스트림을 사용하면 컬렉션의 항목을 순회합니다.

RxJava (컬렉션의 트래버스 항목)에서 거의 동일한 작업을 수행 할 수 있지만 RxJava는 동시 작업에 중점을 두므로 동기화, 래치를 사용합니다. 따라서 RxJava를 사용하는 동일한 작업이 느릴 수 있습니다. Java 8 스트림으로.

RxJava는와 비교할 수 CompletableFuture있지만 둘 이상의 값을 계산할 수 있습니다.


답변

기술적으로나 개념적으로 약간의 차이가 있습니다. 예를 들어, Java 8 스트림은 단일 사용, 풀 기반 동기 값 시퀀스이며 RxJava Observables는 재확인 가능하고 적응 적으로 푸시 풀 기반이며 잠재적으로 비동기 값 시퀀스입니다. RxJava는 Java 6 이상을 목표로하며 Android에서도 작동합니다.


답변

Java 8 스트림은 풀 기반입니다. 각 항목을 소비하는 Java 8 스트림을 반복합니다. 그리고 그것은 끝없는 흐름이 될 수 있습니다.

RXJava Observable는 기본적으로 푸시 기반입니다. Observable에 가입하면 다음 항목이 도착 onNext하거나 ( ), 스트림이 완료 onCompleted될 때 ( ), 오류가 발생했을 때 ( ) 알림을받습니다 onError. 함께하기 때문에 Observable당신이받는 onNext, onCompleted, onError이벤트, 당신은 다른 결합과 같은 몇 가지 강력한 기능을 할 수있는 Observable새에들 ( zip, merge, concat). 당신이 할 수있는 다른 일은 캐싱, 조절, … 그리고 그것은 다른 언어 (RxJava, C #의 RX, RxJS 등)에서 거의 동일한 API를 사용합니다.

기본적으로 RxJava는 단일 스레드입니다. 스케줄러를 사용하지 않으면 모든 것이 동일한 스레드에서 발생합니다.


답변

기존 답변은 포괄적이고 정확하지만 초보자에게는 명확한 예가 부족합니다. “푸시 / 풀 기반”및 “재 관측 가능”과 같은 용어 뒤에 구체적인 내용을 설명하겠습니다. 참고 : 나는 Observable(하늘을위한 스트림) 이라는 용어를 싫어 하므로 J8 대 RX 스트림을 간단히 언급 할 것입니다.

정수 목록을 고려하십시오.

digits = [1,2,3,4,5]

J8 스트림은 콜렉션을 수정하는 유틸리티입니다. 예를 들어 짝수는 다음과 같이 추출 할 수 있습니다.

evens = digits.stream().filter(x -> x%2).collect(Collectors.toList())

이것은 기본적으로 Python의 map, filter, reduce 이며 Java에 매우 훌륭하고 오래 연체되었습니다. 그러나 미리 숫자가 수집되지 않은 경우-앱이 실행되는 동안 숫자가 스트리밍되는 경우-짝수를 실시간으로 필터링 할 수 있습니까?

앱이 실행되는 동안 별도의 스레드 프로세스가 임의의 시간에 정수를 출력한다고 상상해보십시오 ( ---시간을 나타냄)

digits = 12345---6------7--8--9-10--------11--12

RX에서는 각각의 새로운 숫자에 반응 하고 실시간으로 필터를 적용 even할 수 있습니다

even = -2-4-----6---------8----10------------12

입력 및 출력 목록을 저장할 필요가 없습니다. 출력 목록 을 원한다면 스트리밍 가능한 문제도 없습니다. 실제로 모든 것이 스트림입니다.

evens_stored = even.collect()  

이것이 “stateless”및 “functional”과 같은 용어가 RX와 더 관련이있는 이유입니다.


답변

RxJava는 또한 리 액티브 스트림 이니셔티브 와 밀접한 관련이 있으며 액티브 스트림 API의 간단한 구현으로 간주됩니다 (예 : Akka 스트림 구현 과 비교 ). 주요 차이점은 반응 스트림이 역압을 처리 할 수 ​​있도록 설계되었다는 것입니다. 그러나 반응 스트림 페이지를 보면 아이디어를 얻을 수 있습니다. 그들은 그들의 목표를 아주 잘 묘사하고 있으며, 시냇물도 반응성 선언문 과 밀접한 관련이 있습니다 .

Java 8 스트림은 Scala Stream 또는 Clojure lazy seq 와 매우 유사한 무제한 컬렉션 구현입니다 .


답변

Java 8 Streams는 멀티 코어 아키텍처를 활용하면서 매우 큰 컬렉션을 효율적으로 처리 할 수 ​​있습니다. 반대로 RxJava는 기본적으로 단일 스레드입니다 (스케줄러 없음). 따라서 RxJava는 해당 로직을 직접 코딩하지 않으면 멀티 코어 머신을 이용하지 않습니다.