blob: 0b2238ddab8db251e277b8f1116d298c3da9e352 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "qpidit/amqp_large_content_test/Sender.hpp"
#include <iomanip>
#include <iostream>
#include <json/json.h>
#include <proton/container.hpp>
#include <proton/default_container.hpp>
#include <proton/connection.hpp>
#include <proton/message.hpp>
#include <proton/sender.hpp>
#include <proton/tracker.hpp>
#include <qpidit/QpidItErrors.hpp>
namespace qpidit
{
namespace amqp_large_content_test
{
Sender::Sender(const std::string& brokerAddr,
const std::string& queueName,
const std::string& amqpType,
const Json::Value& testValues) :
AmqpSenderBase("amqp_large_content_test::Sender", brokerAddr, queueName, testValues.size()),
_amqpType(amqpType),
_testValues(testValues)
{}
Sender::~Sender() {}
void Sender::on_sendable(proton::sender &s) {
if (_totalMsgs == 0) {
s.connection().close();
} else if (_msgsSent == 0) {
for (Json::Value::const_iterator i=_testValues.begin(); i!=_testValues.end(); ++i) {
if (s.credit()) {
uint32_t totSizeMb;
Json::Value numElementsList = Json::arrayValue;
if ((*i).isInt()) {
totSizeMb = (*i).asInt();
numElementsList.append(1);
} else if ((*i).isArray()) {
totSizeMb = (*i)[0].asInt();
numElementsList = (*i)[1];
} else {
std::cerr << "on_sendable: Unexpected JSON type: " << (*i).type() << std::endl;
}
for (Json::Value::iterator numElementsAsStrItr=numElementsList.begin();
numElementsAsStrItr!=numElementsList.end();
++numElementsAsStrItr) {
proton::message msg;
setMessage(msg, totSizeMb * 1024 * 1024, (*numElementsAsStrItr).asInt());
s.send(msg);
_msgsSent++;
}
}
}
} else {
// do nothing
}
}
// protected
proton::message& Sender::setMessage(proton::message& msg,
uint32_t totSizeBytes,
uint32_t numElements) {
if (_amqpType.compare("binary") == 0) {
proton::binary val(createTestString(totSizeBytes));
msg.body(val);
} else if (_amqpType.compare("string") == 0) {
msg.body(createTestString(totSizeBytes));
} else if (_amqpType.compare("symbol") == 0) {
proton::symbol val(createTestString(totSizeBytes));
msg.body(val);
} else if (_amqpType.compare("list") == 0) {
std::vector<proton::value> testList;
createTestList(testList, totSizeBytes, numElements);
msg.body(testList);
} else if (_amqpType.compare("map") == 0) {
std::map<std::string, proton::value> testMap;
createTestMap(testMap, totSizeBytes, numElements);
msg.body(testMap);
}
return msg;
}
// static
void Sender::createTestList(std::vector<proton::value>& testList,
uint32_t totSizeBytes,
uint32_t numElements) {
uint32_t sizePerEltBytes = totSizeBytes / numElements;
for (uint32_t i=0; i<numElements; ++i) {
testList.push_back(createTestString(sizePerEltBytes));
}
}
// static
void Sender::createTestMap(std::map<std::string, proton::value>& testMap,
uint32_t totSizeBytes,
uint32_t numElements) {
uint32_t sizePerEltBytes = totSizeBytes / numElements;
for (uint32_t i=0; i<numElements; ++i) {
std::ostringstream oss;
oss << "elt_" << std::setw(6) << std::setfill('0') << i;
testMap[oss.str()] = createTestString(sizePerEltBytes);
}
}
//static
std::string Sender::createTestString(uint32_t msgSizeBytes) {
std::ostringstream oss;
for (uint32_t i=0; i<msgSizeBytes; ++i) {
oss << char('a' + (i%26));
}
return oss.str();
}
} /* namespace amqp_large_content_test */
} /* namespace qpidit */
/*
* --- main ---
* Args: 1: Broker address (ip-addr:port)
* 2: Queue name
* 3: AMQP type
* 4: Test value(s) as JSON string
*/
int main(int argc, char** argv) {
// TODO: improve arg management a little...
if (argc != 5) {
throw qpidit::ArgumentError("Incorrect number of arguments");
}
try {
Json::Value testValues;
Json::Reader jsonReader;
if (not jsonReader.parse(argv[4], testValues, false)) {
throw qpidit::JsonParserError(jsonReader);
}
qpidit::amqp_large_content_test::Sender sender(argv[1], argv[2], argv[3], testValues);
proton::default_container(sender).run();
} catch (const std::exception& e) {
std::cerr << "amqp_large_content_test Sender error: " << e.what() << std::endl;
exit(1);
}
exit(0);
}