[angular] RxJs 5에서 Angular Http 네트워크 호출 결과를 공유하는 올바른 방법은 무엇입니까?

Http를 사용하여 네트워크 호출을 수행하고 http 옵저버 블을 반환하는 메소드를 호출합니다.

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json());
}

이 관찰 가능 항목을 가져 와서 여러 구독자를 추가하는 경우 :

let network$ = getCustomer();

let subscriber1 = network$.subscribe(...);
let subscriber2 = network$.subscribe(...);

우리가 원하는 것은 이것이 여러 네트워크 요청을 일으키지 않도록하는 것입니다.

이것은 비정상적인 시나리오처럼 보이지만 실제로는 매우 일반적입니다. 예를 들어 호출자가 오류 메시지를 표시하기 위해 옵저버 블에 가입하고 비동기 파이프를 사용하여 템플릿에 전달하는 경우 이미 두 명의 가입자가 있습니다.

RxJs 5에서 올바른 방법은 무엇입니까?

즉, 이것은 잘 작동하는 것 같습니다 :

getCustomer() {
    return this.http.get('/someUrl').map(res => res.json()).share();
}

그러나 이것이 RxJs 5에서 이것을하는 관용적 인 방법입니까, 아니면 다른 것을해야합니까?

참고 : Angular 5 new 에 따라 JSON 결과가 기본적으로 가정되므로 모든 예제 HttpClient.map(res => res.json())부분은 이제 쓸모가 없습니다.



답변

데이터를 캐시하고 캐시 된 경우 사용 가능한 경우이를 리턴하여 HTTP 요청을 작성하십시오.

import {Injectable} from '@angular/core';
import {Http, Headers} from '@angular/http';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/observable/of'; //proper way to import the 'of' operator
import 'rxjs/add/operator/share';
import 'rxjs/add/operator/map';
import {Data} from './data';

@Injectable()
export class DataService {
  private url: string = 'https://cors-test.appspot.com/test';

  private data: Data;
  private observable: Observable<any>;

  constructor(private http: Http) {}

  getData() {
    if(this.data) {
      // if `data` is available just return it as `Observable`
      return Observable.of(this.data);
    } else if(this.observable) {
      // if `this.observable` is set then the request is in progress
      // return the `Observable` for the ongoing request
      return this.observable;
    } else {
      // example header (not necessary)
      let headers = new Headers();
      headers.append('Content-Type', 'application/json');
      // create the request, store the `Observable` for subsequent subscribers
      this.observable = this.http.get(this.url, {
        headers: headers
      })
      .map(response =>  {
        // when the cached data is available we don't need the `Observable` reference anymore
        this.observable = null;

        if(response.status == 400) {
          return "FAILURE";
        } else if(response.status == 200) {
          this.data = new Data(response.json());
          return this.data;
        }
        // make it shared so more than one subscriber can get the result
      })
      .share();
      return this.observable;
    }
  }
}

플 런커 예

이 기사 https://blog.thoughtram.io/angular/2018/03/05/advanced-caching-with-rxjs.html은로 캐시하는 방법에 대한 훌륭한 설명입니다 shareReplay.


답변

@Cristian 제안에 따르면, 이것은 HTTP Observable에 잘 작동하는 한 가지 방법으로 한 번만 방출 한 다음 완료됩니다.

getCustomer() {
    return this.http.get('/someUrl')
        .map(res => res.json()).publishLast().refCount();
}


답변

업데이트 : Ben Lesh에 따르면 5.2.0 이후의 다음 마이너 릴리스에서는 shareReplay ()를 호출하여 실제로 캐시 할 수 있습니다.

이전 …

첫째, share () 또는 publishReplay (1) .refCount ()를 사용하지 마십시오. 동일하고 문제는, Observable이 활성화 된 상태에서 연결이 완료된 경우에만 공유하고 완료 한 후 연결하면 공유한다는 것입니다 실제로 캐싱이 아닌 새로운 관측 가능 번역을 다시 생성합니다.

Birowski는 ReplaySubject를 사용하는 올바른 솔루션을 제공했습니다. ReplaySubject는 우리의 경우 1에 주어진 값 (bufferSize)을 캐시합니다. refCount가 0에 도달하고 새로운 연결을 설정하면 캐싱에 대한 올바른 동작 인 share ()와 같은 새로운 관찰 가능 값을 생성하지 않습니다.

재사용 가능한 기능은 다음과 같습니다.

export function cacheable<T>(o: Observable<T>): Observable<T> {
  let replay = new ReplaySubject<T>(1);
  o.subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  return replay.asObservable();
}

사용 방법은 다음과 같습니다.

import { Injectable } from '@angular/core';
import { Http } from '@angular/http';
import { Observable } from 'rxjs/Observable';
import { cacheable } from '../utils/rxjs-functions';

@Injectable()
export class SettingsService {
  _cache: Observable<any>;
  constructor(private _http: Http, ) { }

  refresh = () => {
    if (this._cache) {
      return this._cache;
    }
    return this._cache = cacheable<any>(this._http.get('YOUR URL'));
  }
}

다음은 캐시 가능한 기능의 고급 버전입니다.이 기능은 자체 조회 테이블 + 사용자 정의 조회 테이블을 제공 할 수 있습니다. 이런 식으로 위의 예와 같이 this._cache를 확인할 필요가 없습니다. 또한 Observable을 첫 번째 인수로 전달하는 대신 Observable을 반환하는 함수를 전달합니다. 이것은 Angular의 Http가 즉시 실행되기 때문입니다. 우리의 캐시.

let cacheableCache: { [key: string]: Observable<any> } = {};
export function cacheable<T>(returnObservable: () => Observable<T>, key?: string, customCache?: { [key: string]: Observable<T> }): Observable<T> {
  if (!!key && (customCache || cacheableCache)[key]) {
    return (customCache || cacheableCache)[key] as Observable<T>;
  }
  let replay = new ReplaySubject<T>(1);
  returnObservable().subscribe(
    x => replay.next(x),
    x => replay.error(x),
    () => replay.complete()
  );
  let observable = replay.asObservable();
  if (!!key) {
    if (!!customCache) {
      customCache[key] = observable;
    } else {
      cacheableCache[key] = observable;
    }
  }
  return observable;
}

용법:

getData() => cacheable(this._http.get("YOUR URL"), "this is key for my cache")


답변

rxjs 5.4.0 에는 새로운 shareReplay 메소드가 있습니다.

저자는 “AJAX 결과 캐싱과 같은 것을 다루기에 이상적” 이라고 명시 적으로 말한다

rxjs PR # 2443 feat (shareReplay) :의 shareReplay변형 추가publishReplay

shareReplay는 ReplaySubject를 통해 멀티 캐스트 된 소스 인 Observable을 반환합니다. 해당 재생 주제는 소스의 오류에 따라 재활용되지만 소스의 완료에는 없습니다. 이를 통해 shareReplay는 재시도 가능하기 때문에 AJAX 결과 캐싱과 같은 것을 처리하는 데 이상적입니다. 그러나 반복 동작이지만 소스 관찰 가능을 반복하지 않고 소스 관찰 가능 값을 반복한다는 점에서 공유와 다릅니다.


답변

기사 에 따르면

publishReplay (1) 및 refCount를 추가하여 Observable에 캐싱을 쉽게 추가 할 수 있습니다.

그래서 if 문 안에 추가하십시오.

.publishReplay(1)
.refCount();

.map(...)


답변

rxjs 버전 5.4.0 (2017-05-09)shareReplay 지원이 추가 되었습니다 .

shareReplay를 사용하는 이유는 무엇입니까?

일반적으로 여러 구독자간에 실행하지 않으려는 부작용 또는 과세 계산이있는 경우 shareReplay를 사용하려고합니다. 또한 이전에 방출 된 값에 액세스해야하는 스트림에 늦은 구독자가있을 경우 상황에서 유용 할 수 있습니다. 서브 스크립 션에서 가치를 재생하는이 기능은 공유와 shareReplay를 차별화하는 것입니다.

이것을 사용하기 위해 각도 서비스를 쉽게 수정하고 캐시 된 결과로 옵저버 블을 반환하여 http 호출을 한 번만 할 수 있습니다 (첫 번째 호출이 성공했다고 가정).

각도 서비스 예

다음은를 사용하는 매우 간단한 고객 서비스입니다 shareReplay.

customer.service.ts

import { shareReplay } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Injectable()
export class CustomerService {

    private readonly _getCustomers: Observable<ICustomer[]>;

    constructor(private readonly http: HttpClient) {
        this._getCustomers = this.http.get<ICustomer[]>('/api/customers/').pipe(shareReplay());
    }

    getCustomers() : Observable<ICustomer[]> {
        return this._getCustomers;
    }
}

export interface ICustomer {
  /* ICustomer interface fields defined here */
}

생성자의 할당은 메소드로 이동 될 수 있지만 생성자 getCustomers에서 리턴 된 관찰 가능 항목 HttpClient이 “콜드” 이므로 생성자는이 호출을 수행 할 수 있습니다.subscribe 입니다.

또한 여기서 반환 된 초기 데이터는 응용 프로그램 인스턴스 수명 동안 오래되지 않습니다.


답변

나는 그 질문에 별표를 썼다. 그러나 나는 이것을 시도 할 것이다.

//this will be the shared observable that 
//anyone can subscribe to, get the value, 
//but not cause an api request
let customer$ = new Rx.ReplaySubject(1);

getCustomer().subscribe(customer$);

//here's the first subscriber
customer$.subscribe(val => console.log('subscriber 1: ' + val));

//here's the second subscriber
setTimeout(() => {
  customer$.subscribe(val => console.log('subscriber 2: ' + val));
}, 1000);

function getCustomer() {
  return new Rx.Observable(observer => {
    console.log('api request');
    setTimeout(() => {
      console.log('api response');
      observer.next('customer object');
      observer.complete();
    }, 500);
  });
}

여기 증거가 있습니다 🙂

테이크 아웃은 하나뿐입니다. getCustomer().subscribe(customer$)

우리는의 api 응답 getCustomer()을 구독하지 않고 다른 Observable을 구독 할 수있는 ReplaySubject를 구독하고 있으며 (이것은 중요합니다) 마지막으로 방출 된 값을 유지하고 (ReplaySubject의 ) 가입자.