| 'use strict' |
| |
| /* eslint-disable no-var */ |
| |
| var reusify = require('reusify') |
| |
| function fastqueue (context, worker, concurrency) { |
| if (typeof context === 'function') { |
| concurrency = worker |
| worker = context |
| context = null |
| } |
| |
| if (concurrency < 1) { |
| throw new Error('fastqueue concurrency must be greater than 1') |
| } |
| |
| var cache = reusify(Task) |
| var queueHead = null |
| var queueTail = null |
| var _running = 0 |
| var errorHandler = null |
| |
| var self = { |
| push: push, |
| drain: noop, |
| saturated: noop, |
| pause: pause, |
| paused: false, |
| concurrency: concurrency, |
| running: running, |
| resume: resume, |
| idle: idle, |
| length: length, |
| getQueue: getQueue, |
| unshift: unshift, |
| empty: noop, |
| kill: kill, |
| killAndDrain: killAndDrain, |
| error: error |
| } |
| |
| return self |
| |
| function running () { |
| return _running |
| } |
| |
| function pause () { |
| self.paused = true |
| } |
| |
| function length () { |
| var current = queueHead |
| var counter = 0 |
| |
| while (current) { |
| current = current.next |
| counter++ |
| } |
| |
| return counter |
| } |
| |
| function getQueue () { |
| var current = queueHead |
| var tasks = [] |
| |
| while (current) { |
| tasks.push(current.value) |
| current = current.next |
| } |
| |
| return tasks |
| } |
| |
| function resume () { |
| if (!self.paused) return |
| self.paused = false |
| for (var i = 0; i < self.concurrency; i++) { |
| _running++ |
| release() |
| } |
| } |
| |
| function idle () { |
| return _running === 0 && self.length() === 0 |
| } |
| |
| function push (value, done) { |
| var current = cache.get() |
| |
| current.context = context |
| current.release = release |
| current.value = value |
| current.callback = done || noop |
| current.errorHandler = errorHandler |
| |
| if (_running === self.concurrency || self.paused) { |
| if (queueTail) { |
| queueTail.next = current |
| queueTail = current |
| } else { |
| queueHead = current |
| queueTail = current |
| self.saturated() |
| } |
| } else { |
| _running++ |
| worker.call(context, current.value, current.worked) |
| } |
| } |
| |
| function unshift (value, done) { |
| var current = cache.get() |
| |
| current.context = context |
| current.release = release |
| current.value = value |
| current.callback = done || noop |
| |
| if (_running === self.concurrency || self.paused) { |
| if (queueHead) { |
| current.next = queueHead |
| queueHead = current |
| } else { |
| queueHead = current |
| queueTail = current |
| self.saturated() |
| } |
| } else { |
| _running++ |
| worker.call(context, current.value, current.worked) |
| } |
| } |
| |
| function release (holder) { |
| if (holder) { |
| cache.release(holder) |
| } |
| var next = queueHead |
| if (next) { |
| if (!self.paused) { |
| if (queueTail === queueHead) { |
| queueTail = null |
| } |
| queueHead = next.next |
| next.next = null |
| worker.call(context, next.value, next.worked) |
| if (queueTail === null) { |
| self.empty() |
| } |
| } else { |
| _running-- |
| } |
| } else if (--_running === 0) { |
| self.drain() |
| } |
| } |
| |
| function kill () { |
| queueHead = null |
| queueTail = null |
| self.drain = noop |
| } |
| |
| function killAndDrain () { |
| queueHead = null |
| queueTail = null |
| self.drain() |
| self.drain = noop |
| } |
| |
| function error (handler) { |
| errorHandler = handler |
| } |
| } |
| |
| function noop () {} |
| |
| function Task () { |
| this.value = null |
| this.callback = noop |
| this.next = null |
| this.release = noop |
| this.context = null |
| this.errorHandler = null |
| |
| var self = this |
| |
| this.worked = function worked (err, result) { |
| var callback = self.callback |
| var errorHandler = self.errorHandler |
| var val = self.value |
| self.value = null |
| self.callback = noop |
| if (self.errorHandler) { |
| errorHandler(err, val) |
| } |
| callback.call(self.context, err, result) |
| self.release(self) |
| } |
| } |
| |
| function queueAsPromised (context, worker, concurrency) { |
| if (typeof context === 'function') { |
| concurrency = worker |
| worker = context |
| context = null |
| } |
| |
| function asyncWrapper (arg, cb) { |
| worker.call(this, arg) |
| .then(function (res) { |
| cb(null, res) |
| }, cb) |
| } |
| |
| var queue = fastqueue(context, asyncWrapper, concurrency) |
| |
| var pushCb = queue.push |
| var unshiftCb = queue.unshift |
| |
| queue.push = push |
| queue.unshift = unshift |
| queue.drained = drained |
| |
| return queue |
| |
| function push (value) { |
| var p = new Promise(function (resolve, reject) { |
| pushCb(value, function (err, result) { |
| if (err) { |
| reject(err) |
| return |
| } |
| resolve(result) |
| }) |
| }) |
| |
| // Let's fork the promise chain to |
| // make the error bubble up to the user but |
| // not lead to a unhandledRejection |
| p.catch(noop) |
| |
| return p |
| } |
| |
| function unshift (value) { |
| var p = new Promise(function (resolve, reject) { |
| unshiftCb(value, function (err, result) { |
| if (err) { |
| reject(err) |
| return |
| } |
| resolve(result) |
| }) |
| }) |
| |
| // Let's fork the promise chain to |
| // make the error bubble up to the user but |
| // not lead to a unhandledRejection |
| p.catch(noop) |
| |
| return p |
| } |
| |
| function drained () { |
| var previousDrain = queue.drain |
| |
| var p = new Promise(function (resolve) { |
| queue.drain = function () { |
| previousDrain() |
| resolve() |
| } |
| }) |
| |
| return p |
| } |
| } |
| |
| module.exports = fastqueue |
| module.exports.promise = queueAsPromised |