blob: 38d46cacd617d9ea819d33a61ac887fff884a978 [file] [log] [blame]
'use strict';
var STREAM = require('stream'),
UTIL = require('util'),
StringDecoder = require('string_decoder').StringDecoder;
function MemoryReadableStream(data, options) {
if (!(this instanceof MemoryReadableStream))
return new MemoryReadableStream(data, options);
MemoryReadableStream.super_.call(this, options);
this.init(data, options);
}
UTIL.inherits(MemoryReadableStream, STREAM.Readable);
function MemoryWritableStream(data, options) {
if (!(this instanceof MemoryWritableStream))
return new MemoryWritableStream(data, options);
MemoryWritableStream.super_.call(this, options);
this.init(data, options);
}
UTIL.inherits(MemoryWritableStream, STREAM.Writable);
function MemoryDuplexStream(data, options) {
if (!(this instanceof MemoryDuplexStream))
return new MemoryDuplexStream(data, options);
MemoryDuplexStream.super_.call(this, options);
this.init(data, options);
}
UTIL.inherits(MemoryDuplexStream, STREAM.Duplex);
MemoryReadableStream.prototype.init =
MemoryWritableStream.prototype.init =
MemoryDuplexStream.prototype.init = function init (data, options) {
var self = this;
this.queue = [];
if (data) {
if (!Array.isArray(data)) {
data = [ data ];
}
data.forEach(function (chunk) {
if (!(chunk instanceof Buffer)) {
chunk = new Buffer(chunk);
}
self.queue.push(chunk);
});
}
options = options || {};
this.maxbufsize = options.hasOwnProperty('maxbufsize') ? options.maxbufsize
: null;
this.bufoverflow = options.hasOwnProperty('bufoverflow') ? options.bufoverflow
: null;
this.frequence = options.hasOwnProperty('frequence') ? options.frequence
: null;
};
function MemoryStream (data, options) {
if (!(this instanceof MemoryStream))
return new MemoryStream(data, options);
options = options || {};
var readable = options.hasOwnProperty('readable') ? options.readable : true,
writable = options.hasOwnProperty('writable') ? options.writable : true;
if (readable && writable) {
return new MemoryDuplexStream(data, options);
} else if (readable) {
return new MemoryReadableStream(data, options);
} else if (writable) {
return new MemoryWritableStream(data, options);
} else {
throw new Error("Unknown stream type Readable, Writable or Duplex ");
}
}
MemoryStream.createReadStream = function (data, options) {
options = options || {};
options.readable = true;
options.writable = false;
return new MemoryStream(data, options);
};
MemoryStream.createWriteStream = function (data, options) {
options = options || {};
options.readable = false;
options.writable = true;
return new MemoryStream(data, options);
};
MemoryReadableStream.prototype._read =
MemoryDuplexStream.prototype._read = function _read (n) {
var self = this,
frequence = self.frequence || 0,
wait_data = this instanceof STREAM.Duplex && ! this._writableState.finished ? true : false;
if ( ! this.queue.length && ! wait_data) {
this.push(null);// finish stream
} else if (this.queue.length) {
setTimeout(function () {
if (self.queue.length) {
var chunk = self.queue.shift();
if (chunk && ! self._readableState.ended) {
if ( ! self.push(chunk) ) {
self.queue.unshift(chunk);
}
}
}
}, frequence);
}
};
MemoryWritableStream.prototype._write =
MemoryDuplexStream.prototype._write = function _write (chunk, encoding, cb) {
var decoder = null;
try {
decoder = this.decodeStrings && encoding ? new StringDecoder(encoding) : null;
} catch (err){
return cb(err);
}
var decoded_chunk = decoder ? decoder.write(chunk) : chunk,
queue_size = this._getQueueSize(),
chunk_size = decoded_chunk.length;
if (this.maxbufsize && (queue_size + chunk_size) > this.maxbufsize ) {
if (this.bufoverflow) {
return cb("Buffer overflowed (" + this.bufoverflow + "/" + queue_size + ")");
} else {
return cb();
}
}
if (this instanceof STREAM.Duplex) {
while (this.queue.length) {
this.push(this.queue.shift());
}
this.push(decoded_chunk);
} else {
this.queue.push(decoded_chunk);
}
cb();
};
MemoryDuplexStream.prototype.end = function (chunk, encoding, cb) {
var self = this;
return MemoryDuplexStream.super_.prototype.end.call(this, chunk, encoding, function () {
self.push(null);//finish readble stream too
if (cb) cb();
});
};
MemoryReadableStream.prototype._getQueueSize =
MemoryWritableStream.prototype._getQueueSize =
MemoryDuplexStream.prototype._getQueueSize = function () {
var queuesize = 0, i;
for (i = 0; i < this.queue.length; i++) {
queuesize += Array.isArray(this.queue[i]) ? this.queue[i][0].length
: this.queue[i].length;
}
return queuesize;
};
MemoryWritableStream.prototype.toString =
MemoryDuplexStream.prototype.toString =
MemoryReadableStream.prototype.toString =
MemoryWritableStream.prototype.getAll =
MemoryDuplexStream.prototype.getAll =
MemoryReadableStream.prototype.getAll = function () {
var self = this,
ret = '';
this.queue.forEach(function (data) {
ret += data;
});
return ret;
};
MemoryWritableStream.prototype.toBuffer =
MemoryDuplexStream.prototype.toBuffer =
MemoryReadableStream.prototype.toBuffer = function () {
var buffer = new Buffer(this._getQueueSize()),
currentOffset = 0;
this.queue.forEach(function (data) {
var data_buffer = data instanceof Buffer ? data : new Buffer(data);
data_buffer.copy(buffer, currentOffset);
currentOffset += data.length;
});
return buffer;
};
module.exports = MemoryStream;