Nodejs流学习系列之一: Readable Stream

林光

前言

nodejs网关开发或多或少都会牵扯到流的使用,但每次的使用都是迷迷糊糊、懵懵懂懂,总是踩完坑后才知道怎么使用,所以有必要深入学习一下Nodejs的流。学习是为了实践,因此这篇文章将利用两个很常用的demo来学习理论并实践。本来只打算一篇文章就写完的,后来看官网文档,越看越觉得一篇文章解决不来,于是打算使用3篇文章来阐述清楚nodejs流的原理.三篇文章如下:

  1. Nodejs流学习系列之一: Readable Stream
  2. Nodejs流学习系列之二: Writable Stream
  3. Nodejs流学习系列之三: Duplex Stream & Transform Stream

外加一篇应用篇:

  1. Nodejs流学习系列之四: Nodejs流的应用例子

本系列的所有代码都可以在这个github地址上看到: node-stream-demo

1 nodejs流的简介

为什么要有流这种形态存在,估计大家应该都清楚,在linux中能用流解决的事情是不会使用别的方法.为什么?因为流的效率高并且占用内存小.相信没有人会直接去cat|less一个2/3GB的日志文件(敢这么做,信不信分分钟钟爆掉你的内存卡),而是会采用|管道来做出一个流,然后在流中查看我们想要的信息.

Nodejs的流也是这么一个目的,在有限的内存中实现我们操作"海量"数据的目标.

在当前的nodejs文档中,介绍的流主要分4种:readable,writable,transform,duplex。(through流就算了)

  • readable: 可以读取数据的流(比如fs.createReadStream)
  • writable: 可以写入数据的流(比如fs.createWriteStream)
  • duplex: 可以读写数据的流(比如net.Socket)
  • transform: 是双工流的一种特殊模式,与duplex的区别在于它可以对数据进行加工.(比如zlib streams/crypto streams)

在介绍可读流之前,我们先将官网的两个通用的概念大致翻译一下,如果翻译后的仍然不解,我们在后面的可读流分析中会继续解释的:

Object Mode

凡是通过nodejs API创建的流都是专用于string和Buffer上的,如果需要操作其他类型的JS数据(除了JS的null类型,因为这个是另有用途的),这样的流被认为是操作在Object Mode.

流实例通过在创建之初配置objectMode字段为true来切换模式.不建议在一条已经存在的流中切换模式.

Buffer

可写流和可读流都是存储在内部的缓存区中,并且可以通过writable._writableState.getBuffer()readable._readableState.buffer分别获取.

提到缓存就涉及到highWaterMark字段,对于正常的流(objectMode=false),highWaterMark字段指定的便是字节数.如果是操作在objectMode那么highWaterMark字段指定的是objects的总数.

当实现调用stream.push(chunk)的时候数据被缓存到可写流中.如果消费者没有调用stream.read(),那么数据将会一直停留在内部队列中直到被消费掉.

一旦内部可读缓存的大小超过了highWaterMark字段指定的阀值,那么可读流将会暂时停止从底层数据中读取数据直到当前缓存的数据被消耗掉.(也就是可读流将会停止调用内部readable_read()方法,该方法是用于填充可读缓存的)

当不断调用writable.write(chunk)方法的时候,数据就会不断缓存到可写流中,如果内部缓存的数据仍然小于highWaterMark,调用该方法会返回 true.一旦超过阀值,就会返回false.

流API的一个关键目标,特别是stream.pipe()方法,是将数据的缓冲限制到可接受的级别,这样一来,不同速度的源和目标就不会淹没可用的内存

因为duplexTransform流都是可读和可写的,每个都维护两个独立的内部缓冲区,用于读取和写入,允许每一方能够独立地操作,同时还保持适当和高效的数据流。例如,net.socket实例就是duplex流,其可读流允许消费从套接字接收到的数据,而其可写流允许将数据写入到套接字中。因为数据可能以比接收到的数据更快或更慢的速度写入套接字,因此对于双方来说能够独立的操作(和缓冲区)都很重要。

读完这两段内容不知是否对什么object Mode以及水位有一定的了解了?如果仍然不清楚的话,没事,我们下面还会继续阐述.

2 Readable流的基本形态

我们使用最最基本的语法新建一个可读流,代码如下:

const { Readable } = require('stream')

class MyReadable extends Readable {  
  constructor(options) {
    // Calls the stream.Readable(options) constructor
    super(options);
  }
  _read(size) {

  }
}

const rs = new MyReadable()  

这个实例化后的Readable流,其内部结构是这样的:

Readable {  
  _readableState:
   ReadableState {
     objectMode: false, // 是否是object mode
     highWaterMark: 16384, // 最高水位点,默认是16KB,最大为8MB
     buffer: BufferList { head: null, tail: null, length: 0 }, // _read函数读取数据的地方
     length: 0, // 整个可读流的数据大小(如果是object mode,则该值与buffer.length相等)
     pipes: null, // 存储该可读流建立的所有的管道,如果多于1个,则是一个数组
     pipesCount: 0, // 管道的个数
     flowing: null, // 标识可读流的模式
     ended: false, // 是否已经消费了所有的数据
     endEmitted: false, // 是否已经发送了结束的事件
     reading: false, // 是否正在读取数据
     sync: true, // 标识'readable/data'事件是立即发送的还是等到下一个tick,我们默认设置为true.
     needReadable: false, // 是否需要发送readable事件
     emittedReadable: false, // 是否发送过readable事件
     readableListening: false, // 是否有readable监听事件
     resumeScheduled: false, // 标识有调用过resume方法
     defaultEncoding: 'utf8', // 默认编码
     ranOut: false, // 没有用到
     awaitDrain: 0, // 标识在.pipe()s中等待一个drain事件的可写流的个数
     readingMore: false, // 标识是否可以读取更多数据
     decoder: null, // 解码器,如果encoding有配置,那么该值是实例化一个string_decoder解码器
     encoding: null // 编码配置
  },
  readable: true,
  domain: null,
  _events: {},
  _eventsCount: 0,
  _maxListeners: undefined,
  _read: [Function]
}

内部结构的每个字段的含义都已经标明,一个实例化的Readable流就是一个小型状态机,里面的各种状态的转变以及对应触发的事件一两句话是道不清说不明的,以后有时间我打算画一张状态机的图片,立个flag!

我们先关注这个:buffer: BufferList { head: null, tail: null, length: 0 }

3 可读流的Buffer组织

因为js没有指针的概念,所以使用了上面这种结构来实现buffer链表数据结构,现在我们开始往该可读流写入数据:

rs.push('1234567')  
rs.push('890abcd')  

接下去打印rs._readableState.buffer

于是有:

BufferList {  
  head:
   { data: <Buffer 31 32 33 34 35 36 37>,
     next: { data: <Buffer 38 39 30 61 62 63 64>, next: null } },
  tail: { data: <Buffer 38 39 30 61 62 63 64>, next: null },
  length: 2 }

可以看到对应的数据都放在对应的位置,我们再写一组数据:

rs.push('efghijk')  

结果是:

BufferList {  
  head:
   { data: <Buffer 31 32 33 34 35 36 37>,
     next: { data: <Buffer 38 39 30 61 62 63 64>, next: [Object] } },
  tail: { data: <Buffer 65 66 67 68 69 6a 6b>, next: null },
  length: 3 }

这样大家应该都清楚可读流内部的buffer组织形式了吧?使用了对象的嵌套,一层一层地往里嵌套,并且同时保存着最后1块buffer的内容以及整个buffer的块数---也就是length字段.从nodejs源码的commit记录来看,可读流的缓存实现修改为链表形式是在这个commit中实现的: stream: improve Readable.read() performance

3 可读流的两种模式

在内部结构中有一个字段叫做flowing.该字段可以有三种配置:

  1. null: 表明当前没有任何机制去消费流数据,所以流也就不会产生数据.在这个状态下,增加一个data事件或者调用readable.pipe()/readable.resume()方法的时候将会切换该字段为true.
  2. true: 表明当前处于flowing模式,当我们调用readable.pause(), readable.unpipe()或者接收到背板压力的时候会导致该字段切换为false,这样会暂时终止事件流但是不会终止生成数据.在此种状态下,重新监听data事件是不会切换flowing字段为true的,具体的案例请参考官网文档: Readable
  3. false: 表明当前处于paused模式,数据在内部缓存中会不断累积

因为有这三种不同的状态,可读流是可以作用在两种模式下的: flowing modepause mode.在flowing模式下,数据可以自动从底层资源中读取.在paused模式下,我们必须明确地调用stream.read()来从流中读取数据,二者模式可以自由切换,默认新建的流是在paused模式,当使用下面的任意一种方法就可以切换到flowing模式:

  1. 添加一个data事件处理
  2. 调用stream.resume()方法
  3. 调用stream.pipe()方法发送数据到一个可写的流

从flowing模式切换回来可以通过下面的方法:

  1. 如果当前没有任何pipe,那么直接调用stream.pause()即可
  2. 如果当前有pipe,那么需要删掉所有的data事件以及删掉所有pipe,然后再调用stream.pause()

二者的图示如下:

因此我们有这样的一个归纳: 我们使用_read()方法将数据从底层资源中读取,然后使用push()方法将数据保存到内部的缓存中,再使用read()方法将数据从内部缓存中读取到消费者中去

3.1 pause模式

首先我们来看一个pause模式的简单demo:

const { Readable } = require('stream')

class MyReadable extends Readable {  
  constructor(options) {
    // Calls the stream.Readable(options) constructor
    super(options);
  }
  _read(size) {

  }
}

const rs = new MyReadable()  
// const rs = new MyReadable({ objectMode: true })

rs.on('readable', () => {  
  let chunk;
  while (null !== (chunk = rs.read(5))) {
    console.log('Received bytes of data.', chunk);
  }
})

rs.push('1234567')  
rs.push('890abcd')  
rs.push('efghijk')

console.log(rs._readableState.buffer)  

大家可以心里面想一下这个demo会输出些什么?

3

2

1

答案是:

BufferList {  
  head:
   { data: <Buffer 31 32 33 34 35 36 37>,
     next: { data: <Buffer 38 39 30 61 62 63 64>, next: [Object] } },
  tail: { data: <Buffer 65 66 67 68 69 6a 6b>, next: null },
  length: 3 }
Received bytes of data. <Buffer 31 32 33 34 35>  
Received bytes of data. <Buffer 36 37 38 39 30>  
Received bytes of data. <Buffer 61 62 63 64 65>  
Received bytes of data. <Buffer 66 67 68 69 6a>  

针对上面的结果我们有很多信息,也会有很多疑问.那么在解答疑问之前,先了解一下上面涉及到的一个事件readable和一个方法read().

3.1.1. Readable事件

readable事件会在当流中有数据可读的时候触发.在大部分的情况下,监听该事件会导致从内部缓存中读取数据.另外还有一种情况会触发该事件---流数据的末端已经达到,并且在end事件触发之前.

也就是说该事件会表明可写流有新的信息: 要么是有新数据可用要么是已经到达流的末端.在第一种情况下使用stream.read()会返回可用的数据,第二种情况stream.read()会返回null

readable事件的回调函数不会传递任何参数给回调函数.

3.1.2. read([size])方法

该方法用于从内部缓存中读取数据然后返回,如果没有可用数据可读的话,直接返回null.默认数据返回是以Buffer对象的格式返回,除非指定编码或者操作在object模式.

该方法应该只用在paused模式下.在flowing模式下,该方法是直接自动调用的直到内部缓存全部耗光.

然后传递给该方法的size参数,根据不同值或不同场景有对应的不同返回值:

下面的情况前置条件是objectMode=false

  1. 不传size
    如果将上面的例子修改成:
rs.on('readable', () => {  
  let chunk;
  while (null !== (chunk = readable.read())) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
})

那么如果此时调用read()方法不传递size的话,那么将会返回当前可读流中的所有数据,在源码中有所解释:

function howMuchToRead(n, state) {  
  ...
  if (n !== n) { // 等同于(n === null || isNaN(n))
    // Only flow one buffer at a time
    if (state.flowing && state.length)
      return state.buffer.head.data.length;
    else
      return state.length;
  }
  ...
}
  1. n = 0: 在实际应用情况中不建议传递此参数,这种情况下会触发一次底层可读流的刷新但不会消费任何数据.此时返回值永远为null
  2. n > highWaterMark: 这种情况下会修改掉你之前设置的highWaterMark,对应的代码如下:
function howMuchToRead(n, state) {  
  ...
// If we're asking for more than the current hwm, then raise the hwm.
  if (n > state.highWaterMark)
    state.highWaterMark = computeNewHighWaterMark(n);
  ...
}
  1. n > buffer的总数据数: 可以返回null也可以返回现存的buffer所有的数据(条件是可读流已经ended),对应的代码如下:
function howMuchToRead(n, state) {  
  ...
  // Don't have enough
  if (!state.ended) {
    state.needReadable = true;
    return 0;
  }
  return state.length;
}
  1. n < buffer的总数据数: 直接返回n个字节的数据

如果我们将读取数据改成如下:

rs.on('readable', () => {  
  let chunk;
  while (null !== (chunk = readable.read(5))) {
    console.log(`Received ${chunk.length} bytes of data.`);
  }
})

那么打印如下:

BufferList {  
  head:
   { data: <Buffer 31 32 33 34 35 36 37>,
     next: { data: <Buffer 38 39 30 61 62 63 64>, next: [Object] } },
  tail: { data: <Buffer 65 66 67 68 69 6a 6b>, next: null },
  length: 3 }
Received bytes of data. <Buffer 31 32 33 34 35>  
Received bytes of data. <Buffer 36 37 38 39 30>  
Received bytes of data. <Buffer 61 62 63 64 65>  
Received bytes of data. <Buffer 66 67 68 69 6a>  

每次读取5个字节,最后剩余1个字节没有读取出来,根据上面的传参规则可以明显知道这点.

但是当我们设置objectMode=true之后呢?这时就没有那么多种情况,结果如下:

BufferList {  
  head: { data: '1234567', next: { data: '890abcd', next: [Object] } },
  tail: { data: 'efghijk', next: null },
  length: 3 }
Received bytes of data. 1234567  
Received bytes of data. 890abcd  
Received bytes of data. efghijk  

可以发现此时直接忽略size参数,并且是按buffer中list的一块块输出的,并且还帮忙做了编码,貌似这种使用方式大部分适合我们的应用场景.对应的源码处理如下:

function howMuchToRead(n, state) {  
  ...
  if (state.objectMode)
    return 1;
  ...
}
function fromList(n, state) {  
  ...
  if (state.objectMode)
    ret = state.buffer.shift();
  ...
}

现在对objectMode应该清楚了吧?

3.2 flowing模式

根据上面的阐述,flowing模式是自动化的,你可以使用监听data方法来触发,也可以调用pipe方法或者resume方法.我们看一下下面的demo:

const { Readable } = require('stream')

class MyReadable extends Readable {  
  constructor(options) {
    // Calls the stream.Readable(options) constructor
    super(options);
  }
  _read(size) {

  }
}

const rs = new MyReadable()  
// const rs = new MyReadable({ objectMode: true })

rs.on('data', (data) => {  
  console.log('Received data: ', data)
})

rs.push('1234567')

setTimeout(() => {  
  rs.push('890abcd')
}, 1000)

setTimeout(() => {  
  rs.push('efghijk')
}, 2000)

console.log(rs._readableState.buffer)  

该demo输出如下:

BufferList {  
  head: { data: <Buffer 31 32 33 34 35 36 37>, next: null },
  tail: { data: <Buffer 31 32 33 34 35 36 37>, next: null },
  length: 1 }
Received data:  <Buffer 31 32 33 34 35 36 37>  
Received data:  <Buffer 38 39 30 61 62 63 64>  
Received data:  <Buffer 65 66 67 68 69 6a 6b>  

这个demo比较简单,但同时也涉及到一个事件data.另外还有两个方法,读者可以自己修改demo即可.

3.2.1. data事件

当流需要传输数据给消费者的时候会触发该事件.另外无论readable.read()什么时候被调用也会触发该事件.回调函数中会返回对应的数据.

附加一个data的事件侦听器到一个并没有明确paused的流上会切换流为flowing模式,数据一旦可用便会开始传输.

3.2.2. pipe方法和unpipe方法

pipe方法会附加一个Writable流到可读流中,导致可读流自动切换到flowing模式并且push它的所有数据到连接的可写流中去.流中的数据将会自动管理所以目标可写流不会被更快的可读流所淹没.

因为pipe方法返回了目标可写流的引用,所以我们可以直接对其做链式调用,也就可以附加多条可写流,因为在实例化的可读流中就会维护这么一个目标可写流的列表,对应的字段是:

pipes: null, // 存储该可读流建立的所有的管道,如果多于1个,则是一个数组  
pipesCount: 0, // 管道的个数  

默认情况下当可读流触发了end事件之后,将会在目标可写流中调用stream.end(),所以目标可写流不再可写.不过这个行为可以直接禁用掉,只需要传递字段end为false即可让目标可写流保持打开状态.

NOTE 一个最重要的点是如果在可读流处理的时候发生错误,可写流将不会自动关闭掉.如果发生错误,有必要我们自己手动去关掉每一条流以防内存泄漏!

unpipe方法则是将之前连接的可写流剥离掉.如果有指定的可写流则传参进去,不传的话默认剥离所有的.

3.2.3. resume方法

该方法会导致一个明确停止的可读流开始恢复发送data事件并切换为flowing模式.该方法可以用于完全地从流中消耗数据而不需要处理任何数据,比如:

getReadableStreamSomehow()  
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  });

4. 可读流的水位简介

在前文中提及到的两个点:object mode和highwaterMark,Object Mode已经介绍过了,该值会影响数据的编码以及读取方式,大家也见过其效果,应该算是明白一二了。另外一个概念就是水位。可写流的水位不难理解,唯独可读流的水位我始终不明白官网的介绍。因为从源码中我是没有找到可读流对于水位中的处理,下面这个链接是我发起的issue,但是仍然没有得到明确的答复: A question about highWaterMark in readable stream module

抛开可读流水位的介绍,我们从水位中需要清楚两点即可: 1. 可以直接在新建可读流的时候配置水位
2. 修改后的水位并不会一成不变,它会因为你调用read(size)方法发生改变

比如下面的demo:

const { Readable } = require('stream')

class MyReadable extends Readable {  
  constructor(options) {
    // Calls the stream.Readable(options) constructor
    super(options);
  }
  _read(size) {

  }
}

const rs = new MyReadable({ highWaterMark: 20 })

rs.on('readable', () => {  
  const chunk = rs.read(50)
  console.log('hwm has changed!', rs._readableState.highWaterMark)
})
console.log('hwm must be equal to 20!', rs._readableState.highWaterMark)  
rs.push('1234567')  
const r1 = rs.push('890abcd')  
const r2 = rs.push('efghijk')

console.log(`r1=${r1}, r2=${r2}`)

const r3 = rs.push("lmnopqr")

console.log(`r3=${r3}`)  

结果是:

hwm must be equal to 20! 20  
r1=true, r2=false  
r3=false  
hwm has changed! 64  

可以看到我们上面说的两点都分别验证了,第二点的原因在前文已经说过了,就不再赘述。

5. 可读流的其他API简介

除了上面介绍的data/readable事件,还有另外end/error事件没有介绍。

方法除了上面的read()/resume/pipe/unpipe之外,还有isPaused()/pause()/setEncoding(encoding)/unshift(chunk)/wrap(stream)/destroy([error])方法,我们简单介绍一下这些事件和方法。

  1. end事件: 当可读流中没有任何数据可以消费的时候就回触发这个事件
  2. error事件: 当可读流在执行的时候发生错误都会触发这个事件
  3. isPaused()方法: 返回当前可读流的操作状态,是否处于暂停状态。一般不会使用到这个方法。
  4. pause()方法: 调用该方法会导致一个处于flowing模式的流停止触发data事件,并离开flowing模式。所有有效的数据都会滞留在内部缓存中
  5. setEncoding(encoding)方法: 设置从可读流读取的数据的编码方式。默认是Buffer对象,你可以设置为utf-8之类的编码。
  6. unshift(chunk)方法: 该方法是将数据重新返回到缓存中的头部位置上,利用源码中的addToFront字段来操作。
  7. wrap(stream)方法: 该方法是兼容以前老版本的,用的地方基本很少,不用太关注这个。
  8. destroy([error])方法: 该方法用于销毁可读流,然后触发error事件。切记不要重写该方法,而是应该实现readable._destroy

参考

  1. nodejs Stream