[node.js] node.js 스트림으로 오류 처리

스트림 오류를 처리하는 올바른 방법은 무엇입니까? 나는 여러분이들을 수있는 ‘오류’이벤트가 있다는 것을 이미 알고 있지만, 임의로 복잡한 상황에 대한 자세한 내용을 알고 싶습니다.

우선 간단한 파이프 체인을 원할 때 무엇을합니까?

input.pipe(transformA).pipe(transformB).pipe(transformC)...

오류를 올바르게 처리하기 위해 이러한 변환 중 하나를 올바르게 작성하는 방법은 무엇입니까?

더 많은 관련 질문 :

  • 오류가 발생하면 ‘종료’이벤트는 어떻게됩니까? 절대 해고되지 않습니까? 때때로 해고됩니까? 변환 / 스트림에 의존합니까? 여기에 표준은 무엇입니까?
  • 파이프를 통해 오류를 전파하는 메커니즘이 있습니까?
  • 도메인이이 문제를 효과적으로 해결합니까? 예가 좋을 것입니다.
  • ‘오류’이벤트에서 발생하는 오류에 스택 추적이 있습니까? 때때로? 못? 그들에게서 하나를 얻는 방법이 있습니까?


답변

변환

변환 스트림은 읽기 및 쓰기가 가능하므로 정말 좋은 ‘중간’스트림입니다. 이러한 이유로 때때로 through스트림 이라고합니다 . 데이터를 전송하는 것보다 데이터를 조작 할 수있는 훌륭한 인터페이스를 제공한다는 점을 제외하면이 방식에서는 이중 스트림과 비슷합니다. 변환 스트림의 목적은 데이터가 스트림을 통해 파이프 될 때 데이터를 조작하는 것입니다. 예를 들어 비동기 호출을 수행하거나 몇 개의 필드를 파생 시키거나 일부를 다시 매핑하는 등의 작업을 수행 할 수 있습니다.


변환 스트림을 넣을 수있는 곳


변환 스트림을 만드는 방법은 herehere을 참조 하십시오 . 당신이해야 할 일은 :

  1. 스트림 모듈을 포함
  2. Transform 클래스를 인스턴스화 (또는 상속)
  3. _transform을 취하는 메소드를 구현하십시오 (chunk, encoding, callback).

청크는 당신의 데이터입니다. 대부분의 경우에 작업중인 경우 인코딩에 대해 걱정할 필요가 없습니다 objectMode = true. 청크 처리가 완료되면 콜백이 호출됩니다. 그런 다음이 청크가 다음 스트림으로 푸시됩니다.

스트림을 정말 쉽게 처리 할 수있는 멋진 도우미 모듈을 원한다면 through2를 제안 합니다.

오류 처리를 위해 계속 읽으십시오.

파이프

파이프 체인에서 핸들링 오류는 실제로 사소한 것이 아닙니다. 이 스레드 에 따르면 .pipe ()는 오류를 전달하도록 빌드되지 않았습니다. 그래서 …

var a = createStream();
a.pipe(b).pipe(c).on('error', function(e){handleError(e)});

… 스트림에서 오류 만 수신합니다 c. 에 오류 이벤트가 발생 a하면 전달되지 않으며 실제로 발생합니다. 이를 올바르게 수행하려면 다음을 수행하십시오.

var a = createStream();
a.on('error', function(e){handleError(e)})
.pipe(b)
.on('error', function(e){handleError(e)})
.pipe(c)
.on('error', function(e){handleError(e)});

이제 두 번째 방법은 더 장황하지만 최소한 오류가 발생한 위치의 컨텍스트를 유지할 수 있습니다. 이것은 일반적으로 좋은 것입니다.

목적지의 오류 만 캡처하고 이벤트 발생 위치에 대해 신경 쓰지 않는 경우가 도움이되는 라이브러리 하나가 도움이됩니다 .

종료

오류 이벤트가 발생하면 종료 이벤트가 (명시 적으로) 발생하지 않습니다. 오류 이벤트가 발생하면 스트림이 종료됩니다.

도메인

내 경험상 도메인은 대부분 잘 작동합니다. 처리되지 않은 오류 이벤트 (예 : 리스너가없는 스트림에서 오류 발생)가있는 경우 서버가 중단 될 수 있습니다. 위의 기사에서 지적했듯이 모든 오류를 올바르게 포착 해야하는 도메인에서 스트림을 래핑 할 수 있습니다.

var d = domain.create();
 d.on('error', handleAllErrors);
 d.run(function() {
     fs.createReadStream(tarball)
       .pipe(gzip.Gunzip())
       .pipe(tar.Extract({ path: targetPath }))
       .on('close', cb);
 });

도메인의 장점은 스택 추적을 보존한다는 것입니다. 이벤트 스트림도 이것의 좋은 역할을합니다.

자세한 내용은 stream-handbook을 확인하십시오 . 깊이는 있지만 매우 유용하며 많은 유용한 모듈에 대한 훌륭한 링크를 제공합니다.


답변

node> = v10.0.0을 사용하는 경우 stream.pipelinestream.finished를 사용할 수 있습니다 .

예를 들면 다음과 같습니다.

const { pipeline, finished } = require('stream');

pipeline(
  input,
  transformA,
  transformB,
  transformC,
  (err) => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
});


finished(input, (err) => {
  if (err) {
    console.error('Stream failed', err);
  } else {
    console.log('Stream is done reading');
  }
});

자세한 내용은이 github PR 을 참조하십시오 .


답변

도메인은 더 이상 사용되지 않습니다. 당신은 그것들이 필요하지 않습니다.

이 질문에 대해 변환 또는 쓰기 가능의 구별은 그렇게 중요하지 않습니다.

mshell_lauren의 대답은 훌륭하지만 대안으로 오류가 있다고 생각되는 각 스트림에서 오류 이벤트를 명시 적으로 수신 할 수도 있습니다. 원하는 경우 핸들러 함수를 재사용하십시오.

var a = createReadableStream()
var b = anotherTypeOfStream()
var c = createWriteStream()

a.on('error', handler)
b.on('error', handler)
c.on('error', handler)

a.pipe(b).pipe(c)

function handler (err) { console.log(err) }

이렇게하면 해당 스트림 중 하나가 오류 이벤트를 발생시키는 경우 악명 높은 잡히지 않은 예외가 발생하지 않습니다.


답변

간단한 함수를 사용하여 전체 체인의 오류를 가장 오른쪽 스트림으로 전파 할 수 있습니다.

function safePipe (readable, transforms) {
    while (transforms.length > 0) {
        var new_readable = transforms.shift();
        readable.on("error", function(e) { new_readable.emit("error", e); });
        readable.pipe(new_readable);
        readable = new_readable;
    }
    return readable;
}

다음과 같이 사용할 수 있습니다.

safePipe(readable, [ transform1, transform2, ... ]);


답변

.on("error", handler)스트림 오류 만 처리하지만 사용자 정의 변환 스트림을 사용하는 경우 함수 .on("error", handler)내부에서 발생하는 오류를 포착하지 마십시오 _transform. 따라서 응용 프로그램 흐름을 제어하기 위해 이와 같은 작업을 수행 할 수 있습니다.

this_transform함수의 키워드는 Stream자체를 나타냅니다 EventEmitter. 따라서 try catch아래와 같이 오류를 잡은 다음 나중에 사용자 지정 이벤트 처리기에 전달할 수 있습니다.

// CustomTransform.js
CustomTransformStream.prototype._transform = function (data, enc, done) {
  var stream = this
  try {
    // Do your transform code
  } catch (e) {
    // Now based on the error type, with an if or switch statement
    stream.emit("CTError1", e)
    stream.emit("CTError2", e)
  }
  done()
}

// StreamImplementation.js
someReadStream
  .pipe(CustomTransformStream)
  .on("CTError1", function (e) { console.log(e) })
  .on("CTError2", function (e) { /*Lets do something else*/ })
  .pipe(someWriteStream)

이런 식으로 논리 및 오류 처리기를 개별적으로 유지할 수 있습니다. 또한 일부 오류 만 처리하고 다른 오류는 무시하도록 선택할 수 있습니다.

업데이트
대안 : RXJS 관찰 가능


답변

다중 파이프 패키지를 사용 하여 여러 스트림을 하나의 이중 스트림으로 결합 하십시오 . 한 곳에서 오류를 처리하십시오.

const pipe = require('multipipe')

// pipe streams
const stream = pipe(streamA, streamB, streamC)


// centralized error handling
stream.on('error', fn)


답변

Transform 스트림 메커니즘을 생성하고 done인수와 함께 콜백 을 호출하여 Node.js 패턴을 사용 하여 오류를 전파하십시오.

var transformStream1 = new stream.Transform(/*{objectMode: true}*/);

transformStream1.prototype._transform = function (chunk, encoding, done) {
  //var stream = this;

  try {
    // Do your transform code
    /* ... */
  } catch (error) {
    // nodejs style for propagating an error
    return done(error);
  }

  // Here, everything went well
  done();
}

// Let's use the transform stream, assuming `someReadStream`
// and `someWriteStream` have been defined before
someReadStream
  .pipe(transformStream1)
  .on('error', function (error) {
    console.error('Error in transformStream1:');
    console.error(error);
    process.exit(-1);
   })
  .pipe(someWriteStream)
  .on('close', function () {
    console.log('OK.');
    process.exit();
  })
  .on('error', function (error) {
    console.error(error);
    process.exit(-1);
   });