[javascript] promise.then ()과 동일한 RxJS 시퀀스?

나는 약속을 가지고 많은 것을 개발하곤했고 지금은 RxJS로 이동하고 있습니다. RxJS의 문서는 프라 미스 체인에서 옵저버 시퀀스로 이동하는 방법에 대한 명확한 예를 제공하지 않습니다.

예를 들어, 저는 보통 다음과 같이 여러 단계로 프로 미스 체인을 작성합니다.

// a function that returns a promise
getPromise()
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.then(function(result) {
   // do something
})
.catch(function(err) {
    // handle error
});

이 약속 체인을 RxJS 스타일로 어떻게 다시 작성해야합니까?



답변

데이터 흐름의 경우 (에 해당 then) :

Rx.Observable.fromPromise(...)
  .flatMap(function(result) {
   // do something
  })
  .flatMap(function(result) {
   // do something
  })
  .subscribe(function onNext(result) {
    // end of chain
  }, function onError(error) {
    // process the error
  });

약속은를 사용하여 관찰 가능으로 변환 할 수 있습니다 Rx.Observable.fromPromise.

일부 약속 연산자는 직접 번역합니다. 예를 들어 RSVP.all, 또는 jQuery.when로 대체 될 수 있습니다 Rx.Observable.forkJoin.

데이터를 비동기 적으로 변환하고 약속으로 수행 할 수 없거나 수행하기 매우 어려운 작업을 수행 할 수있는 많은 연산자가 있다는 점을 기억하십시오. Rxjs는 데이터의 비동기 시퀀스 (즉, 하나 이상의 비동기 값)로 모든 능력을 보여줍니다.

오류 관리의 경우 주제는 조금 더 복잡합니다.

  • 거기 캐치마지막으로 통신 사업자도
  • retryWhen 오류가 발생한 경우 시퀀스를 반복하는 데 도움이 될 수도 있습니다.
  • onError기능 을 사용하여 구독자 자체의 오류를 처리 할 수도 있습니다.

정확한 의미 체계를 위해 웹에서 찾을 수있는 문서와 예제를 더 자세히 살펴 보거나 여기에서 특정 질문을 할 수 있습니다.

이것은 Rxjs를 사용하여 오류 관리를 더 깊게하기위한 좋은 출발점이 될 것입니다 : https://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/creating_and_querying_observable_sequences/error_handling.html


답변

보다 현대적인 대안 :

import {from as fromPromise} from 'rxjs';
import {catchError, flatMap} from 'rxjs/operators';

fromPromise(...).pipe(
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   flatMap(result => {
       // do something
   }),
   catchError(error => {
       // handle error
   })
)

또한이 모든 것이 작동하려면 어딘가에 subscribe파이프 가 필요 Observable하지만 응용 프로그램의 다른 부분에서 처리된다고 가정합니다.


답변

RxJs 6을 사용하여 2019 년 5 월 업데이트

위에 제공된 답변에 동의하고, 명확성을 추가하기 위해 RxJs v6 을 사용하여 장난감 데이터 및 간단한 약속 (setTimeout 포함)으로 구체적인 예제를 추가하고 싶습니다 .

전달 된 ID (현재 하드 코딩 됨 1)를 존재하지 않는 것으로 업데이트하여 오류 처리 로직도 실행하십시오. 중요한 것은 ofwith catchErrormessage 의 사용에도 유의하십시오 .

import { from as fromPromise, of } from "rxjs";
import { catchError, flatMap, tap } from "rxjs/operators";

const posts = [
  { title: "I love JavaScript", author: "Wes Bos", id: 1 },
  { title: "CSS!", author: "Chris Coyier", id: 2 },
  { title: "Dev tools tricks", author: "Addy Osmani", id: 3 }
];

const authors = [
  { name: "Wes Bos", twitter: "@wesbos", bio: "Canadian Developer" },
  {
    name: "Chris Coyier",
    twitter: "@chriscoyier",
    bio: "CSS Tricks and CodePen"
  },
  { name: "Addy Osmani", twitter: "@addyosmani", bio: "Googler" }
];

function getPostById(id) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const post = posts.find(post => post.id === id);
      if (post) {
        console.log("ok, post found!");
        resolve(post);
      } else {
        reject(Error("Post not found!"));
      }
    }, 200);
  });
}

function hydrateAuthor(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      const authorDetails = authors.find(person => person.name === post.author);
      if (authorDetails) {
        post.author = authorDetails;
        console.log("ok, post hydrated with author info");
        resolve(post);
      } else {
        reject(Error("Author not Found!"));
      }
    }, 200);
  });
}

function dehydratePostTitle(post) {
  return new Promise((resolve, reject) => {
    setTimeout(() => {
      delete post.title;
      console.log("ok, applied transformation to remove title");
      resolve(post);
    }, 200);
  });
}

// ok, here is how it looks regarding this question..
let source$ = fromPromise(getPostById(1)).pipe(
  flatMap(post => {
    return hydrateAuthor(post);
  }),
  flatMap(post => {
    return dehydratePostTitle(post);
  }),
  catchError(error => of(`Caught error: ${error}`))
);

source$.subscribe(console.log);

출력 데이터 :

ok, post found!
ok, post hydrated with author info
ok, applied transformation to remove title
{ author:
   { name: 'Wes Bos',
     twitter: '@wesbos',
     bio: 'Canadian Developer' },
  id: 1 }

핵심 부분은 일반 약속 제어 흐름을 사용하는 다음과 같습니다.

getPostById(1)
  .then(post => {
    return hydrateAuthor(post);
  })
  .then(post => {
    return dehydratePostTitle(post);
  })
  .then(author => {
    console.log(author);
  })
  .catch(err => {
    console.error(err);
  });


답변

내가 올바르게 이해했다면 값을 소비하는 것을 의미합니다.이 경우 sbuscribe를 사용합니다.

const arrObservable = from([1,2,3,4,5,6,7,8]);
arrObservable.subscribe(number => console.log(num) );

또한 다음과 같이 toPromise ()를 사용하여 Observable을 promise로 바꿀 수 있습니다.

arrObservable.toPromise().then()


답변

경우 getPromise함수는 스트림 파이프의 중간에 당신이해야 기능 중 하나에 간단한 포장을 mergeMap, switchMap또는 concatMap(보통 mergeMap) :

stream$.pipe(
   mergeMap(data => getPromise(data)),
   filter(...),
   map(...)
 ).subscribe(...);

스트림을 시작하려면 함수 getPromise()로 래핑하십시오 from.

import {from} from 'rxjs';

from(getPromise()).pipe(
   filter(...)
   map(...)
).subscribe(...);


답변

내가 방금 알아 낸 한 flatMap에서 결과를 반환하면 문자열을 반환하더라도 배열로 변환합니다.

그러나 Observable을 반환하면 해당 Observable은 문자열을 반환 할 수 있습니다.


답변

이것이 내가 한 방법입니다.

이전

  public fetchContacts(onCompleteFn: (response: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => void) {
    const request = gapi.client.people.people.connections.list({
      resourceName: 'people/me',
      pageSize: 100,
      personFields: 'phoneNumbers,organizations,emailAddresses,names'
    }).then(response => {
      onCompleteFn(response as gapi.client.Response<gapi.client.people.ListConnectionsResponse>);
    });
  }

// caller:

  this.gapi.fetchContacts((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      // handle rsp;
  });

이후 (ly?)

public fetchContacts(): Observable<gapi.client.Response<gapi.client.people.ListConnectionsResponse>> {
    return from(
      new Promise((resolve, reject) => {
        gapi.client.people.people.connections.list({
          resourceName: 'people/me',
          pageSize: 100,
          personFields: 'phoneNumbers,organizations,emailAddresses,names'
        }).then(result => {
          resolve(result);
        });
      })
    ).pipe(map((result: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
      return result; //map is not really required if you not changing anything in the response. you can just return the from() and caller would subscribe to it.
    }));
  }

// caller

this.gapi.fetchContacts().subscribe(((rsp: gapi.client.Response<gapi.client.people.ListConnectionsResponse>) => {
  // handle rsp
}), (error) => {
  // handle error
});