blob: 78c61d3719bc977f8548cbab93a99c2eff2280cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SlowListenerTest.h"
#include <decaf/lang/Thread.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/StlSet.h>
#include <decaf/lang/System.h>
#include <activemq/exceptions/ActiveMQException.h>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::test;
using namespace activemq::util;
using namespace activemq::exceptions;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
namespace activemq{
namespace test{
class SlowListener: public cms::MessageListener {
public:
unsigned int count;
decaf::util::StlSet<long long> threadIds;
SlowListener() { count = 0; }
void onMessage( const cms::Message* message ) {
synchronized( &threadIds ) {
count++;
threadIds.add( Thread::getId() );
}
Thread::sleep( 20 );
}
};
}}
////////////////////////////////////////////////////////////////////////////////
void SlowListenerTest::testSlowListener() {
try {
SlowListener listener;
cms::Session* session = cmsProvider->getSession();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
const unsigned int numConsumers = 5;
cms::MessageConsumer* consumers[numConsumers];
// Create several consumers for the same destination.
for( unsigned int i = 0; i < numConsumers; i++ ) {
consumers[i] = session->createConsumer( cmsProvider->getDestination() );
consumers[i]->setMessageListener( &listener );
}
auto_ptr<cms::BytesMessage> message( session->createBytesMessage() );
unsigned int msgCount = 50;
for( unsigned int i = 0; i < msgCount; i++ ) {
producer->send( message.get() );
}
// Wait no more than 10 seconds for all the messages to come in.
waitForMessages( msgCount * numConsumers, 10000, &listener );
synchronized( &listener.threadIds ) {
// Make sure that the listener was always accessed by the same thread
// and that it received all the messages from all consumers.
CPPUNIT_ASSERT_EQUAL( 1, (int)listener.threadIds.size() );
CPPUNIT_ASSERT_EQUAL( (msgCount * numConsumers), listener.count );
}
for( unsigned int i = 0; i < numConsumers; i++ ) {
delete consumers[i];
}
} catch( ActiveMQException& ex ) {
ex.printStackTrace();
throw ex;
}
}
////////////////////////////////////////////////////////////////////////////////
void SlowListenerTest::waitForMessages(
unsigned int count, long long maxWaitTime, SlowListener* l ) {
long long startTime = System::currentTimeMillis();
synchronized( &( l->threadIds ) ) {
while( l->count < count ) {
long long curTime = System::currentTimeMillis();
if( ( curTime - startTime ) >= maxWaitTime ) {
return;
}
l->threadIds.wait( 500 );
}
}
}