blob: dca4ff70ef1ba73340abfdcf9adb2888355fe14a [file] [log] [blame]
/* Copyright (c) 2012-2014 LevelUP contributors
* See list at <https://github.com/rvagg/node-levelup#contributing>
* MIT License <https://github.com/rvagg/node-levelup/blob/master/LICENSE.md>
*/
// NOTE: we are fixed to readable-stream@1.0.x for now
// for pure Streams2 across Node versions
import ReadableStreamCore from 'readable-stream';
var Readable = ReadableStreamCore.Readable;
function createClass(parent, init) {
let klass = function (...args) {
if (!(this instanceof klass)) {
return new klass(...args);
}
init.apply(this, args);
};
klass.prototype = Object.create(parent.prototype, {
constructor: { value: klass }
});
return klass;
}
class ReadStreamInternal extends Readable {
constructor(options, makeData) {
super({ objectMode: true, highWaterMark: options.highWaterMark });
this._setup(options, makeData);
}
_setup(options, makeData) {
super.constructor({ objectMode: true, highWaterMark: options.highWaterMark });
// purely to keep `db` around until we're done so it's not GCed if the user doesn't keep a ref
this._waiting = false;
this._options = options;
this._makeData = makeData;
}
setIterator(it) {
this._iterator = it;
/* istanbul ignore if */
if (this._destroyed) {
return it.end(function () {});
}
/* istanbul ignore if */
if (this._waiting) {
this._waiting = false;
return this._read();
}
return this;
}
_cleanup(err) {
if (this._destroyed) {
return;
}
this._destroyed = true;
var self = this;
/* istanbul ignore if */
if (err && err.message !== 'iterator has ended') {
self.emit('error', err);
}
/* istanbul ignore else */
if (self._iterator) {
self._iterator.end(function () {
self._iterator = null;
self.emit('close');
});
} else {
self.emit('close');
}
}
destroy() {
this._cleanup();
}
_read() {
var self = this;
/* istanbul ignore if */
if (self._destroyed) {
return;
}
/* istanbul ignore if */
if (!self._iterator) {
return this._waiting = true;
}
self._iterator.next(function (err, key, value) {
if (err || (key === undefined && value === undefined)) {
if (!err && !self._destroyed) {
self.push(null);
}
return self._cleanup(err);
}
value = self._makeData(key, value);
if (!self._destroyed) {
self.push(value);
}
});
}
}
const ReadStream = createClass(ReadStreamInternal, function (options, makeData) {
ReadStreamInternal.prototype._setup.call(this, options, makeData);
});
export default ReadStream;