blob: ebbdfe4e7db94c027fcfb1aa0b4bd75d1bc8f169 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpidit/jms_messages_test/Receiver.hpp"
#include <iostream>
#include <json/json.h>
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/delivery.hpp>
#include <proton/message.hpp>
#include <proton/thread_safe.hpp>
#include <proton/transport.hpp>
#include <qpidit/QpidItErrors.hpp>
#include <typeinfo>
namespace qpidit
{
namespace jms_messages_test
{
Receiver::Receiver(const std::string& brokerUrl,
const std::string& jmsMessageType,
const Json::Value& testNumberMap):
_brokerUrl(brokerUrl),
_jmsMessageType(jmsMessageType),
_testNumberMap(testNumberMap),
_subTypeList(testNumberMap.getMemberNames()),
_subTypeIndex(0),
_expected(getTotalNumExpectedMsgs(testNumberMap)),
_received(0UL),
_receivedSubTypeList(Json::arrayValue),
_receivedValueMap(Json::objectValue)
{}
Receiver::~Receiver() {}
Json::Value& Receiver::getReceivedValueMap() {
return _receivedValueMap;
}
void Receiver::on_container_start(proton::container &c) {
c.open_receiver(_brokerUrl);
}
void Receiver::on_message(proton::delivery &d, proton::message &m) {
try {
if (_received < _expected) {
int8_t t = qpidit::JMS_MESSAGE_TYPE; // qpidit::JMS_MESSAGE_TYPE has value 0
try {
t = proton::get<int8_t>(m.message_annotations().get(proton::symbol("x-opt-jms-msg-type")));
} catch (const proton::conversion_error& e) {
std::cout << "JmsReceiver::on_message(): Error converting value for annotation \"x-opt-jms-msg-type\": " << e.what() << std::endl;
throw;
} catch (const std::exception& e) {
std::cout << "JmsReceiver::on_message(): Missing annotation \"x-opt-jms-msg-type\"" << std::endl;
throw;
}
switch (t) {
case qpidit::JMS_MESSAGE_TYPE:
receiveJmsMessage(m);
break;
case qpidit::JMS_OBJECTMESSAGE_TYPE:
receiveJmsObjectMessage(m);
break;
case qpidit::JMS_MAPMESSAGE_TYPE:
receiveJmsMapMessage(m);
break;
case qpidit::JMS_BYTESMESSAGE_TYPE:
receiveJmsBytesMessage(m);
break;
case qpidit::JMS_STREAMMESSAGE_TYPE:
receiveJmsStreamMessage(m);
break;
case qpidit::JMS_TEXTMESSAGE_TYPE:
receiveJmsTextMessage(m);
break;
default:;
// TODO: handle error - no known JMS message type
}
std::string subType(_subTypeList[_subTypeIndex]);
// Increment the subtype if the required number of messages have been received
if (_receivedSubTypeList.size() >= _testNumberMap[subType].asInt() &&
_subTypeIndex < _testNumberMap.size()) {
_receivedValueMap[subType] = _receivedSubTypeList;
_receivedSubTypeList.clear();
++_subTypeIndex;
}
_received++;
if (_received >= _expected) {
d.receiver().close();
d.connection().close();
}
}
} catch (const std::exception&) {
d.receiver().close();
d.connection().close();
throw;
}
}
//static
uint32_t Receiver::getTotalNumExpectedMsgs(const Json::Value testNumberMap) {
uint32_t total(0UL);
for (Json::Value::const_iterator i=testNumberMap.begin(); i!=testNumberMap.end(); ++i) {
total += (*i).asUInt();
}
return total;
}
// protected
void Receiver::receiveJmsMessage(const proton::message& msg) {
_receivedSubTypeList.append(Json::Value());
}
void Receiver::receiveJmsObjectMessage(const proton::message& msg) {
// TODO
}
void Receiver::receiveJmsMapMessage(const proton::message& msg) {
if(_jmsMessageType.compare("JMS_MAPMESSAGE_TYPE") != 0) {
throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_MAPMESSAGE_TYPE");
}
std::string subType(_subTypeList[_subTypeIndex]);
std::map<std::string, proton::value> m;
proton::get(msg.body(), m);
for (std::map<std::string, proton::value>::const_iterator i=m.begin(); i!=m.end(); ++i) {
std::string key = i->first;
if (subType.compare(key.substr(0, key.size()-3)) != 0) {
throw qpidit::IncorrectJmsMapKeyPrefixError(subType, key);
}
proton::value val = i->second;
if (subType.compare("boolean") == 0) {
_receivedSubTypeList.append(proton::get<bool>(val) ? Json::Value("True") : Json::Value("False"));
} else if (subType.compare("byte") == 0) {
_receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(proton::get<int8_t>(val))));
} else if (subType.compare("bytes") == 0) {
_receivedSubTypeList.append(Json::Value(std::string(proton::get<proton::binary>(val))));
} else if (subType.compare("char") == 0) {
std::ostringstream oss;
oss << (char)proton::get<wchar_t>(val);
_receivedSubTypeList.append(Json::Value(oss.str()));
} else if (subType.compare("double") == 0) {
double d = proton::get<double>(val);
_receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false)));
} else if (subType.compare("float") == 0) {
float f = proton::get<float>(val);
_receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false)));
} else if (subType.compare("int") == 0) {
_receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(proton::get<int32_t>(val))));
} else if (subType.compare("long") == 0) {
_receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(proton::get<int64_t>(val))));
} else if (subType.compare("short") == 0) {
_receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(proton::get<int16_t>(val))));
} else if (subType.compare("string") == 0) {
_receivedSubTypeList.append(Json::Value(proton::get<std::string>(val)));
} else {
throw qpidit::UnknownJmsMessageSubTypeError(subType);
}
}
}
void Receiver::receiveJmsBytesMessage(const proton::message& msg) {
if(_jmsMessageType.compare("JMS_BYTESMESSAGE_TYPE") != 0) {
throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_BYTESMESSAGE_TYPE");
}
std::string subType(_subTypeList[_subTypeIndex]);
proton::binary body = proton::get<proton::binary>(msg.body());
if (subType.compare("boolean") == 0) {
if (body.size() != 1) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=boolean", 1, body.size());
_receivedSubTypeList.append(body[0] ? Json::Value("True") : Json::Value("False"));
} else if (subType.compare("byte") == 0) {
if (body.size() != sizeof(int8_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=byte", sizeof(int8_t), body.size());
int8_t val = *((int8_t*)body.data());
_receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(val)));
} else if (subType.compare("bytes") == 0) {
_receivedSubTypeList.append(Json::Value(std::string(body)));
} else if (subType.compare("char") == 0) {
if (body.size() != sizeof(uint16_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=char", sizeof(uint16_t), body.size());
// TODO: This is ugly: ignoring first byte - handle UTF-16 correctly
char c = body[1];
std::ostringstream oss;
oss << c;
_receivedSubTypeList.append(Json::Value(oss.str()));
} else if (subType.compare("double") == 0) {
if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=double", sizeof(int64_t), body.size());
int64_t val = be64toh(*((int64_t*)body.data()));
_receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val, true, false)));
} else if (subType.compare("float") == 0) {
if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=float", sizeof(int32_t), body.size());
int32_t val = be32toh(*((int32_t*)body.data()));
_receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val, true, false)));
} else if (subType.compare("long") == 0) {
if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=long", sizeof(int64_t), body.size());
int64_t val = be64toh(*((int64_t*)body.data()));
_receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val)));
} else if (subType.compare("int") == 0) {
if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=int", sizeof(int32_t), body.size());
int32_t val = be32toh(*((int32_t*)body.data()));
_receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val)));
} else if (subType.compare("short") == 0) {
if (body.size() != sizeof(int16_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=short", sizeof(int16_t), body.size());
int16_t val = be16toh(*((int16_t*)body.data()));
_receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(val)));
} else if (subType.compare("string") == 0) {
// TODO: decode string size in first two bytes and check string size
_receivedSubTypeList.append(Json::Value(std::string(body).substr(2)));
} else {
throw qpidit::UnknownJmsMessageSubTypeError(subType);
}
}
void Receiver::receiveJmsStreamMessage(const proton::message& msg) {
if(_jmsMessageType.compare("JMS_STREAMMESSAGE_TYPE") != 0) {
throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_STREAMMESSAGE_TYPE");
}
std::string subType(_subTypeList[_subTypeIndex]);
std::vector<proton::value> l;
proton::get(msg.body(), l);
for (std::vector<proton::value>::const_iterator i=l.begin(); i!=l.end(); ++i) {
if (subType.compare("boolean") == 0) {
_receivedSubTypeList.append(proton::get<bool>(*i) ? Json::Value("True") : Json::Value("False"));
} else if (subType.compare("byte") == 0) {
_receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(proton::get<int8_t>(*i))));
} else if (subType.compare("bytes") == 0) {
_receivedSubTypeList.append(Json::Value(std::string(proton::get<proton::binary>(*i))));
} else if (subType.compare("char") == 0) {
std::ostringstream oss;
oss << (char)proton::get<wchar_t>(*i);
_receivedSubTypeList.append(Json::Value(oss.str()));
} else if (subType.compare("double") == 0) {
double d = proton::get<double>(*i);
_receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false)));
} else if (subType.compare("float") == 0) {
float f = proton::get<float>(*i);
_receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false)));
} else if (subType.compare("int") == 0) {
_receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(proton::get<int32_t>(*i))));
} else if (subType.compare("long") == 0) {
_receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(proton::get<int64_t>(*i))));
} else if (subType.compare("short") == 0) {
_receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(proton::get<int16_t>(*i))));
} else if (subType.compare("string") == 0) {
_receivedSubTypeList.append(Json::Value(proton::get<std::string>(*i)));
} else {
throw qpidit::UnknownJmsMessageSubTypeError(subType);
}
}
}
void Receiver::receiveJmsTextMessage(const proton::message& msg) {
if(_jmsMessageType.compare("JMS_TEXTMESSAGE_TYPE") != 0) {
throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_TEXTMESSAGE_TYPE");
}
_receivedSubTypeList.append(Json::Value(proton::get<std::string>(msg.body())));
}
} /* namespace jms_messages_test */
} /* namespace qpidit */
/* --- main ---
* Args: 1: Broker address (ip-addr:port)
* 2: Queue name
* 3: JMS message type
* 4: JSON Test parameters containing 2 maps: [testValuesMap, flagMap]
*/
int main(int argc, char** argv) {
try {
// TODO: improve arg management a little...
if (argc != 5) {
throw qpidit::ArgumentError("Incorrect number of arguments (expected 4):\n\t1. Broker TCP address(ip-addr:port)\n\t2. Queue name\n\t3. JMS message type\n\t4. JSON data string\n");
}
std::ostringstream oss;
oss << argv[1] << "/" << argv[2];
Json::Value testParams;
Json::Reader jsonReader;
if (not jsonReader.parse(argv[4], testParams, false)) {
throw qpidit::JsonParserError(jsonReader);
}
qpidit::jms_messages_test::Receiver receiver(oss.str(), argv[3], testParams);
proton::container(receiver).run();
Json::FastWriter fw;
std::cout << argv[3] << std::endl;
std::cout << fw.write(receiver.getReceivedValueMap());
} catch (const std::exception& e) {
std::cout << "JmsReceiver error: " << e.what() << std::endl;
}
}