blob: 475d6b5cd978ff08f0f04a360ef9b8c02543bc9f [file] [log] [blame]
#!/usr/bin/env node
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
"use strict";
var uuid = require("node-uuid");
var amqp_types = require('rhea/lib/types.js');
//Check if the environment is Node.js and if not log an error and exit.
if (typeof process !== 'object' || typeof require !== 'function') {
console.error("Receiver.js should be run in Node.js");
process.exit(-1);
}
var args = process.argv.slice(2);
if (args.length !== 4) {
console.error("ERROR: Sender.js needs 4 arguments:");
console.error(" 1. Broker address (ip-addr:port)");
console.error(" 2. Queue name");
console.error(" 3. AMQP type");
console.error(" 4. JSON string containing test values");
process.exit(-1);
}
var AmqpArrayTypes = {
"boolean": 0x56,
"ubyte": 0x50,
"ushort": 0x60,
"uint": 0x70,
"ulong": 0x80,
"byte": 0x51,
"short": 0x61,
"int": 0x71,
"long": 0x81,
"float": 0x72,
"double": 0x82,
"decimal32": 0x74,
"decimal64": 0x84,
"decimal128": 0x94,
"char": 0x73,
"timestamp": 0x83,
"uuid": 0x98,
"binary": 0xb0,
"string": 0xb1,
"symbol": 0xb3
};
function Sender(brokerAddr, brokerPort, queueName, amqpType, testValues) {
this.amqpType = amqpType;
this.testValues = testValues;
this.sent = 0;
this.confirmed = 0;
this.total = testValues.length;
this.container = require('rhea');
this.connection = this.container.connect({'host':brokerAddr, 'port':brokerPort}); //.open_sender(queueName);
this.connection.open_sender(queueName);
this.createMessage = function(testValue) {
//console.log("createMessage: amqpType=" + this.amqpType + "; testValue=" + testValue);
switch(this.amqpType) {
case "null": return {body: this.encodeNull(testValue)};
case "boolean": return {body: amqp_types.wrap_boolean(this.encodeBoolean(testValue))};
case "ubyte": return {body: amqp_types.wrap_ubyte(this.encodeUbyte(testValue))};
case "ushort": return {body: amqp_types.wrap_ushort(this.encodeUshort(testValue))};
case "uint": return {body: amqp_types.wrap_uint(this.encodeUint(testValue))};
case "ulong": return {body: amqp_types.wrap_ulong(this.encodeUlong(testValue))};
case "byte": return {body: amqp_types.wrap_byte(this.encodeByte(testValue))};
case "short": return {body: amqp_types.wrap_short(this.encodeShort(testValue))};
case "int": return {body: amqp_types.wrap_int(this.encodeInt(testValue))};
case "long": return {body: amqp_types.wrap_long(this.encodeLong(testValue))};
case "float": return {body: amqp_types.wrap_float(this.encodeFloat(testValue))};
case "double": return {body: amqp_types.wrap_double(this.encodeDouble(testValue))};
case "decimal32": return {body: new amqp_types.Decimal32(this.encodeDecimal32(testValue))};
case "decimal64": return {body: new amqp_types.Decimal64(this.encodeDecimal64(testValue))};
case "decimal128": return {body: new amqp_types.Decimal128(this.encodeDecimal128(testValue))};
case "char": return {body: new amqp_types.CharUTF32(this.encodeChar(testValue))};
case "timestamp": return {body: amqp_types.wrap_timestamp(this.encodeTimestamp(testValue))};
case "uuid": return {body: new amqp_types.Uuid(this.encodeUuid(testValue))};
case "binary": return {body: amqp_types.wrap_binary(this.encodeBinary(testValue))};
case "string": return {body: amqp_types.wrap_string(this.encodeString(testValue))};
case "symbol": return {body: amqp_types.wrap_symbol(this.encodeSymbol(testValue))};
case "list": return {body: amqp_types.wrap_list(this.encodeList(testValue))};
case "map": return {body: amqp_types.wrap_map(this.encodeMap(testValue))};
case "array": return {body: amqp_types.wrap_array(this.encodeArray(testValue.slice(1), AmqpArrayTypes[testValue[0]]))};
default: throw "Unknown AMQP type: \"" + this.amqpType + "\"";
}
};
// static
this.encodeAmqpType = function(amqpType, testValue) {
switch(amqpType) {
case "null": return this.encodeNull(testValue);
case "boolean": return this.encodeBoolean(testValue);
case "ubyte": return this.encodeUbyte(testValue);
case "ushort": return this.encodeUshort(testValue);
case "uint": return this.encodeUint(testValue);
case "ulong": return this.encodeUlong(testValue);
case "byte": return this.encodeByte(testValue);
case "short": return this.encodeShort(testValue);
case "int": return this.encodeInt(testValue);
case "long": return this.encodeLong(testValue);
case "float": return this.encodeFloat(testValue);
case "double": return this.encodeDouble(testValue);
case "decimal32": return this.encodeDecimal32(testValue);
case "decimal64": return this.encodeDecimal64(testValue);
case "decimal128": return this.encodeDecimal128(testValue);
case "char": return this.encodeChar(testValue);
case "timestamp": return this.encodeTimestamp(testValue);
case "uuid": return this.encodeUuid(testValue);
case "binary": return this.encodeBinary(testValue);
case "string": return this.encodeString(testValue);
case "symbol": return this.encodeSymbol(testValue);
case "list": return this.encodeList(testValue);
case "map": return this.encodeMap(testValue);
case "array": return this.encodeArray(testValue);
default: throw "Unknown AMQP type: \"" + this.amqpType + "\"";
}
};
// These functions encode the incoming JSON string representation
// of the test values into appropriate test values for the message
// bodies.
this.encodeNull = function(testValue) {
if (testValue === "None") return null;
this.handleEncodeError("null", testValue);
};
this.encodeBoolean = function(testValue) {
if (testValue === "True") return true;
if (testValue === "False") return false;
this.handleEncodeError("boolean", testValue);
};
this.encodeUbyte = function(testValue) {
try { return parseInt(testValue, 16); }
catch (err) { this.handleEncodeError("ubyte", testValue); }
};
this.encodeUshort = function(testValue) {
try { return parseInt(testValue, 16); }
catch (err) { this.handleEncodeError("ushort", testValue); }
};
this.encodeUint = function(testValue) {
try { return parseInt(testValue, 16); }
catch (err) { this.handleEncodeError("uint", testValue); }
};
this.encodeUlong = function(testValue) {
try { return new Buffer(this.hexString2ByteArray(testValue.slice(2), 8)); }
catch (err) { this.handleEncodeError("ulong", testValue, err); }
};
this.encodeByte = function(testValue) {
try { return parseInt(testValue, 16); }
catch (err) { this.handleEncodeError("byte", testValue); }
};
this.encodeShort = function(testValue) {
try { return parseInt(testValue, 16); }
catch (err) { this.handleEncodeError("short", testValue); }
};
this.encodeInt = function(testValue) {
try { return parseInt(testValue, 16); }
catch (err) { this.handleEncodeError("int", testValue); }
};
this.encodeLong = function(testValue) {
try {
var negFlag = testValue.charAt(0) === "-";
var ba = this.hexString2ByteArray(testValue.slice(negFlag ? 3 : 2), 8);
if (negFlag) this.byteArray2sCompl(ba);
return new Buffer(ba);
}
catch (err) { this.handleEncodeError("long", testValue); }
};
this.encodeFloat = function(testValue) {
// NOTE: Buffer.readFloatBE() does not support -NaN (ignores sign)
// var buf = new Buffer(testValue.substr(2), "hex");
// return buf.readFloatBE(0);
// This method gets the -NaN with sign onto the wire correctly.
var dv = new DataView(new ArrayBuffer(4));
dv.setUint32(0, parseInt(testValue.substr(2), 16));
return dv.getFloat32(0);
};
this.encodeDouble = function(testValue) {
// NOTE: Buffer.readDoubleBE() does not support -NaN (ignores sign)
// var buf = new Buffer(testValue.substr(2), "hex");
// return buf.readDoubleBE(0);
// This method gets the -NaN with sign onto the wire correctly.
var dv = new DataView(new ArrayBuffer(8));
dv.setUint32(0, parseInt(testValue.substr(2, 8), 16));
dv.setUint32(4, parseInt(testValue.substr(10), 16));
return dv.getFloat64(0);
};
this.encodeDecimal32 = function(testValue) {
try { return new Buffer(this.hexString2ByteArray(testValue.slice(2), 4)); }
catch (err) { this.handleEncodeError("decimal32", testValue, err); }
};
this.encodeDecimal64 = function(testValue) {
try { return new Buffer(this.hexString2ByteArray(testValue.slice(2), 8)); }
catch (err) { this.handleEncodeError("decimal64", testValue, err); }
};
this.encodeDecimal128 = function(testValue) {
try { return new Buffer(this.hexString2ByteArray(testValue.slice(2), 16)); }
catch (err) { this.handleEncodeError("decimal128", testValue, err); }
};
this.encodeChar = function(testValue) {
var val = null;
try {
if (testValue.length === 1) { // Single char format 'a'
val = [0, 0, 0, testValue.charCodeAt(0)];
} else { // Hex format '0xNNNN'
val = this.hexString2ByteArray(testValue.slice(2), 4);
}
}
catch (err) { this.handleEncodeError("char", testValue); }
return new Buffer(val);
};
this.encodeTimestamp = function(testValue) {
try { return parseInt(testValue, 16); }
catch (err) { this.handleEncodeError("timestamp", testValue); }
};
this.encodeUuid = function(testValue) {
var buff = new Buffer(16);
try { uuid.parse(testValue, buff); }
catch (err) { this.handleEncodeError("uuid", testValue); }
return buff;
};
this.encodeBinary = function(testValue) {
return testValue;
};
this.encodeString = function(testValue) {
return testValue;
};
this.encodeSymbol = function(testValue) {
return testValue;
};
this.encodeList = function(testValue) {
return testValue; // TODO: encode list
};
this.encodeMap = function(testValue) {
return testValue; // TODO: encode map
};
// testValue format for arrays: ['type', 'val1', 'val2', ... ]
this.encodeArray = function(testValue) {
return testValue.slice(1); // TODO: encode array
};
this.handleEncodeError = function(amqpType, testValue, err) {
var errStr = "Invalid string value for type " + amqpType + ": \"" + testValue + "\"";
if (err) {
errStr += " (" + err.message + ")";
}
throw new Error(errStr);
};
// NOTE: hexStr must not include '0x' prefix
this.hexString2ByteArray = function(hexStr, len) {
var result = [];
var oddFlag = hexStr.length !== 2*Math.round(hexStr.length/2.0); // odd number of hex digits
for (var i = len - 1; i >= 0; i--) {
if (2 * i < hexStr.length) {
var ssIndex = oddFlag ? 0 : hexStr.length - 2*(i+1);
var ssLen = oddFlag ? 1 : 2;
result.push(parseInt(hexStr.substr(ssIndex, ssLen), 16));
oddFlag = false;
} else {
result.push(0); // leading zeros
}
}
return result;
};
this.byteArray2sCompl = function(ba) {
var carry = 1;
for (var i=ba.length-1; i>=0; i--) {
var val = (ba[i] ^ 0xff) + carry;
ba[i] = val & 0xff;
carry = val >> 8;
}
};
this.container.on('sendable', function (context) {
while (context.sender.sendable() && sender.sent < sender.total) {
for (var i=0; i<sender.testValues.length; ++i) {
context.sender.send(sender.createMessage(sender.testValues[i]));
sender.sent++;
}
}
});
this.container.on('accepted', function (context) {
if (++sender.confirmed === sender.total) {
context.connection.close();
}
});
this.container.on('disconnected', function (context) {
sender.sent = sender.confirmed;
});
}
var colonPos = args[0].indexOf(":");
var sender = new Sender(args[0].slice(0, colonPos), args[0].slice(colonPos+1), args[1], args[2], JSON.parse(args[3]));