blob: e6d40905968f844c96c360ee0be9c4c5b2d7a7d7 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <vector>
#include <iostream>
#include <boost/bind.hpp>
#include <qpid_test_plugin.h>
#include "InProcessBroker.h"
#include "sys/ProducerConsumer.h"
#include "sys/Thread.h"
#include "AMQP_HighestVersion.h"
#include "sys/AtomicCount.h"
using namespace qpid::sys;
using namespace qpid::framing;
using namespace boost;
using namespace std;
/** A counter that notifies a monitor when changed */
class WatchedCounter : public Monitor {
public:
WatchedCounter(int i=0) : count(i) {}
WatchedCounter(const WatchedCounter& c) : Monitor(), count(int(c)) {}
WatchedCounter& operator=(const WatchedCounter& x) {
return *this = int(x);
}
WatchedCounter& operator=(int i) {
Lock l(*this);
count = i;
return *this;
}
int operator++() {
Lock l(*this);
notifyAll();
return ++count;
}
int operator++(int) {
Lock l(*this);
notifyAll();
return count++;
}
bool operator==(int i) const {
Lock l(const_cast<WatchedCounter&>(*this));
return i == count;
}
operator int() const {
Lock l(const_cast<WatchedCounter&>(*this));
return count;
}
bool waitFor(int i, Time timeout=TIME_SEC) {
Lock l(*this);
Time deadline = timeout+now();
while (count != i) {
if (!wait(deadline))
return false;
}
assert(count == i);
return true;
}
private:
typedef Mutex::ScopedLock Lock;
int count;
};
class ProducerConsumerTest : public CppUnit::TestCase
{
CPPUNIT_TEST_SUITE(ProducerConsumerTest);
CPPUNIT_TEST(testProduceConsume);
CPPUNIT_TEST(testTimeout);
CPPUNIT_TEST(testStop);
CPPUNIT_TEST(testCancel);
CPPUNIT_TEST_SUITE_END();
public:
InProcessBrokerClient client;
ProducerConsumer pc;
WatchedCounter stopped;
WatchedCounter timeout;
WatchedCounter consumed;
WatchedCounter produced;
struct ConsumeRunnable : public Runnable {
ProducerConsumerTest& test;
ConsumeRunnable(ProducerConsumerTest& test_) : test(test_) {}
void run() { test.consume(); }
};
struct ConsumeTimeoutRunnable : public Runnable {
ProducerConsumerTest& test;
Time timeout;
ConsumeTimeoutRunnable(ProducerConsumerTest& test_, const Time& t)
: test(test_), timeout(t) {}
void run() { test.consumeTimeout(timeout); }
};
void consumeInternal(ProducerConsumer::ConsumerLock& consumer) {
if (pc.isStopped()) {
++stopped;
return;
}
if (consumer.isTimedOut()) {
++timeout;
return;
}
CPPUNIT_ASSERT(consumer.isOk());
CPPUNIT_ASSERT(pc.available() > 0);
consumer.confirm();
consumed++;
}
void consume() {
ProducerConsumer::ConsumerLock consumer(pc);
consumeInternal(consumer);
};
void consumeTimeout(const Time& timeout) {
ProducerConsumer::ConsumerLock consumer(pc, timeout);
consumeInternal(consumer);
};
void produce() {
ProducerConsumer::ProducerLock producer(pc);
CPPUNIT_ASSERT(producer.isOk());
producer.confirm();
produced++;
}
void join(vector<Thread>& threads) {
for_each(threads.begin(), threads.end(), bind(&Thread::join,_1));
}
vector<Thread> startThreads(size_t n, Runnable& runnable) {
vector<Thread> threads(n);
while (n > 0)
threads[--n] = Thread(runnable);
return threads;
}
public:
ProducerConsumerTest() : client(highestProtocolVersion) {}
void testProduceConsume() {
ConsumeRunnable runMe(*this);
produce();
produce();
CPPUNIT_ASSERT(produced.waitFor(2));
vector<Thread> threads = startThreads(1, runMe);
CPPUNIT_ASSERT(consumed.waitFor(1));
join(threads);
threads = startThreads(1, runMe);
CPPUNIT_ASSERT(consumed.waitFor(2));
join(threads);
threads = startThreads(3, runMe);
produce();
produce();
CPPUNIT_ASSERT(consumed.waitFor(4));
produce();
CPPUNIT_ASSERT(consumed.waitFor(5));
join(threads);
CPPUNIT_ASSERT_EQUAL(0, int(stopped));
}
void testTimeout() {
try {
// 0 timeout no items available throws exception
ProducerConsumer::ConsumerLock consumer(pc, 0);
CPPUNIT_FAIL("Expected exception");
} catch(...){}
produce();
CPPUNIT_ASSERT(produced.waitFor(1));
CPPUNIT_ASSERT_EQUAL(1, int(pc.available()));
{
// 0 timeout succeeds if there's an item available.
ProducerConsumer::ConsumerLock consume(pc, 0);
CPPUNIT_ASSERT(consume.isOk());
consume.confirm();
}
CPPUNIT_ASSERT_EQUAL(0, int(pc.available()));
// Produce an item within the timeout.
ConsumeTimeoutRunnable runMe(*this, 2*TIME_SEC);
vector<Thread> threads = startThreads(1, runMe);
produce();
CPPUNIT_ASSERT(consumed.waitFor(1));
join(threads);
}
void testStop() {
ConsumeRunnable runMe(*this);
vector<Thread> threads = startThreads(2, runMe);
while (pc.consumers() != 2)
Thread::yield();
pc.stop();
CPPUNIT_ASSERT(stopped.waitFor(2));
join(threads);
threads = startThreads(1, runMe); // Should stop immediately.
CPPUNIT_ASSERT(stopped.waitFor(3));
join(threads);
// Produce/consume while stopped should return isStopped and
// throw on confirm.
try {
ProducerConsumer::ProducerLock p(pc);
CPPUNIT_ASSERT(pc.isStopped());
CPPUNIT_FAIL("Expected exception");
}
catch (...) {} // Expected
try {
ProducerConsumer::ConsumerLock c(pc);
CPPUNIT_ASSERT(pc.isStopped());
CPPUNIT_FAIL("Expected exception");
}
catch (...) {} // Expected
}
void testCancel() {
CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
{
ProducerConsumer::ProducerLock p(pc);
CPPUNIT_ASSERT(p.isOk());
p.cancel();
}
// Nothing was produced.
CPPUNIT_ASSERT_EQUAL(size_t(0), pc.available());
{
ProducerConsumer::ConsumerLock c(pc, 0);
CPPUNIT_ASSERT(c.isTimedOut());
}
// Now produce but cancel the consume
{
ProducerConsumer::ProducerLock p(pc);
CPPUNIT_ASSERT(p.isOk());
p.confirm();
}
CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
{
ProducerConsumer::ConsumerLock c(pc);
CPPUNIT_ASSERT(c.isOk());
c.cancel();
}
CPPUNIT_ASSERT_EQUAL(size_t(1), pc.available());
}
};
// Make this test suite a plugin.
CPPUNIT_PLUGIN_IMPLEMENT();
CPPUNIT_TEST_SUITE_REGISTRATION(ProducerConsumerTest);