blob: 1badca17bcba9498f92a090c6225bc881e2e3c5e [file] [log] [blame]
/* Copyright (c) 2012-2014 LevelUP contributors
* See list at <https://github.com/rvagg/node-levelup#contributing>
* MIT License <https://github.com/rvagg/node-levelup/blob/master/LICENSE.md>
*/
// NOTE: we are fixed to readable-stream@1.0.x for now
// for pure Streams2 across Node versions
import ReadableStreamCore from 'readable-stream';
import inherits from 'inherits';
var Readable = ReadableStreamCore.Readable;
function ReadStream(options, makeData) {
if (!(this instanceof ReadStream)) {
return new ReadStream(options, makeData);
}
Readable.call(this, { objectMode: true, highWaterMark: options.highWaterMark });
// purely to keep `db` around until we're done so it's not GCed if the user doesn't keep a ref
this._waiting = false;
this._options = options;
this._makeData = makeData;
}
inherits(ReadStream, Readable);
ReadStream.prototype.setIterator = function (it) {
this._iterator = it;
/* istanbul ignore if */
if (this._destroyed) {
return it.end(function () {});
}
/* istanbul ignore if */
if (this._waiting) {
this._waiting = false;
return this._read();
}
return this;
};
ReadStream.prototype._read = function read() {
var self = this;
/* istanbul ignore if */
if (self._destroyed) {
return;
}
/* istanbul ignore if */
if (!self._iterator) {
return this._waiting = true;
}
self._iterator.next(function (err, key, value) {
if (err || (key === undefined && value === undefined)) {
if (!err && !self._destroyed) {
self.push(null);
}
return self._cleanup(err);
}
value = self._makeData(key, value);
if (!self._destroyed) {
self.push(value);
}
});
};
ReadStream.prototype._cleanup = function (err) {
if (this._destroyed) {
return;
}
this._destroyed = true;
var self = this;
/* istanbul ignore if */
if (err) {
self.emit('error', err);
}
/* istanbul ignore else */
if (self._iterator) {
self._iterator.end(function () {
self._iterator = null;
self.emit('close');
});
} else {
self.emit('close');
}
};
ReadStream.prototype.destroy = function () {
this._cleanup();
};
export default ReadStream;