blob: c4fe43a543f65f6cd4c9acdb0552dddbaa4f859b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ActiveMQQueueBrowser.h"
#include <cms/MessageListener.h>
#include <activemq/commands/ConsumerId.h>
#include <activemq/commands/ActiveMQDestination.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/kernels/ActiveMQConsumerKernel.h>
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/PrefetchPolicy.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/util/CMSExceptionSupport.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
using namespace std;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::core::kernels;
using namespace activemq::commands;
using namespace activemq::exceptions;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::util::concurrent::atomic;
////////////////////////////////////////////////////////////////////////////////
namespace activemq{
namespace core{
class Browser : public ActiveMQConsumerKernel {
public:
ActiveMQQueueBrowser* parent;
private:
Browser(const Browser&);
Browser& operator= (const Browser&);
public:
Browser(ActiveMQQueueBrowser* parent, ActiveMQSessionKernel* session,
const Pointer<commands::ConsumerId>& id,
const Pointer<commands::ActiveMQDestination>& destination,
const std::string& name, const std::string& selector,
int prefetch, int maxPendingMessageCount, bool noLocal,
bool browser, bool dispatchAsync,
cms::MessageListener* listener) :
ActiveMQConsumerKernel(session, id, destination, name, selector, prefetch,
maxPendingMessageCount, noLocal, browser, dispatchAsync,
listener), parent(parent) {
}
virtual ~Browser() {}
virtual void dispatch(const Pointer<MessageDispatch>& dispatched) {
try{
if (dispatched->getMessage() == NULL) {
this->parent->browseDone.set(true);
} else {
ActiveMQConsumerKernel::dispatch(dispatched);
}
this->parent->notifyMessageAvailable();
}
AMQ_CATCH_RETHROW(ActiveMQException)
AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
AMQ_CATCHALL_THROW(ActiveMQException)
}
};
}}
////////////////////////////////////////////////////////////////////////////////
ActiveMQQueueBrowser::ActiveMQQueueBrowser(ActiveMQSessionKernel* session,
const Pointer<commands::ConsumerId>& consumerId,
const Pointer<commands::ActiveMQDestination>& destination,
const std::string& selector,
bool dispatchAsync) : cms::QueueBrowser(),
cms::MessageEnumeration(),
session(session),
consumerId(consumerId),
destination(destination),
selector(selector),
dispatchAsync(dispatchAsync),
queue(NULL),
closed(false),
mutex(),
wait(),
browseDone(),
browser(NULL) {
if (session == NULL) {
throw ActiveMQException(__FILE__, __LINE__, "Session instance provided was NULL.");
}
if (consumerId == NULL) {
throw ActiveMQException(__FILE__, __LINE__, "ConsumerId instance provided was NULL.");
}
if (destination == NULL || !destination->isQueue()) {
throw ActiveMQException(__FILE__, __LINE__, "Destination instance provided was NULL or not a Queue.");
}
// Cache the Queue instance for faster retreival.
this->queue = destination.dynamicCast<cms::Queue>().get();
}
////////////////////////////////////////////////////////////////////////////////
ActiveMQQueueBrowser::~ActiveMQQueueBrowser() {
try {
this->close();
}
DECAF_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
const cms::Queue* ActiveMQQueueBrowser::getQueue() const {
return this->queue;
}
////////////////////////////////////////////////////////////////////////////////
std::string ActiveMQQueueBrowser::getMessageSelector() const {
return this->selector;
}
////////////////////////////////////////////////////////////////////////////////
cms::MessageEnumeration* ActiveMQQueueBrowser::getEnumeration() {
try {
checkClosed();
if (this->browser == NULL) {
this->browser = createConsumer();
}
return this;
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQQueueBrowser::close() {
try {
if (closed.compareAndSet(false, true)) {
synchronized(&mutex) {
destroyConsumer();
}
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQQueueBrowser::hasMoreMessages() {
try {
while (true) {
synchronized(&mutex) {
if (this->browser == NULL) {
return false;
}
}
if (this->browser->getMessageAvailableCount() > 0) {
return true;
}
if (browseDone.get() || !this->session->isStarted()) {
destroyConsumer();
return false;
}
waitForMessageAvailable();
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
cms::Message* ActiveMQQueueBrowser::nextMessage() {
try {
while (true) {
synchronized(&mutex) {
if (this->browser == NULL) {
return NULL;
}
}
try {
cms::Message* answer = this->browser->receiveNoWait();
if (answer != NULL) {
return answer;
}
} catch (cms::CMSException& e) {
return NULL;
}
if (this->browseDone.get() || !this->session->isStarted()) {
destroyConsumer();
return NULL;
}
waitForMessageAvailable();
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQQueueBrowser::notifyMessageAvailable() {
synchronized(&wait) {
wait.notifyAll();
}
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQQueueBrowser::waitForMessageAvailable() {
synchronized(&wait) {
wait.wait(2000);
}
}
////////////////////////////////////////////////////////////////////////////////
Pointer<ActiveMQConsumerKernel> ActiveMQQueueBrowser::createConsumer() {
this->browseDone.set(false);
int prefetch = this->session->getConnection()->getPrefetchPolicy()->getQueueBrowserPrefetch();
Pointer<ActiveMQConsumerKernel> consumer(new Browser(this, session, consumerId, destination, "", selector, prefetch, 0, false, true, dispatchAsync, NULL));
try {
this->session->addConsumer(consumer);
this->session->syncRequest(consumer->getConsumerInfo());
} catch (Exception& ex) {
this->session->removeConsumer(consumer);
throw;
}
if (this->session->getConnection()->isStarted()) {
consumer->start();
}
return consumer;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQQueueBrowser::destroyConsumer() {
if (this->browser == NULL) {
return;
}
try {
if (this->session->isTransacted()) {
session->commit();
}
this->browser->stop();
this->browser->close();
this->browser.reset(NULL);
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQQueueBrowser::checkClosed() {
if (closed.get()) {
throw ActiveMQException(__FILE__, __LINE__, "The QueueBrowser is closed.");
}
}