| /** |
| * Support for concurrent task management and synchronization in web |
| * applications. |
| * |
| * @author Dave Longley |
| * @author David I. Lehn <dlehn@digitalbazaar.com> |
| * |
| * Copyright (c) 2009-2013 Digital Bazaar, Inc. |
| */ |
| (function() { |
| /* ########## Begin module implementation ########## */ |
| function initModule(forge) { |
| |
| // logging category |
| var cat = 'forge.task'; |
| |
| // verbose level |
| // 0: off, 1: a little, 2: a whole lot |
| // Verbose debug logging is surrounded by a level check to avoid the |
| // performance issues with even calling the logging code regardless if it |
| // is actually logged. For performance reasons this should not be set to 2 |
| // for production use. |
| // ex: if(sVL >= 2) forge.log.verbose(....) |
| var sVL = 0; |
| |
| // track tasks for debugging |
| var sTasks = {}; |
| var sNextTaskId = 0; |
| // debug access |
| forge.debug.set(cat, 'tasks', sTasks); |
| |
| // a map of task type to task queue |
| var sTaskQueues = {}; |
| // debug access |
| forge.debug.set(cat, 'queues', sTaskQueues); |
| |
| // name for unnamed tasks |
| var sNoTaskName = '?'; |
| |
| // maximum number of doNext() recursions before a context swap occurs |
| // FIXME: might need to tweak this based on the browser |
| var sMaxRecursions = 30; |
| |
| // time slice for doing tasks before a context swap occurs |
| // FIXME: might need to tweak this based on the browser |
| var sTimeSlice = 20; |
| |
| /** |
| * Task states. |
| * |
| * READY: ready to start processing |
| * RUNNING: task or a subtask is running |
| * BLOCKED: task is waiting to acquire N permits to continue |
| * SLEEPING: task is sleeping for a period of time |
| * DONE: task is done |
| * ERROR: task has an error |
| */ |
| var READY = 'ready'; |
| var RUNNING = 'running'; |
| var BLOCKED = 'blocked'; |
| var SLEEPING = 'sleeping'; |
| var DONE = 'done'; |
| var ERROR = 'error'; |
| |
| /** |
| * Task actions. Used to control state transitions. |
| * |
| * STOP: stop processing |
| * START: start processing tasks |
| * BLOCK: block task from continuing until 1 or more permits are released |
| * UNBLOCK: release one or more permits |
| * SLEEP: sleep for a period of time |
| * WAKEUP: wakeup early from SLEEPING state |
| * CANCEL: cancel further tasks |
| * FAIL: a failure occured |
| */ |
| var STOP = 'stop'; |
| var START = 'start'; |
| var BLOCK = 'block'; |
| var UNBLOCK = 'unblock'; |
| var SLEEP = 'sleep'; |
| var WAKEUP = 'wakeup'; |
| var CANCEL = 'cancel'; |
| var FAIL = 'fail'; |
| |
| /** |
| * State transition table. |
| * |
| * nextState = sStateTable[currentState][action] |
| */ |
| var sStateTable = {}; |
| |
| sStateTable[READY] = {}; |
| sStateTable[READY][STOP] = READY; |
| sStateTable[READY][START] = RUNNING; |
| sStateTable[READY][CANCEL] = DONE; |
| sStateTable[READY][FAIL] = ERROR; |
| |
| sStateTable[RUNNING] = {}; |
| sStateTable[RUNNING][STOP] = READY; |
| sStateTable[RUNNING][START] = RUNNING; |
| sStateTable[RUNNING][BLOCK] = BLOCKED; |
| sStateTable[RUNNING][UNBLOCK] = RUNNING; |
| sStateTable[RUNNING][SLEEP] = SLEEPING; |
| sStateTable[RUNNING][WAKEUP] = RUNNING; |
| sStateTable[RUNNING][CANCEL] = DONE; |
| sStateTable[RUNNING][FAIL] = ERROR; |
| |
| sStateTable[BLOCKED] = {}; |
| sStateTable[BLOCKED][STOP] = BLOCKED; |
| sStateTable[BLOCKED][START] = BLOCKED; |
| sStateTable[BLOCKED][BLOCK] = BLOCKED; |
| sStateTable[BLOCKED][UNBLOCK] = BLOCKED; |
| sStateTable[BLOCKED][SLEEP] = BLOCKED; |
| sStateTable[BLOCKED][WAKEUP] = BLOCKED; |
| sStateTable[BLOCKED][CANCEL] = DONE; |
| sStateTable[BLOCKED][FAIL] = ERROR; |
| |
| sStateTable[SLEEPING] = {}; |
| sStateTable[SLEEPING][STOP] = SLEEPING; |
| sStateTable[SLEEPING][START] = SLEEPING; |
| sStateTable[SLEEPING][BLOCK] = SLEEPING; |
| sStateTable[SLEEPING][UNBLOCK] = SLEEPING; |
| sStateTable[SLEEPING][SLEEP] = SLEEPING; |
| sStateTable[SLEEPING][WAKEUP] = SLEEPING; |
| sStateTable[SLEEPING][CANCEL] = DONE; |
| sStateTable[SLEEPING][FAIL] = ERROR; |
| |
| sStateTable[DONE] = {}; |
| sStateTable[DONE][STOP] = DONE; |
| sStateTable[DONE][START] = DONE; |
| sStateTable[DONE][BLOCK] = DONE; |
| sStateTable[DONE][UNBLOCK] = DONE; |
| sStateTable[DONE][SLEEP] = DONE; |
| sStateTable[DONE][WAKEUP] = DONE; |
| sStateTable[DONE][CANCEL] = DONE; |
| sStateTable[DONE][FAIL] = ERROR; |
| |
| sStateTable[ERROR] = {}; |
| sStateTable[ERROR][STOP] = ERROR; |
| sStateTable[ERROR][START] = ERROR; |
| sStateTable[ERROR][BLOCK] = ERROR; |
| sStateTable[ERROR][UNBLOCK] = ERROR; |
| sStateTable[ERROR][SLEEP] = ERROR; |
| sStateTable[ERROR][WAKEUP] = ERROR; |
| sStateTable[ERROR][CANCEL] = ERROR; |
| sStateTable[ERROR][FAIL] = ERROR; |
| |
| /** |
| * Creates a new task. |
| * |
| * @param options options for this task |
| * run: the run function for the task (required) |
| * name: the run function for the task (optional) |
| * parent: parent of this task (optional) |
| * |
| * @return the empty task. |
| */ |
| var Task = function(options) { |
| // task id |
| this.id = -1; |
| |
| // task name |
| this.name = options.name || sNoTaskName; |
| |
| // task has no parent |
| this.parent = options.parent || null; |
| |
| // save run function |
| this.run = options.run; |
| |
| // create a queue of subtasks to run |
| this.subtasks = []; |
| |
| // error flag |
| this.error = false; |
| |
| // state of the task |
| this.state = READY; |
| |
| // number of times the task has been blocked (also the number |
| // of permits needed to be released to continue running) |
| this.blocks = 0; |
| |
| // timeout id when sleeping |
| this.timeoutId = null; |
| |
| // no swap time yet |
| this.swapTime = null; |
| |
| // no user data |
| this.userData = null; |
| |
| // initialize task |
| // FIXME: deal with overflow |
| this.id = sNextTaskId++; |
| sTasks[this.id] = this; |
| if(sVL >= 1) { |
| forge.log.verbose(cat, '[%s][%s] init', this.id, this.name, this); |
| } |
| }; |
| |
| /** |
| * Logs debug information on this task and the system state. |
| */ |
| Task.prototype.debug = function(msg) { |
| msg = msg || ''; |
| forge.log.debug(cat, msg, |
| '[%s][%s] task:', this.id, this.name, this, |
| 'subtasks:', this.subtasks.length, |
| 'queue:', sTaskQueues); |
| }; |
| |
| /** |
| * Adds a subtask to run after task.doNext() or task.fail() is called. |
| * |
| * @param name human readable name for this task (optional). |
| * @param subrun a function to run that takes the current task as |
| * its first parameter. |
| * |
| * @return the current task (useful for chaining next() calls). |
| */ |
| Task.prototype.next = function(name, subrun) { |
| // juggle parameters if it looks like no name is given |
| if(typeof(name) === 'function') { |
| subrun = name; |
| |
| // inherit parent's name |
| name = this.name; |
| } |
| // create subtask, set parent to this task, propagate callbacks |
| var subtask = new Task({ |
| run: subrun, |
| name: name, |
| parent: this |
| }); |
| // start subtasks running |
| subtask.state = RUNNING; |
| subtask.type = this.type; |
| subtask.successCallback = this.successCallback || null; |
| subtask.failureCallback = this.failureCallback || null; |
| |
| // queue a new subtask |
| this.subtasks.push(subtask); |
| |
| return this; |
| }; |
| |
| /** |
| * Adds subtasks to run in parallel after task.doNext() or task.fail() |
| * is called. |
| * |
| * @param name human readable name for this task (optional). |
| * @param subrun functions to run that take the current task as |
| * their first parameter. |
| * |
| * @return the current task (useful for chaining next() calls). |
| */ |
| Task.prototype.parallel = function(name, subrun) { |
| // juggle parameters if it looks like no name is given |
| if(forge.util.isArray(name)) { |
| subrun = name; |
| |
| // inherit parent's name |
| name = this.name; |
| } |
| // Wrap parallel tasks in a regular task so they are started at the |
| // proper time. |
| return this.next(name, function(task) { |
| // block waiting for subtasks |
| var ptask = task; |
| ptask.block(subrun.length); |
| |
| // we pass the iterator from the loop below as a parameter |
| // to a function because it is otherwise included in the |
| // closure and changes as the loop changes -- causing i |
| // to always be set to its highest value |
| var startParallelTask = function(pname, pi) { |
| forge.task.start({ |
| type: pname, |
| run: function(task) { |
| subrun[pi](task); |
| }, |
| success: function(task) { |
| ptask.unblock(); |
| }, |
| failure: function(task) { |
| ptask.unblock(); |
| } |
| }); |
| }; |
| |
| for(var i = 0; i < subrun.length; i++) { |
| // Type must be unique so task starts in parallel: |
| // name + private string + task id + sub-task index |
| // start tasks in parallel and unblock when the finish |
| var pname = name + '__parallel-' + task.id + '-' + i; |
| var pi = i; |
| startParallelTask(pname, pi); |
| } |
| }); |
| }; |
| |
| /** |
| * Stops a running task. |
| */ |
| Task.prototype.stop = function() { |
| this.state = sStateTable[this.state][STOP]; |
| }; |
| |
| /** |
| * Starts running a task. |
| */ |
| Task.prototype.start = function() { |
| this.error = false; |
| this.state = sStateTable[this.state][START]; |
| |
| // try to restart |
| if(this.state === RUNNING) { |
| this.start = new Date(); |
| this.run(this); |
| runNext(this, 0); |
| } |
| }; |
| |
| /** |
| * Blocks a task until it one or more permits have been released. The |
| * task will not resume until the requested number of permits have |
| * been released with call(s) to unblock(). |
| * |
| * @param n number of permits to wait for(default: 1). |
| */ |
| Task.prototype.block = function(n) { |
| n = typeof(n) === 'undefined' ? 1 : n; |
| this.blocks += n; |
| if(this.blocks > 0) { |
| this.state = sStateTable[this.state][BLOCK]; |
| } |
| }; |
| |
| /** |
| * Releases a permit to unblock a task. If a task was blocked by |
| * requesting N permits via block(), then it will only continue |
| * running once enough permits have been released via unblock() calls. |
| * |
| * If multiple processes need to synchronize with a single task then |
| * use a condition variable (see forge.task.createCondition). It is |
| * an error to unblock a task more times than it has been blocked. |
| * |
| * @param n number of permits to release (default: 1). |
| * |
| * @return the current block count (task is unblocked when count is 0) |
| */ |
| Task.prototype.unblock = function(n) { |
| n = typeof(n) === 'undefined' ? 1 : n; |
| this.blocks -= n; |
| if(this.blocks === 0 && this.state !== DONE) { |
| this.state = RUNNING; |
| runNext(this, 0); |
| } |
| return this.blocks; |
| }; |
| |
| /** |
| * Sleep for a period of time before resuming tasks. |
| * |
| * @param n number of milliseconds to sleep (default: 0). |
| */ |
| Task.prototype.sleep = function(n) { |
| n = typeof(n) === 'undefined' ? 0 : n; |
| this.state = sStateTable[this.state][SLEEP]; |
| var self = this; |
| this.timeoutId = setTimeout(function() { |
| self.timeoutId = null; |
| self.state = RUNNING; |
| runNext(self, 0); |
| }, n); |
| }; |
| |
| /** |
| * Waits on a condition variable until notified. The next task will |
| * not be scheduled until notification. A condition variable can be |
| * created with forge.task.createCondition(). |
| * |
| * Once cond.notify() is called, the task will continue. |
| * |
| * @param cond the condition variable to wait on. |
| */ |
| Task.prototype.wait = function(cond) { |
| cond.wait(this); |
| }; |
| |
| /** |
| * If sleeping, wakeup and continue running tasks. |
| */ |
| Task.prototype.wakeup = function() { |
| if(this.state === SLEEPING) { |
| cancelTimeout(this.timeoutId); |
| this.timeoutId = null; |
| this.state = RUNNING; |
| runNext(this, 0); |
| } |
| }; |
| |
| /** |
| * Cancel all remaining subtasks of this task. |
| */ |
| Task.prototype.cancel = function() { |
| this.state = sStateTable[this.state][CANCEL]; |
| // remove permits needed |
| this.permitsNeeded = 0; |
| // cancel timeouts |
| if(this.timeoutId !== null) { |
| cancelTimeout(this.timeoutId); |
| this.timeoutId = null; |
| } |
| // remove subtasks |
| this.subtasks = []; |
| }; |
| |
| /** |
| * Finishes this task with failure and sets error flag. The entire |
| * task will be aborted unless the next task that should execute |
| * is passed as a parameter. This allows levels of subtasks to be |
| * skipped. For instance, to abort only this tasks's subtasks, then |
| * call fail(task.parent). To abort this task's subtasks and its |
| * parent's subtasks, call fail(task.parent.parent). To abort |
| * all tasks and simply call the task callback, call fail() or |
| * fail(null). |
| * |
| * The task callback (success or failure) will always, eventually, be |
| * called. |
| * |
| * @param next the task to continue at, or null to abort entirely. |
| */ |
| Task.prototype.fail = function(next) { |
| // set error flag |
| this.error = true; |
| |
| // finish task |
| finish(this, true); |
| |
| if(next) { |
| // propagate task info |
| next.error = this.error; |
| next.swapTime = this.swapTime; |
| next.userData = this.userData; |
| |
| // do next task as specified |
| runNext(next, 0); |
| } else { |
| if(this.parent !== null) { |
| // finish root task (ensures it is removed from task queue) |
| var parent = this.parent; |
| while(parent.parent !== null) { |
| // propagate task info |
| parent.error = this.error; |
| parent.swapTime = this.swapTime; |
| parent.userData = this.userData; |
| parent = parent.parent; |
| } |
| finish(parent, true); |
| } |
| |
| // call failure callback if one exists |
| if(this.failureCallback) { |
| this.failureCallback(this); |
| } |
| } |
| }; |
| |
| /** |
| * Asynchronously start a task. |
| * |
| * @param task the task to start. |
| */ |
| var start = function(task) { |
| task.error = false; |
| task.state = sStateTable[task.state][START]; |
| setTimeout(function() { |
| if(task.state === RUNNING) { |
| task.swapTime = +new Date(); |
| task.run(task); |
| runNext(task, 0); |
| } |
| }, 0); |
| }; |
| |
| /** |
| * Run the next subtask or finish this task. |
| * |
| * @param task the task to process. |
| * @param recurse the recursion count. |
| */ |
| var runNext = function(task, recurse) { |
| // get time since last context swap (ms), if enough time has passed set |
| // swap to true to indicate that doNext was performed asynchronously |
| // also, if recurse is too high do asynchronously |
| var swap = |
| (recurse > sMaxRecursions) || |
| (+new Date() - task.swapTime) > sTimeSlice; |
| |
| var doNext = function(recurse) { |
| recurse++; |
| if(task.state === RUNNING) { |
| if(swap) { |
| // update swap time |
| task.swapTime = +new Date(); |
| } |
| |
| if(task.subtasks.length > 0) { |
| // run next subtask |
| var subtask = task.subtasks.shift(); |
| subtask.error = task.error; |
| subtask.swapTime = task.swapTime; |
| subtask.userData = task.userData; |
| subtask.run(subtask); |
| if(!subtask.error) { |
| runNext(subtask, recurse); |
| } |
| } else { |
| finish(task); |
| |
| if(!task.error) { |
| // chain back up and run parent |
| if(task.parent !== null) { |
| // propagate task info |
| task.parent.error = task.error; |
| task.parent.swapTime = task.swapTime; |
| task.parent.userData = task.userData; |
| |
| // no subtasks left, call run next subtask on parent |
| runNext(task.parent, recurse); |
| } |
| } |
| } |
| } |
| }; |
| |
| if(swap) { |
| // we're swapping, so run asynchronously |
| setTimeout(doNext, 0); |
| } else { |
| // not swapping, so run synchronously |
| doNext(recurse); |
| } |
| }; |
| |
| /** |
| * Finishes a task and looks for the next task in the queue to start. |
| * |
| * @param task the task to finish. |
| * @param suppressCallbacks true to suppress callbacks. |
| */ |
| var finish = function(task, suppressCallbacks) { |
| // subtask is now done |
| task.state = DONE; |
| |
| delete sTasks[task.id]; |
| if(sVL >= 1) { |
| forge.log.verbose(cat, '[%s][%s] finish', |
| task.id, task.name, task); |
| } |
| |
| // only do queue processing for root tasks |
| if(task.parent === null) { |
| // report error if queue is missing |
| if(!(task.type in sTaskQueues)) { |
| forge.log.error(cat, |
| '[%s][%s] task queue missing [%s]', |
| task.id, task.name, task.type); |
| } else if(sTaskQueues[task.type].length === 0) { |
| // report error if queue is empty |
| forge.log.error(cat, |
| '[%s][%s] task queue empty [%s]', |
| task.id, task.name, task.type); |
| } else if(sTaskQueues[task.type][0] !== task) { |
| // report error if this task isn't the first in the queue |
| forge.log.error(cat, |
| '[%s][%s] task not first in queue [%s]', |
| task.id, task.name, task.type); |
| } else { |
| // remove ourselves from the queue |
| sTaskQueues[task.type].shift(); |
| // clean up queue if it is empty |
| if(sTaskQueues[task.type].length === 0) { |
| if(sVL >= 1) { |
| forge.log.verbose(cat, '[%s][%s] delete queue [%s]', |
| task.id, task.name, task.type); |
| } |
| /* Note: Only a task can delete a queue of its own type. This |
| is used as a way to synchronize tasks. If a queue for a certain |
| task type exists, then a task of that type is running. |
| */ |
| delete sTaskQueues[task.type]; |
| } else { |
| // dequeue the next task and start it |
| if(sVL >= 1) { |
| forge.log.verbose(cat, |
| '[%s][%s] queue start next [%s] remain:%s', |
| task.id, task.name, task.type, |
| sTaskQueues[task.type].length); |
| } |
| sTaskQueues[task.type][0].start(); |
| } |
| } |
| |
| if(!suppressCallbacks) { |
| // call final callback if one exists |
| if(task.error && task.failureCallback) { |
| task.failureCallback(task); |
| } else if(!task.error && task.successCallback) { |
| task.successCallback(task); |
| } |
| } |
| } |
| }; |
| |
| /* Tasks API */ |
| forge.task = forge.task || {}; |
| |
| /** |
| * Starts a new task that will run the passed function asynchronously. |
| * |
| * In order to finish the task, either task.doNext() or task.fail() |
| * *must* be called. |
| * |
| * The task must have a type (a string identifier) that can be used to |
| * synchronize it with other tasks of the same type. That type can also |
| * be used to cancel tasks that haven't started yet. |
| * |
| * To start a task, the following object must be provided as a parameter |
| * (each function takes a task object as its first parameter): |
| * |
| * { |
| * type: the type of task. |
| * run: the function to run to execute the task. |
| * success: a callback to call when the task succeeds (optional). |
| * failure: a callback to call when the task fails (optional). |
| * } |
| * |
| * @param options the object as described above. |
| */ |
| forge.task.start = function(options) { |
| // create a new task |
| var task = new Task({ |
| run: options.run, |
| name: options.name || sNoTaskName |
| }); |
| task.type = options.type; |
| task.successCallback = options.success || null; |
| task.failureCallback = options.failure || null; |
| |
| // append the task onto the appropriate queue |
| if(!(task.type in sTaskQueues)) { |
| if(sVL >= 1) { |
| forge.log.verbose(cat, '[%s][%s] create queue [%s]', |
| task.id, task.name, task.type); |
| } |
| // create the queue with the new task |
| sTaskQueues[task.type] = [task]; |
| start(task); |
| } else { |
| // push the task onto the queue, it will be run after a task |
| // with the same type completes |
| sTaskQueues[options.type].push(task); |
| } |
| }; |
| |
| /** |
| * Cancels all tasks of the given type that haven't started yet. |
| * |
| * @param type the type of task to cancel. |
| */ |
| forge.task.cancel = function(type) { |
| // find the task queue |
| if(type in sTaskQueues) { |
| // empty all but the current task from the queue |
| sTaskQueues[type] = [sTaskQueues[type][0]]; |
| } |
| }; |
| |
| /** |
| * Creates a condition variable to synchronize tasks. To make a task wait |
| * on the condition variable, call task.wait(condition). To notify all |
| * tasks that are waiting, call condition.notify(). |
| * |
| * @return the condition variable. |
| */ |
| forge.task.createCondition = function() { |
| var cond = { |
| // all tasks that are blocked |
| tasks: {} |
| }; |
| |
| /** |
| * Causes the given task to block until notify is called. If the task |
| * is already waiting on this condition then this is a no-op. |
| * |
| * @param task the task to cause to wait. |
| */ |
| cond.wait = function(task) { |
| // only block once |
| if(!(task.id in cond.tasks)) { |
| task.block(); |
| cond.tasks[task.id] = task; |
| } |
| }; |
| |
| /** |
| * Notifies all waiting tasks to wake up. |
| */ |
| cond.notify = function() { |
| // since unblock() will run the next task from here, make sure to |
| // clear the condition's blocked task list before unblocking |
| var tmp = cond.tasks; |
| cond.tasks = {}; |
| for(var id in tmp) { |
| tmp[id].unblock(); |
| } |
| }; |
| |
| return cond; |
| }; |
| |
| } // end module implementation |
| |
| /* ########## Begin module wrapper ########## */ |
| var name = 'task'; |
| if(typeof define !== 'function') { |
| // NodeJS -> AMD |
| if(typeof module === 'object' && module.exports) { |
| var nodeJS = true; |
| define = function(ids, factory) { |
| factory(require, module); |
| }; |
| } else { |
| // <script> |
| if(typeof forge === 'undefined') { |
| forge = {}; |
| } |
| return initModule(forge); |
| } |
| } |
| // AMD |
| var deps; |
| var defineFunc = function(require, module) { |
| module.exports = function(forge) { |
| var mods = deps.map(function(dep) { |
| return require(dep); |
| }).concat(initModule); |
| // handle circular dependencies |
| forge = forge || {}; |
| forge.defined = forge.defined || {}; |
| if(forge.defined[name]) { |
| return forge[name]; |
| } |
| forge.defined[name] = true; |
| for(var i = 0; i < mods.length; ++i) { |
| mods[i](forge); |
| } |
| return forge[name]; |
| }; |
| }; |
| var tmpDefine = define; |
| define = function(ids, factory) { |
| deps = (typeof ids === 'string') ? factory.slice(2) : ids.slice(2); |
| if(nodeJS) { |
| delete define; |
| return tmpDefine.apply(null, Array.prototype.slice.call(arguments, 0)); |
| } |
| define = tmpDefine; |
| return define.apply(null, Array.prototype.slice.call(arguments, 0)); |
| }; |
| define(['require', 'module', './debug', './log', './util'], function() { |
| defineFunc.apply(null, Array.prototype.slice.call(arguments, 0)); |
| }); |
| })(); |