var stream = require("stream"), | |
constants = require("./constants"), | |
util = require("util"); | |
var ReadableStreamBuffer = module.exports = function(opts) { | |
var that = this; | |
stream.Stream.call(this); | |
opts = opts || {}; | |
var frequency = opts.hasOwnProperty("frequency") ? opts.frequency : constants.DEFAULT_FREQUENCY; | |
var chunkSize = opts.chunkSize || constants.DEFAULT_CHUNK_SIZE; | |
var initialSize = opts.initialSize || constants.DEFAULT_INITIAL_SIZE; | |
var incrementAmount = opts.incrementAmount || constants.DEFAULT_INCREMENT_AMOUNT; | |
var size = 0; | |
var buffer = new Buffer(initialSize); | |
var encoding = null; | |
this.readable = true; | |
this.writable = false; | |
var sendData = function() { | |
if(!size) { | |
that.emit("end"); | |
return; | |
} | |
var amount = Math.min(chunkSize, size); | |
var chunk = null; | |
if(encoding) { | |
chunk = buffer.toString(encoding, 0, amount); | |
} | |
else { | |
chunk = new Buffer(amount); | |
buffer.copy(chunk, 0, 0, amount); | |
} | |
that.emit("data", chunk); | |
if(amount < buffer.length) | |
buffer.copy(buffer, 0, amount, size); | |
size -= amount; | |
}; | |
this.size = function() { | |
return size; | |
}; | |
this.maxSize = function() { | |
return buffer.length; | |
}; | |
var increaseBufferIfNecessary = function(incomingDataSize) { | |
if((buffer.length - size) < incomingDataSize) { | |
var factor = Math.ceil((incomingDataSize - (buffer.length - size)) / incrementAmount); | |
var newBuffer = new Buffer(buffer.length + (incrementAmount * factor)); | |
buffer.copy(newBuffer, 0, 0, size); | |
buffer = newBuffer; | |
} | |
}; | |
this.put = function(data, encoding) { | |
if(!that.readable) return; | |
if(Buffer.isBuffer(data)) { | |
increaseBufferIfNecessary(data.length); | |
data.copy(buffer, size, 0); | |
size += data.length; | |
} | |
else { | |
data = data + ""; | |
var dataSizeInBytes = Buffer.byteLength(data); | |
increaseBufferIfNecessary(dataSizeInBytes); | |
buffer.write(data, size, encoding || "utf8"); | |
size += dataSizeInBytes; | |
} | |
if (!this.isPaused && !frequency) { | |
while (size > 0) { | |
sendData(); | |
} | |
} | |
}; | |
this.pause = function() { | |
this.isPaused = true; | |
if(sendData && sendData.interval) { | |
clearInterval(sendData.interval); | |
delete sendData.interval; | |
} | |
}; | |
this.resume = function() { | |
this.isPaused = false; | |
if(sendData && !sendData.interval && frequency > 0) { | |
sendData.interval = setInterval(sendData, frequency); | |
} | |
}; | |
this.destroy = function() { | |
that.emit("end"); | |
if(sendData.interval) clearTimeout(sendData.interval); | |
sendData = null; | |
that.readable = false; | |
that.emit("close"); | |
}; | |
this.setEncoding = function(_encoding) { | |
encoding = _encoding; | |
}; | |
this.resume(); | |
}; | |
util.inherits(ReadableStreamBuffer, stream.Stream); |