| 'use strict'; |
| |
| var RingBuffer = require('./ring_buffer'); |
| |
| var Functor = function(session, method) { |
| this._session = session; |
| this._method = method; |
| this._queue = new RingBuffer(Functor.QUEUE_SIZE); |
| this._stopped = false; |
| this.pending = 0; |
| }; |
| |
| Functor.QUEUE_SIZE = 8; |
| |
| Functor.prototype.call = function(error, message, callback, context) { |
| if (this._stopped) return; |
| |
| var record = { error: error, message: message, callback: callback, context: context, done: false }, |
| called = false, |
| self = this; |
| |
| this._queue.push(record); |
| |
| if (record.error) { |
| record.done = true; |
| this._stop(); |
| return this._flushQueue(); |
| } |
| |
| var handler = function(err, msg) { |
| if (!(called ^ (called = true))) return; |
| |
| if (err) { |
| self._stop(); |
| record.error = err; |
| record.message = null; |
| } else { |
| record.message = msg; |
| } |
| |
| record.done = true; |
| self._flushQueue(); |
| }; |
| |
| try { |
| this._session[this._method](message, handler); |
| } catch (err) { |
| handler(err); |
| } |
| }; |
| |
| Functor.prototype._stop = function() { |
| this.pending = this._queue.length; |
| this._stopped = true; |
| }; |
| |
| Functor.prototype._flushQueue = function() { |
| var queue = this._queue, record; |
| |
| while (queue.length > 0 && queue.peek().done) { |
| record = queue.shift(); |
| if (record.error) { |
| this.pending = 0; |
| queue.clear(); |
| } else { |
| this.pending -= 1; |
| } |
| record.callback.call(record.context, record.error, record.message); |
| } |
| }; |
| |
| module.exports = Functor; |