| const debug = require("debug")("streamroller:RollingFileWriteStream"); |
| const fs = require("fs-extra"); |
| const path = require("path"); |
| const newNow = require("./now"); |
| const format = require("date-format"); |
| const { Writable } = require("stream"); |
| const fileNameFormatter = require("./fileNameFormatter"); |
| const fileNameParser = require("./fileNameParser"); |
| const moveAndMaybeCompressFile = require("./moveAndMaybeCompressFile"); |
| |
| /** |
| * RollingFileWriteStream is mainly used when writing to a file rolling by date or size. |
| * RollingFileWriteStream inherits from stream.Writable |
| */ |
| class RollingFileWriteStream extends Writable { |
| /** |
| * Create a RollingFileWriteStream |
| * @constructor |
| * @param {string} filePath - The file path to write. |
| * @param {object} options - The extra options |
| * @param {number} options.numToKeep - The max numbers of files to keep. |
| * @param {number} options.maxSize - The maxSize one file can reach. Unit is Byte. |
| * This should be more than 1024. The default is Number.MAX_SAFE_INTEGER. |
| * @param {string} options.mode - The mode of the files. The default is '0644'. Refer to stream.writable for more. |
| * @param {string} options.flags - The default is 'a'. Refer to stream.flags for more. |
| * @param {boolean} options.compress - Whether to compress backup files. |
| * @param {boolean} options.keepFileExt - Whether to keep the file extension. |
| * @param {string} options.pattern - The date string pattern in the file name. |
| * @param {boolean} options.alwaysIncludePattern - Whether to add date to the name of the first file. |
| */ |
| constructor(filePath, options) { |
| debug(`constructor: creating RollingFileWriteStream. path=${filePath}`); |
| super(options); |
| this.options = this._parseOption(options); |
| this.fileObject = path.parse(filePath); |
| if (this.fileObject.dir === "") { |
| this.fileObject = path.parse(path.join(process.cwd(), filePath)); |
| } |
| this.fileFormatter = fileNameFormatter({ |
| file: this.fileObject, |
| alwaysIncludeDate: this.options.alwaysIncludePattern, |
| needsIndex: this.options.maxSize < Number.MAX_SAFE_INTEGER, |
| compress: this.options.compress, |
| keepFileExt: this.options.keepFileExt |
| }); |
| |
| this.fileNameParser = fileNameParser({ |
| file: this.fileObject, |
| keepFileExt: this.options.keepFileExt, |
| pattern: this.options.pattern |
| }); |
| |
| this.state = { |
| currentSize: 0 |
| }; |
| |
| if (this.options.pattern) { |
| this.state.currentDate = format(this.options.pattern, newNow()); |
| } |
| |
| this.filename = this.fileFormatter({ |
| index: 0, |
| date: this.state.currentDate |
| }); |
| if (["a", "a+", "as", "as+"].includes(this.options.flags)) { |
| this._setExistingSizeAndDate(); |
| } |
| |
| debug( |
| `constructor: create new file ${this.filename}, state=${JSON.stringify( |
| this.state |
| )}` |
| ); |
| this._renewWriteStream(); |
| } |
| |
| _setExistingSizeAndDate() { |
| try { |
| const stats = fs.statSync(this.filename); |
| this.state.currentSize = stats.size; |
| if (this.options.pattern) { |
| this.state.currentDate = format(this.options.pattern, stats.mtime); |
| } |
| } catch (e) { |
| //file does not exist, that's fine - move along |
| return; |
| } |
| } |
| |
| _parseOption(rawOptions) { |
| const defaultOptions = { |
| maxSize: Number.MAX_SAFE_INTEGER, |
| numToKeep: Number.MAX_SAFE_INTEGER, |
| encoding: "utf8", |
| mode: parseInt("0644", 8), |
| flags: "a", |
| compress: false, |
| keepFileExt: false, |
| alwaysIncludePattern: false |
| }; |
| const options = Object.assign({}, defaultOptions, rawOptions); |
| if (options.maxSize <= 0) { |
| throw new Error(`options.maxSize (${options.maxSize}) should be > 0`); |
| } |
| if (options.numToKeep <= 0) { |
| throw new Error(`options.numToKeep (${options.numToKeep}) should be > 0`); |
| } |
| debug( |
| `_parseOption: creating stream with option=${JSON.stringify(options)}` |
| ); |
| return options; |
| } |
| |
| _final(callback) { |
| this.currentFileStream.end("", this.options.encoding, callback); |
| } |
| |
| _write(chunk, encoding, callback) { |
| this._shouldRoll().then(() => { |
| debug( |
| `_write: writing chunk. ` + |
| `file=${this.currentFileStream.path} ` + |
| `state=${JSON.stringify(this.state)} ` + |
| `chunk=${chunk}` |
| ); |
| this.currentFileStream.write(chunk, encoding, e => { |
| this.state.currentSize += chunk.length; |
| callback(e); |
| }); |
| }); |
| } |
| |
| async _shouldRoll() { |
| if (this._dateChanged() || this._tooBig()) { |
| debug( |
| `_shouldRoll: rolling because dateChanged? ${this._dateChanged()} or tooBig? ${this._tooBig()}` |
| ); |
| await this._roll(); |
| } |
| } |
| |
| _dateChanged() { |
| return ( |
| this.state.currentDate && |
| this.state.currentDate !== format(this.options.pattern, newNow()) |
| ); |
| } |
| |
| _tooBig() { |
| return this.state.currentSize >= this.options.maxSize; |
| } |
| |
| _roll() { |
| debug(`_roll: closing the current stream`); |
| return new Promise((resolve, reject) => { |
| this.currentFileStream.end("", this.options.encoding, () => { |
| this._moveOldFiles() |
| .then(resolve) |
| .catch(reject); |
| }); |
| }); |
| } |
| |
| async _moveOldFiles() { |
| const files = await this._getExistingFiles(); |
| const todaysFiles = this.state.currentDate |
| ? files.filter(f => f.date === this.state.currentDate) |
| : files; |
| for (let i = todaysFiles.length; i >= 0; i--) { |
| debug(`_moveOldFiles: i = ${i}`); |
| const sourceFilePath = this.fileFormatter({ |
| date: this.state.currentDate, |
| index: i |
| }); |
| const targetFilePath = this.fileFormatter({ |
| date: this.state.currentDate, |
| index: i + 1 |
| }); |
| |
| await moveAndMaybeCompressFile( |
| sourceFilePath, |
| targetFilePath, |
| this.options.compress && i === 0 |
| ); |
| } |
| |
| this.state.currentSize = 0; |
| this.state.currentDate = this.state.currentDate |
| ? format(this.options.pattern, newNow()) |
| : null; |
| debug( |
| `_moveOldFiles: finished rolling files. state=${JSON.stringify( |
| this.state |
| )}` |
| ); |
| this._renewWriteStream(); |
| // wait for the file to be open before cleaning up old ones, |
| // otherwise the daysToKeep calculations can be off |
| await new Promise((resolve, reject) => { |
| this.currentFileStream.write("", "utf8", () => { |
| this._clean() |
| .then(resolve) |
| .catch(reject); |
| }); |
| }); |
| } |
| |
| // Sorted from the oldest to the latest |
| async _getExistingFiles() { |
| const files = await fs.readdir(this.fileObject.dir).catch(() => []); |
| |
| debug(`_getExistingFiles: files=${files}`); |
| const existingFileDetails = files |
| .map(n => this.fileNameParser(n)) |
| .filter(n => n); |
| |
| const getKey = n => |
| (n.timestamp ? n.timestamp : newNow().getTime()) - n.index; |
| existingFileDetails.sort((a, b) => getKey(a) - getKey(b)); |
| |
| return existingFileDetails; |
| } |
| |
| _renewWriteStream() { |
| fs.ensureDirSync(this.fileObject.dir); |
| const filePath = this.fileFormatter({ |
| date: this.state.currentDate, |
| index: 0 |
| }); |
| const ops = { |
| flags: this.options.flags, |
| encoding: this.options.encoding, |
| mode: this.options.mode |
| }; |
| this.currentFileStream = fs.createWriteStream(filePath, ops); |
| this.currentFileStream.on("error", e => { |
| this.emit("error", e); |
| }); |
| } |
| |
| async _clean() { |
| const existingFileDetails = await this._getExistingFiles(); |
| debug( |
| `_clean: numToKeep = ${this.options.numToKeep}, existingFiles = ${existingFileDetails.length}` |
| ); |
| debug("_clean: existing files are: ", existingFileDetails); |
| if (this._tooManyFiles(existingFileDetails.length)) { |
| const fileNamesToRemove = existingFileDetails |
| .slice(0, existingFileDetails.length - this.options.numToKeep - 1) |
| .map(f => path.format({ dir: this.fileObject.dir, base: f.filename })); |
| await deleteFiles(fileNamesToRemove); |
| } |
| } |
| |
| _tooManyFiles(numFiles) { |
| return this.options.numToKeep > 0 && numFiles > this.options.numToKeep; |
| } |
| } |
| |
| const deleteFiles = fileNames => { |
| debug(`deleteFiles: files to delete: ${fileNames}`); |
| return Promise.all(fileNames.map(f => fs.unlink(f).catch((e) => { |
| debug(`deleteFiles: error when unlinking ${f}, ignoring. Error was ${e}`); |
| }))); |
| }; |
| |
| module.exports = RollingFileWriteStream; |