blob: 008de035c2edca32dbb619daf441893e0e548845 [file] [log] [blame]
// write data to it, and it'll emit data in 512 byte blocks.
// if you .end() or .flush(), it'll emit whatever it's got,
// padded with nulls to 512 bytes.
module.exports = BlockStream
var Stream = require("stream").Stream
, inherits = require("inherits")
, assert = require("assert").ok
, debug = process.env.DEBUG ? console.error : function () {}
function BlockStream (size, opt) {
this.writable = this.readable = true
this._opt = opt || {}
this._chunkSize = size || 512
this._offset = 0
this._buffer = []
this._bufferLength = 0
if (this._opt.nopad) this._zeroes = false
else {
this._zeroes = new Buffer(this._chunkSize)
for (var i = 0; i < this._chunkSize; i ++) {
this._zeroes[i] = 0
}
}
}
inherits(BlockStream, Stream)
BlockStream.prototype.write = function (c) {
// debug(" BS write", c)
if (this._ended) throw new Error("BlockStream: write after end")
if (c && !Buffer.isBuffer(c)) c = new Buffer(c + "")
if (c.length) {
this._buffer.push(c)
this._bufferLength += c.length
}
// debug("pushed onto buffer", this._bufferLength)
if (this._bufferLength >= this._chunkSize) {
if (this._paused) {
// debug(" BS paused, return false, need drain")
this._needDrain = true
return false
}
this._emitChunk()
}
return true
}
BlockStream.prototype.pause = function () {
// debug(" BS pausing")
this._paused = true
}
BlockStream.prototype.resume = function () {
// debug(" BS resume")
this._paused = false
return this._emitChunk()
}
BlockStream.prototype.end = function (chunk) {
// debug("end", chunk)
if (typeof chunk === "function") cb = chunk, chunk = null
if (chunk) this.write(chunk)
this._ended = true
this.flush()
}
BlockStream.prototype.flush = function () {
this._emitChunk(true)
}
BlockStream.prototype._emitChunk = function (flush) {
// debug("emitChunk flush=%j emitting=%j paused=%j", flush, this._emitting, this._paused)
// emit a <chunkSize> chunk
if (flush && this._zeroes) {
// debug(" BS push zeroes", this._bufferLength)
// push a chunk of zeroes
var padBytes = (this._bufferLength % this._chunkSize)
if (padBytes !== 0) padBytes = this._chunkSize - padBytes
if (padBytes > 0) {
// debug("padBytes", padBytes, this._zeroes.slice(0, padBytes))
this._buffer.push(this._zeroes.slice(0, padBytes))
this._bufferLength += padBytes
// debug(this._buffer[this._buffer.length - 1].length, this._bufferLength)
}
}
if (this._emitting || this._paused) return
this._emitting = true
// debug(" BS entering loops")
var bufferIndex = 0
while (this._bufferLength >= this._chunkSize &&
(flush || !this._paused)) {
// debug(" BS data emission loop", this._bufferLength)
var out
, outOffset = 0
, outHas = this._chunkSize
while (outHas > 0 && (flush || !this._paused) ) {
// debug(" BS data inner emit loop", this._bufferLength)
var cur = this._buffer[bufferIndex]
, curHas = cur.length - this._offset
// debug("cur=", cur)
// debug("curHas=%j", curHas)
// If it's not big enough to fill the whole thing, then we'll need
// to copy multiple buffers into one. However, if it is big enough,
// then just slice out the part we want, to save unnecessary copying.
// Also, need to copy if we've already done some copying, since buffers
// can't be joined like cons strings.
if (out || curHas < outHas) {
out = out || new Buffer(this._chunkSize)
cur.copy(out, outOffset,
this._offset, this._offset + Math.min(curHas, outHas))
} else if (cur.length === outHas && this._offset === 0) {
// shortcut -- cur is exactly long enough, and no offset.
out = cur
} else {
// slice out the piece of cur that we need.
out = cur.slice(this._offset, this._offset + outHas)
}
if (curHas > outHas) {
// means that the current buffer couldn't be completely output
// update this._offset to reflect how much WAS written
this._offset += outHas
outHas = 0
} else {
// output the entire current chunk.
// toss it away
outHas -= curHas
outOffset += curHas
bufferIndex ++
this._offset = 0
}
}
this._bufferLength -= this._chunkSize
assert(out.length === this._chunkSize)
// debug("emitting data", out)
// debug(" BS emitting, paused=%j", this._paused, this._bufferLength)
this.emit("data", out)
out = null
}
// debug(" BS out of loops", this._bufferLength)
// whatever is left, it's not enough to fill up a block, or we're paused
this._buffer = this._buffer.slice(bufferIndex)
if (this._paused) {
// debug(" BS paused, leaving", this._bufferLength)
this._needsDrain = true
this._emitting = false
return
}
// if flushing, and not using null-padding, then need to emit the last
// chunk(s) sitting in the queue. We know that it's not enough to
// fill up a whole block, because otherwise it would have been emitted
// above, but there may be some offset.
var l = this._buffer.length
if (flush && !this._zeroes && l) {
if (l === 1) {
if (this._offset) {
this.emit("data", this._buffer[0].slice(this._offset))
} else {
this.emit("data", this._buffer[0])
}
} else {
var outHas = this._bufferLength
, out = new Buffer(outHas)
, outOffset = 0
for (var i = 0; i < l; i ++) {
var cur = this._buffer[i]
, curHas = cur.length - this._offset
cur.copy(out, outOffset, this._offset)
this._offset = 0
outOffset += curHas
this._bufferLength -= curHas
}
this.emit("data", out)
}
// truncate
this._buffer.length = 0
this._bufferLength = 0
this._offset = 0
}
// now either drained or ended
// debug("either draining, or ended", this._bufferLength, this._ended)
// means that we've flushed out all that we can so far.
if (this._needDrain) {
// debug("emitting drain", this._bufferLength)
this._needDrain = false
this.emit("drain")
}
if ((this._bufferLength === 0) && this._ended && !this._endEmitted) {
// debug("emitting end", this._bufferLength)
this._endEmitted = true
this.emit("end")
}
this._emitting = false
// debug(" BS no longer emitting", flush, this._paused, this._emitting, this._bufferLength, this._chunkSize)
}