blob: f374175889eca07c3a80bf70894750730fa4014e [file] [log] [blame]
//
//
//
'use strict';
/*
The channel (promise) and callback APIs have similar signatures, and
in particular, both need AMQP fields prepared from the same arguments
and options. The arguments marshalling is done here. Each of the
procedures below takes arguments and options (the latter in an object)
particular to the operation it represents, and returns an object with
fields for handing to the encoder.
*/
// A number of AMQP methods have a table-typed field called
// `arguments`, that is intended to carry extension-specific
// values. RabbitMQ uses this in a number of places; e.g., to specify
// an 'alternate exchange'.
//
// Many of the methods in this API have an `options` argument, from
// which I take both values that have a default in AMQP (e.g.,
// autoDelete in QueueDeclare) *and* values that are specific to
// RabbitMQ (e.g., 'alternate-exchange'), which would normally be
// supplied in `arguments`. So that extensions I don't support yet can
// be used, I include `arguments` itself among the options.
//
// The upshot of this is that I often need to prepare an `arguments`
// value that has any values passed in `options.arguments` as well as
// any I've promoted to being options themselves. Since I don't want
// to mutate anything passed in, the general pattern is to create a
// fresh object with the `arguments` value given as its prototype; all
// fields in the supplied value will be serialised, as well as any I
// set on the fresh object. What I don't want to do, however, is set a
// field to undefined by copying possibly missing field values,
// because that will mask a value in the prototype.
//
// NB the `arguments` field already has a default value of `{}`, so
// there's no need to explicitly default it unless I'm setting values.
function setIfDefined(obj, prop, value) {
if (value != undefined) obj[prop] = value;
}
var EMPTY_OPTIONS = Object.freeze({});
var Args = {};
Args.assertQueue = function(queue, options) {
queue = queue || '';
options = options || EMPTY_OPTIONS;
var argt = Object.create(options.arguments || null);
setIfDefined(argt, 'x-expires', options.expires);
setIfDefined(argt, 'x-message-ttl', options.messageTtl);
setIfDefined(argt, 'x-dead-letter-exchange',
options.deadLetterExchange);
setIfDefined(argt, 'x-dead-letter-routing-key',
options.deadLetterRoutingKey);
setIfDefined(argt, 'x-max-length', options.maxLength);
setIfDefined(argt, 'x-max-priority', options.maxPriority);
return {
queue: queue,
exclusive: !!options.exclusive,
durable: (options.durable === undefined) ? true : options.durable,
autoDelete: !!options.autoDelete,
arguments: argt,
passive: false,
// deprecated but we have to include it
ticket: 0,
nowait: false
};
};
Args.checkQueue = function(queue) {
return {
queue: queue,
passive: true, // switch to "completely different" mode
nowait: false,
durable: true, autoDelete: false, exclusive: false, // ignored
ticket: 0,
};
};
Args.deleteQueue = function(queue, options) {
options = options || EMPTY_OPTIONS;
return {
queue: queue,
ifUnused: !!options.ifUnused,
ifEmpty: !!options.ifEmpty,
ticket: 0, nowait: false
};
};
Args.purgeQueue = function(queue) {
return {
queue: queue,
ticket: 0, nowait: false
};
};
Args.bindQueue = function(queue, source, pattern, argt) {
return {
queue: queue,
exchange: source,
routingKey: pattern,
arguments: argt,
ticket: 0, nowait: false
};
};
Args.unbindQueue = function(queue, source, pattern, argt) {
return {
queue: queue,
exchange: source,
routingKey: pattern,
arguments: argt,
ticket: 0, nowait: false
};
};
Args.assertExchange = function(exchange, type, options) {
options = options || EMPTY_OPTIONS;
var argt = Object.create(options.arguments || null);
setIfDefined(argt, 'alternate-exchange', options.alternateExchange);
return {
exchange: exchange,
ticket: 0,
type: type,
passive: false,
durable: (options.durable === undefined) ? true : options.durable,
autoDelete: !!options.autoDelete,
internal: !!options.internal,
nowait: false,
arguments: argt
};
};
Args.checkExchange = function(exchange) {
return {
exchange: exchange,
passive: true, // switch to 'may as well be another method' mode
nowait: false,
// ff are ignored
durable: true, internal: false, type: '', autoDelete: false,
ticket: 0
};
};
Args.deleteExchange = function(exchange, options) {
options = options || EMPTY_OPTIONS;
return {
exchange: exchange,
ifUnused: !!options.ifUnused,
ticket: 0, nowait: false
};
};
Args.bindExchange = function(dest, source, pattern, argt) {
return {
source: source,
destination: dest,
routingKey: pattern,
arguments: argt,
ticket: 0, nowait: false
};
};
Args.unbindExchange = function(dest, source, pattern, argt) {
return {
source: source,
destination: dest,
routingKey: pattern,
arguments: argt,
ticket: 0, nowait: false
};
};
// It's convenient to construct the properties and the method fields
// at the same time, since in the APIs, values for both can appear in
// `options`. Since the property or mthod field names don't overlap, I
// just return one big object that can be used for both purposes, and
// the encoder will pick out what it wants.
Args.publish = function(exchange, routingKey, options) {
options = options || EMPTY_OPTIONS;
// The CC and BCC fields expect an array of "longstr", which would
// normally be buffer values in JavaScript; however, since a field
// array (or table) cannot have shortstr values, the codec will
// encode all strings as longstrs anyway.
function convertCC(cc) {
if (cc === undefined) {
return undefined;
}
else if (Array.isArray(cc)) {
return cc.map(String);
}
else return [String(cc)];
}
var headers = Object.create(options.headers || null);
setIfDefined(headers, 'CC', convertCC(options.CC));
setIfDefined(headers, 'BCC', convertCC(options.BCC));
var deliveryMode; // undefined will default to 1 (non-persistent)
// Previously I overloaded deliveryMode be a boolean meaning
// 'persistent or not'; better is to name this option for what it
// is, but I need to have backwards compatibility for applications
// that either supply a numeric or boolean value.
if (options.persistent !== undefined)
deliveryMode = (options.persistent) ? 2 : 1;
else if (typeof options.deliveryMode === 'number')
deliveryMode = options.deliveryMode;
else if (options.deliveryMode) // is supplied and truthy
deliveryMode = 2;
var expiration = options.expiration;
if (expiration !== undefined) expiration = expiration.toString();
return {
// method fields
exchange: exchange,
routingKey: routingKey,
mandatory: !!options.mandatory,
immediate: false, // RabbitMQ doesn't implement this any more
ticket: undefined,
// properties
contentType: options.contentType,
contentEncoding: options.contentEncoding,
headers: headers,
deliveryMode: deliveryMode,
priority: options.priority,
correlationId: options.correlationId,
replyTo: options.replyTo,
expiration: expiration,
messageId: options.messageId,
timestamp: options.timestamp,
type: options.type,
userId: options.userId,
appId: options.appId,
clusterId: undefined
};
};
Args.consume = function(queue, options) {
options = options || EMPTY_OPTIONS;
var argt = Object.create(options.arguments || null);
setIfDefined(argt, 'x-priority', options.priority);
return {
ticket: 0,
queue: queue,
consumerTag: options.consumerTag || '',
noLocal: !!options.noLocal,
noAck: !!options.noAck,
exclusive: !!options.exclusive,
nowait: false,
arguments: argt
};
};
Args.cancel = function(consumerTag) {
return {
consumerTag: consumerTag,
nowait: false
};
};
Args.get = function(queue, options) {
options = options || EMPTY_OPTIONS;
return {
ticket: 0,
queue: queue,
noAck: !!options.noAck
};
};
Args.ack = function(tag, allUpTo) {
return {
deliveryTag: tag,
multiple: !!allUpTo
};
};
Args.nack = function(tag, allUpTo, requeue) {
return {
deliveryTag: tag,
multiple: !!allUpTo,
requeue: (requeue === undefined) ? true : requeue
};
};
Args.reject = function(tag, requeue) {
return {
deliveryTag: tag,
requeue: (requeue === undefined) ? true : requeue
};
};
Args.prefetch = function(count, global) {
return {
prefetchCount: count || 0,
prefetchSize: 0,
global: !!global
};
};
Args.recover = function() {
return {requeue: true};
};
module.exports = Object.freeze(Args);