blob: dc4ec5786aac817128a7443ce6b8d8b0db2f9682 [file] [log] [blame]
#!/usr/bin/env node
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
"use strict";
//Check if the environment is Node.js and if not log an error and exit.
if (typeof process !== 'object' || typeof require !== 'function') {
console.error("Receiver.js should be run in Node.js");
process.exit(-1);
}
var uuid = require("node-uuid");
var args = process.argv.slice(2);
if (args.length !== 4) {
console.error("ERROR: Sender.js needs 4 arguments:");
console.error(" 1. Broker address (ip-addr:port)");
console.error(" 2. Queue name");
console.error(" 3. AMQP type");
console.error(" 4. Number of expected values");
process.exit(-1);
}
function Receiver(brokerAddr, brokerPort, queueName, amqpType, numTestValues) {
this.amqpType = amqpType;
this.received = 0;
this.expected = numTestValues;
this.receivedValueList = [];
this.container = require('rhea');
this.container.connect({'host':brokerAddr, 'port':brokerPort}).open_receiver(queueName);
this.processMessage = function(msgBody) {
// console.log("processMessage: amqpType=" + this.amqpType + "; msgBody=" + msgBody);
switch(this.amqpType) {
case "null": this.receivedValueList.push(this.decodeNull(msgBody)); break;
case "boolean": this.receivedValueList.push(this.decodeBoolean(msgBody)); break;
case "ubyte":
case "ushort":
case "uint":
case "ulong":
case "decimal32":
case "decimal64":
case "decimal128":
case "timestamp": this.receivedValueList.push(this.decodeUnsigned(msgBody)); break;
case "byte":
case "short":
case "int":
case "long": this.receivedValueList.push(this.decodeSigned(msgBody)); break;
case "float": this.receivedValueList.push(this.decodeFloat(msgBody)); break;
case "double": this.receivedValueList.push(this.decodeDouble(msgBody)); break;
case "char": this.receivedValueList.push(this.decodeChar(msgBody)); break;
case "uuid": this.receivedValueList.push(this.decodeUuid(msgBody)); break;
case "binary": this.receivedValueList.push(this.decodeBinary(msgBody)); break;
case "string": this.receivedValueList.push(this.decodeString(msgBody)); break;
case "symbol": this.receivedValueList.push(this.decodeSymbol(msgBody)); break;
case "list": this.receivedValueList.push(this.decodeList(msgBody)); break;
case "map": this.receivedValueList.push(this.decodeMap(msgBody)); break;
case "array": this.receivedValueList.push(this.decodeArray(msgBody)); break;
default: throw "Unknown AMQP type: " + this.amqpType;
}
};
this.decodeNull = function (msgBody) {
return "None";
};
this.decodeBoolean = function(msgBody) {
return msgBody ? "True" : "False";
};
this.decodeUnsigned = function(msgBody) {
return "0x" + msgBody.toString(Buffer.isBuffer(msgBody) ? 'hex' : 16);
};
this.decodeSigned = function(msgBody) {
if (Buffer.isBuffer(msgBody)) {
if (msgBody[0] & 0x80) { // sign bit set
msgBody[0] &= 0x80;
return "-0x" + msgBody.toString('hex');
} else {
return "0x" + msgBody.toString('hex');
}
} else {
if (msgBody < 0) {
return "-0x" + (-msgBody).toString(16);
} else {
return "0x" + msgBody.toString(16);
}
}
};
this.decodeFloat = function(msgBody) {
// Buffer.writeFloatBE() does not support -NaN (ignores sign)
var buf = new Buffer(4);
buf.writeFloatBE(msgBody);
return "0x" + buf.toString('hex');
};
this.decodeDouble = function(msgBody) {
// Buffer.writeDoubleBE() does not support -NaN (ignores sign)
var buf = new Buffer(8);
buf.writeDoubleBE(msgBody);
return "0x" + buf.toString('hex');
};
// UTF32LE char per AMQP spec
this.decodeChar = function(msgBody) {
if (Buffer.isBuffer(msgBody)) {
if (msgBody[0] === 0 && msgBody[1] === 0 && msgBody[2] === 0 && msgBody[3] >= 32 && msgBody[3] <= 126) {
// Printable single ASCII char - return just the char
return String.fromCharCode(msgBody[3]);
}
return "0x" + this.buffer2HexString(msgBody, false);
} else {
throw "AMQP type char message body is not Buffer";
}
};
this.decodeUuid = function(msgBody) {
return uuid.unparse(msgBody);
};
this.decodeBinary = function(msgBody) {
return msgBody.toString();
};
this.decodeString = function(msgBody) {
return msgBody;
};
this.decodeSymbol = function(msgBody) {
return msgBody;
};
this.decodeList = function(msgBody) {
return msgBody; // TODO: decode list
};
this.decodeMap = function(msgBody) {
return msgBody; // TODO: decode map
};
this.decodeArray = function(msgBody) {
return msgBody; // TODO: decode array
};
this.buffer2HexString = function(buff, pad) {
var hexStr = "";
var first = true;
for (var i = 0; i < buff.length; i++) {
if (pad || buff[i] > 0) {
hexStr += (pad || !first) ? ("0" + buff[i].toString(16)).substr(-2) : buff[i].toString(16);
first = false;
}
}
return hexStr;
};
this.printResult = function() {
console.log(this.amqpType);
console.log(JSON.stringify(this.receivedValueList));
};
this.container.on('message', function (context) {
if (receiver.expected === 0 || receiver.received < receiver.expected) {
receiver.processMessage(context.message.body);
if (++receiver.received === receiver.expected) {
context.receiver.detach();
context.connection.close();
receiver.printResult();
}
}
});
}
var colonPos = args[0].indexOf(":");
var receiver = new Receiver(args[0].slice(0, colonPos), args[0].slice(colonPos+1), args[1], args[2], parseInt(args[3], 10));