Nodejs流学习系列之二: Writable Stream

林光

前言

上一篇文章Nodejs流学习系列之一: Readable Stream我们简单地介绍了Nodejs的流的概念以及分类.然后详细地解释了可读流的两种模式以及缓存,简单地回顾了可读流的API.这一篇我们将介绍可写流,基本概念上一篇已经提过,就不再赘述,直奔主题吧~

1 Writable流的基本形态

我们使用最最基本的语法新建一个可写流,代码如下:

class MyWritable extends Writable {  
  constructor(options) {
    // Calls the stream.Readable(options) constructor
    super(options);
  }
  _write(chunk, encoding, callback) {
  }
}

const ws = new MyWritable()  

这个实例化后的Writable流,其内部结构是这样的:

MyWritable {  
  _writableState:
   WritableState {
     objectMode: false, // 是否是object mode
     highWaterMark: 16384, // 最高水位,默认是16K
     needDrain: false, // 是否需要发送drain事件
     ending: false, // 是否可写流正在关闭,在调用end()事件之前
     ended: false, // 可写流是否已经停止
     finished: false, // 标识已经发送过finish事件
     decodeStrings: true, // 标识是否我们应该在传递给_write之前编码string为buffer
     defaultEncoding: 'utf8', // 默认编码
     length: 0, // 该字段标识的是等待push到底层socket或者文件的数据大小
     writing: false, // 标识是否正在写缓存到底层资源中
     corked: 0, // 标识是否需要先缓存直到调用uncork()方法才会写到底层资源
     sync: true, // 标识onwrite的cb调用是立即调用还是下一个tick调用
     bufferProcessing: false, // 标识我们是否正在处理之前缓存的items
     onwrite: [Function: bound onwrite], // 这个回调会传递给_write(chunk, cb)的cb参数
     writecb: null, // 用户提供给write(chunk,encoding,cb)的回调
     writelen: 0, // 标识当_write被调用的时候需要写的数据的大小
     bufferedRequest: null, // 等待需要写到底层资源的缓存请求,是一个"链表"
     lastBufferedRequest: null, // 缓存请求的最后一个
     pendingcb: 0, // 用户当前待处理的write回调,该值在finish事件发射之前必须为0
     prefinished: false, // 如果我们等待的唯一一件事就是_write cbs的话就发射`prefinish`事件,该事件供同// 步Transform流使用
     errorEmitted: false, // 标识我们已经发射过错误事件,并且不应该再发射了
     bufferedRequestCount: 0, // 缓存请求的个数
     corkedRequestsFree: // 分配第一个CorkedRequest: {next:null,entry:null,finish:undefined }
      CorkedRequest {
        next: null,
        entry: null,
        finish: [Function: bound onCorkedFinish] } },
  writable: true,
  domain: null,
  _events: {},
  _eventsCount: 0,
  _maxListeners: undefined }

内部结构的每个字段的含义都已经标明,这个时候你可以将其和可读流进行对比,在缓存这块的组织还是基本一样的.

有了内部结构我们再贴上一张可写流的原理:

![]( http://blogimages2016.oss-cn-hangzhou.aliyuncs.com/nodejs/node-stream-3.png)

2 可写流的原理

在讲原理之前我们完善一下上面的demo,如下:

const { Writable } = require('stream')

class MyWritable extends Writable {  
  constructor(options) {
    // Calls the stream.Readable(options) constructor
    super(options);
  }
  _write(chunk, encoding, callback) {
    console.log('we write--', chunk)
    // callback()
  }
}

const ws = new MyWritable()

console.log('writable stream: ', ws)

const ws1 = ws.write('abcdefgh')

console.log('writable stream: ', ws)  
console.log('write buffer return value:', ws1)  
console.log(ws._writableState.getBuffer())

const ws2 = ws.write('ijk')

console.log('writable stream: ', ws)  
console.log('write buffer return value:', ws2)  
console.log(ws._writableState.getBuffer())

const ws3 = ws.write('opq')

console.log('writable stream: ', ws)  
console.log('write buffer return value:', ws3)  
console.log(ws._writableState.getBuffer())  

请关注我们每次写值进去对应的可写流实例的状态变化以及buffer的变化.

结果如下:(结果有点长,为了能够直观看出来就不要介意了~~)

writable stream:  MyWritable {  
  _writableState:
   WritableState {
     objectMode: false,
     highWaterMark: 16384,
     needDrain: false,
     ending: false,
     ended: false,
     finished: false,
     decodeStrings: true,
     defaultEncoding: 'utf8',
     length: 0,
     writing: false,
     corked: 0,
     sync: true,
     bufferProcessing: false,
     onwrite: [Function: bound onwrite],
     writecb: null,
     writelen: 0,
     bufferedRequest: null,
     lastBufferedRequest: null,
     pendingcb: 0,
     prefinished: false,
     errorEmitted: false,
     bufferedRequestCount: 0,
     corkedRequestsFree:
      CorkedRequest {
        next: null,
        entry: null,
        finish: [Function: bound onCorkedFinish] } },
  writable: true,
  domain: null,
  _events: {},
  _eventsCount: 0,
  _maxListeners: undefined }
we write-- <Buffer 61 62 63 64 65 66 67 68>  
writable stream:  MyWritable {  
  _writableState:
   WritableState {
     objectMode: false,
     highWaterMark: 16384,
     needDrain: false,
     ending: false,
     ended: false,
     finished: false,
     decodeStrings: true,
     defaultEncoding: 'utf8',
     length: 8,
     writing: true,
     corked: 0,
     sync: false,
     bufferProcessing: false,
     onwrite: [Function: bound onwrite],
     writecb: [Function: nop],
     writelen: 8,
     bufferedRequest: null,
     lastBufferedRequest: null,
     pendingcb: 1,
     prefinished: false,
     errorEmitted: false,
     bufferedRequestCount: 0,
     corkedRequestsFree:
      CorkedRequest {
        next: null,
        entry: null,
        finish: [Function: bound onCorkedFinish] } },
  writable: true,
  domain: null,
  _events: {},
  _eventsCount: 0,
  _maxListeners: undefined }
write buffer return value: true  
[]
writable stream:  MyWritable {  
  _writableState:
   WritableState {
     objectMode: false,
     highWaterMark: 16384,
     needDrain: false,
     ending: false,
     ended: false,
     finished: false,
     decodeStrings: true,
     defaultEncoding: 'utf8',
     length: 11,
     writing: true,
     corked: 0,
     sync: false,
     bufferProcessing: false,
     onwrite: [Function: bound onwrite],
     writecb: [Function: nop],
     writelen: 8,
     bufferedRequest:
      WriteReq {
        chunk: <Buffer 69 6a 6b>,
        encoding: 'buffer',
        callback: [Function: nop],
        next: null },
     lastBufferedRequest:
      WriteReq {
        chunk: <Buffer 69 6a 6b>,
        encoding: 'buffer',
        callback: [Function: nop],
        next: null },
     pendingcb: 2,
     prefinished: false,
     errorEmitted: false,
     bufferedRequestCount: 1,
     corkedRequestsFree:
      CorkedRequest {
        next: null,
        entry: null,
        finish: [Function: bound onCorkedFinish] } },
  writable: true,
  domain: null,
  _events: {},
  _eventsCount: 0,
  _maxListeners: undefined }
write buffer return value: true  
[ WriteReq {
    chunk: <Buffer 69 6a 6b>,
    encoding: 'buffer',
    callback: [Function: nop],
    next: null } ]
writable stream:  MyWritable {  
  _writableState:
   WritableState {
     objectMode: false,
     highWaterMark: 16384,
     needDrain: false,
     ending: false,
     ended: false,
     finished: false,
     decodeStrings: true,
     defaultEncoding: 'utf8',
     length: 14,
     writing: true,
     corked: 0,
     sync: false,
     bufferProcessing: false,
     onwrite: [Function: bound onwrite],
     writecb: [Function: nop],
     writelen: 8,
     bufferedRequest:
      WriteReq {
        chunk: <Buffer 69 6a 6b>,
        encoding: 'buffer',
        callback: [Function: nop],
        next: [Object] },
     lastBufferedRequest:
      WriteReq {
        chunk: <Buffer 6f 70 71>,
        encoding: 'buffer',
        callback: [Function: nop],
        next: null },
     pendingcb: 3,
     prefinished: false,
     errorEmitted: false,
     bufferedRequestCount: 2,
     corkedRequestsFree:
      CorkedRequest {
        next: null,
        entry: null,
        finish: [Function: bound onCorkedFinish] } },
  writable: true,
  domain: null,
  _events: {},
  _eventsCount: 0,
  _maxListeners: undefined }
write buffer return value: true  
[ WriteReq {
    chunk: <Buffer 69 6a 6b>,
    encoding: 'buffer',
    callback: [Function: nop],
    next:
     WriteReq {
       chunk: <Buffer 6f 70 71>,
       encoding: 'buffer',
       callback: [Function: nop],
       next: null } },
  WriteReq {
    chunk: <Buffer 6f 70 71>,
    encoding: 'buffer',
    callback: [Function: nop],
    next: null } ]

2.1 可写流状态的变化

  1. 第一次打印可读流,大部分的字段都是false或者null
  2. 写入'abcdefgh',这个时候可读流的状态开始发生变化:
    2.1 触发了_write方法,打印了写入的数据 2.2 length/writelen字段都变为8,并且writing=true,pendingcb=1 2.3 write方法返回仍为true 2.4 getBuffer返回的数组为空[]
  3. 写入'ijk',这个时候可写流的状态继续变化:
    3.1 这次没有触发_write方法 3.2 length字段变为11,pendingcb=2 3.3 bufferedRequest/lastBufferedRequest有数据了,结构为{chunk,encoding,callback,next},其中next为null 3.4 bufferedRequestCount=1 3.5 write方法返回仍为true,但是getBuffer返回的不再是空数组
  4. 写入'opq',这个时候的可写流状态有规律可循,基本是在第三个步骤的基础上累加:
    3.1 这次没有触发_write方法 3.2 length字段变为14,pendingcb=3 3.3 bufferedRequest/lastBufferedRequest有数据了,结构为{chunk,encoding,callback,next},并且next有数据了 3.4 bufferedRequestCount=2 3.5 write方法返回仍为true,但是getBuffer返回的不再是空数组

这个时候请问以下问题:

  1. 为什么不再调用_write方法了?
  2. 第一次写入的时候为什么buffer仍然为null?
  3. write方法什么时候会返回false?
  4. writelen为什么一直是8?

想要解答上面的问题,我们需要从源码去解析可写流实现的机制.下图是我画的一个大致的执行流程图

![]( http://blogimages2016.oss-cn-hangzhou.aliyuncs.com/nodejs/node-stream-4.png)

从上面的代码执行流程我们可以清楚地解释上面的四个问题: 1. 因为在write方法中没有调用callback,导致没有调用代码中state.onwrite方法,进而不会更新状态,使writing一直是true
2. 因为第一次是直接调用doWrite方法写到底层资源上,并不会去走缓存的那个判断
3. 当state.length < state.highWaterMark不成立的时候就回返回true
4. 这个也是说明writelen是指的是等待写入底层资源的数据长度。因为没有再触发任何的
write,所以这个值一直是第一次写入的数据的长度

至此我们应该也大致明白了可写流的一些基本逻辑,我们接下去看看buffer的组织。

2.2 buffer的组织

可写流的buffer组织和可读流差不多,都是通过类似队列的形式去组织,形如:

bufferedRequest:  
WriteReq {  
    chunk: <Buffer 69 6a 6b>,
    encoding: 'buffer',
    callback: [Function: nop],
    next: [Object] },
lastBufferedRequest:  
  WriteReq {
    chunk: <Buffer 6f 70 71>,
    encoding: 'buffer',
    callback: [Function: nop],
    next: null },

{chunk: data, next: { chunk: data, next: .....}},然后比可读流多了一个回调函数(callback)和编码(encoding)。组织该buffer的代码如下:

var last = state.lastBufferedRequest;  
state.lastBufferedRequest = {  
  chunk,
  encoding,
  isBuf,
  callback: cb,
  next: null
};
if (last) {  
  last.next = state.lastBufferedRequest;
} else {
  state.bufferedRequest = state.lastBufferedRequest;
}
state.bufferedRequestCount += 1;  

3. 可写流的事件介绍

可写流实现了close/error/drain/finish/pipe/unpipe六种事件.比较生疏的事件是drain

3.1 Drain事件

当你调用stream.write方法的时候返回false,这个时候表明写入的数据超过可写流的阀值.然后等待可写流又能恢复写入数据的时候,该事件将会被出发.

在源码中我们发现有这么一个变量: this.needDrain = false;,该变量标识是否需要发送drain事件. 该变量在下面的条件下会置为true:

function writeOrBuffer(stream, state, isBuf, chunk, encoding, cb) {  
  ...
  // we must ensure that previous needDrain will not be reset to false.
  if (!ret)
    state.needDrain = true;
  ...
}

之后再等到buffer又重新清空的时候将其置为false:

function onwriteDrain(stream, state) {  
  if (state.length === 0 && state.needDrain) {
    state.needDrain = false;
    stream.emit('drain');
  }
}

3.2 其他事件

可写流的写入完成事件是finish,这个与可读流的读完事件end一样,只是名字有所区别而已. 而pipe事件和unpipe事件则是当有可读流将此可写流作为输出的时候会触发此事件.

4。 可写流的水位

水位的判断在writeOrBuffer中判断:

  var ret = state.length < state.highWaterMark;
  // we must ensure that previous needDrain will not be reset to false.
  if (!ret)
    state.needDrain = true;

还记得在上一篇文章说的,可写流的水位只是做为write方法是否返回false的依据,它并不会暂停写数据到缓存中去,所以数据还是会继续保存。因此在代码中你并没有看到直接return的代码。

5. 可写流的方法

可写流的方法有cork/uncork/end/setDefaultEncoding/write/destroy

5.1 cork()/uncork()方法

cork/uncork是组合使用的,前者可以直接强制数据滞留到缓存中,然后当我们再调用uncork或者end的时候再写到具体的底层资源.

这种情况是为了避免我们频繁地写小块的数据到可写流中造成性能的下降,所以我们可以批量写完一批数据后再调用uncork事件来flush数据.在官方文档中建议uncork放在process.nextTick()中延迟调用.

5.2 write(chunk[, encoding][, callback])

该方法用来写指定的数据到可写流中,如果返回true表明内部可用的缓存仍然小于highWaterMark。如果为true的话那就是内部缓存空间不够,此时写入数据就应该暂停直到接收到drain事件。但是并不表示此时你再往可写流写数据会丢数据,Nodejs仍然会缓存所有写入的chunks直到使用完所有的缓存,这个时候将会无条件地终止掉缓存数据.甚至在中断之前,高内存的使用将会导致低效的垃圾回收性能以及高RSS(指的是没有释放内存回系统即使这块内存不再使用).

当可写流没有耗干之前就写数据对于Transform流来说是一个特殊的问题,因为Transform流是默认暂停的直到他有管道输出流或者有data/readable事件处理的时候.

5.3 其他方法

  1. end([chunk][, encoding][, callback])方法: 该方法是用来告知可写流已经没有别的数据可写入了。可选项chunkencoding允许在最后关闭可写流之前再直接写入最后的编码数据。
  2. setDefaultEncoding(encoding)方法: 与可读流的setDefaultEncoding(encoding)方法类似
  3. destroy([error])方法: 与可读流的destroy([error])方法类似

6. 可写流实例需要实现的方法

可读流实例只需要实现一个_read方法,但是可写流可以最多实现三个方法: _write, _writev, _final。我们一一介绍这三个方法都是需要做些什么。

6.1 _write(chunk, encoding, callback)

该方法用来将数据写到底层资源中。其中callback是必须调用以通知写入成功或者出现错误。如果不调用,在之前的例子我们也已经看到其后果了。

如果decodeStrings为true的话,那么chunk也许是一个字符串而不是一个Buffer,encoding将会标识字符串的字符编码。如果为false的话,那么encoding参数可以安全地忽略掉,那么chunk将保留相同的对象。

如果我们想要处理chunk数组的话,那我们需要实现下面的这个方法。

6.2 _writev(chunks, callback)

通过参数我们可以看到该方法是用于一次性处理多个chunk,chunks是一个数组。如果该方法有实现,那么我们会优先使用该方法去实现写缓存到底层资源,因为这是一种更快地方法,具体代码如下:

function clearBuffer(stream, state) {  
  ...
  if (stream._writev && entry && entry.next) {
    // Fast case, write everything using _writev()
    ...
  } else {
    // Slow case, write chunks one-by-one
    ...
  }
}

6.3 _final(callback)

该方法可以在流关闭之前调用。延迟finish事件直到callback方法被调用。这个在我们想要关闭流之前关闭资源或者写入额外数据的时候比较有用。

参考

  1. nodejs Stream