Skip to content

理解 Node.js 流机制

流的基本概念

流(Stream)是 Node.js 中处理流式数据的抽象接口,它可以让我们以高效的方式处理读写文件、网络通信等 I/O 操作。

流的类型

1. 可读流(Readable Stream)

javascript
const fs = require('fs')

// 创建可读流
const readStream = fs.createReadStream('large-file.txt', {
  highWaterMark: 64 * 1024, // 64KB 的缓冲区
  encoding: 'utf8'
})

// 处理数据块
readStream.on('data', (chunk) => {
  console.log('接收到数据块:', chunk.length, '字节')
})

// 处理流结束
readStream.on('end', () => {
  console.log('数据读取完成')
})

// 错误处理
readStream.on('error', (error) => {
  console.error('读取错误:', error)
})

2. 可写流(Writable Stream)

javascript
const fs = require('fs')

// 创建可写流
const writeStream = fs.createWriteStream('output.txt', {
  flags: 'w',
  encoding: 'utf8'
})

// 写入数据
writeStream.write('Hello, ')
writeStream.write('World!')

// 结束写入
writeStream.end(() => {
  console.log('写入完成')
})

// 错误处理
writeStream.on('error', (error) => {
  console.error('写入错误:', error)
})

3. 双工流(Duplex Stream)

javascript
const { Duplex } = require('stream')

class MyDuplex extends Duplex {
  constructor(options) {
    super(options)
    this.data = ['Hello', 'World', 'Stream']
  }

  _read(size) {
    const data = this.data.shift()
    if (data) {
      this.push(data)
    } else {
      this.push(null)
    }
  }

  _write(chunk, encoding, callback) {
    console.log('写入数据:', chunk.toString())
    callback()
  }
}

const duplex = new MyDuplex()

duplex.on('data', (chunk) => {
  console.log('读取数据:', chunk.toString())
})

duplex.write('测试数据')

流的应用场景

1. 文件压缩

javascript
const fs = require('fs')
const zlib = require('zlib')

// 使用管道链接多个流
fs.createReadStream('input.txt')
  .pipe(zlib.createGzip())
  .pipe(fs.createWriteStream('input.txt.gz'))
  .on('finish', () => {
    console.log('文件压缩完成')
  })

2. HTTP 流式响应

javascript
const http = require('http')
const fs = require('fs')

const server = http.createServer((req, res) => {
  // 流式处理大文件
  if (req.url === '/video') {
    const videoStream = fs.createReadStream('large-video.mp4')
    res.writeHead(200, {
      'Content-Type': 'video/mp4'
    })
    videoStream.pipe(res)
  }
})

server.listen(3000)

性能优化

1. 内存管理

javascript
const fs = require('fs')
const { Transform } = require('stream')

// 创建转换流进行数据处理
const processChunk = new Transform({
  transform(chunk, encoding, callback) {
    // 处理数据块,避免一次性加载大量数据到内存
    const processed = someHeavyProcessing(chunk)
    this.push(processed)
    callback()
  }
})

// 流式处理大文件
fs.createReadStream('large-file.txt').pipe(processChunk).pipe(fs.createWriteStream('output.txt'))

function someHeavyProcessing(data) {
  // 实际的数据处理逻辑
  return data
}

2. 背压处理

javascript
const fs = require('fs')

const readStream = fs.createReadStream('source.txt')
const writeStream = fs.createWriteStream('destination.txt')

// 手动控制数据流动
readStream.on('data', (chunk) => {
  // 检查写入流是否已满
  const canContinue = writeStream.write(chunk)

  if (!canContinue) {
    // 暂停读取
    readStream.pause()

    // 等待 drain 事件后继续
    writeStream.once('drain', () => {
      readStream.resume()
    })
  }
})

最佳实践

  1. 错误处理
javascript
function handleStream(stream) {
  return new Promise((resolve, reject) => {
    // 始终处理错误事件
    stream.on('error', reject)

    // 根据流的类型处理完成事件
    if (stream.readable) {
      stream.on('end', resolve)
    } else if (stream.writable) {
      stream.on('finish', resolve)
    }
  })
}

// 使用 async/await 处理流
async function processStream() {
  const readStream = fs.createReadStream('input.txt')
  const writeStream = fs.createWriteStream('output.txt')

  try {
    await Promise.all([handleStream(readStream), handleStream(writeStream)])
    console.log('流处理完成')
  } catch (error) {
    console.error('流处理错误:', error)
  }
}
  1. 资源清理
javascript
const cleanup = (stream) => {
  return new Promise((resolve) => {
    stream.on('close', () => {
      resolve()
    })
    stream.destroy()
  })
}

async function safelyProcessStream() {
  const stream = fs.createReadStream('file.txt')

  try {
    await processStreamData(stream)
  } finally {
    await cleanup(stream)
  }
}

总结

TIP

Node.js 的流机制是一个强大的功能,它可以帮助我们:

  • 高效处理大文件
  • 优化内存使用
  • 提供更好的数据处理方式
  • 实现复杂的数据管道

在实际开发中,合理使用流可以显著提升应用性能和用户体验。记住要始终:

  • 正确处理错误
  • 实现背压机制
  • 及时清理资源
  • 选择合适的缓冲区大小