| 'use strict' |
| const MiniPass = require('minipass') |
| const EE = require('events').EventEmitter |
| const fs = require('fs') |
| |
| let writev = fs.writev |
| /* istanbul ignore next */ |
| if (!writev) { |
| // This entire block can be removed if support for earlier than Node.js |
| // 12.9.0 is not needed. |
| const binding = process.binding('fs') |
| const FSReqWrap = binding.FSReqWrap || binding.FSReqCallback |
| |
| writev = (fd, iovec, pos, cb) => { |
| const done = (er, bw) => cb(er, bw, iovec) |
| const req = new FSReqWrap() |
| req.oncomplete = done |
| binding.writeBuffers(fd, iovec, pos, req) |
| } |
| } |
| |
| const _autoClose = Symbol('_autoClose') |
| const _close = Symbol('_close') |
| const _ended = Symbol('_ended') |
| const _fd = Symbol('_fd') |
| const _finished = Symbol('_finished') |
| const _flags = Symbol('_flags') |
| const _flush = Symbol('_flush') |
| const _handleChunk = Symbol('_handleChunk') |
| const _makeBuf = Symbol('_makeBuf') |
| const _mode = Symbol('_mode') |
| const _needDrain = Symbol('_needDrain') |
| const _onerror = Symbol('_onerror') |
| const _onopen = Symbol('_onopen') |
| const _onread = Symbol('_onread') |
| const _onwrite = Symbol('_onwrite') |
| const _open = Symbol('_open') |
| const _path = Symbol('_path') |
| const _pos = Symbol('_pos') |
| const _queue = Symbol('_queue') |
| const _read = Symbol('_read') |
| const _readSize = Symbol('_readSize') |
| const _reading = Symbol('_reading') |
| const _remain = Symbol('_remain') |
| const _size = Symbol('_size') |
| const _write = Symbol('_write') |
| const _writing = Symbol('_writing') |
| const _defaultFlag = Symbol('_defaultFlag') |
| const _errored = Symbol('_errored') |
| |
| class ReadStream extends MiniPass { |
| constructor (path, opt) { |
| opt = opt || {} |
| super(opt) |
| |
| this.readable = true |
| this.writable = false |
| |
| if (typeof path !== 'string') |
| throw new TypeError('path must be a string') |
| |
| this[_errored] = false |
| this[_fd] = typeof opt.fd === 'number' ? opt.fd : null |
| this[_path] = path |
| this[_readSize] = opt.readSize || 16*1024*1024 |
| this[_reading] = false |
| this[_size] = typeof opt.size === 'number' ? opt.size : Infinity |
| this[_remain] = this[_size] |
| this[_autoClose] = typeof opt.autoClose === 'boolean' ? |
| opt.autoClose : true |
| |
| if (typeof this[_fd] === 'number') |
| this[_read]() |
| else |
| this[_open]() |
| } |
| |
| get fd () { return this[_fd] } |
| get path () { return this[_path] } |
| |
| write () { |
| throw new TypeError('this is a readable stream') |
| } |
| |
| end () { |
| throw new TypeError('this is a readable stream') |
| } |
| |
| [_open] () { |
| fs.open(this[_path], 'r', (er, fd) => this[_onopen](er, fd)) |
| } |
| |
| [_onopen] (er, fd) { |
| if (er) |
| this[_onerror](er) |
| else { |
| this[_fd] = fd |
| this.emit('open', fd) |
| this[_read]() |
| } |
| } |
| |
| [_makeBuf] () { |
| return Buffer.allocUnsafe(Math.min(this[_readSize], this[_remain])) |
| } |
| |
| [_read] () { |
| if (!this[_reading]) { |
| this[_reading] = true |
| const buf = this[_makeBuf]() |
| /* istanbul ignore if */ |
| if (buf.length === 0) |
| return process.nextTick(() => this[_onread](null, 0, buf)) |
| fs.read(this[_fd], buf, 0, buf.length, null, (er, br, buf) => |
| this[_onread](er, br, buf)) |
| } |
| } |
| |
| [_onread] (er, br, buf) { |
| this[_reading] = false |
| if (er) |
| this[_onerror](er) |
| else if (this[_handleChunk](br, buf)) |
| this[_read]() |
| } |
| |
| [_close] () { |
| if (this[_autoClose] && typeof this[_fd] === 'number') { |
| const fd = this[_fd] |
| this[_fd] = null |
| fs.close(fd, er => er ? this.emit('error', er) : this.emit('close')) |
| } |
| } |
| |
| [_onerror] (er) { |
| this[_reading] = true |
| this[_close]() |
| this.emit('error', er) |
| } |
| |
| [_handleChunk] (br, buf) { |
| let ret = false |
| // no effect if infinite |
| this[_remain] -= br |
| if (br > 0) |
| ret = super.write(br < buf.length ? buf.slice(0, br) : buf) |
| |
| if (br === 0 || this[_remain] <= 0) { |
| ret = false |
| this[_close]() |
| super.end() |
| } |
| |
| return ret |
| } |
| |
| emit (ev, data) { |
| switch (ev) { |
| case 'prefinish': |
| case 'finish': |
| break |
| |
| case 'drain': |
| if (typeof this[_fd] === 'number') |
| this[_read]() |
| break |
| |
| case 'error': |
| if (this[_errored]) |
| return |
| this[_errored] = true |
| return super.emit(ev, data) |
| |
| default: |
| return super.emit(ev, data) |
| } |
| } |
| } |
| |
| class ReadStreamSync extends ReadStream { |
| [_open] () { |
| let threw = true |
| try { |
| this[_onopen](null, fs.openSync(this[_path], 'r')) |
| threw = false |
| } finally { |
| if (threw) |
| this[_close]() |
| } |
| } |
| |
| [_read] () { |
| let threw = true |
| try { |
| if (!this[_reading]) { |
| this[_reading] = true |
| do { |
| const buf = this[_makeBuf]() |
| /* istanbul ignore next */ |
| const br = buf.length === 0 ? 0 |
| : fs.readSync(this[_fd], buf, 0, buf.length, null) |
| if (!this[_handleChunk](br, buf)) |
| break |
| } while (true) |
| this[_reading] = false |
| } |
| threw = false |
| } finally { |
| if (threw) |
| this[_close]() |
| } |
| } |
| |
| [_close] () { |
| if (this[_autoClose] && typeof this[_fd] === 'number') { |
| const fd = this[_fd] |
| this[_fd] = null |
| fs.closeSync(fd) |
| this.emit('close') |
| } |
| } |
| } |
| |
| class WriteStream extends EE { |
| constructor (path, opt) { |
| opt = opt || {} |
| super(opt) |
| this.readable = false |
| this.writable = true |
| this[_errored] = false |
| this[_writing] = false |
| this[_ended] = false |
| this[_needDrain] = false |
| this[_queue] = [] |
| this[_path] = path |
| this[_fd] = typeof opt.fd === 'number' ? opt.fd : null |
| this[_mode] = opt.mode === undefined ? 0o666 : opt.mode |
| this[_pos] = typeof opt.start === 'number' ? opt.start : null |
| this[_autoClose] = typeof opt.autoClose === 'boolean' ? |
| opt.autoClose : true |
| |
| // truncating makes no sense when writing into the middle |
| const defaultFlag = this[_pos] !== null ? 'r+' : 'w' |
| this[_defaultFlag] = opt.flags === undefined |
| this[_flags] = this[_defaultFlag] ? defaultFlag : opt.flags |
| |
| if (this[_fd] === null) |
| this[_open]() |
| } |
| |
| emit (ev, data) { |
| if (ev === 'error') { |
| if (this[_errored]) |
| return |
| this[_errored] = true |
| } |
| return super.emit(ev, data) |
| } |
| |
| |
| get fd () { return this[_fd] } |
| get path () { return this[_path] } |
| |
| [_onerror] (er) { |
| this[_close]() |
| this[_writing] = true |
| this.emit('error', er) |
| } |
| |
| [_open] () { |
| fs.open(this[_path], this[_flags], this[_mode], |
| (er, fd) => this[_onopen](er, fd)) |
| } |
| |
| [_onopen] (er, fd) { |
| if (this[_defaultFlag] && |
| this[_flags] === 'r+' && |
| er && er.code === 'ENOENT') { |
| this[_flags] = 'w' |
| this[_open]() |
| } else if (er) |
| this[_onerror](er) |
| else { |
| this[_fd] = fd |
| this.emit('open', fd) |
| this[_flush]() |
| } |
| } |
| |
| end (buf, enc) { |
| if (buf) |
| this.write(buf, enc) |
| |
| this[_ended] = true |
| |
| // synthetic after-write logic, where drain/finish live |
| if (!this[_writing] && !this[_queue].length && |
| typeof this[_fd] === 'number') |
| this[_onwrite](null, 0) |
| return this |
| } |
| |
| write (buf, enc) { |
| if (typeof buf === 'string') |
| buf = Buffer.from(buf, enc) |
| |
| if (this[_ended]) { |
| this.emit('error', new Error('write() after end()')) |
| return false |
| } |
| |
| if (this[_fd] === null || this[_writing] || this[_queue].length) { |
| this[_queue].push(buf) |
| this[_needDrain] = true |
| return false |
| } |
| |
| this[_writing] = true |
| this[_write](buf) |
| return true |
| } |
| |
| [_write] (buf) { |
| fs.write(this[_fd], buf, 0, buf.length, this[_pos], (er, bw) => |
| this[_onwrite](er, bw)) |
| } |
| |
| [_onwrite] (er, bw) { |
| if (er) |
| this[_onerror](er) |
| else { |
| if (this[_pos] !== null) |
| this[_pos] += bw |
| if (this[_queue].length) |
| this[_flush]() |
| else { |
| this[_writing] = false |
| |
| if (this[_ended] && !this[_finished]) { |
| this[_finished] = true |
| this[_close]() |
| this.emit('finish') |
| } else if (this[_needDrain]) { |
| this[_needDrain] = false |
| this.emit('drain') |
| } |
| } |
| } |
| } |
| |
| [_flush] () { |
| if (this[_queue].length === 0) { |
| if (this[_ended]) |
| this[_onwrite](null, 0) |
| } else if (this[_queue].length === 1) |
| this[_write](this[_queue].pop()) |
| else { |
| const iovec = this[_queue] |
| this[_queue] = [] |
| writev(this[_fd], iovec, this[_pos], |
| (er, bw) => this[_onwrite](er, bw)) |
| } |
| } |
| |
| [_close] () { |
| if (this[_autoClose] && typeof this[_fd] === 'number') { |
| const fd = this[_fd] |
| this[_fd] = null |
| fs.close(fd, er => er ? this.emit('error', er) : this.emit('close')) |
| } |
| } |
| } |
| |
| class WriteStreamSync extends WriteStream { |
| [_open] () { |
| let fd |
| // only wrap in a try{} block if we know we'll retry, to avoid |
| // the rethrow obscuring the error's source frame in most cases. |
| if (this[_defaultFlag] && this[_flags] === 'r+') { |
| try { |
| fd = fs.openSync(this[_path], this[_flags], this[_mode]) |
| } catch (er) { |
| if (er.code === 'ENOENT') { |
| this[_flags] = 'w' |
| return this[_open]() |
| } else |
| throw er |
| } |
| } else |
| fd = fs.openSync(this[_path], this[_flags], this[_mode]) |
| |
| this[_onopen](null, fd) |
| } |
| |
| [_close] () { |
| if (this[_autoClose] && typeof this[_fd] === 'number') { |
| const fd = this[_fd] |
| this[_fd] = null |
| fs.closeSync(fd) |
| this.emit('close') |
| } |
| } |
| |
| [_write] (buf) { |
| // throw the original, but try to close if it fails |
| let threw = true |
| try { |
| this[_onwrite](null, |
| fs.writeSync(this[_fd], buf, 0, buf.length, this[_pos])) |
| threw = false |
| } finally { |
| if (threw) |
| try { this[_close]() } catch (_) {} |
| } |
| } |
| } |
| |
| exports.ReadStream = ReadStream |
| exports.ReadStreamSync = ReadStreamSync |
| |
| exports.WriteStream = WriteStream |
| exports.WriteStreamSync = WriteStreamSync |