blob: 7d97ded8b9e5cb0bc99bcf87c9d304d6ca7fb4d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* \file PerfTest.cpp
*/
#include "tests/storePerftools/asyncPerf/MessageConsumer.h"
#include "tests/storePerftools/asyncPerf/MessageProducer.h"
#include "tests/storePerftools/asyncPerf/PerfTest.h"
#include "tests/storePerftools/common/ScopedTimer.h"
#include "tests/storePerftools/common/Thread.h"
#include "tests/storePerftools/version.h"
#include "qpid/Modules.h" // Use with loading store as module
#include "qpid/asyncStore/AsyncStoreImpl.h"
#include "qpid/asyncStore/AsyncStoreOptions.h"
#include "qpid/broker/SimpleQueue.h"
#include <iomanip>
namespace tests {
namespace storePerftools {
namespace asyncPerf {
PerfTest::PerfTest(const TestOptions& to,
const qpid::asyncStore::AsyncStoreOptions& aso) :
m_testOpts(to),
m_storeOpts(aso),
m_testResult(to),
m_msgData(new char[to.m_msgSize]),
m_poller(new qpid::sys::Poller),
m_pollingThread(m_poller.get()),
m_resultQueue(m_poller),
m_store(0)
{
std::memset(m_msgData, 0, (size_t)to.m_msgSize);
}
PerfTest::~PerfTest() {
m_poller->shutdown();
m_pollingThread.join();
m_queueList.clear();
m_queueList.clear();
m_producers.clear();
delete[] m_msgData;
}
void
PerfTest::run() {
if (m_testOpts.m_durable) {
prepareStore();
}
prepareQueues();
// TODO: replace with qpid::sys::Thread
std::deque<boost::shared_ptr<tests::storePerftools::common::Thread> > threads;
{ // --- Start of timed section ---
tests::storePerftools::common::ScopedTimer st(m_testResult);
for (uint16_t q = 0; q < m_testOpts.m_numQueues; q++) {
for (uint16_t t = 0; t < m_testOpts.m_numEnqThreadsPerQueue; t++) { // TODO - replace with qpid threads
boost::shared_ptr<MessageProducer> mp(new MessageProducer(m_testOpts,
m_msgData,
m_store,
m_resultQueue,
m_queueList[q]));
m_producers.push_back(mp);
boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mp->startProducers,
reinterpret_cast<void*>(mp.get())));
threads.push_back(tp);
}
for (uint16_t dt = 0; dt < m_testOpts.m_numDeqThreadsPerQueue; ++dt) { // TODO - replace with qpid threads
boost::shared_ptr<MessageConsumer> mc(new MessageConsumer(m_testOpts,
m_store,
m_resultQueue,
m_queueList[q]));
m_consumers.push_back(mc);
boost::shared_ptr<tests::storePerftools::common::Thread> tp(new tests::storePerftools::common::Thread(mc->startConsumers,
reinterpret_cast<void*>(mc.get())));
threads.push_back(tp);
}
}
while (threads.size()) {
threads.front()->join();
threads.pop_front();
}
} // --- End of timed section ---
destroyQueues();
destroyStore();
}
void
PerfTest::toStream(std::ostream& os) const {
m_testOpts.printVals(os);
os << std::endl;
m_storeOpts.printVals(os);
os << std::endl;
os << m_testResult << std::endl;
}
// private
void
PerfTest::prepareStore() {
qpid::asyncStore::AsyncStoreImpl* s = new qpid::asyncStore::AsyncStoreImpl(m_poller, m_storeOpts);
s->initialize();
m_store = s;
}
// private
void
PerfTest::destroyStore() {
if (m_store) {
delete m_store;
}
}
// private
void
PerfTest::prepareQueues() {
for (uint16_t i = 0; i < m_testOpts.m_numQueues; ++i) {
std::ostringstream qname;
qname << "queue_" << std::setw(4) << std::setfill('0') << i;
boost::shared_ptr<qpid::broker::SimpleQueue> mpq(new qpid::broker::SimpleQueue(qname.str(), m_queueArgs, m_store, m_resultQueue));
mpq->asyncCreate();
m_queueList.push_back(mpq);
}
}
// private
void
PerfTest::destroyQueues() {
while (m_queueList.size() > 0) {
m_queueList.front()->asyncDestroy(m_testOpts.m_destroyQueuesOnCompletion);
m_queueList.pop_front();
}
}
int
runPerfTest(int argc, char** argv) {
// Load async store module
qpid::tryShlib ("asyncStore.so");
qpid::CommonOptions co;
qpid::asyncStore::AsyncStoreOptions aso;
TestOptions to;
qpid::Options opts;
opts.add(co).add(aso).add(to);
try {
opts.parse(argc, argv);
aso.validate();
to.validate();
// Handle options that just print information then exit.
if (co.version) {
std::cout << tests::storePerftools::name() << " v." << tests::storePerftools::version() << std::endl;
return 0;
}
if (co.help) {
std::cout << tests::storePerftools::name() << ": asyncPerf" << std::endl;
std::cout << "Performance test for the async store through the qpid async store interface." << std::endl;
std::cout << "Usage: asyncPerf [options]" << std::endl;
std::cout << opts << std::endl;
return 0;
}
// Create and start test
tests::storePerftools::asyncPerf::PerfTest apt(to, aso);
apt.run();
// Print test result
std::cout << apt << std::endl;
//::sleep(1);
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
return 1;
}
return 0;
}
}}} // namespace tests::storePerftools::asyncPerf
// -----------------------------------------------------------------
int
main(int argc, char** argv) {
return tests::storePerftools::asyncPerf::runPerfTest(argc, argv);
}