| "use strict"; |
| var fs = require('fs') |
| , zlib = require('zlib') |
| , debug = require('debug')('streamroller:BaseRollingFileStream') |
| , mkdirp = require('mkdirp') |
| , path = require('path') |
| , util = require('util') |
| , stream = require('readable-stream'); |
| |
| module.exports = BaseRollingFileStream; |
| |
| function BaseRollingFileStream(filename, options) { |
| debug("In BaseRollingFileStream"); |
| this.filename = filename; |
| this.options = options || {}; |
| this.options.encoding = this.options.encoding || 'utf8'; |
| this.options.mode = this.options.mode || parseInt('0644', 8); |
| this.options.flags = this.options.flags || 'a'; |
| |
| this.currentSize = 0; |
| |
| function currentFileSize(file) { |
| var fileSize = 0; |
| try { |
| fileSize = fs.statSync(file).size; |
| } catch (e) { |
| // file does not exist |
| } |
| return fileSize; |
| } |
| |
| function throwErrorIfArgumentsAreNotValid() { |
| if (!filename) { |
| throw new Error("You must specify a filename"); |
| } |
| } |
| |
| throwErrorIfArgumentsAreNotValid(); |
| debug("Calling BaseRollingFileStream.super"); |
| BaseRollingFileStream.super_.call(this); |
| this.openTheStream(); |
| this.currentSize = currentFileSize(this.filename); |
| } |
| util.inherits(BaseRollingFileStream, stream.Writable); |
| |
| BaseRollingFileStream.prototype._writeTheChunk = function(chunk, encoding, callback) { |
| debug("writing the chunk to the underlying stream"); |
| this.currentSize += chunk.length; |
| try { |
| if (!this.theStream.write(chunk,encoding)) { |
| debug('waiting for drain event'); |
| this.theStream.once('drain',callback); |
| } else { |
| process.nextTick(callback); |
| } |
| debug("chunk written"); |
| } catch (err) { |
| debug(err); |
| if (callback) { |
| callback(err); |
| } |
| } |
| }; |
| |
| BaseRollingFileStream.prototype._write = function(chunk, encoding, callback) { |
| debug("in _write"); |
| |
| if (this.shouldRoll()) { |
| this.currentSize = 0; |
| this.roll(this.filename, this._writeTheChunk.bind(this, chunk, encoding, callback)); |
| } else { |
| this._writeTheChunk(chunk, encoding, callback); |
| } |
| }; |
| |
| BaseRollingFileStream.prototype.openTheStream = function(cb) { |
| debug("opening the underlying stream"); |
| var that = this; |
| mkdirp.sync(path.dirname(this.filename)); |
| this.theStream = fs.createWriteStream(this.filename, this.options); |
| this.theStream.on('error', function(err) { |
| that.emit('error', err); |
| }); |
| if (cb) { |
| this.theStream.on("open", cb); |
| } |
| }; |
| |
| BaseRollingFileStream.prototype.closeTheStream = function(cb) { |
| debug("closing the underlying stream"); |
| this.theStream.end(cb); |
| }; |
| |
| BaseRollingFileStream.prototype.compress = function(filename, cb) { |
| debug('Compressing ', filename, ' -> ', filename, '.gz'); |
| var gzip = zlib.createGzip(); |
| var inp = fs.createReadStream(filename); |
| var out = fs.createWriteStream(filename+".gz"); |
| inp.pipe(gzip).pipe(out); |
| |
| out.on('finish', function(err) { |
| debug('Removing original ', filename); |
| fs.unlink(filename, cb); |
| }); |
| }; |
| |
| BaseRollingFileStream.prototype.shouldRoll = function() { |
| return false; // default behaviour is never to roll |
| }; |
| |
| BaseRollingFileStream.prototype.roll = function(filename, callback) { |
| callback(); // default behaviour is not to do anything |
| }; |
| |
| BaseRollingFileStream.prototype.end = function(chunk, encoding, callback) { |
| var self = this; |
| debug('end called - first close myself'); |
| stream.Writable.prototype.end.call(self, function() { |
| debug('writable end callback, now close underlying stream'); |
| self.theStream.end(chunk, encoding, function(err) { |
| debug('underlying stream closed'); |
| if (callback) { |
| callback(err); |
| } |
| }); |
| }); |
| }; |