https://nodejs.org/api/stream.html 정독하고 코드 확인해보는 시간을 가졌습니다. 너무 당연한건 적진 않았습니다.
모든 스트림은 EventEmitter 객체입니다. (EventEmitter도 파해쳐 봐야겠어요)
Types of Stream
- writable
- readable
- duplex: both writable and readable. ex) net.Socket
- transfrom: 데이터를 수정 할 수 있는 duplex streams ex) zlib.createDeflate()
Streams Promises API
callback functions을 이용하는 것이 아니라, Promise를 반환하는 비동기 스트림 함수
const {pipeline} = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
await pipeline(
fs.createReadStream('archive.tar'), // Read Stream
zlib.createGzip(), // Transform
fs.createWriteStream('archive.tar.gz') // Write Stream
)
console.log('Pipeline succeeded')
}
run().catch(console.error)
abort signal를 사용하여 stream을 중간에 중단할 수 있습니다.
abort signal이 발생되면 pipeline 안에서 destory가 호출 됩니다.
const {pipeline} = require('node:stream/promises')
const fs = require('node:fs')
const zlib = require('node:zlib')
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(
fs.createReadStream('archive.tar'), // Read Stream
zlib.createGzip(), // Transform
fs.createWriteStream('archive.tar.gz'), // Write Stream
{ signal }
)
console.log('Pipeline succeeded')
}
run().catch(console.error)
또한, abort signal을 이용하면 작업이 오래걸리는 작업을 중간에 중지 할 수 있습니다. (Promise.race를 이용)
const {pipeline} = require('node:stream/promises')
const fs = require('node:fs')
function processChunk(chuck, {signal}) {
return Promise.race([
new Promise((resolve)=> {
setTimeout(() => {
resolve((chuck +'').toUpperCase())
}, 500)
}),
new Promise((resolve, reject) => setTimeout(reject, 100))
]).catch(e => {
console.error(e)
signal.abort()
})
}
async function run() {
const ac = new AbortController()
const signal = ac.signal
setImmediate(() => ac.abort())
await pipeline(
fs.createReadStream('big.file'), // Read Stream
async function* (source, { signal }) {
source.setEncoding('utf8')
for await (const chunk of source) {
await processChunk(chunk, {signal})
yield 'asd'
}
},
fs.createWriteStream('big-uppercase.txt'),
{signal}
)
console.log('Pipeline succeeded')
}
run().catch(console.error)
finished
stream에서 더이상 읽거나 쓸수 있는 데이터가 없으면 해당 함수가 호출됩니다.
const { finished} = require('node:stream/promises')
const fs = require('node:fs')
const rs = fs.createReadStream('lowercase.txt')
async function run() {
await finished(rs)
console.log('Stream is done reading.');
}
run().catch(console.error)
rs.resume()
.on('readable', () => {console.log(rs.read())})
.on('end', () => {console.log('end')})
/**
* <Buffer 68 65 6c 6c 6f 20 77 6f 72 6c 64 0a>
* null
* end
* Stream is done reading.
*/
'IT > node.js' 카테고리의 다른 글
[node.js] stream - three states (5) (0) | 2024.05.05 |
---|---|
[node.js] readable stream - two modes (4) (0) | 2024.05.05 |
[node.js] stream - buffering (3) (0) | 2024.05.04 |
[node.js] stream - object mode (2) (0) | 2024.05.03 |
node.js, single thread 아닌가요? (1) | 2024.05.03 |