| 'use strict'; |
| |
| var Queue = require('double-ended-queue'); |
| var utils = require('./utils'); |
| var Command = require('./command'); |
| |
| function Multi (client, args) { |
| this._client = client; |
| this.queue = new Queue(); |
| var command, tmp_args; |
| if (args) { // Either undefined or an array. Fail hard if it's not an array |
| for (var i = 0; i < args.length; i++) { |
| command = args[i][0]; |
| tmp_args = args[i].slice(1); |
| if (Array.isArray(command)) { |
| this[command[0]].apply(this, command.slice(1).concat(tmp_args)); |
| } else { |
| this[command].apply(this, tmp_args); |
| } |
| } |
| } |
| } |
| |
| function pipeline_transaction_command (self, command_obj, index) { |
| // Queueing is done first, then the commands are executed |
| var tmp = command_obj.callback; |
| command_obj.callback = function (err, reply) { |
| // Ignore the multi command. This is applied by node_redis and the user does not benefit by it |
| if (err && index !== -1) { |
| if (tmp) { |
| tmp(err); |
| } |
| err.position = index; |
| self.errors.push(err); |
| } |
| // Keep track of who wants buffer responses: |
| // By the time the callback is called the command_obj got the buffer_args attribute attached |
| self.wants_buffers[index] = command_obj.buffer_args; |
| command_obj.callback = tmp; |
| }; |
| self._client.internal_send_command(command_obj); |
| } |
| |
| Multi.prototype.exec_atomic = Multi.prototype.EXEC_ATOMIC = Multi.prototype.execAtomic = function exec_atomic (callback) { |
| if (this.queue.length < 2) { |
| return this.exec_batch(callback); |
| } |
| return this.exec(callback); |
| }; |
| |
| function multi_callback (self, err, replies) { |
| var i = 0, command_obj; |
| |
| if (err) { |
| err.errors = self.errors; |
| if (self.callback) { |
| self.callback(err); |
| // Exclude connection errors so that those errors won't be emitted twice |
| } else if (err.code !== 'CONNECTION_BROKEN') { |
| self._client.emit('error', err); |
| } |
| return; |
| } |
| |
| if (replies) { |
| while (command_obj = self.queue.shift()) { |
| if (replies[i] instanceof Error) { |
| var match = replies[i].message.match(utils.err_code); |
| // LUA script could return user errors that don't behave like all other errors! |
| if (match) { |
| replies[i].code = match[1]; |
| } |
| replies[i].command = command_obj.command.toUpperCase(); |
| if (typeof command_obj.callback === 'function') { |
| command_obj.callback(replies[i]); |
| } |
| } else { |
| // If we asked for strings, even in detect_buffers mode, then return strings: |
| replies[i] = self._client.handle_reply(replies[i], command_obj.command, self.wants_buffers[i]); |
| if (typeof command_obj.callback === 'function') { |
| command_obj.callback(null, replies[i]); |
| } |
| } |
| i++; |
| } |
| } |
| |
| if (self.callback) { |
| self.callback(null, replies); |
| } |
| } |
| |
| Multi.prototype.exec_transaction = function exec_transaction (callback) { |
| if (this.monitoring || this._client.monitoring) { |
| var err = new RangeError( |
| 'Using transaction with a client that is in monitor mode does not work due to faulty return values of Redis.' |
| ); |
| err.command = 'EXEC'; |
| err.code = 'EXECABORT'; |
| return utils.reply_in_order(this._client, callback, err); |
| } |
| var self = this; |
| var len = self.queue.length; |
| self.errors = []; |
| self.callback = callback; |
| self._client.cork(); |
| self.wants_buffers = new Array(len); |
| pipeline_transaction_command(self, new Command('multi', []), -1); |
| // Drain queue, callback will catch 'QUEUED' or error |
| for (var index = 0; index < len; index++) { |
| // The commands may not be shifted off, since they are needed in the result handler |
| pipeline_transaction_command(self, self.queue.get(index), index); |
| } |
| |
| self._client.internal_send_command(new Command('exec', [], function (err, replies) { |
| multi_callback(self, err, replies); |
| })); |
| self._client.uncork(); |
| return !self._client.should_buffer; |
| }; |
| |
| function batch_callback (self, cb, i) { |
| return function batch_callback (err, res) { |
| if (err) { |
| self.results[i] = err; |
| // Add the position to the error |
| self.results[i].position = i; |
| } else { |
| self.results[i] = res; |
| } |
| cb(err, res); |
| }; |
| } |
| |
| Multi.prototype.exec = Multi.prototype.EXEC = Multi.prototype.exec_batch = function exec_batch (callback) { |
| var self = this; |
| var len = self.queue.length; |
| var index = 0; |
| var command_obj; |
| if (len === 0) { |
| utils.reply_in_order(self._client, callback, null, []); |
| return !self._client.should_buffer; |
| } |
| self._client.cork(); |
| if (!callback) { |
| while (command_obj = self.queue.shift()) { |
| self._client.internal_send_command(command_obj); |
| } |
| self._client.uncork(); |
| return !self._client.should_buffer; |
| } |
| var callback_without_own_cb = function (err, res) { |
| if (err) { |
| self.results.push(err); |
| // Add the position to the error |
| var i = self.results.length - 1; |
| self.results[i].position = i; |
| } else { |
| self.results.push(res); |
| } |
| // Do not emit an error here. Otherwise each error would result in one emit. |
| // The errors will be returned in the result anyway |
| }; |
| var last_callback = function (cb) { |
| return function (err, res) { |
| cb(err, res); |
| callback(null, self.results); |
| }; |
| }; |
| self.results = []; |
| while (command_obj = self.queue.shift()) { |
| if (typeof command_obj.callback === 'function') { |
| command_obj.callback = batch_callback(self, command_obj.callback, index); |
| } else { |
| command_obj.callback = callback_without_own_cb; |
| } |
| if (typeof callback === 'function' && index === len - 1) { |
| command_obj.callback = last_callback(command_obj.callback); |
| } |
| this._client.internal_send_command(command_obj); |
| index++; |
| } |
| self._client.uncork(); |
| return !self._client.should_buffer; |
| }; |
| |
| module.exports = Multi; |