| 'use strict'; |
| |
| const { Duplex } = require('stream'); |
| |
| /** |
| * Emits the `'close'` event on a stream. |
| * |
| * @param {Duplex} stream The stream. |
| * @private |
| */ |
| function emitClose(stream) { |
| stream.emit('close'); |
| } |
| |
| /** |
| * The listener of the `'end'` event. |
| * |
| * @private |
| */ |
| function duplexOnEnd() { |
| if (!this.destroyed && this._writableState.finished) { |
| this.destroy(); |
| } |
| } |
| |
| /** |
| * The listener of the `'error'` event. |
| * |
| * @param {Error} err The error |
| * @private |
| */ |
| function duplexOnError(err) { |
| this.removeListener('error', duplexOnError); |
| this.destroy(); |
| if (this.listenerCount('error') === 0) { |
| // Do not suppress the throwing behavior. |
| this.emit('error', err); |
| } |
| } |
| |
| /** |
| * Wraps a `WebSocket` in a duplex stream. |
| * |
| * @param {WebSocket} ws The `WebSocket` to wrap |
| * @param {Object} [options] The options for the `Duplex` constructor |
| * @return {Duplex} The duplex stream |
| * @public |
| */ |
| function createWebSocketStream(ws, options) { |
| let resumeOnReceiverDrain = true; |
| let terminateOnDestroy = true; |
| |
| function receiverOnDrain() { |
| if (resumeOnReceiverDrain) ws._socket.resume(); |
| } |
| |
| if (ws.readyState === ws.CONNECTING) { |
| ws.once('open', function open() { |
| ws._receiver.removeAllListeners('drain'); |
| ws._receiver.on('drain', receiverOnDrain); |
| }); |
| } else { |
| ws._receiver.removeAllListeners('drain'); |
| ws._receiver.on('drain', receiverOnDrain); |
| } |
| |
| const duplex = new Duplex({ |
| ...options, |
| autoDestroy: false, |
| emitClose: false, |
| objectMode: false, |
| writableObjectMode: false |
| }); |
| |
| ws.on('message', function message(msg) { |
| if (!duplex.push(msg)) { |
| resumeOnReceiverDrain = false; |
| ws._socket.pause(); |
| } |
| }); |
| |
| ws.once('error', function error(err) { |
| if (duplex.destroyed) return; |
| |
| // Prevent `ws.terminate()` from being called by `duplex._destroy()`. |
| // |
| // - If the `'error'` event is emitted before the `'open'` event, then |
| // `ws.terminate()` is a noop as no socket is assigned. |
| // - Otherwise, the error is re-emitted by the listener of the `'error'` |
| // event of the `Receiver` object. The listener already closes the |
| // connection by calling `ws.close()`. This allows a close frame to be |
| // sent to the other peer. If `ws.terminate()` is called right after this, |
| // then the close frame might not be sent. |
| terminateOnDestroy = false; |
| duplex.destroy(err); |
| }); |
| |
| ws.once('close', function close() { |
| if (duplex.destroyed) return; |
| |
| duplex.push(null); |
| }); |
| |
| duplex._destroy = function (err, callback) { |
| if (ws.readyState === ws.CLOSED) { |
| callback(err); |
| process.nextTick(emitClose, duplex); |
| return; |
| } |
| |
| let called = false; |
| |
| ws.once('error', function error(err) { |
| called = true; |
| callback(err); |
| }); |
| |
| ws.once('close', function close() { |
| if (!called) callback(err); |
| process.nextTick(emitClose, duplex); |
| }); |
| |
| if (terminateOnDestroy) ws.terminate(); |
| }; |
| |
| duplex._final = function (callback) { |
| if (ws.readyState === ws.CONNECTING) { |
| ws.once('open', function open() { |
| duplex._final(callback); |
| }); |
| return; |
| } |
| |
| // If the value of the `_socket` property is `null` it means that `ws` is a |
| // client websocket and the handshake failed. In fact, when this happens, a |
| // socket is never assigned to the websocket. Wait for the `'error'` event |
| // that will be emitted by the websocket. |
| if (ws._socket === null) return; |
| |
| if (ws._socket._writableState.finished) { |
| callback(); |
| if (duplex._readableState.endEmitted) duplex.destroy(); |
| } else { |
| ws._socket.once('finish', function finish() { |
| // `duplex` is not destroyed here because the `'end'` event will be |
| // emitted on `duplex` after this `'finish'` event. The EOF signaling |
| // `null` chunk is, in fact, pushed when the websocket emits `'close'`. |
| callback(); |
| }); |
| ws.close(); |
| } |
| }; |
| |
| duplex._read = function () { |
| if ( |
| (ws.readyState === ws.OPEN || ws.readyState === ws.CLOSING) && |
| !resumeOnReceiverDrain |
| ) { |
| resumeOnReceiverDrain = true; |
| if (!ws._receiver._writableState.needDrain) ws._socket.resume(); |
| } |
| }; |
| |
| duplex._write = function (chunk, encoding, callback) { |
| if (ws.readyState === ws.CONNECTING) { |
| ws.once('open', function open() { |
| duplex._write(chunk, encoding, callback); |
| }); |
| return; |
| } |
| |
| ws.send(chunk, callback); |
| }; |
| |
| duplex.on('end', duplexOnEnd); |
| duplex.on('error', duplexOnError); |
| return duplex; |
| } |
| |
| module.exports = createWebSocketStream; |