| 'use strict'; |
| |
| /** |
| * A worker that does nothing but passing chunks to the next one. This is like |
| * a nodejs stream but with some differences. On the good side : |
| * - it works on IE 6-9 without any issue / polyfill |
| * - it weights less than the full dependencies bundled with browserify |
| * - it forwards errors (no need to declare an error handler EVERYWHERE) |
| * |
| * A chunk is an object with 2 attributes : `meta` and `data`. The former is an |
| * object containing anything (`percent` for example), see each worker for more |
| * details. The latter is the real data (String, Uint8Array, etc). |
| * |
| * @constructor |
| * @param {String} name the name of the stream (mainly used for debugging purposes) |
| */ |
| function GenericWorker(name) { |
| // the name of the worker |
| this.name = name || "default"; |
| // an object containing metadata about the workers chain |
| this.streamInfo = {}; |
| // an error which happened when the worker was paused |
| this.generatedError = null; |
| // an object containing metadata to be merged by this worker into the general metadata |
| this.extraStreamInfo = {}; |
| // true if the stream is paused (and should not do anything), false otherwise |
| this.isPaused = true; |
| // true if the stream is finished (and should not do anything), false otherwise |
| this.isFinished = false; |
| // true if the stream is locked to prevent further structure updates (pipe), false otherwise |
| this.isLocked = false; |
| // the event listeners |
| this._listeners = { |
| 'data':[], |
| 'end':[], |
| 'error':[] |
| }; |
| // the previous worker, if any |
| this.previous = null; |
| } |
| |
| GenericWorker.prototype = { |
| /** |
| * Push a chunk to the next workers. |
| * @param {Object} chunk the chunk to push |
| */ |
| push : function (chunk) { |
| this.emit("data", chunk); |
| }, |
| /** |
| * End the stream. |
| * @return {Boolean} true if this call ended the worker, false otherwise. |
| */ |
| end : function () { |
| if (this.isFinished) { |
| return false; |
| } |
| |
| this.flush(); |
| try { |
| this.emit("end"); |
| this.cleanUp(); |
| this.isFinished = true; |
| } catch (e) { |
| this.emit("error", e); |
| } |
| return true; |
| }, |
| /** |
| * End the stream with an error. |
| * @param {Error} e the error which caused the premature end. |
| * @return {Boolean} true if this call ended the worker with an error, false otherwise. |
| */ |
| error : function (e) { |
| if (this.isFinished) { |
| return false; |
| } |
| |
| if(this.isPaused) { |
| this.generatedError = e; |
| } else { |
| this.isFinished = true; |
| |
| this.emit("error", e); |
| |
| // in the workers chain exploded in the middle of the chain, |
| // the error event will go downward but we also need to notify |
| // workers upward that there has been an error. |
| if(this.previous) { |
| this.previous.error(e); |
| } |
| |
| this.cleanUp(); |
| } |
| return true; |
| }, |
| /** |
| * Add a callback on an event. |
| * @param {String} name the name of the event (data, end, error) |
| * @param {Function} listener the function to call when the event is triggered |
| * @return {GenericWorker} the current object for chainability |
| */ |
| on : function (name, listener) { |
| this._listeners[name].push(listener); |
| return this; |
| }, |
| /** |
| * Clean any references when a worker is ending. |
| */ |
| cleanUp : function () { |
| this.streamInfo = this.generatedError = this.extraStreamInfo = null; |
| this._listeners = []; |
| }, |
| /** |
| * Trigger an event. This will call registered callback with the provided arg. |
| * @param {String} name the name of the event (data, end, error) |
| * @param {Object} arg the argument to call the callback with. |
| */ |
| emit : function (name, arg) { |
| if (this._listeners[name]) { |
| for(var i = 0; i < this._listeners[name].length; i++) { |
| this._listeners[name][i].call(this, arg); |
| } |
| } |
| }, |
| /** |
| * Chain a worker with an other. |
| * @param {Worker} next the worker receiving events from the current one. |
| * @return {worker} the next worker for chainability |
| */ |
| pipe : function (next) { |
| return next.registerPrevious(this); |
| }, |
| /** |
| * Same as `pipe` in the other direction. |
| * Using an API with `pipe(next)` is very easy. |
| * Implementing the API with the point of view of the next one registering |
| * a source is easier, see the ZipFileWorker. |
| * @param {Worker} previous the previous worker, sending events to this one |
| * @return {Worker} the current worker for chainability |
| */ |
| registerPrevious : function (previous) { |
| if (this.isLocked) { |
| throw new Error("The stream '" + this + "' has already been used."); |
| } |
| |
| // sharing the streamInfo... |
| this.streamInfo = previous.streamInfo; |
| // ... and adding our own bits |
| this.mergeStreamInfo(); |
| this.previous = previous; |
| var self = this; |
| previous.on('data', function (chunk) { |
| self.processChunk(chunk); |
| }); |
| previous.on('end', function () { |
| self.end(); |
| }); |
| previous.on('error', function (e) { |
| self.error(e); |
| }); |
| return this; |
| }, |
| /** |
| * Pause the stream so it doesn't send events anymore. |
| * @return {Boolean} true if this call paused the worker, false otherwise. |
| */ |
| pause : function () { |
| if(this.isPaused || this.isFinished) { |
| return false; |
| } |
| this.isPaused = true; |
| |
| if(this.previous) { |
| this.previous.pause(); |
| } |
| return true; |
| }, |
| /** |
| * Resume a paused stream. |
| * @return {Boolean} true if this call resumed the worker, false otherwise. |
| */ |
| resume : function () { |
| if(!this.isPaused || this.isFinished) { |
| return false; |
| } |
| this.isPaused = false; |
| |
| // if true, the worker tried to resume but failed |
| var withError = false; |
| if(this.generatedError) { |
| this.error(this.generatedError); |
| withError = true; |
| } |
| if(this.previous) { |
| this.previous.resume(); |
| } |
| |
| return !withError; |
| }, |
| /** |
| * Flush any remaining bytes as the stream is ending. |
| */ |
| flush : function () {}, |
| /** |
| * Process a chunk. This is usually the method overridden. |
| * @param {Object} chunk the chunk to process. |
| */ |
| processChunk : function(chunk) { |
| this.push(chunk); |
| }, |
| /** |
| * Add a key/value to be added in the workers chain streamInfo once activated. |
| * @param {String} key the key to use |
| * @param {Object} value the associated value |
| * @return {Worker} the current worker for chainability |
| */ |
| withStreamInfo : function (key, value) { |
| this.extraStreamInfo[key] = value; |
| this.mergeStreamInfo(); |
| return this; |
| }, |
| /** |
| * Merge this worker's streamInfo into the chain's streamInfo. |
| */ |
| mergeStreamInfo : function () { |
| for(var key in this.extraStreamInfo) { |
| if (!this.extraStreamInfo.hasOwnProperty(key)) { |
| continue; |
| } |
| this.streamInfo[key] = this.extraStreamInfo[key]; |
| } |
| }, |
| |
| /** |
| * Lock the stream to prevent further updates on the workers chain. |
| * After calling this method, all calls to pipe will fail. |
| */ |
| lock: function () { |
| if (this.isLocked) { |
| throw new Error("The stream '" + this + "' has already been used."); |
| } |
| this.isLocked = true; |
| if (this.previous) { |
| this.previous.lock(); |
| } |
| }, |
| |
| /** |
| * |
| * Pretty print the workers chain. |
| */ |
| toString : function () { |
| var me = "Worker " + this.name; |
| if (this.previous) { |
| return this.previous + " -> " + me; |
| } else { |
| return me; |
| } |
| } |
| }; |
| |
| module.exports = GenericWorker; |