본문 바로가기
IT/node.js

[node.js] stream api - sample code (1)

by 내일은교양왕 2024. 5. 3.

https://nodejs.org/api/stream.html 정독하고 코드 확인해보는 시간을 가졌습니다. 너무 당연한건 적진 않았습니다.


모든 스트림은 EventEmitter 객체입니다. (EventEmitter도 파해쳐 봐야겠어요)

 

Types of Stream

 - writable

 - readable

 - duplex: both writable and readable. ex) net.Socket

 - transfrom: 데이터를 수정 할 수 있는 duplex streams ex) zlib.createDeflate()

 

Streams Promises API

callback functions을 이용하는 것이 아니라, Promise를 반환하는 비동기 스트림 함수

 

const {pipeline} = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  await pipeline(
      fs.createReadStream('archive.tar'), // Read Stream
      zlib.createGzip(), // Transform
      fs.createWriteStream('archive.tar.gz') // Write Stream
  )
  console.log('Pipeline succeeded')
}

run().catch(console.error)

 

 

abort signal를 사용하여 stream을 중간에 중단할 수 있습니다.

abort signal이 발생되면 pipeline 안에서 destory가 호출 됩니다.

const {pipeline} = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')

async function run() {
  const ac = new AbortController()
  const signal = ac.signal

  setImmediate(() => ac.abort())
  await pipeline(
      fs.createReadStream('archive.tar'), // Read Stream
      zlib.createGzip(), // Transform
      fs.createWriteStream('archive.tar.gz'), // Write Stream
      { signal }
  )
  console.log('Pipeline succeeded')
}

run().catch(console.error)

 

 

 

또한, abort signal을 이용하면 작업이 오래걸리는 작업을 중간에 중지 할 수 있습니다. (Promise.race를 이용)

const {pipeline} = require('node:stream/promises')
const fs = require('node:fs')

function processChunk(chuck, {signal}) {
    return Promise.race([
        new Promise((resolve)=> {
            setTimeout(() => {
                resolve((chuck +'').toUpperCase())
            }, 500)

        }),
        new Promise((resolve, reject) => setTimeout(reject, 100))
    ]).catch(e => {
        console.error(e)
        signal.abort()
    })

}

async function run() {
  const ac = new AbortController()
  const signal = ac.signal

  setImmediate(() => ac.abort())
  await pipeline(
      fs.createReadStream('big.file'), // Read Stream
      async function* (source, { signal }) {
        source.setEncoding('utf8')
        for await (const chunk of source) {
          await processChunk(chunk, {signal})
          yield 'asd'
        }
      },
      fs.createWriteStream('big-uppercase.txt'),
      {signal}
  )
  console.log('Pipeline succeeded')
}

run().catch(console.error)

 

 

finished

stream에서 더이상 읽거나 쓸수 있는 데이터가 없으면 해당 함수가 호출됩니다.

const { finished} = require('node:stream/promises')
const fs = require('node:fs')

const rs = fs.createReadStream('lowercase.txt')

async function run() {
    await finished(rs)
    console.log('Stream is done reading.');

}

run().catch(console.error)
rs.resume()
    .on('readable', () => {console.log(rs.read())})
    .on('end', () => {console.log('end')})

/**
 * <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64 0a>
 * null
 * end
 * Stream is done reading.
 */

 

'IT > node.js' 카테고리의 다른 글

[node.js] stream - three states (5)  (0) 2024.05.05
[node.js] readable stream - two modes (4)  (0) 2024.05.05
[node.js] stream - buffering (3)  (0) 2024.05.04
[node.js] stream - object mode (2)  (0) 2024.05.03
node.js, single thread 아닌가요?  (1) 2024.05.03