| // |
| // |
| // |
| |
| 'use strict'; |
| |
| var defs = require('./defs'); |
| var Promise = require('bluebird'); |
| var inherits = require('util').inherits; |
| var EventEmitter = require('events').EventEmitter; |
| var BaseChannel = require('./channel').BaseChannel; |
| var acceptMessage = require('./channel').acceptMessage; |
| var Args = require('./api_args'); |
| |
| function ChannelModel(connection) { |
| if (!(this instanceof ChannelModel)) |
| return new ChannelModel(connection); |
| EventEmitter.call( this ); |
| this.connection = connection; |
| var self = this; |
| ['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) { |
| connection.on(ev, self.emit.bind(self, ev)); |
| }); |
| } |
| inherits(ChannelModel, EventEmitter); |
| |
| module.exports.ChannelModel = ChannelModel; |
| |
| var CM = ChannelModel.prototype; |
| |
| CM.close = function() { |
| return Promise.fromCallback(this.connection.close.bind(this.connection)); |
| }; |
| |
| // Channels |
| |
| function Channel(connection) { |
| BaseChannel.call(this, connection); |
| this.on('delivery', this.handleDelivery.bind(this)); |
| this.on('cancel', this.handleCancel.bind(this)); |
| } |
| inherits(Channel, BaseChannel); |
| |
| module.exports.Channel = Channel; |
| |
| CM.createChannel = function() { |
| var c = new Channel(this.connection); |
| return c.open().then(function(openOk) { return c; }); |
| }; |
| |
| var C = Channel.prototype; |
| |
| // An RPC that returns a 'proper' promise, which resolves to just the |
| // response's fields; this is intended to be suitable for implementing |
| // API procedures. |
| C.rpc = function(method, fields, expect) { |
| var self = this; |
| return Promise.fromCallback(function(cb) { |
| return self._rpc(method, fields, expect, cb); |
| }) |
| .then(function(f) { |
| return f.fields; |
| }); |
| }; |
| |
| // Do the remarkably simple channel open handshake |
| C.open = function() { |
| return Promise.try(this.allocate.bind(this)).then( |
| function(ch) { |
| return ch.rpc(defs.ChannelOpen, {outOfBand: ""}, |
| defs.ChannelOpenOk); |
| }); |
| }; |
| |
| C.close = function() { |
| var self = this; |
| return Promise.fromCallback(function(cb) { |
| return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, |
| cb); |
| }); |
| }; |
| |
| // === Public API, declaring queues and stuff === |
| |
| C.assertQueue = function(queue, options) { |
| return this.rpc(defs.QueueDeclare, |
| Args.assertQueue(queue, options), |
| defs.QueueDeclareOk); |
| }; |
| |
| C.checkQueue = function(queue) { |
| return this.rpc(defs.QueueDeclare, |
| Args.checkQueue(queue), |
| defs.QueueDeclareOk); |
| }; |
| |
| C.deleteQueue = function(queue, options) { |
| return this.rpc(defs.QueueDelete, |
| Args.deleteQueue(queue, options), |
| defs.QueueDeleteOk); |
| }; |
| |
| C.purgeQueue = function(queue) { |
| return this.rpc(defs.QueuePurge, |
| Args.purgeQueue(queue), |
| defs.QueuePurgeOk); |
| }; |
| |
| C.bindQueue = function(queue, source, pattern, argt) { |
| return this.rpc(defs.QueueBind, |
| Args.bindQueue(queue, source, pattern, argt), |
| defs.QueueBindOk); |
| }; |
| |
| C.unbindQueue = function(queue, source, pattern, argt) { |
| return this.rpc(defs.QueueUnbind, |
| Args.unbindQueue(queue, source, pattern, argt), |
| defs.QueueUnbindOk); |
| }; |
| |
| C.assertExchange = function(exchange, type, options) { |
| // The server reply is an empty set of fields, but it's convenient |
| // to have the exchange name handed to the continuation. |
| return this.rpc(defs.ExchangeDeclare, |
| Args.assertExchange(exchange, type, options), |
| defs.ExchangeDeclareOk) |
| .then(function(_ok) { return { exchange: exchange }; }); |
| }; |
| |
| C.checkExchange = function(exchange) { |
| return this.rpc(defs.ExchangeDeclare, |
| Args.checkExchange(exchange), |
| defs.ExchangeDeclareOk); |
| }; |
| |
| C.deleteExchange = function(name, options) { |
| return this.rpc(defs.ExchangeDelete, |
| Args.deleteExchange(name, options), |
| defs.ExchangeDeleteOk); |
| }; |
| |
| C.bindExchange = function(dest, source, pattern, argt) { |
| return this.rpc(defs.ExchangeBind, |
| Args.bindExchange(dest, source, pattern, argt), |
| defs.ExchangeBindOk); |
| }; |
| |
| C.unbindExchange = function(dest, source, pattern, argt) { |
| return this.rpc(defs.ExchangeUnbind, |
| Args.unbindExchange(dest, source, pattern, argt), |
| defs.ExchangeUnbindOk); |
| }; |
| |
| // Working with messages |
| |
| C.publish = function(exchange, routingKey, content, options) { |
| var fieldsAndProps = Args.publish(exchange, routingKey, options); |
| return this.sendMessage(fieldsAndProps, fieldsAndProps, content); |
| }; |
| |
| C.sendToQueue = function(queue, content, options) { |
| return this.publish('', queue, content, options); |
| }; |
| |
| C.consume = function(queue, callback, options) { |
| var self = this; |
| // NB we want the callback to be run synchronously, so that we've |
| // registered the consumerTag before any messages can arrive. |
| var fields = Args.consume(queue, options); |
| return Promise.fromCallback(function(cb) { |
| self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); |
| }) |
| .then(function(ok) { |
| self.registerConsumer(ok.fields.consumerTag, callback); |
| return ok.fields; |
| }); |
| }; |
| |
| C.cancel = function(consumerTag) { |
| var self = this; |
| return Promise.fromCallback(function(cb) { |
| self._rpc(defs.BasicCancel, Args.cancel(consumerTag), |
| defs.BasicCancelOk, |
| cb); |
| }) |
| .then(function(ok) { |
| self.unregisterConsumer(consumerTag); |
| return ok.fields; |
| }); |
| }; |
| |
| C.get = function(queue, options) { |
| var self = this; |
| var fields = Args.get(queue, options); |
| return Promise.fromCallback(function(cb) { |
| return self.sendOrEnqueue(defs.BasicGet, fields, cb); |
| }) |
| .then(function(f) { |
| if (f.id === defs.BasicGetEmpty) { |
| return false; |
| } |
| else if (f.id === defs.BasicGetOk) { |
| var fields = f.fields; |
| return new Promise(function(resolve) { |
| self.handleMessage = acceptMessage(function(m) { |
| m.fields = fields; |
| resolve(m); |
| }); |
| }); |
| } |
| else { |
| throw new Error("Unexpected response to BasicGet: " + |
| inspect(f)); |
| } |
| }) |
| }; |
| |
| C.ack = function(message, allUpTo) { |
| this.sendImmediately( |
| defs.BasicAck, |
| Args.ack(message.fields.deliveryTag, allUpTo)); |
| }; |
| |
| C.ackAll = function() { |
| this.sendImmediately(defs.BasicAck, Args.ack(0, true)); |
| }; |
| |
| C.nack = function(message, allUpTo, requeue) { |
| this.sendImmediately( |
| defs.BasicNack, |
| Args.nack(message.fields.deliveryTag, allUpTo, requeue)); |
| }; |
| |
| C.nackAll = function(requeue) { |
| this.sendImmediately(defs.BasicNack, |
| Args.nack(0, true, requeue)); |
| }; |
| |
| // `Basic.Nack` is not available in older RabbitMQ versions (or in the |
| // AMQP specification), so you have to use the one-at-a-time |
| // `Basic.Reject`. This is otherwise synonymous with |
| // `#nack(message, false, requeue)`. |
| C.reject = function(message, requeue) { |
| this.sendImmediately( |
| defs.BasicReject, |
| Args.reject(message.fields.deliveryTag, requeue)); |
| }; |
| |
| // There are more options in AMQP than exposed here; RabbitMQ only |
| // implements prefetch based on message count, and only for individual |
| // channels or consumers. RabbitMQ v3.3.0 and after treat prefetch |
| // (without `global` set) as per-consumer (for consumers following), |
| // and prefetch with `global` set as per-channel. |
| C.prefetch = C.qos = function(count, global) { |
| return this.rpc(defs.BasicQos, |
| Args.prefetch(count, global), |
| defs.BasicQosOk); |
| }; |
| |
| C.recover = function() { |
| return this.rpc(defs.BasicRecover, |
| Args.recover(), |
| defs.BasicRecoverOk); |
| }; |
| |
| // Confirm channel. This is a channel with confirms 'switched on', |
| // meaning sent messages will provoke a responding 'ack' or 'nack' |
| // from the server. The upshot of this is that `publish` and |
| // `sendToQueue` both take a callback, which will be called either |
| // with `null` as its argument to signify 'ack', or an exception as |
| // its argument to signify 'nack'. |
| |
| function ConfirmChannel(connection) { |
| Channel.call(this, connection); |
| } |
| inherits(ConfirmChannel, Channel); |
| |
| module.exports.ConfirmChannel = ConfirmChannel; |
| |
| CM.createConfirmChannel = function() { |
| var c = new ConfirmChannel(this.connection); |
| return c.open() |
| .then(function(openOk) { |
| return c.rpc(defs.ConfirmSelect, {nowait: false}, |
| defs.ConfirmSelectOk) |
| }) |
| .then(function() { return c; }); |
| }; |
| |
| var CC = ConfirmChannel.prototype; |
| |
| CC.publish = function(exchange, routingKey, content, options, cb) { |
| this.pushConfirmCallback(cb); |
| return C.publish.call(this, exchange, routingKey, content, options); |
| }; |
| |
| CC.sendToQueue = function(queue, content, options, cb) { |
| return this.publish('', queue, content, options, cb); |
| }; |
| |
| CC.waitForConfirms = function() { |
| var awaiting = []; |
| var unconfirmed = this.unconfirmed; |
| unconfirmed.forEach(function(val, index) { |
| if (val === null); // already confirmed |
| else { |
| var confirmed = new Promise(function(resolve, reject) { |
| unconfirmed[index] = function(err) { |
| if (val) val(err); |
| if (err === null) resolve(); |
| else reject(err); |
| }; |
| }); |
| awaiting.push(confirmed); |
| } |
| }); |
| return Promise.all(awaiting); |
| }; |