blob: cf7201bb51c04e6c8ff90b0a25c4974e00e90ced [file] [log] [blame]
#!/usr/bin/env node
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
/**
* Port of qpid-config to JavaScript for Node.js, mainly intended as a demo to
* illustrate using QMF2 in JavaScript using the proton.Messenger JS binding.
* It illustrates a few things including how to use Messenger completely
* asynchronously including using an async request/response pattern with
* correlation IDs. It also proves interoperability of AMQP Map, List etc.
* between C++ and JavaScript as QMF2 is pretty much all about Lists of Maps.
* <p>
* The actual QMF2 code is pretty simple as we're just doing a basic getObjects
* it's made all the simpler because we can use JavaScript object literals as
* the JavaScript binding serialises and deserialises directly between JavaScript
* Objects and Lists and the AMQP type system so something that can be quite
* involved in languages like C++ and Java becomes quite simple in JavaScript,
* though the asynchronous nature of JavaScript provides its own opportunities
* for complication best illustrated by the need for the correlator object.
*/
// Check if the environment is Node.js and if not log an error and exit.
if (typeof process === 'object' && typeof require === 'function') {
var qmf = {}; // Create qmf namespace object.
qmf.Console = function() { // qmf.Console Constructor.
var proton = require("qpid-proton-messenger");
var message = new proton.Message();
var messenger = new proton.Messenger();
var brokerAddress = '';
var replyTo = '';
/**
* The correlator object is a mechanism used to correlate requests with
* their aynchronous responses. It might possible be better to make use
* of Promises to implement part of this behaviour but a mechanism would
* still be needed to correlate a request with its response callback in
* order to wrap things up in a Promise, so much of the behaviour of this
* object would still be required. In addition it seemed to make sense to
* make this QMF2 implementation fairly free of dependencies and using
* Promises would require external libraries. Instead the correlator
* implements "Promise-like" semantics, you might say a broken Promise :-)
* <p>
* in particular the request method behaves a *bit* like Promise.all()
* though it is mostly fake and takes an array of functions that call
* the add() method which is really the method used to associate response
* objects by correlationID. The then method is used to register a
* listener that will be called when all the requests that have been
* registered have received responses.
* TODO error/timeout handling.
*/
var correlator = {
_resolve: null,
_objects: {},
add: function(id) {
this._objects[id] = {complete: false, list: null};
},
request: function() {
this._resolve = function() {console.log("Warning: No resolver has been set")};
return this;
},
then: function(resolver) {
this._resolve = resolver ? resolver : this._resolve;
},
resolve: function() {
var opcode = message.properties['qmf.opcode'];
var correlationID = message.getCorrelationID();
var resp = this._objects[correlationID];
if (opcode === '_query_response') {
if (resp.list) {
Array.prototype.push.apply(resp.list, message.body); // This is faster than concat.
} else {
resp.list = message.body;
}
var partial = message.properties['partial'];
if (!partial) {
resp.complete = true;
}
this._objects[correlationID] = resp;
this._checkComplete();
} else if (opcode === '_method_response' || opcode === '_exception') {
resp.list = message.body;
resp.complete = true;
this._objects[correlationID] = resp;
this._checkComplete();
} else {
console.error("Bad Message response, qmf.opcode = " + opcode);
}
},
_checkComplete: function() {
var response = {};
for (var id in this._objects) {
var object = this._objects[id];
if (object.complete) {
response[id] = object.list;
} else {
return;
}
}
this._objects = {}; // Clear state ready for next call.
this._resolve(response.method ? response.method : response);
}
}; // End of correlator object definition.
var pumpData = function() {
while (messenger.incoming()) {
// The second parameter forces Binary payloads to be decoded as
// strings this is useful because the broker QMF Agent encodes
// strings as AMQP binary unfortunately.
var t = messenger.get(message, true);
correlator.resolve();
messenger.accept(t);
}
if (messenger.isStopped()) {
message.free();
messenger.free();
}
};
this.getObjects = function(packageName, className) {
message.setAddress(brokerAddress);
message.setSubject('broker');
message.setReplyTo(replyTo);
message.setCorrelationID(className);
message.properties = {
"routing-key": "broker", // Added for Java Broker
"x-amqp-0-10.app-id": "qmf2",
"method": "request",
"qmf.opcode": "_query_request",
};
message.body = {
"_what": "OBJECT",
"_schema_id": {
"_package_name": packageName,
"_class_name": className
}
};
correlator.add(className);
messenger.put(message);
};
this.invokeMethod = function(object, method, arguments) {
var correlationID = 'method';
message.setAddress(brokerAddress);
message.setSubject('broker');
message.setReplyTo(replyTo);
message.setCorrelationID(correlationID);
message.properties = {
"routing-key": "broker", // Added for Java Broker
"x-amqp-0-10.app-id": "qmf2",
"method": "request",
"qmf.opcode": "_method_request",
};
message.body = {
"_object_id": object._object_id,
"_method_name" : method,
"_arguments" : arguments
};
correlator.add(correlationID);
messenger.put(message);
};
this.addConnection = function(addr, callback) {
brokerAddress = addr + '/qmf.default.direct';
var replyAddress = addr + '/#';
messenger.on('subscription', function(subscription) {
var subscriptionAddress = subscription.getAddress();
var splitAddress = subscriptionAddress.split('/');
replyTo = splitAddress[splitAddress.length - 1];
callback();
});
messenger.subscribe(replyAddress);
}
this.destroy = function() {
messenger.stop();
}
this.request = function() {return correlator.request();}
messenger.on('error', function(error) {console.log(error);});
messenger.on('work', pumpData);
messenger.setOutgoingWindow(1024);
messenger.recv(); // Receive as many messages as messenger can buffer.
messenger.start();
}; // End of qmf.Console
/************************* qpid-config business logic ************************/
var brokerAgent = new qmf.Console();
var _usage =
'Usage: qpid-config [OPTIONS]\n' +
' qpid-config [OPTIONS] exchanges [filter-string]\n' +
' qpid-config [OPTIONS] queues [filter-string]\n' +
' qpid-config [OPTIONS] add exchange <type> <name> [AddExchangeOptions]\n' +
' qpid-config [OPTIONS] del exchange <name>\n' +
' qpid-config [OPTIONS] add queue <name> [AddQueueOptions]\n' +
' qpid-config [OPTIONS] del queue <name> [DelQueueOptions]\n' +
' qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]\n' +
' <for type xml> [-f -|filename]\n' +
' <for type header> [all|any] k1=v1 [, k2=v2...]\n' +
' qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]\n' +
' qpid-config [OPTIONS] reload-acl\n' +
' qpid-config [OPTIONS] add <type> <name> [--argument <property-name>=<property-value>]\n' +
' qpid-config [OPTIONS] del <type> <name>\n' +
' qpid-config [OPTIONS] list <type> [--show-property <property-name>]\n';
var usage = function() {
console.log(_usage);
process.exit(-1);
};
var _description =
'Examples:\n' +
'\n' +
'$ qpid-config add queue q\n' +
'$ qpid-config add exchange direct d -a localhost:5672\n' +
'$ qpid-config exchanges -b 10.1.1.7:10000\n' +
'$ qpid-config queues -b guest/guest@broker-host:10000\n' +
'\n' +
'Add Exchange <type> values:\n' +
'\n' +
' direct Direct exchange for point-to-point communication\n' +
' fanout Fanout exchange for broadcast communication\n' +
' topic Topic exchange that routes messages using binding keys with wildcards\n' +
' headers Headers exchange that matches header fields against the binding keys\n' +
' xml XML Exchange - allows content filtering using an XQuery\n' +
'\n' +
'\n' +
'Queue Limit Actions:\n' +
'\n' +
' none (default) - Use broker\'s default policy\n' +
' reject - Reject enqueued messages\n' +
' ring - Replace oldest unacquired message with new\n' +
'\n' +
'Replication levels:\n' +
'\n' +
' none - no replication\n' +
' configuration - replicate queue and exchange existence and bindings, but not messages.\n' +
' all - replicate configuration and messages\n';
var _options =
'Options:\n' +
' -h, --help show this help message and exit\n' +
'\n' +
' General Options:\n' +
' -t <secs>, --timeout=<secs>\n' +
' Maximum time to wait for broker connection (in\n' +
' seconds)\n' +
' -r, --recursive Show bindings in queue or exchange list\n' +
' -b <address>, --broker=<address>\n' +
' Address of qpidd broker with syntax:\n' +
' [username/password@] hostname | ip-address [:<port>]\n' +
' -a <address>, --broker-addr=<address>\n' +
/* TODO Connection options
' --sasl-mechanism=<mech>\n' +
' SASL mechanism for authentication (e.g. EXTERNAL,\n' +
' ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n' +
' automatically picks the most secure available\n' +
' mechanism - use this option to override.\n' +
' --ssl-certificate=<cert>\n' +
' Client SSL certificate (PEM Format)\n' +
' --ssl-key=<key> Client SSL private key (PEM Format)\n' +
' --ha-admin Allow connection to a HA backup broker.\n' +
*/
'\n' +
' Options for Listing Exchanges and Queues:\n' +
' --ignore-default Ignore the default exchange in exchange or queue list\n' +
'\n' +
' Options for Adding Exchanges and Queues:\n' +
' --alternate-exchange=<aexname>\n' +
' Name of the alternate-exchange for the new queue or\n' +
' exchange. Exchanges route messages to the alternate\n' +
' exchange if they are unable to route them elsewhere.\n' +
' Queues route messages to the alternate exchange if\n' +
' they are rejected by a subscriber or orphaned by queue\n' +
' deletion.\n' +
' --durable The new queue or exchange is durable.\n' +
' --replicate=<level>\n' +
' Enable automatic replication in a HA cluster. <level>\n' +
' is \'none\', \'configuration\' or \'all\').\n' +
'\n' +
' Options for Adding Queues:\n' +
' --file-count=<n> Number of files in queue\'s persistence journal\n' +
' --file-size=<n> File size in pages (64KiB/page)\n' +
' --max-queue-size=<n>\n' +
' Maximum in-memory queue size as bytes\n' +
' --max-queue-count=<n>\n' +
' Maximum in-memory queue size as a number of messages\n' +
' --limit-policy=<policy>\n' +
' Action to take when queue limit is reached\n' +
' --lvq-key=<key> Last Value Queue key\n' +
' --generate-queue-events=<n>\n' +
' If set to 1, every enqueue will generate an event that\n' +
' can be processed by registered listeners (e.g. for\n' +
' replication). If set to 2, events will be generated\n' +
' for enqueues and dequeues.\n' +
' --flow-stop-size=<n>\n' +
' Turn on sender flow control when the number of queued\n' +
' bytes exceeds this value.\n' +
' --flow-resume-size=<n>\n' +
' Turn off sender flow control when the number of queued\n' +
' bytes drops below this value.\n' +
' --flow-stop-count=<n>\n' +
' Turn on sender flow control when the number of queued\n' +
' messages exceeds this value.\n' +
' --flow-resume-count=<n>\n' +
' Turn off sender flow control when the number of queued\n' +
' messages drops below this value.\n' +
' --group-header=<header-name>\n' +
' Enable message groups. Specify name of header that\n' +
' holds group identifier.\n' +
' --shared-groups Allow message group consumption across multiple\n' +
' consumers.\n' +
' --argument=<NAME=VALUE>\n' +
' Specify a key-value pair to add to queue arguments\n' +
' --start-replica=<broker-url>\n' +
' Start replication from the same-named queue at\n' +
' <broker-url>\n' +
'\n' +
' Options for Adding Exchanges:\n' +
' --sequence Exchange will insert a \'qpid.msg_sequence\' field in\n' +
' the message header\n' +
' --ive Exchange will behave as an \'initial-value-exchange\',\n' +
' keeping a reference to the last message forwarded and\n' +
' enqueuing that message to newly bound queues.\n' +
'\n' +
' Options for Deleting Queues:\n' +
' --force Force delete of queue even if it\'s currently used or\n' +
' it\'s not empty\n' +
' --force-if-not-empty\n' +
' Force delete of queue even if it\'s not empty\n' +
' --force-if-used Force delete of queue even if it\'s currently used\n' +
'\n' +
' Options for Declaring Bindings:\n' +
' -f <file.xq>, --file=<file.xq>\n' +
' For XML Exchange bindings - specifies the name of a\n' +
' file containing an XQuery.\n' +
'\n' +
' Formatting options for \'list\' action:\n' +
' --show-property=<property-name>\n' +
' Specify a property of an object to be included in\n' +
' output\n';
var REPLICATE_LEVELS = {"none" : true, "configuration": true, "all": true};
var DEFAULT_PROPERTIES = {"exchange": {"name": true, "type": true, "durable": true},
"queue": {"name": true, "durable": true, "autoDelete": true}};
var getValue = function(r) {
var value = null;
if (r.length === 2) {
value = r[1];
if (!isNaN(value)) {
value = parseInt(value);
}
}
return value;
};
var config = {
_recursive : false,
_host : 'guest:guest@localhost:5673', // Note 5673 not 5672 as we use WebSocket transport.
_connTimeout : 10,
_ignoreDefault : false,
_altern_ex : null,
_durable : false,
_replicate : null,
_if_empty : true,
_if_unused : true,
_fileCount : null,
_fileSize : null,
_maxQueueSize : null,
_maxQueueCount : null,
_limitPolicy : null,
_msgSequence : false,
_lvq_key : null,
_ive : null,
_eventGeneration: null,
_file : null,
_flowStopCount : null,
_flowResumeCount: null,
_flowStopSize : null,
_flowResumeSize : null,
_msgGroupHeader : null,
_sharedMsgGroup : false,
_extra_arguments: [],
_start_replica : null,
_returnCode : 0,
_list_properties: null,
getOptions: function() {
var options = {};
for (var a = 0; a < this._extra_arguments.length; a++) {
var r = this._extra_arguments[a].split('=');
options[r[0]] = getValue(r);
}
return options;
}
};
var FILECOUNT = 'qpid.file_count';
var FILESIZE = 'qpid.file_size';
var MAX_QUEUE_SIZE = 'qpid.max_size';
var MAX_QUEUE_COUNT = 'qpid.max_count';
var POLICY_TYPE = 'qpid.policy_type';
var LVQ_KEY = 'qpid.last_value_queue_key';
var MSG_SEQUENCE = 'qpid.msg_sequence';
var IVE = 'qpid.ive';
var QUEUE_EVENT_GENERATION = 'qpid.queue_event_generation';
var FLOW_STOP_COUNT = 'qpid.flow_stop_count';
var FLOW_RESUME_COUNT = 'qpid.flow_resume_count';
var FLOW_STOP_SIZE = 'qpid.flow_stop_size';
var FLOW_RESUME_SIZE = 'qpid.flow_resume_size';
var MSG_GROUP_HDR_KEY = 'qpid.group_header_key';
var SHARED_MSG_GROUP = 'qpid.shared_msg_group';
var REPLICATE = 'qpid.replicate';
/**
* There are various arguments to declare that have specific program
* options in this utility. However there is now a generic mechanism for
* passing arguments as well. The SPECIAL_ARGS list contains the
* arguments for which there are specific program options defined
* i.e. the arguments for which there is special processing on add and
* list
*/
var SPECIAL_ARGS={};
SPECIAL_ARGS[FILECOUNT] = true;
SPECIAL_ARGS[FILESIZE] = true;
SPECIAL_ARGS[MAX_QUEUE_SIZE] = true;
SPECIAL_ARGS[MAX_QUEUE_COUNT] = true;
SPECIAL_ARGS[POLICY_TYPE] = true;
SPECIAL_ARGS[LVQ_KEY] = true;
SPECIAL_ARGS[MSG_SEQUENCE] = true;
SPECIAL_ARGS[IVE] = true;
SPECIAL_ARGS[QUEUE_EVENT_GENERATION] = true;
SPECIAL_ARGS[FLOW_STOP_COUNT] = true;
SPECIAL_ARGS[FLOW_RESUME_COUNT] = true;
SPECIAL_ARGS[FLOW_STOP_SIZE] = true;
SPECIAL_ARGS[FLOW_RESUME_SIZE] = true;
SPECIAL_ARGS[MSG_GROUP_HDR_KEY] = true;
SPECIAL_ARGS[SHARED_MSG_GROUP] = true;
SPECIAL_ARGS[REPLICATE] = true;
// Returns a String representation of an ObjectID.
var oid = function(id) {
return id._agent_epoch + ':' + id._object_name
};
// Check if the supplied name contains the supplied filter String.
var filterMatch = function(name, filter) {
if (filter === '') {
return true;
}
if (name.indexOf(filter) === -1) {
return false;
}
return true;
};
// Take the supplied List of QMF2 Objects and return a Map keyed by ObjectID.
var idMap = function(list) {
var map = {};
for (var i = 0; i < list.length; i++) {
var item = list[i];
map[oid(item._object_id)] = item;
}
return map;
};
// Pretty-print the supplied Object.
var renderObject = function(obj, list) {
if (!obj) {
return '';
}
var string = '';
var addComma = false;
for (var prop in obj) {
if (addComma) {
string += ', ';
}
if (obj.hasOwnProperty(prop)) {
if (list) {
if (SPECIAL_ARGS[prop]) continue;
string += " --argument " + prop + "=" + obj[prop];
} else {
string += "'" + prop + "'" + ": '" + obj[prop] + "'";
addComma = true;
}
}
}
if (addComma) {
return '{' + string + '}';
} else {
if (list) {
return string;
} else {
return '';
}
}
};
/**
* The following methods illustrate the QMF2 class query mechanism which returns
* the list of QMF Objects for the specified class that are currently present
* on the Broker. The Schema <qpid>/cpp/src/qpid/broker/management-schema.xml
* describes the properties and statistics of each Management Object.
* <p>
* One slightly subtle part of QMF is that certain Objects are associated via
* references, for example Binding contains queueRef and exchangeRef, which lets
* Objects link to each other using their _object_id property.
* <p>
* The implementation of these methods attempts to follow the same general flow
* as the equivalent method in the "canonical" python based qpid-config version
* but has the added complication that JavaScript is entirely asynchronous.
* The approach that has been taken is to use the correlator object that lets a
* callback function be registered via the "then" method and actually calls the
* callback when all of the requests specified in the request method have
* returned their results (which get passed as the callback parameter).
*/
var overview = function() {
brokerAgent.request(
// Send the QMF query requests for the specified classes.
brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
).then(function(objects) {
var exchanges = objects.exchange;
var queues = objects.queue;
console.log("Total Exchanges: " + exchanges.length);
var etype = {};
for (var i = 0; i < exchanges.length; i++) {
var exchange = exchanges[i]._values;
if (!etype[exchange.type]) {
etype[exchange.type] = 1;
} else {
etype[exchange.type]++;
}
}
for (var typ in etype) {
var pad = Array(16 - typ.length).join(' ');
console.log(pad + typ + ": " + etype[typ]);
}
console.log("\n Total Queues: " + queues.length);
var durable = 0;
for (var i = 0; i < queues.length; i++) {
var queue = queues[i]._values;
if (queue.durable) {
durable++;
}
}
console.log(" durable: " + durable);
console.log(" non-durable: " + (queues.length - durable));
brokerAgent.destroy();
});
};
var exchangeList = function(filter) {
brokerAgent.request(
// Send the QMF query requests for the specified classes.
brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
).then(function(objects) {
var exchanges = objects.exchange;
var exMap = idMap(exchanges);
var caption1 = "Type ";
var caption2 = "Exchange Name";
var maxNameLen = caption2.length;
var found = false;
for (var i = 0; i < exchanges.length; i++) {
var exchange = exchanges[i]._values;
if (filterMatch(exchange.name, filter)) {
if (exchange.name.length > maxNameLen) {
maxNameLen = exchange.name.length;
}
found = true;
}
}
if (!found) {
config._returnCode = 1;
return;
}
var pad = Array(maxNameLen + 1 - caption2.length).join(' ');
console.log(caption1 + caption2 + pad + " Attributes");
console.log(Array(maxNameLen + caption1.length + 13).join('='));
for (var i = 0; i < exchanges.length; i++) {
var exchange = exchanges[i]._values;
if (config._ignoreDefault && !exchange.name) continue;
if (filterMatch(exchange.name, filter)) {
var pad1 = Array(11 - exchange.type.length).join(' ');
var pad2 = Array(maxNameLen + 2 - exchange.name.length).join(' ');
var string = exchange.type + pad1 + exchange.name + pad2;
var args = exchange.arguments ? exchange.arguments : {};
if (exchange.durable) {
string += ' --durable';
}
if (args[REPLICATE]) {
string += ' --replicate=' + args[REPLICATE];
}
if (args[MSG_SEQUENCE]) {
string += ' --sequence';
}
if (args[IVE]) {
string += ' --ive';
}
if (exchange.altExchange) {
string += ' --alternate-exchange=' + exMap[oid(exchange.altExchange)]._values.name;
}
console.log(string);
}
}
brokerAgent.destroy();
});
};
var exchangeListRecurse = function(filter) {
brokerAgent.request(
// Send the QMF query requests for the specified classes.
brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
brokerAgent.getObjects('org.apache.qpid.broker', 'exchange'),
brokerAgent.getObjects('org.apache.qpid.broker', 'binding')
).then(function(objects) {
var exchanges = objects.exchange;
var bindings = objects.binding;
var queues = idMap(objects.queue);
for (var i = 0; i < exchanges.length; i++) {
var exchange = exchanges[i];
var exchangeId = oid(exchange._object_id);
exchange = exchange._values;
if (config._ignoreDefault && !exchange.name) continue;
if (filterMatch(exchange.name, filter)) {
console.log("Exchange '" + exchange.name + "' (" + exchange.type + ")");
for (var j = 0; j < bindings.length; j++) {
var bind = bindings[j]._values;
var exchangeRef = oid(bind.exchangeRef);
if (exchangeRef === exchangeId) {
var queue = queues[oid(bind.queueRef)];
var queueName = queue ? queue._values.name : "<unknown>";
console.log(" bind [" + bind.bindingKey + "] => " + queueName +
" " + renderObject(bind.arguments));
}
}
}
}
brokerAgent.destroy();
});
};
var queueList = function(filter) {
brokerAgent.request(
// Send the QMF query requests for the specified classes.
brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
brokerAgent.getObjects('org.apache.qpid.broker', 'exchange')
).then(function(objects) {
var queues = objects.queue;
var exMap = idMap(objects.exchange);
var caption = "Queue Name";
var maxNameLen = caption.length;
var found = false;
for (var i = 0; i < queues.length; i++) {
var queue = queues[i]._values;
if (filterMatch(queue.name, filter)) {
if (queue.name.length > maxNameLen) {
maxNameLen = queue.name.length;
}
found = true;
}
}
if (!found) {
config._returnCode = 1;
return;
}
var pad = Array(maxNameLen + 1 - caption.length).join(' ');
console.log(caption + pad + " Attributes");
console.log(Array(maxNameLen + caption.length + 3).join('='));
for (var i = 0; i < queues.length; i++) {
var queue = queues[i]._values;
if (filterMatch(queue.name, filter)) {
var pad2 = Array(maxNameLen + 2 - queue.name.length).join(' ');
var string = queue.name + pad2;
var args = queue.arguments ? queue.arguments : {};
if (queue.durable) {
string += ' --durable';
}
if (args[REPLICATE]) {
string += ' --replicate=' + args[REPLICATE];
}
if (queue.autoDelete) {
string += ' auto-del';
}
if (queue.exclusive) {
string += ' excl';
}
if (args[FILESIZE]) {
string += ' --file-size=' + args[FILESIZE];
}
if (args[FILECOUNT]) {
string += ' --file-count=' + args[FILECOUNT];
}
if (args[MAX_QUEUE_SIZE]) {
string += ' --max-queue-size=' + args[MAX_QUEUE_SIZE];
}
if (args[MAX_QUEUE_COUNT]) {
string += ' --max-queue-count=' + args[MAX_QUEUE_COUNT];
}
if (args[POLICY_TYPE]) {
string += ' --limit-policy=' + args[POLICY_TYPE].replace("_", "-");
}
if (args[LVQ_KEY]) {
string += ' --lvq-key=' + args[LVQ_KEY];
}
if (args[QUEUE_EVENT_GENERATION]) {
string += ' --generate-queue-events=' + args[QUEUE_EVENT_GENERATION];
}
if (queue.altExchange) {
string += ' --alternate-exchange=' + exMap[oid(queue.altExchange)]._values.name;
}
if (args[FLOW_STOP_SIZE]) {
string += ' --flow-stop-size=' + args[FLOW_STOP_SIZE];
}
if (args[FLOW_RESUME_SIZE]) {
string += ' --flow-resume-size=' + args[FLOW_RESUME_SIZE];
}
if (args[FLOW_STOP_COUNT]) {
string += ' --flow-stop-count=' + args[FLOW_STOP_COUNT];
}
if (args[FLOW_RESUME_COUNT]) {
string += ' --flow-resume-count=' + args[FLOW_RESUME_COUNT];
}
if (args[MSG_GROUP_HDR_KEY]) {
string += ' --group-header=' + args[MSG_GROUP_HDR_KEY];
}
if (args[SHARED_MSG_GROUP] === 1) {
string += ' --shared-groups';
}
string += ' ' + renderObject(args, true);
console.log(string);
}
}
brokerAgent.destroy();
});
};
var queueListRecurse = function(filter) {
brokerAgent.request(
// Send the QMF query requests for the specified classes.
brokerAgent.getObjects('org.apache.qpid.broker', 'queue'),
brokerAgent.getObjects('org.apache.qpid.broker', 'exchange'),
brokerAgent.getObjects('org.apache.qpid.broker', 'binding')
).then(function(objects) {
var queues = objects.queue;
var bindings = objects.binding;
var exchanges = idMap(objects.exchange);
for (var i = 0; i < queues.length; i++) {
var queue = queues[i];
var queueId = oid(queue._object_id);
queue = queue._values;
if (filterMatch(queue.name, filter)) {
console.log("Queue '" + queue.name + "'");
for (var j = 0; j < bindings.length; j++) {
var bind = bindings[j]._values;
var queueRef = oid(bind.queueRef);
if (queueRef === queueId) {
var exchange = exchanges[oid(bind.exchangeRef)];
var exchangeName = "<unknown>";
if (exchange) {
exchangeName = exchange._values.name;
if (exchangeName === '') {
if (config._ignoreDefault) continue;
exchangeName = "''";
}
}
console.log(" bind [" + bind.bindingKey + "] => " + exchangeName +
" " + renderObject(bind.arguments));
}
}
}
}
brokerAgent.destroy();
});
};
/**
* The following methods implement adding and deleting various Broker Management
* Objects via QMF. Although <qpid>/cpp/src/qpid/broker/management-schema.xml
* describes the basic method schema, for example:
* <method name="create" desc="Create an object of the specified type">
* <arg name="type" dir="I" type="sstr" desc="The type of object to create"/>
* <arg name="name" dir="I" type="sstr" desc="The name of the object to create"/>
* <arg name="properties" dir="I" type="map" desc="Type specific object properties"/>
* <arg name="strict" dir="I" type="bool" desc="If specified, treat unrecognised object properties as an error"/ >
* </method>
*
* <method name="delete" desc="Delete an object of the specified type">
* <arg name="type" dir="I" type="sstr" desc="The type of object to delete"/>
* <arg name="name" dir="I" type="sstr" desc="The name of the object to delete"/>
* <arg name="options" dir="I" type="map" desc="Type specific object options for deletion"/>
* </method>
*
* What the schema doesn't do however is to explain what the properties/options
* Map values actually mean, unfortunately these aren't documented anywhere so
* the only option is to look in the code, the best place to look is in:
* <qpid>/cpp/src/qpid/broker/Broker.cpp, the method Broker::ManagementMethod is
* the best place to start, then Broker::createObject and Broker::deleteObject
* even then it's pretty hard to figure out all that is possible.
*/
var handleMethodResponse = function(response, dontStop) {
if (response._arguments) {
//console.log(response._arguments);
} if (response._values) {
console.error("Exception from Agent: " + renderObject(response._values));
}
// Mostly we want to stop the Messenger Event loop and exit when a QMF method
// returns, but sometimes we don't, the dontStop flag prevents this behaviour.
if (!dontStop) {
brokerAgent.destroy();
}
}
var addExchange = function(args) {
if (args.length < 2) {
usage();
}
var etype = args[0];
var ename = args[1];
var declArgs = {};
declArgs['exchange-type'] = etype;
for (var a = 0; a < config._extra_arguments.length; a++) {
var r = config._extra_arguments[a].split('=');
declArgs[r[0]] = getValue(r);
}
if (config._msgSequence) {
declArgs[MSG_SEQUENCE] = 1;
}
if (config._ive) {
declArgs[IVE] = 1;
}
if (config._altern_ex) {
declArgs['alternate-exchange'] = config._altern_ex;
}
if (config._durable) {
declArgs['durable'] = 1;
}
if (config._replicate) {
declArgs[REPLICATE] = config._replicate;
}
brokerAgent.request(
// We invoke the CRUD methods on the broker object.
brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
).then(function(objects) {
var broker = objects.broker[0];
brokerAgent.request(
brokerAgent.invokeMethod(broker, 'create', {
"type": "exchange",
"name": ename,
"properties": declArgs,
"strict": true})
).then(handleMethodResponse);
});
};
var delExchange = function(args) {
if (args.length < 1) {
usage();
}
var ename = args[0];
brokerAgent.request(
// We invoke the CRUD methods on the broker object.
brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
).then(function(objects) {
var broker = objects.broker[0];
brokerAgent.request(
brokerAgent.invokeMethod(broker, 'delete', {
"type": "exchange",
"name": ename})
).then(handleMethodResponse);
});
};
var addQueue = function(args) {
if (args.length < 1) {
usage();
}
var qname = args[0];
var declArgs = {};
for (var a = 0; a < config._extra_arguments.length; a++) {
var r = config._extra_arguments[a].split('=');
declArgs[r[0]] = getValue(r);
}
if (config._durable) {
// allow the default fileCount and fileSize specified
// in qpid config file to take prededence
if (config._fileCount) {
declArgs[FILECOUNT] = config._fileCount;
}
if (config._fileSize) {
declArgs[FILESIZE] = config._fileSize;
}
}
if (config._maxQueueSize != null) {
declArgs[MAX_QUEUE_SIZE] = config._maxQueueSize;
}
if (config._maxQueueCount != null) {
declArgs[MAX_QUEUE_COUNT] = config._maxQueueCount;
}
if (config._limitPolicy) {
if (config._limitPolicy === 'none') {
} else if (config._limitPolicy === 'reject') {
declArgs[POLICY_TYPE] = 'reject';
} else if (config._limitPolicy === 'ring') {
declArgs[POLICY_TYPE] = 'ring';
}
}
if (config._lvq_key) {
declArgs[LVQ_KEY] = config._lvq_key;
}
if (config._eventGeneration) {
declArgs[QUEUE_EVENT_GENERATION] = config._eventGeneration;
}
if (config._flowStopSize != null) {
declArgs[FLOW_STOP_SIZE] = config._flowStopSize;
}
if (config._flowResumeSize != null) {
declArgs[FLOW_RESUME_SIZE] = config._flowResumeSize;
}
if (config._flowStopCount != null) {
declArgs[FLOW_STOP_COUNT] = config._flowStopCount;
}
if (config._flowResumeCount != null) {
declArgs[FLOW_RESUME_COUNT] = config._flowResumeCount;
}
if (config._msgGroupHeader) {
declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader;
}
if (config._sharedMsgGroup) {
declArgs[SHARED_MSG_GROUP] = 1;
}
if (config._altern_ex) {
declArgs['alternate-exchange'] = config._altern_ex;
}
if (config._durable) {
declArgs['durable'] = 1;
}
if (config._replicate) {
declArgs[REPLICATE] = config._replicate;
}
// This block is a little complex and untidy, the real issue is that the
// correlator object isn't as good as a real Promise and doesn't support
// chaining of "then" calls, so where we have complex dependencies we still
// get somewhat into "callback hell". TODO improve the correlator.
brokerAgent.request(
// We invoke the CRUD methods on the broker object.
brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
).then(function(objects) {
var broker = objects.broker[0];
brokerAgent.request(
brokerAgent.invokeMethod(broker, 'create', {
"type": "queue",
"name": qname,
"properties": declArgs,
"strict": true})
).then(function(response) {
if (config._start_replica) {
handleMethodResponse(response, true); // The second parameter prevents exiting.
// TODO test this stuff!
brokerAgent.request(
brokerAgent.getObjects('org.apache.qpid.ha', 'habroker') // Not sure if this is correct
).then(function(objects) {
if (objects.habroker.length > 0) {
var habroker = objects.habroker[0];
brokerAgent.request(
brokerAgent.invokeMethod(habroker, 'replicate', {
"broker": config._start_replica,
"queue": qname})
).then(handleMethodResponse);
} else {
brokerAgent.destroy();
}
});
} else {
handleMethodResponse(response);
}
});
});
};
var delQueue = function(args) {
if (args.length < 1) {
usage();
}
var qname = args[0];
brokerAgent.request(
// We invoke the CRUD methods on the broker object.
brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
).then(function(objects) {
var broker = objects.broker[0];
brokerAgent.request(
brokerAgent.invokeMethod(broker, 'delete', {
"type": "queue",
"name": qname,
"options": {"if_empty": config._if_empty,
"if_unused": config._if_unused}})
).then(handleMethodResponse);
});
};
var snarf_header_args = function(args) {
if (args.length < 2) {
console.log("Invalid args to bind headers: need 'any'/'all' plus conditions");
return false;
}
var op = args[0];
if (op === 'all' || op === 'any') {
kv = {};
var bindings = Array.prototype.slice.apply(args, [1]);
for (var i = 0; i < bindings.length; i++) {
var binding = bindings[i];
binding = binding.split(",")[0];
binding = binding.split("=");
kv[binding[0]] = binding[1];
}
kv['x-match'] = op;
return kv;
} else {
console.log("Invalid condition arg to bind headers, need 'any' or 'all', not '" + op + "'");
return false;
}
};
var bind = function(args) {
if (args.length < 2) {
usage();
}
var ename = args[0];
var qname = args[1];
var key = '';
if (args.length > 2) {
key = args[2];
}
brokerAgent.request(
// We invoke the CRUD methods on the broker object.
brokerAgent.getObjects('org.apache.qpid.broker', 'broker'),
brokerAgent.getObjects('org.apache.qpid.broker', 'exchange') // Get exchanges to look up exchange type.
).then(function(objects) {
var exchanges = objects.exchange;
var etype = '';
for (var i = 0; i < exchanges.length; i++) {
var exchange = exchanges[i]._values;
if (exchange.name === ename) {
etype = exchange.type;
break;
}
}
// type of the xchg determines the processing of the rest of
// argv. if it's an xml xchg, we want to find a file
// containing an x-query, and pass that. if it's a headers
// exchange, we need to pass either "any" or all, followed by a
// map containing key/value pairs. if neither of those, extra
// args are ignored.
var declArgs = {};
if (etype === 'xml') {
} else if (etype === 'headers') {
declArgs = snarf_header_args(Array.prototype.slice.apply(args, [3]));
}
//console.log(declArgs);
if (typeof declArgs !== 'object') {
process.exit(1);
}
var broker = objects.broker[0];
brokerAgent.request(
brokerAgent.invokeMethod(broker, 'create', {
"type": "binding",
"name": ename + '/' + qname + '/' + key,
"properties": declArgs,
"strict": true})
).then(handleMethodResponse);
});
/*
ok = True
_args = {}
if not res:
pass
elif res.type == "xml":
# this checks/imports the -f arg
[ok, xquery] = snarf_xquery_args()
_args = { "xquery" : xquery }
else:
if res.type == "headers":
[ok, op, kv] = snarf_header_args(args[3:])
_args = kv
_args["x-match"] = op
if not ok:
sys.exit(1)
self.broker.bind(ename, qname, key, _args)
*/
};
var unbind = function(args) {
if (args.length < 2) {
usage();
}
var ename = args[0];
var qname = args[1];
var key = '';
if (args.length > 2) {
key = args[2];
}
brokerAgent.request(
// We invoke the CRUD methods on the broker object.
brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
).then(function(objects) {
var broker = objects.broker[0];
brokerAgent.request(
brokerAgent.invokeMethod(broker, 'delete', {
"type": "binding",
"name": ename + '/' + qname + '/' + key})
).then(handleMethodResponse);
});
};
/**
* The following methods are "generic" create and delete methods to for arbitrary
* Management Objects e.g. Incoming, Outgoing, Domain, Topic, QueuePolicy,
* TopicPolicy etc. use --argument k1=v1 --argument k2=v2 --argument k3=v3 to
* pass arbitrary arguments as key/value pairs to the Object being created/deleted,
* for example to add a topic object that uses the fanout exchange:
* ./qpid-config.js add topic fanout --argument exchange=amq.fanout \
* --argument qpid.max_size=1000000 --argument qpid.policy_type=ring
*/
var createObject = function(type, name, args) {
brokerAgent.request(
// We invoke the CRUD methods on the broker object.
brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
).then(function(objects) {
var broker = objects.broker[0];
brokerAgent.request(
// Create an object of the specified type.
brokerAgent.invokeMethod(broker, 'create', {
"type": type,
"name": name,
"properties": args,
"strict": true})
).then(handleMethodResponse);
});
};
var deleteObject = function(type, name, args) {
brokerAgent.request(
// We invoke the CRUD methods on the broker object.
brokerAgent.getObjects('org.apache.qpid.broker', 'broker')
).then(function(objects) {
var broker = objects.broker[0];
brokerAgent.request(
// Create an object of the specified type and name.
brokerAgent.invokeMethod(broker, 'delete', {
"type": type,
"name": name,
"options": args})
).then(handleMethodResponse);
});
};
/**
* This is a "generic" mechanism for listing arbitrary Management Objects.
*/
var listObjects = function(type) {
brokerAgent.request(
brokerAgent.getObjects('org.apache.qpid.broker', type)
).then(function(objects) {
// The correlator passes an object containing responses for all of the
// supplied requests so we index it by the supplied type to get our response.
objects = objects[type];
// Collect available attributes, stringify the values and compute the max
// length of the value of each attribute so that we can later create a table.
var attributes = {};
var lengths = {};
for (var i = 0; i < objects.length; i++) {
var object = objects[i];
object = object._values;
for (var prop in object) {
if (typeof object[prop] === 'object') { // Stringify Object properties.
// Check if property is an ObjectID (reference property),
// if so replace with the "name" part of the OID.
if (object[prop]['_object_name']) {
var parts = object[prop]['_object_name'].split(':');
object[prop] = parts[parts.length - 1];
} else {
// Stringify general Object properties.
object[prop] = renderObject(object[prop]);
}
} else {
object[prop] = object[prop].toString(); // Stringify other property types.
}
if (!lengths[prop] || object[prop].length > lengths[prop]) { // Compute lengths.
lengths[prop] = object[prop].length > prop.length ? object[prop].length : prop.length;
}
if (!config._list_properties || config._list_properties[prop]) { // Do we want this property?
attributes[prop] = true;
}
}
}
if (!config._list_properties && DEFAULT_PROPERTIES[type]) {
attributes = DEFAULT_PROPERTIES[type];
}
// Using the information we've previously prepared now render a table
// showing the required property values.
var desired = [];
var header = ''; // Table header showing the property names.
if (attributes['name']) {
desired.push('name');
delete attributes['name'];
header += 'name' + Array(lengths['name'] + 2 - 4).join(' ');
}
for (var prop in attributes) {
desired.push(prop);
header += prop + Array(lengths[prop] + 2 - prop.length).join(' ');
}
console.log("Objects of type '" + type + "'");
console.log(header);
console.log(Array(header.length).join('='));
for (var i = 0; i < objects.length; i++) {
var object = objects[i];
object = object._values;
var string = '';
for (var j = 0; j < desired.length; j++) {
var key = desired[j];
string += object[key] + Array(lengths[key] + 2 - object[key].length).join(' ');
}
console.log(string);
}
brokerAgent.destroy();
});
};
var reloadAcl = function() {
brokerAgent.request(
brokerAgent.getObjects('org.apache.qpid.acl', 'acl')
).then(function(objects) {
if (objects.acl.length > 0) {
var acl = objects.acl[0];
brokerAgent.request(
// Create an object of the specified type.
brokerAgent.invokeMethod(acl, 'reloadACLFile', {})
).then(handleMethodResponse);
} else {
console.log("Failed: No ACL Loaded in Broker");
brokerAgent.destroy();
}
});
};
/********************* process command line options **********************/
var params = [];
var extra_arguments = [];
var args = process.argv.slice(2);
if (args.length > 0) {
if (args[0] === '-h' || args[0] === '--help') {
console.log(_usage);
console.log(_description);
console.log(_options);
process.exit(0);
}
for (var i = 0; i < args.length; i++) {
var arg = args[i];
if (arg === '-r' || arg === '--recursive') {
config._recursive = true;
} else if (arg === '--ignore-default') {
config._ignoreDefault = true;
} else if (arg === '--durable') {
config._durable = true;
} else if (arg === '--shared-groups') {
config._sharedMsgGroup = true;
} else if (arg === '--sequence') {
config._sequence = true;
} else if (arg === '--ive') {
config._ive = true;
} else if (arg === '--force') {
config._if_empty = false;
config._if_unused = false;
} else if (arg === '--force-if-not-empty') {
config._if_empty = false;
} else if (arg === '--force-if-used') {
config._if_unused = false;
} else if (arg === '--sequence') {
config._msgSequence = true;
} else if (arg.charAt(0) === '-') {
i++;
var val = args[i];
if (arg === '-t' || arg === '--timeout') {
config._connTimeout = parseInt(val);
if (config._connTimeout === 0) {
config._connTimeout = null;
}
} else if (arg === '-b' || arg === '--broker' || arg === '-a' || arg === '--broker-addr') {
if (val != null) {
config._host = val;
}
} else if (arg === '--alternate-exchange') {
config._altern_ex = val;
} else if (arg === '--replicate') {
if (!REPLICATE_LEVELS[val]) {
console.error("Invalid replication level " + val + ", should be one of 'none', 'configuration' or 'all'");
}
config._replicate = val;
} else if (arg === '--file-count') {
config._fileCount = parseInt(val);
} else if (arg === '--file-size') {
config._fileSize = parseInt(val);
} else if (arg === '--max-queue-size') {
config._maxQueueSize = parseInt(val);
} else if (arg === '--max-queue-count') {
config._maxQueueCount = parseInt(val);
} else if (arg === '--limit-policy') {
config._limitPolicy = val;
} else if (arg === '--lvq-key') {
config._lvq_key = val;
} else if (arg === '--generate-queue-events') {
config._eventGeneration = parseInt(val);
} else if (arg === '--flow-stop-size') {
config._flowStopSize = parseInt(val);
} else if (arg === '--flow-resume-size') {
config._flowResumeSize = parseInt(val);
} else if (arg === '--flow-stop-count') {
config._flowStopCount = parseInt(val);
} else if (arg === '--flow-resume-count') {
config._flowResumeCount = parseInt(val);
} else if (arg === '--group-header') {
config._msgGroupHeader = val;
} else if (arg === '--argument') {
extra_arguments.push(val);
} else if (arg === '--start-replica') {
config._start_replica = val;
} else if (arg === '--f' || arg === '--file') { // TODO Won't work in node.js
config._file = val;
} else if (arg === '--show-property') {
if (config._list_properties === null) {
config._list_properties = {};
}
config._list_properties[val] = true;
}
} else {
params.push(arg);
}
}
}
config._extra_arguments = extra_arguments;
// The command only *actually* gets called when the QMF connection has actually
// been established so we wrap up the function we want to get called in a lambda.
var command = function() {overview();};
if (params.length > 0) {
var cmd = params[0];
var modifier = '';
if (params.length > 1) {
modifier = params[1];
}
if (cmd === 'exchanges') {
if (config._recursive) {
command = function() {exchangeListRecurse(modifier);};
} else {
command = function() {exchangeList(modifier);};
}
} else if (cmd === 'queues') {
if (config._recursive) {
command = function() {queueListRecurse(modifier);};
} else {
command = function() {queueList(modifier);};
}
} else if (cmd === 'add') {
if (modifier === 'exchange') {
command = function() {addExchange(Array.prototype.slice.apply(params, [2]));};
} else if (modifier === 'queue') {
command = function() {addQueue(Array.prototype.slice.apply(params, [2]));};
} else if (params.length > 2) {
command = function() {createObject(modifier, params[2], config.getOptions());};
} else {
usage();
}
} else if (cmd === 'del') {
if (modifier === 'exchange') {
command = function() {delExchange(Array.prototype.slice.apply(params, [2]));};
} else if (modifier === 'queue') {
command = function() {delQueue(Array.prototype.slice.apply(params, [2]));};
} else if (params.length > 2) {
command = function() {deleteObject(modifier, params[2], {});};
} else {
usage();
}
} else if (cmd === 'bind') {
command = function() {bind(Array.prototype.slice.apply(params, [1]));};
} else if (cmd === 'unbind') {
command = function() {unbind(Array.prototype.slice.apply(params, [1]));};
} else if (cmd === 'reload-acl') {
command = function() {reloadAcl();};
} else if (cmd === 'list' && params.length > 1) {
command = function() {listObjects(modifier);};
} else {
usage();
}
}
//console.log(config._host);
brokerAgent.addConnection(config._host, command);
} else {
console.error("qpid-config.js should be run in Node.js");
}