blob: 138b54ef98d7a887c5758e49a6cce12c008601d1 [file] [log] [blame]
"use strict";
var fs = require('fs')
, zlib = require('zlib')
, debug = require('debug')('streamroller:BaseRollingFileStream')
, mkdirp = require('mkdirp')
, path = require('path')
, util = require('util')
, stream = require('readable-stream');
module.exports = BaseRollingFileStream;
function BaseRollingFileStream(filename, options) {
debug("In BaseRollingFileStream");
this.filename = filename;
this.options = options || {};
this.options.encoding = this.options.encoding || 'utf8';
this.options.mode = this.options.mode || parseInt('0644', 8);
this.options.flags = this.options.flags || 'a';
this.currentSize = 0;
function currentFileSize(file) {
var fileSize = 0;
try {
fileSize = fs.statSync(file).size;
} catch (e) {
// file does not exist
}
return fileSize;
}
function throwErrorIfArgumentsAreNotValid() {
if (!filename) {
throw new Error("You must specify a filename");
}
}
throwErrorIfArgumentsAreNotValid();
debug("Calling BaseRollingFileStream.super");
BaseRollingFileStream.super_.call(this);
this.openTheStream();
this.currentSize = currentFileSize(this.filename);
}
util.inherits(BaseRollingFileStream, stream.Writable);
BaseRollingFileStream.prototype._writeTheChunk = function(chunk, encoding, callback) {
debug("writing the chunk to the underlying stream");
this.currentSize += chunk.length;
try {
if (!this.theStream.write(chunk,encoding)) {
debug('waiting for drain event');
this.theStream.once('drain',callback);
} else {
process.nextTick(callback);
}
debug("chunk written");
} catch (err) {
debug(err);
if (callback) {
callback(err);
}
}
};
BaseRollingFileStream.prototype._write = function(chunk, encoding, callback) {
debug("in _write");
if (this.shouldRoll()) {
this.currentSize = 0;
this.roll(this.filename, this._writeTheChunk.bind(this, chunk, encoding, callback));
} else {
this._writeTheChunk(chunk, encoding, callback);
}
};
BaseRollingFileStream.prototype.openTheStream = function(cb) {
debug("opening the underlying stream");
var that = this;
mkdirp.sync(path.dirname(this.filename));
this.theStream = fs.createWriteStream(this.filename, this.options);
this.theStream.on('error', function(err) {
that.emit('error', err);
});
if (cb) {
this.theStream.on("open", cb);
}
};
BaseRollingFileStream.prototype.closeTheStream = function(cb) {
debug("closing the underlying stream");
this.theStream.end(cb);
};
BaseRollingFileStream.prototype.compress = function(filename, cb) {
debug('Compressing ', filename, ' -> ', filename, '.gz');
var gzip = zlib.createGzip();
var inp = fs.createReadStream(filename);
var out = fs.createWriteStream(filename+".gz");
inp.pipe(gzip).pipe(out);
out.on('finish', function(err) {
debug('Removing original ', filename);
fs.unlink(filename, cb);
});
};
BaseRollingFileStream.prototype.shouldRoll = function() {
return false; // default behaviour is never to roll
};
BaseRollingFileStream.prototype.roll = function(filename, callback) {
callback(); // default behaviour is not to do anything
};
BaseRollingFileStream.prototype.end = function(chunk, encoding, callback) {
var self = this;
debug('end called - first close myself');
stream.Writable.prototype.end.call(self, function() {
debug('writable end callback, now close underlying stream');
self.theStream.end(chunk, encoding, function(err) {
debug('underlying stream closed');
if (callback) {
callback(err);
}
});
});
};