| // |
| // |
| // |
| |
| 'use strict'; |
| |
| /* |
| The channel (promise) and callback APIs have similar signatures, and |
| in particular, both need AMQP fields prepared from the same arguments |
| and options. The arguments marshalling is done here. Each of the |
| procedures below takes arguments and options (the latter in an object) |
| particular to the operation it represents, and returns an object with |
| fields for handing to the encoder. |
| */ |
| |
| // A number of AMQP methods have a table-typed field called |
| // `arguments`, that is intended to carry extension-specific |
| // values. RabbitMQ uses this in a number of places; e.g., to specify |
| // an 'alternate exchange'. |
| // |
| // Many of the methods in this API have an `options` argument, from |
| // which I take both values that have a default in AMQP (e.g., |
| // autoDelete in QueueDeclare) *and* values that are specific to |
| // RabbitMQ (e.g., 'alternate-exchange'), which would normally be |
| // supplied in `arguments`. So that extensions I don't support yet can |
| // be used, I include `arguments` itself among the options. |
| // |
| // The upshot of this is that I often need to prepare an `arguments` |
| // value that has any values passed in `options.arguments` as well as |
| // any I've promoted to being options themselves. Since I don't want |
| // to mutate anything passed in, the general pattern is to create a |
| // fresh object with the `arguments` value given as its prototype; all |
| // fields in the supplied value will be serialised, as well as any I |
| // set on the fresh object. What I don't want to do, however, is set a |
| // field to undefined by copying possibly missing field values, |
| // because that will mask a value in the prototype. |
| // |
| // NB the `arguments` field already has a default value of `{}`, so |
| // there's no need to explicitly default it unless I'm setting values. |
| function setIfDefined(obj, prop, value) { |
| if (value != undefined) obj[prop] = value; |
| } |
| |
| var EMPTY_OPTIONS = Object.freeze({}); |
| |
| var Args = {}; |
| |
| Args.assertQueue = function(queue, options) { |
| queue = queue || ''; |
| options = options || EMPTY_OPTIONS; |
| |
| var argt = Object.create(options.arguments || null); |
| setIfDefined(argt, 'x-expires', options.expires); |
| setIfDefined(argt, 'x-message-ttl', options.messageTtl); |
| setIfDefined(argt, 'x-dead-letter-exchange', |
| options.deadLetterExchange); |
| setIfDefined(argt, 'x-dead-letter-routing-key', |
| options.deadLetterRoutingKey); |
| setIfDefined(argt, 'x-max-length', options.maxLength); |
| setIfDefined(argt, 'x-max-priority', options.maxPriority); |
| |
| return { |
| queue: queue, |
| exclusive: !!options.exclusive, |
| durable: (options.durable === undefined) ? true : options.durable, |
| autoDelete: !!options.autoDelete, |
| arguments: argt, |
| passive: false, |
| // deprecated but we have to include it |
| ticket: 0, |
| nowait: false |
| }; |
| }; |
| |
| Args.checkQueue = function(queue) { |
| return { |
| queue: queue, |
| passive: true, // switch to "completely different" mode |
| nowait: false, |
| durable: true, autoDelete: false, exclusive: false, // ignored |
| ticket: 0, |
| }; |
| }; |
| |
| Args.deleteQueue = function(queue, options) { |
| options = options || EMPTY_OPTIONS; |
| return { |
| queue: queue, |
| ifUnused: !!options.ifUnused, |
| ifEmpty: !!options.ifEmpty, |
| ticket: 0, nowait: false |
| }; |
| }; |
| |
| Args.purgeQueue = function(queue) { |
| return { |
| queue: queue, |
| ticket: 0, nowait: false |
| }; |
| }; |
| |
| Args.bindQueue = function(queue, source, pattern, argt) { |
| return { |
| queue: queue, |
| exchange: source, |
| routingKey: pattern, |
| arguments: argt, |
| ticket: 0, nowait: false |
| }; |
| }; |
| |
| Args.unbindQueue = function(queue, source, pattern, argt) { |
| return { |
| queue: queue, |
| exchange: source, |
| routingKey: pattern, |
| arguments: argt, |
| ticket: 0, nowait: false |
| }; |
| }; |
| |
| Args.assertExchange = function(exchange, type, options) { |
| options = options || EMPTY_OPTIONS; |
| var argt = Object.create(options.arguments || null); |
| setIfDefined(argt, 'alternate-exchange', options.alternateExchange); |
| return { |
| exchange: exchange, |
| ticket: 0, |
| type: type, |
| passive: false, |
| durable: (options.durable === undefined) ? true : options.durable, |
| autoDelete: !!options.autoDelete, |
| internal: !!options.internal, |
| nowait: false, |
| arguments: argt |
| }; |
| }; |
| |
| Args.checkExchange = function(exchange) { |
| return { |
| exchange: exchange, |
| passive: true, // switch to 'may as well be another method' mode |
| nowait: false, |
| // ff are ignored |
| durable: true, internal: false, type: '', autoDelete: false, |
| ticket: 0 |
| }; |
| }; |
| |
| Args.deleteExchange = function(exchange, options) { |
| options = options || EMPTY_OPTIONS; |
| return { |
| exchange: exchange, |
| ifUnused: !!options.ifUnused, |
| ticket: 0, nowait: false |
| }; |
| }; |
| |
| Args.bindExchange = function(dest, source, pattern, argt) { |
| return { |
| source: source, |
| destination: dest, |
| routingKey: pattern, |
| arguments: argt, |
| ticket: 0, nowait: false |
| }; |
| }; |
| |
| Args.unbindExchange = function(dest, source, pattern, argt) { |
| return { |
| source: source, |
| destination: dest, |
| routingKey: pattern, |
| arguments: argt, |
| ticket: 0, nowait: false |
| }; |
| }; |
| |
| // It's convenient to construct the properties and the method fields |
| // at the same time, since in the APIs, values for both can appear in |
| // `options`. Since the property or mthod field names don't overlap, I |
| // just return one big object that can be used for both purposes, and |
| // the encoder will pick out what it wants. |
| Args.publish = function(exchange, routingKey, options) { |
| options = options || EMPTY_OPTIONS; |
| |
| // The CC and BCC fields expect an array of "longstr", which would |
| // normally be buffer values in JavaScript; however, since a field |
| // array (or table) cannot have shortstr values, the codec will |
| // encode all strings as longstrs anyway. |
| function convertCC(cc) { |
| if (cc === undefined) { |
| return undefined; |
| } |
| else if (Array.isArray(cc)) { |
| return cc.map(String); |
| } |
| else return [String(cc)]; |
| } |
| |
| var headers = Object.create(options.headers || null); |
| setIfDefined(headers, 'CC', convertCC(options.CC)); |
| setIfDefined(headers, 'BCC', convertCC(options.BCC)); |
| |
| var deliveryMode; // undefined will default to 1 (non-persistent) |
| |
| // Previously I overloaded deliveryMode be a boolean meaning |
| // 'persistent or not'; better is to name this option for what it |
| // is, but I need to have backwards compatibility for applications |
| // that either supply a numeric or boolean value. |
| if (options.persistent !== undefined) |
| deliveryMode = (options.persistent) ? 2 : 1; |
| else if (typeof options.deliveryMode === 'number') |
| deliveryMode = options.deliveryMode; |
| else if (options.deliveryMode) // is supplied and truthy |
| deliveryMode = 2; |
| |
| var expiration = options.expiration; |
| if (expiration !== undefined) expiration = expiration.toString(); |
| |
| return { |
| // method fields |
| exchange: exchange, |
| routingKey: routingKey, |
| mandatory: !!options.mandatory, |
| immediate: false, // RabbitMQ doesn't implement this any more |
| ticket: undefined, |
| // properties |
| contentType: options.contentType, |
| contentEncoding: options.contentEncoding, |
| headers: headers, |
| deliveryMode: deliveryMode, |
| priority: options.priority, |
| correlationId: options.correlationId, |
| replyTo: options.replyTo, |
| expiration: expiration, |
| messageId: options.messageId, |
| timestamp: options.timestamp, |
| type: options.type, |
| userId: options.userId, |
| appId: options.appId, |
| clusterId: undefined |
| }; |
| }; |
| |
| Args.consume = function(queue, options) { |
| options = options || EMPTY_OPTIONS; |
| var argt = Object.create(options.arguments || null); |
| setIfDefined(argt, 'x-priority', options.priority); |
| return { |
| ticket: 0, |
| queue: queue, |
| consumerTag: options.consumerTag || '', |
| noLocal: !!options.noLocal, |
| noAck: !!options.noAck, |
| exclusive: !!options.exclusive, |
| nowait: false, |
| arguments: argt |
| }; |
| }; |
| |
| Args.cancel = function(consumerTag) { |
| return { |
| consumerTag: consumerTag, |
| nowait: false |
| }; |
| }; |
| |
| Args.get = function(queue, options) { |
| options = options || EMPTY_OPTIONS; |
| return { |
| ticket: 0, |
| queue: queue, |
| noAck: !!options.noAck |
| }; |
| }; |
| |
| Args.ack = function(tag, allUpTo) { |
| return { |
| deliveryTag: tag, |
| multiple: !!allUpTo |
| }; |
| }; |
| |
| Args.nack = function(tag, allUpTo, requeue) { |
| return { |
| deliveryTag: tag, |
| multiple: !!allUpTo, |
| requeue: (requeue === undefined) ? true : requeue |
| }; |
| }; |
| |
| Args.reject = function(tag, requeue) { |
| return { |
| deliveryTag: tag, |
| requeue: (requeue === undefined) ? true : requeue |
| }; |
| }; |
| |
| Args.prefetch = function(count, global) { |
| return { |
| prefetchCount: count || 0, |
| prefetchSize: 0, |
| global: !!global |
| }; |
| }; |
| |
| Args.recover = function() { |
| return {requeue: true}; |
| }; |
| |
| module.exports = Object.freeze(Args); |