blob: c04327dd8a859c39cf5c51b99318ccc5c5f388bd [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
#include "qpidit/jms_hdrs_props_test/Sender.hpp"
#include <cerrno>
#include <iomanip>
#include <iostream>
#include <json/json.h>
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/default_container.hpp>
#include <proton/thread_safe.hpp>
#include <proton/tracker.hpp>
#include <proton/transport.hpp>
#include <stdio.h>
namespace qpidit
namespace jms_hdrs_props_test
Sender::Sender(const std::string& brokerUrl,
const std::string& jmsMessageType,
const Json::Value& testParams) :
if (_testValueMap.type() != Json::objectValue) {
throw qpidit::InvalidJsonRootNodeError(Json::objectValue, _testValueMap.type());
Sender::~Sender() {}
void Sender::on_container_start(proton::container &c) {
void Sender::on_sendable(proton::sender &s) {
if (_totalMsgs == 0) {
} else if (_msgsSent == 0) {
Json::Value::Members subTypes = _testValueMap.getMemberNames();
std::sort(subTypes.begin(), subTypes.end());
for (std::vector<std::string>::const_iterator i=subTypes.begin(); i!=subTypes.end(); ++i) {
sendMessages(s, *i, _testValueMap[*i]);
void Sender::on_tracker_accept(proton::tracker &t) {
if (_msgsConfirmed == _totalMsgs) {
void Sender::on_transport_close(proton::transport &t) {
_msgsSent = _msgsConfirmed;
// protected
void Sender::sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValues) {
uint32_t valueNumber = 0;
for (Json::Value::const_iterator i=testValues.begin(); i!=testValues.end(); ++i) {
if ( {
proton::message msg;
if ("JMS_MESSAGE_TYPE") == 0) {
setMessage(msg, subType, (*i).asString());
} else if ("JMS_BYTESMESSAGE_TYPE") == 0) {
setBytesMessage(msg, subType, (*i).asString());
} else if ("JMS_MAPMESSAGE_TYPE") == 0) {
setMapMessage(msg, subType, (*i).asString(), valueNumber);
} else if ("JMS_OBJECTMESSAGE_TYPE") == 0) {
setObjectMessage(msg, subType, *i);
} else if ("JMS_STREAMMESSAGE_TYPE") == 0) {
setStreamMessage(msg, subType, (*i).asString());
} else if ("JMS_TEXTMESSAGE_TYPE") == 0) {
setTextMessage(msg, *i);
} else {
throw qpidit::UnknownJmsMessageTypeError(_jmsMessageType);
_msgsSent += 1;
valueNumber += 1;
proton::message& Sender::setMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) {
if ("none") != 0) {
throw qpidit::UnknownJmsMessageSubTypeError(subType);
if (testValueStr.size() != 0) {
throw InvalidTestValueError(subType, testValueStr);
msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_MESSAGE_TYPE"]);
return msg;
proton::message& Sender::setBytesMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) {
proton::binary bin;
if ("boolean") == 0) {
if ("False") == 0) bin.push_back(char(0));
else if ("True") == 0) bin.push_back(char(1));
else throw InvalidTestValueError(subType, testValueStr);
} else if ("byte") == 0) {
uint8_t val = getIntegralValue<int8_t>(testValueStr);
} else if ("bytes") == 0) {
bin.assign(testValueStr.begin(), testValueStr.end());
} else if ("char") == 0) {
if (testValueStr[0] == '\\') { // Format: '\xNN'
} else { // Format: 'c'
} else if ("double") == 0) {
uint64_t val;
try {
val = htobe64(std::strtoul(, NULL, 16));
} catch (const std::exception& e) { throw qpidit::InvalidTestValueError("double", testValueStr); }
numToBinary(val, bin);
//for (int i=0; i<sizeof(val); ++i) {
// bin.push_back(* ((char*)&val + i));
// }
} else if ("float") == 0) {
uint32_t val;
try {
val = htobe32((uint32_t)std::strtoul(, NULL, 16));
} catch (const std::exception& e) { throw qpidit::InvalidTestValueError("float", testValueStr); }
numToBinary(val, bin);
//for (int i=0; i<sizeof(val); ++i) {
// bin.push_back(* ((char*)&val + i));
} else if ("long") == 0) {
uint64_t val = htobe64(getIntegralValue<uint64_t>(testValueStr));
numToBinary(val, bin);
//bin.assign(sizeof(val), val);
} else if ("int") == 0) {
uint32_t val = htobe32(getIntegralValue<uint32_t>(testValueStr));
numToBinary(val, bin);
//bin.assign(sizeof(val), val);
} else if ("short") == 0) {
uint16_t val = htobe16(getIntegralValue<int16_t>(testValueStr));
numToBinary(val, bin);
//bin.assign(sizeof(val), val);
} else if ("string") == 0) {
std::ostringstream oss;
uint16_t strlen = htobe16((uint16_t)testValueStr.size());
oss.write((char*)&strlen, sizeof(strlen));
oss << testValueStr;
std::string os = oss.str();
bin.assign(os.begin(), os.end());
} else {
throw qpidit::UnknownJmsMessageSubTypeError(subType);
msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_BYTESMESSAGE_TYPE"]);
return msg;
proton::message& Sender::setMapMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr, uint32_t valueNumber) {
std::ostringstream oss;
oss << subType << std::setw(3) << std::setfill('0') << valueNumber;
std::string mapKey(oss.str());
std::map<std::string, proton::value> m;
if ("boolean") == 0) {
if ("False") == 0) m[mapKey] = false;
else if ("True") == 0) m[mapKey] = true;
else throw InvalidTestValueError(subType, testValueStr);
} else if ("byte") == 0) {
m[mapKey] = int8_t(getIntegralValue<int8_t>(testValueStr));
} else if ("bytes") == 0) {
m[mapKey] = proton::binary(testValueStr);
} else if ("char") == 0) {
wchar_t val;
if (testValueStr[0] == '\\') { // Format: '\xNN'
val = (wchar_t)getIntegralValue<wchar_t>(testValueStr.substr(2));
} else { // Format: 'c'
val = testValueStr[0];
m[mapKey] = val;
} else if ("double") == 0) {
m[mapKey] = getFloatValue<double, uint64_t>(testValueStr);
} else if ("float") == 0) {
m[mapKey] = getFloatValue<float, uint32_t>(testValueStr);
} else if ("int") == 0) {
m[mapKey] = getIntegralValue<int32_t>(testValueStr);
} else if ("long") == 0) {
m[mapKey] = getIntegralValue<int64_t>(testValueStr);
} else if ("short") == 0) {
m[mapKey] = getIntegralValue<int16_t>(testValueStr);
} else if ("string") == 0) {
m[mapKey] = testValueStr;
} else {
throw qpidit::UnknownJmsMessageSubTypeError(subType);
msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_MAPMESSAGE_TYPE"]);
return msg;
proton::message& Sender::setObjectMessage(proton::message& msg, const std::string& subType, const Json::Value& testValue) {
msg.body(getJavaObjectBinary(subType, testValue.asString()));
msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_OBJECTMESSAGE_TYPE"]);
return msg;
proton::message& Sender::setStreamMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) {
std::vector<proton::value> l;
if ("boolean") == 0) {
if ("False") == 0) l.push_back(false);
else if ("True") == 0) l.push_back(true);
else throw InvalidTestValueError(subType, testValueStr);
} else if ("byte") == 0) {
} else if ("bytes") == 0) {
} else if ("char") == 0) {
wchar_t val;
if (testValueStr[0] == '\\') { // Format: '\xNN'
val = (wchar_t)getIntegralValue<wchar_t>(testValueStr.substr(2));
} else { // Format: 'c'
val = testValueStr[0];
} else if ("double") == 0) {
l.push_back(getFloatValue<double, uint64_t>(testValueStr));
} else if ("float") == 0) {
l.push_back(getFloatValue<float, uint32_t>(testValueStr));
} else if ("int") == 0) {
} else if ("long") == 0) {
} else if ("short") == 0) {
} else if ("string") == 0) {
} else {
throw qpidit::UnknownJmsMessageSubTypeError(subType);
msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_STREAMMESSAGE_TYPE"]);
return msg;
proton::message& Sender::setTextMessage(proton::message& msg, const Json::Value& testValue) {
msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_TEXTMESSAGE_TYPE"]);
return msg;
proton::message& Sender::addMessageHeaders(proton::message& msg) {
Json::Value::Members headerNames = _testHeadersMap.getMemberNames();
for (std::vector<std::string>::const_iterator i=headerNames.begin(); i!=headerNames.end(); ++i) {
const Json::Value _subMap = _testHeadersMap[*i];
const std::string headerValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map
std::string val = _subMap[headerValueType].asString();
if (i->compare("JMS_TYPE_HEADER") == 0) {
setJmsTypeHeader(msg, val);
} else if (i->compare("JMS_CORRELATIONID_HEADER") == 0) {
if ("bytes") == 0) {
setJmsCorrelationId(msg, proton::binary(val));
} else {
setJmsCorrelationId(msg, val);
} else if (i->compare("JMS_REPLYTO_HEADER") == 0) {
setJmsReplyTo(msg, headerValueType, val);
} else {
throw qpidit::UnknownJmsHeaderTypeError(*i);
return msg;
proton::message& Sender::setJmsTypeHeader(proton::message& msg, const std::string& t) {
return msg;
proton::message& Sender::setJmsCorrelationId(proton::message& msg, const std::string& cid) {
proton::message_id mid(cid);
msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true);
return msg;
proton::message& Sender::setJmsCorrelationId(proton::message& msg, const proton::binary cid) {
proton::message_id mid(cid);
msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true);
return msg;
proton::message& Sender::setJmsReplyTo(proton::message& msg, const std::string& dts, const std::string& d) {
if ("queue") == 0) {
msg.reply_to(/*std::string("queue://") + */d);
msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(qpidit::JMS_QUEUE));
} else if ("temp_queue") == 0) {
msg.reply_to(/*std::string("queue://") + */d);
msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(qpidit::JMS_TMEP_QUEUE));
} else if ("topic") == 0) {
msg.reply_to(/*std::string("topic://") + */d);
msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(qpidit::JMS_TOPIC));
} else if ("temp_topic") == 0) {
msg.reply_to(/*std::string("topic://") + */d);
msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(qpidit::JMS_TEMP_TOPIC));
} else {
throw qpidit::UnknownJmsDestinationTypeError(dts);
return msg;
proton::message& Sender::addMessageProperties(proton::message& msg) {
Json::Value::Members propertyNames = _testPropertiesMap.getMemberNames();
for (std::vector<std::string>::const_iterator i=propertyNames.begin(); i!=propertyNames.end(); ++i) {
const Json::Value _subMap = _testPropertiesMap[*i];
const std::string propertyValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map
std::string val = _subMap[propertyValueType].asString();
if ("boolean") == 0) {
if ("False") == 0) setMessageProperty(msg, *i, false);
else if ("True") == 0) setMessageProperty(msg, *i, true);
else throw InvalidTestValueError(propertyValueType, val);
} else if ("byte") == 0) {
setMessageProperty(msg, *i, getIntegralValue<int8_t>(val));
} else if ("double") == 0) {
setMessageProperty(msg, *i, getFloatValue<double, uint64_t>(val));
} else if ("float") == 0) {
setMessageProperty(msg, *i, getFloatValue<float, uint64_t>(val));
} else if ("int") == 0) {
setMessageProperty(msg, *i, getIntegralValue<int32_t>(val));
} else if ("long") == 0) {
setMessageProperty(msg, *i, getIntegralValue<int64_t>(val));
} else if ("short") == 0) {
setMessageProperty(msg, *i, getIntegralValue<int16_t>(val));
} else if ("string") == 0) {
setMessageProperty(msg, *i, val);
} else {
throw qpidit::UnknownJmsPropertyTypeError(propertyValueType);
return msg;
proton::binary Sender::getJavaObjectBinary(const std::string& javaClassName, const std::string& valAsString) {
proton::binary javaObjectBinary;
char buf[1024];
int bytesRead;
FILE* fp = ::popen("java -cp target/JavaObjUtils.jar org.apache.qpid.interop_test.obj_util.JavaObjToBytes javaClassStr", "rb");
if (fp == NULL) { throw qpidit::PopenError(errno); }
do {
bytesRead = ::fread(buf, 1, sizeof(buf), fp);
javaObjectBinary.insert(javaObjectBinary.end(), &buf[0], &buf[bytesRead-1]);
} while (bytesRead == sizeof(buf));
int status = ::pclose(fp);
if (status == -1) {
throw qpidit::PcloseError(errno);
return javaObjectBinary;
// static
uint32_t Sender::getTotalNumMessages(const Json::Value& testValueMap) {
uint32_t tot = 0;
for (Json::Value::const_iterator i = testValueMap.begin(); i != testValueMap.end(); ++i) {
tot += (*i).size();
return tot;
} /* namespace jms_hdrs_props_test */
} /* namespace qpidit */
* --- main ---
* Args: 1: Broker address (ip-addr:port)
* 2: Queue name
* 3: AMQP type
* 4: JSON Test parameters containing 3 maps: [testValueMap, testHeadersMap, testPropertiesMap]
int main(int argc, char** argv) {
// TODO: improve arg management a little...
if (argc != 5) {
throw qpidit::ArgumentError("Incorrect number of arguments");
std::ostringstream oss;
oss << argv[1] << "/" << argv[2];
try {
Json::Value testParams;
Json::Reader jsonReader;
if (not jsonReader.parse(argv[4], testParams, false)) {
throw qpidit::JsonParserError(jsonReader);
qpidit::jms_hdrs_props_test::Sender sender(oss.str(), argv[3], testParams);
} catch (const std::exception& e) {
std::cout << "Sender error: " << e.what() << std::endl;