blob: 6b6f2ba8e1744f3077dc1697cd3c5f6476e2bb86 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpidit/amqp_types_test/Receiver.hpp>
#include <iostream>
#include <json/json.h>
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/error_condition.hpp>
#include <proton/delivery.hpp>
#include <proton/message.hpp>
#include <proton/receiver.hpp>
#include <proton/thread_safe.hpp>
#include <proton/transport.hpp>
#include <qpidit/QpidItErrors.hpp>
namespace qpidit
{
namespace amqp_types_test
{
Receiver::Receiver(const std::string& brokerUrl,
const std::string& queueName,
const std::string& amqpType,
uint32_t expected) :
_brokerUrl(brokerUrl),
_queueName(queueName),
_amqpType(amqpType),
_expected(expected),
_received(0UL),
_receivedValueList(Json::arrayValue)
{}
Receiver::~Receiver() {}
Json::Value& Receiver::getReceivedValueList() {
return _receivedValueList;
}
void Receiver::on_container_start(proton::container &c) {
std::ostringstream oss;
oss << _brokerUrl << "/" << _queueName;
c.open_receiver(oss.str());
}
void Receiver::on_message(proton::delivery &d, proton::message &m) {
try {
if (_received < _expected) {
_receivedValueList.append(getValue(_amqpType, m.body()));
}
_received++;
if (_received >= _expected) {
d.receiver().close();
d.connection().close();
}
} catch (const std::exception&) {
d.receiver().close();
d.connection().close();
throw;
}
}
void Receiver::on_connection_error(proton::connection &c) {
std::cerr << "AmqpReceiver::on_connection_error(): " << c.error() << std::endl;
}
void Receiver::on_receiver_error(proton::receiver& r) {
std::cerr << "AmqpReceiver::on_receiver_error(): " << r.error() << std::endl;
}
void Receiver::on_session_error(proton::session &s) {
std::cerr << "AmqpReceiver::on_session_error(): " << s.error() << std::endl;
}
void Receiver::on_transport_error(proton::transport &t) {
std::cerr << "AmqpReceiver::on_transport_error(): " << t.error() << std::endl;
}
void Receiver::on_error(const proton::error_condition &ec) {
std::cerr << "AmqpReceiver::on_error(): " << ec << std::endl;
}
// protected
//static
void Receiver::checkMessageType(const proton::value& val, proton::type_id amqpType) {
if (val.type() != amqpType) {
throw qpidit::IncorrectMessageBodyTypeError(amqpType, val.type());
}
}
//static
std::string Receiver::getAmqpType(const proton::value& val) {
switch(val.type()) {
case proton::NULL_TYPE: return "null";
case proton::BOOLEAN: return "boolean";
case proton::UBYTE: return "ubyte";
case proton::USHORT: return "ushort";
case proton::UINT: return "uint";
case proton::ULONG: return "ulong";
case proton::BYTE: return "byte";
case proton::SHORT: return "short";
case proton::INT: return "int";
case proton::LONG: return "long";
case proton::FLOAT: return "float";
case proton::DOUBLE: return "double";
case proton::DECIMAL32: return "decimal32";
case proton::DECIMAL64: return "decimal64";
case proton::DECIMAL128: return "decimal128";
case proton::CHAR: return "char";
case proton::TIMESTAMP: return "timestamp";
case proton::UUID: return "uuid";
case proton::BINARY: return "binary";
case proton::STRING: return "string";
case proton::SYMBOL: return "symbol";
case proton::LIST: return "list";
case proton::MAP: return "map";
case proton::ARRAY: return "array";
//default: throw qpidit::UnknownAmqpTypeError(val);
}
}
//static
Json::Value& Receiver::getMap(Json::Value& jsonMap, const proton::value& val) {
std::map<proton::value, proton::value> msgMap;
proton::get(val, msgMap);
for (std::map<proton::value, proton::value>::const_iterator i = msgMap.begin(); i != msgMap.end(); ++i) {
// Process key
Json::Value mapKey;
if (i->first.type() == proton::LIST || i->first.type() == proton::MAP || i->first.type() == proton::ARRAY) {
mapKey = getValue(i->first);
} else {
std::ostringstream oss;
oss << getAmqpType(i->first) << ":" << getValue(i->first).asString();
mapKey = oss.str();
}
// Process value
Json::Value mapValue;
if (i->second.type() == proton::LIST || i->second.type() == proton::MAP || i->second.type() == proton::ARRAY) {
mapValue = getValue(i->second);
} else {
std::ostringstream oss;
oss << getAmqpType(i->second) << ":" << getValue(i->second).asString();
mapValue = oss.str();
}
jsonMap[mapKey.asString()] = mapValue;
}
return jsonMap;
}
//static
Json::Value& Receiver::getSequence(Json::Value& jsonList, const proton::value& val) {
std::vector<proton::value> msgList;
proton::get(val, msgList);
for (std::vector<proton::value>::const_iterator i=msgList.begin(); i!=msgList.end(); ++i) {
if (i->type() == proton::LIST || i->type() == proton::MAP || i->type() == proton::ARRAY) {
jsonList.append(getValue(*i));
} else {
std::ostringstream oss;
oss << getAmqpType(*i) << ":" << getValue(*i).asString();
jsonList.append(oss.str());
}
}
return jsonList;
}
//static
Json::Value Receiver::getValue(const proton::value& val) {
return getValue(getAmqpType(val), val);
}
//static
Json::Value Receiver::getValue(const std::string& amqpType, const proton::value& val) {
if (amqpType.compare("null") == 0) {
checkMessageType(val, proton::NULL_TYPE);
return "None";
}
if (amqpType.compare("boolean") == 0) {
checkMessageType(val, proton::BOOLEAN);
return proton::get<bool>(val) ? "True" : "False";
}
if (amqpType.compare("ubyte") == 0) {
checkMessageType(val, proton::UBYTE);
return toHexStr<uint8_t>(proton::get<uint8_t>(val));
}
if (amqpType.compare("ushort") == 0) {
checkMessageType(val, proton::USHORT);
return toHexStr<uint16_t>(proton::get<uint16_t>(val));
}
if (amqpType.compare("uint") == 0) {
checkMessageType(val, proton::UINT);
return toHexStr<uint32_t>(proton::get<uint32_t>(val));
}
if (amqpType.compare("ulong") == 0) {
checkMessageType(val, proton::ULONG);
return toHexStr<uint64_t>(proton::get<uint64_t>(val));
}
if (amqpType.compare("byte") == 0) {
checkMessageType(val, proton::BYTE);
return toHexStr<int8_t>(proton::get<int8_t>(val));
}
if (amqpType.compare("short") == 0) {
checkMessageType(val, proton::SHORT);
return toHexStr<int16_t>(proton::get<int16_t>(val));
}
if (amqpType.compare("int") == 0) {
checkMessageType(val, proton::INT);
return toHexStr<int32_t>(proton::get<int32_t>(val));
}
if (amqpType.compare("long") == 0) {
checkMessageType(val, proton::LONG);
return toHexStr<int64_t>(proton::get<int64_t>(val));
}
if (amqpType.compare("float") == 0) {
checkMessageType(val, proton::FLOAT);
float f = proton::get<float>(val);
return toHexStr<uint32_t>(*((uint32_t*)&f), true);
}
if (amqpType.compare("double") == 0) {
checkMessageType(val, proton::DOUBLE);
double d = proton::get<double>(val);
return toHexStr<uint64_t>(*((uint64_t*)&d), true);
}
if (amqpType.compare("decimal32") == 0) {
checkMessageType(val, proton::DECIMAL32);
return byteArrayToHexStr(proton::get<proton::decimal32>(val));
}
if (amqpType.compare("decimal64") == 0) {
checkMessageType(val, proton::DECIMAL64);
return byteArrayToHexStr(proton::get<proton::decimal64>(val));
}
if (amqpType.compare("decimal128") == 0) {
checkMessageType(val, proton::DECIMAL128);
return byteArrayToHexStr(proton::get<proton::decimal128>(val));
}
if (amqpType.compare("char") == 0) {
checkMessageType(val, proton::CHAR);
wchar_t c = proton::get<wchar_t>(val);
std::stringstream oss;
if (c < 0x7f && std::iswprint(c)) {
oss << (char)c;
} else {
oss << "0x" << std::hex << c;
}
return oss.str();
}
if (amqpType.compare("timestamp") == 0) {
checkMessageType(val, proton::TIMESTAMP);
std::ostringstream oss;
oss << "0x" << std::hex << proton::get<proton::timestamp>(val).milliseconds();
return oss.str();
}
if (amqpType.compare("uuid") == 0) {
checkMessageType(val, proton::UUID);
std::ostringstream oss;
oss << proton::get<proton::uuid>(val);
return oss.str();
}
if (amqpType.compare("binary") == 0) {
checkMessageType(val, proton::BINARY);
return std::string(proton::get<proton::binary>(val));
}
if (amqpType.compare("string") == 0) {
checkMessageType(val, proton::STRING);
return proton::get<std::string>(val);
}
if (amqpType.compare("symbol") == 0) {
checkMessageType(val, proton::SYMBOL);
return proton::get<proton::symbol>(val);
}
if (amqpType.compare("list") == 0) {
checkMessageType(val, proton::LIST);
Json::Value jsonList(Json::arrayValue);
return getSequence(jsonList, val);
}
if (amqpType.compare("map") == 0) {
checkMessageType(val, proton::MAP);
Json::Value jsonMap(Json::objectValue);
return getMap(jsonMap, val);
}
if (amqpType.compare("array") == 0) {
throw qpidit::UnsupportedAmqpTypeError(amqpType);
}
throw qpidit::UnknownAmqpTypeError(amqpType);
}
} /* namespace amqp_types_test */
} /* namespace qpidit */
/*
* --- main ---
* Args: 1: Broker address (ip-addr:port)
* 2: Queue name
* 3: AMQP type
* 4: Expected number of test values to receive
*/
int main(int argc, char** argv) {
// TODO: improve arg management a little...
if (argc != 5) {
throw qpidit::ArgumentError("Incorrect number of arguments");
}
try {
qpidit::amqp_types_test::Receiver receiver(argv[1], argv[2], argv[3], std::strtoul(argv[4], NULL, 0));
proton::container(receiver).run();
std::cout << argv[3] << std::endl;
Json::FastWriter fw;
std::cout << fw.write(receiver.getReceivedValueList());
} catch (const std::exception& e) {
std::cerr << "AmqpReceiver error: " << e.what() << std::endl;
exit(-1);
}
exit(0);
}