blob: d08379142d9a4063556b2037390e9e9f1dc5f340 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpidit/amqp_types_test/Sender.hpp"
#include <cstdlib>
#include <iomanip>
#include <iostream>
#include <json/json.h>
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/sender.hpp>
#include <proton/tracker.hpp>
namespace qpidit
{
namespace amqp_types_test
{
Sender::Sender(const std::string& brokerAddr,
const std::string& queueName,
const std::string& amqpType,
const Json::Value& testValues) :
AmqpSenderBase("amqp_types_test::Sender", brokerAddr, queueName, testValues.size()),
_amqpType(amqpType),
_testValues(testValues)
{}
Sender::~Sender() {}
void Sender::on_sendable(proton::sender &s) {
if (_totalMsgs == 0) {
s.connection().close();
} else if (_msgsSent == 0) {
for (Json::Value::const_iterator i=_testValues.begin(); i!=_testValues.end(); ++i) {
if (s.credit()) {
proton::message msg;
s.send(setMessage(msg, *i));
_msgsSent++;
}
}
} else {
// do nothing
}
}
// protected
proton::message& Sender::setMessage(proton::message& msg, const Json::Value& testValue) {
msg.id(_msgsSent + 1);
msg.body(convertAmqpValue(_amqpType, testValue));
return msg;
}
//static
std::string Sender::bytearrayToHexStr(const char* src, int len) {
std::ostringstream oss;
oss << "0x" << std::hex;
for (int i=0; i<len; ++i) {
oss << std::setw(2) << std::setfill('0') << ((int)src[i] & 0xff);
}
return oss.str();
}
//static
proton::value Sender::convertAmqpValue(const std::string& amqpType, const Json::Value& testValue) {
if (amqpType.compare("null") == 0) {
std::string testValueStr(testValue.asString());
if (testValueStr.compare("None") != 0) {
throw qpidit::InvalidTestValueError(amqpType, testValueStr);
}
proton::value v;
return v;
}
if (amqpType.compare("boolean") == 0) {
std::string testValueStr(testValue.asString());
if (testValueStr.compare("True") == 0) {
return true;
} else if (testValueStr.compare("False") == 0) {
return false;
} else {
throw qpidit::InvalidTestValueError(amqpType, testValueStr);
}
}
if (amqpType.compare("ubyte") == 0) {
return integralValue<uint8_t>(amqpType, testValue.asString(), true);
}
if (amqpType.compare("ushort") == 0) {
return integralValue<uint16_t>(amqpType, testValue.asString(), true);
}
if (amqpType.compare("uint") == 0) {
return integralValue<uint32_t>(amqpType, testValue.asString(), true);
}
if (amqpType.compare("ulong") == 0) {
return integralValue<uint64_t>(amqpType, testValue.asString(), true);
}
if (amqpType.compare("byte") == 0) {
return integralValue<int8_t>(amqpType, testValue.asString(), false);
}
if (amqpType.compare("short") == 0) {
return integralValue<int16_t>(amqpType, testValue.asString(), false);
}
if (amqpType.compare("int") == 0) {
return integralValue<int32_t>(amqpType, testValue.asString(), false);
}
if (amqpType.compare("long") == 0) {
return integralValue<int64_t>(amqpType, testValue.asString(), false);
}
if (amqpType.compare("float") == 0) {
const std::string testValueStr = testValue.asString();
if (testValueStr.find("0x") == std::string::npos) // regular decimal fraction
return std::strtof(testValueStr.c_str(), nullptr);
// hex representation of float
return floatValue<float, uint32_t>(amqpType, testValue.asString());
}
if (amqpType.compare("double") == 0) {
const std::string testValueStr = testValue.asString();
if (testValueStr.find("0x") == std::string::npos) // regular decimal fraction
return std::strtod(testValueStr.c_str(), nullptr);
// hex representation of float
return floatValue<double, uint64_t>(amqpType, testValue.asString());
}
if (amqpType.compare("decimal32") == 0) {
proton::decimal32 val;
hexStringToBytearray(val, testValue.asString().substr(2));
return val;
}
if (amqpType.compare("decimal64") == 0) {
proton::decimal64 val;
hexStringToBytearray(val, testValue.asString().substr(2));
return val;
}
if (amqpType.compare("decimal128") == 0) {
proton::decimal128 val;
hexStringToBytearray(val, testValue.asString().substr(2));
return val;
}
if (amqpType.compare("char") == 0) {
std::string charStr = testValue.asString();
wchar_t val;
if (charStr.size() == 1) { // Single char "a"
val = charStr[0];
} else if (charStr.size() >= 3 && charStr.size() <= 10) { // Format "0xN" through "0xNNNNNNNN"
val = std::strtoul(charStr.data(), NULL, 16);
} else {
//TODO throw format error
}
return val;
}
if (amqpType.compare("timestamp") == 0) {
const std::string testValueStr(testValue.asString());
bool xhexFlag = testValueStr.find("0x") != std::string::npos;
return proton::timestamp(std::strtoul(testValue.asString().data(), NULL, xhexFlag ? 16 : 10));
}
if (amqpType.compare("uuid") == 0) {
proton::uuid val;
std::string uuidStr(testValue.asString());
// Expected format: "00000000-0000-0000-0000-000000000000"
// ^ ^ ^ ^ ^
// start index -> 0 9 14 19 24
hexStringToBytearray(val, uuidStr.substr(0, 8), 0, 4);
hexStringToBytearray(val, uuidStr.substr(9, 4), 4, 2);
hexStringToBytearray(val, uuidStr.substr(14, 4), 6, 2);
hexStringToBytearray(val, uuidStr.substr(19, 4), 8, 2);
hexStringToBytearray(val, uuidStr.substr(24, 12), 10, 6);
return val;
}
if (amqpType.compare("binary") == 0) {
return proton::binary(testValue.asString());
}
if (amqpType.compare("string") == 0) {
return std::string(testValue.asString());
}
if (amqpType.compare("symbol") == 0) {
return proton::symbol(testValue.asString());
}
if (amqpType.compare("list") == 0) {
std::vector<proton::value> list;
processList(list, testValue);
return list;
} else if (amqpType.compare("map") == 0) {
std::map<proton::value, proton::value> map;
processMap(map, testValue);
return map;
}
if (amqpType.compare("array") == 0) {
/*
std::vector<proton::value> array;
processArray(array, testValue);
return proton::as<proton::ARRAY>(array);
*/
throw qpidit::UnsupportedAmqpTypeError(amqpType);
}
throw qpidit::UnknownAmqpTypeError(amqpType);
}
// //static
// Json::Value::ValueType getArrayType(const Json::Value& val) {
// if (val.size()) > 0) {
// return val[0].type();
// } else {
// return Json::Value::nullValue; // TODO: find a way to represent empty array
// }
// }
//static
void Sender::processArray(std::vector<proton::value>& array, const Json::Value& testValues) {
for (Json::Value::const_iterator i = testValues.begin(); i != testValues.end(); ++i) {
if ((*i).isArray()) {
std::vector<proton::value> subArray;
processArray(subArray, *i);
array.push_back(proton::value(subArray));
} else if ((*i).isObject()) {
std::map<proton::value, proton::value> subMap;
processMap(subMap, *i);
array.push_back(proton::value(subMap));
} else {
proton::value v;
if ((*i).isNull())
;
else if ((*i).isBool())
v = (*i).asBool();
else if ((*i).isInt())
v = (*i).asInt();
else if ((*i).isUInt())
v = (*i).asUInt();
else if ((*i).isDouble())
v = (*i).asDouble();
else if ((*i).isString())
v = (*i).asString();
else
; // TODO handle this case
array.push_back(v);
}
}
}
//static
proton::value Sender::processElement(const Json::Value& testValue) {
const std::string testValueStr(testValue.asString());
// testValue has the format amqp-type:amqp-str-value
const std::size_t splitIndex = testValueStr.find_first_of(':');
if (splitIndex == std::string::npos) {
throw qpidit::InvalidTestValueError(testValueStr);
}
const std::string amqpType = testValueStr.substr(0, splitIndex);
const std::string amqpValueAsStr = testValueStr.substr(splitIndex + 1);
return convertAmqpValue(amqpType, amqpValueAsStr);
}
//static
void Sender::processList(std::vector<proton::value>& list, const Json::Value& testValues) {
for (Json::Value::const_iterator i = testValues.begin(); i != testValues.end(); ++i) {
if ((*i).isArray()) {
std::vector<proton::value> subList;
processList(subList, *i);
list.push_back(proton::value(subList));
} else if ((*i).isObject()) {
std::map<proton::value, proton::value> subMap;
processMap(subMap, *i);
list.push_back(proton::value(subMap));
} else {
list.push_back(processElement(*i));
}
}
}
//static
void Sender::processMap(std::map<proton::value, proton::value>& map, const Json::Value& testValues) {
Json::Value::Members keys = testValues.getMemberNames();
for (std::vector<std::string>::const_iterator i=keys.begin(); i!=keys.end(); ++i) {
proton::value key = processElement(*i);
Json::Value mapVal = testValues[*i];
if (mapVal.isArray()) {
std::vector<proton::value> subList;
processList(subList, mapVal);
map[key] = subList;
} else if (mapVal.isObject()) {
std::map<proton::value, proton::value> subMap;
processMap(subMap, mapVal);
map[key] = subMap;
} else {
map[key] = processElement(mapVal);
}
}
}
//static
void Sender::revMemcpy(char* dest, const char* src, int n) {
for (int i = 0; i < n; ++i) {
*(dest + i) = *(src + n - i - 1);
}
}
//static
void Sender::uint64ToChar16(char* dest, uint64_t upper, uint64_t lower) {
revMemcpy(dest, (const char*)&upper, sizeof(uint64_t));
revMemcpy(dest + 8, (const char*)&lower, sizeof(uint64_t));
}
} /* namespace amqp_types_test */
} /* namespace qpidit */
/*
* --- main ---
* Args: 1: Broker address (ip-addr:port)
* 2: Queue name
* 3: AMQP type
* 4: Test value(s) as JSON string
*/
int main(int argc, char** argv) {
// TODO: improve arg management a little...
if (argc != 5) {
throw qpidit::ArgumentError("Incorrect number of arguments");
}
try {
Json::Value testValues;
Json::Reader jsonReader;
if (not jsonReader.parse(argv[4], testValues, false)) {
throw qpidit::JsonParserError(jsonReader);
}
qpidit::amqp_types_test::Sender sender(argv[1], argv[2], argv[3], testValues);
proton::container(sender).run();
} catch (const std::exception& e) {
std::cerr << "amqp_types_test Sender error: " << e.what() << std::endl;
exit(1);
}
exit(0);
}