| #!/usr/bin/env node |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| "use strict"; |
| |
| //Check if the environment is Node.js and if not log an error and exit. |
| if (typeof process !== 'object' || typeof require !== 'function') { |
| console.error("Receiver.js should be run in Node.js"); |
| process.exit(-1); |
| } |
| |
| var uuid = require("node-uuid"); |
| |
| var args = process.argv.slice(2); |
| if (args.length !== 4) { |
| console.error("ERROR: Sender.js needs 4 arguments:"); |
| console.error(" 1. Broker address (ip-addr:port)"); |
| console.error(" 2. Queue name"); |
| console.error(" 3. AMQP type"); |
| console.error(" 4. Number of expected values"); |
| process.exit(-1); |
| } |
| |
| function Receiver(brokerAddr, brokerPort, queueName, amqpType, numTestValues) { |
| this.amqpType = amqpType; |
| this.received = 0; |
| this.expected = numTestValues; |
| this.receivedValueList = []; |
| this.container = require('rhea'); |
| |
| this.container.connect({'host':brokerAddr, 'port':brokerPort}).open_receiver(queueName); |
| |
| this.processMessage = function(msgBody) { |
| // console.log("processMessage: amqpType=" + this.amqpType + "; msgBody=" + msgBody); |
| switch(this.amqpType) { |
| case "null": this.receivedValueList.push(this.decodeNull(msgBody)); break; |
| case "boolean": this.receivedValueList.push(this.decodeBoolean(msgBody)); break; |
| case "ubyte": |
| case "ushort": |
| case "uint": |
| case "ulong": |
| case "decimal32": |
| case "decimal64": |
| case "decimal128": |
| case "timestamp": this.receivedValueList.push(this.decodeUnsigned(msgBody)); break; |
| case "byte": |
| case "short": |
| case "int": |
| case "long": this.receivedValueList.push(this.decodeSigned(msgBody)); break; |
| case "float": this.receivedValueList.push(this.decodeFloat(msgBody)); break; |
| case "double": this.receivedValueList.push(this.decodeDouble(msgBody)); break; |
| case "char": this.receivedValueList.push(this.decodeChar(msgBody)); break; |
| case "uuid": this.receivedValueList.push(this.decodeUuid(msgBody)); break; |
| case "binary": this.receivedValueList.push(this.decodeBinary(msgBody)); break; |
| case "string": this.receivedValueList.push(this.decodeString(msgBody)); break; |
| case "symbol": this.receivedValueList.push(this.decodeSymbol(msgBody)); break; |
| case "list": this.receivedValueList.push(this.decodeList(msgBody)); break; |
| case "map": this.receivedValueList.push(this.decodeMap(msgBody)); break; |
| case "array": this.receivedValueList.push(this.decodeArray(msgBody)); break; |
| default: throw "Unknown AMQP type: " + this.amqpType; |
| } |
| }; |
| |
| this.decodeNull = function (msgBody) { |
| return "None"; |
| }; |
| |
| this.decodeBoolean = function(msgBody) { |
| return msgBody ? "True" : "False"; |
| }; |
| |
| this.decodeUnsigned = function(msgBody) { |
| return "0x" + msgBody.toString(Buffer.isBuffer(msgBody) ? 'hex' : 16); |
| }; |
| |
| this.decodeSigned = function(msgBody) { |
| if (Buffer.isBuffer(msgBody)) { |
| if (msgBody[0] & 0x80) { // sign bit set |
| msgBody[0] &= 0x80; |
| return "-0x" + msgBody.toString('hex'); |
| } else { |
| return "0x" + msgBody.toString('hex'); |
| } |
| } else { |
| if (msgBody < 0) { |
| return "-0x" + (-msgBody).toString(16); |
| } else { |
| return "0x" + msgBody.toString(16); |
| } |
| } |
| }; |
| |
| this.decodeFloat = function(msgBody) { |
| // Buffer.writeFloatBE() does not support -NaN (ignores sign) |
| var buf = new Buffer(4); |
| buf.writeFloatBE(msgBody); |
| return "0x" + buf.toString('hex'); |
| }; |
| |
| this.decodeDouble = function(msgBody) { |
| // Buffer.writeDoubleBE() does not support -NaN (ignores sign) |
| var buf = new Buffer(8); |
| buf.writeDoubleBE(msgBody); |
| return "0x" + buf.toString('hex'); |
| }; |
| |
| // UTF32LE char per AMQP spec |
| this.decodeChar = function(msgBody) { |
| if (Buffer.isBuffer(msgBody)) { |
| if (msgBody[0] === 0 && msgBody[1] === 0 && msgBody[2] === 0 && msgBody[3] >= 32 && msgBody[3] <= 126) { |
| // Printable single ASCII char - return just the char |
| return String.fromCharCode(msgBody[3]); |
| } |
| return "0x" + this.buffer2HexString(msgBody, false); |
| } else { |
| throw "AMQP type char message body is not Buffer"; |
| } |
| }; |
| |
| this.decodeUuid = function(msgBody) { |
| return uuid.unparse(msgBody); |
| }; |
| |
| this.decodeBinary = function(msgBody) { |
| return msgBody.toString(); |
| }; |
| |
| this.decodeString = function(msgBody) { |
| return msgBody; |
| }; |
| |
| this.decodeSymbol = function(msgBody) { |
| return msgBody; |
| }; |
| |
| this.decodeList = function(msgBody) { |
| return msgBody; // TODO: decode list |
| }; |
| |
| this.decodeMap = function(msgBody) { |
| return msgBody; // TODO: decode map |
| }; |
| |
| this.decodeArray = function(msgBody) { |
| return msgBody; // TODO: decode array |
| }; |
| |
| this.buffer2HexString = function(buff, pad) { |
| var hexStr = ""; |
| var first = true; |
| for (var i = 0; i < buff.length; i++) { |
| if (pad || buff[i] > 0) { |
| hexStr += (pad || !first) ? ("0" + buff[i].toString(16)).substr(-2) : buff[i].toString(16); |
| first = false; |
| } |
| } |
| return hexStr; |
| }; |
| |
| this.printResult = function() { |
| console.log(this.amqpType); |
| console.log(JSON.stringify(this.receivedValueList)); |
| }; |
| |
| this.container.on('message', function (context) { |
| if (receiver.expected === 0 || receiver.received < receiver.expected) { |
| receiver.processMessage(context.message.body); |
| if (++receiver.received === receiver.expected) { |
| context.receiver.detach(); |
| context.connection.close(); |
| receiver.printResult(); |
| } |
| } |
| }); |
| } |
| |
| var colonPos = args[0].indexOf(":"); |
| var receiver = new Receiver(args[0].slice(0, colonPos), args[0].slice(colonPos+1), args[1], args[2], parseInt(args[3], 10)); |