blob: 0b2d35dba6d01c8654f4a919fd389cace9d1a1b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Receiver.h"
#include "MessagingTask.h"
#include "decaf/lang/System.h"
#include "ConnectionFactoryMgr.h"
#include <cms/Message.h>
#include <decaf/lang/Exception.h>
#include <decaf/util/concurrent/TimeUnit.h>
using namespace std;
using namespace cms;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace activemq::cmsutil;
using namespace cmstemplate;
////////////////////////////////////////////////////////////////////////////////
ThreadPoolExecutor* Receiver::threadPoolExecutor = NULL;
////////////////////////////////////////////////////////////////////////////////
Receiver::Receiver(const string & url, const string & queueOrTopicName,
bool isTopic, long long receiveTimeout, bool useThreadPool) :
url(url),
mutexForCmsTemplate(),
mutexGeneral(),
closing(false),
ready(1),
messageListener(NULL),
cmsTemplate(NULL),
asyncReceiverThread(NULL),
receiveTimeout(receiveTimeout),
bUseThreadPool(useThreadPool),
cmsTemplateCreateTime(0),
numOfMessagingTasks(0) {
ConnectionFactory* connectionFactory = ConnectionFactoryMgr::getConnectionFactory(url);
cmsTemplate.reset(new CmsTemplate(connectionFactory));
cmsTemplate->setDefaultDestinationName(queueOrTopicName);
cmsTemplate->setPubSubDomain(isTopic);
cmsTemplate->setReceiveTimeout(receiveTimeout);
}
////////////////////////////////////////////////////////////////////////////////
Receiver::~Receiver() {
try {
closing = true;
// wait until all outstanding messaging tasks are done
while (true) {
long numOfMessagingTasks = getNumOfMessagingTasks();
if (numOfMessagingTasks <= 0) {
break;
}
Thread::sleep(1000);
}
if (asyncReceiverThread.get() != NULL) {
asyncReceiverThread->join();
}
} catch (...) {
}
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::initialize(int reservedThreads, int maxThreads) {
threadPoolExecutor = new ThreadPoolExecutor(reservedThreads, maxThreads, 5, TimeUnit::SECONDS, new LinkedBlockingQueue<Runnable*>());
threadPoolExecutor->prestartCoreThread();
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::unInitialize() {
if (threadPoolExecutor != NULL) {
try {
threadPoolExecutor->shutdown();
threadPoolExecutor->awaitTermination(10000, TimeUnit::MILLISECONDS);
} catch (Exception& ie) {
}
delete threadPoolExecutor;
threadPoolExecutor = NULL;
}
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::receiveMessage(std::string& message, ErrorCode& errorCode, bool retryOnError) {
long long stopRetryTime = System::currentTimeMillis() + receiveTimeout;
errorCode = CMS_SUCCESS;
if (receiveTimeout == 0/*CmsTemplate::RECEIVE_TIMEOUT_NO_WAIT*/) {
retryOnError = false;
} else if (receiveTimeout == -1/*CmsTemplate::RECEIVE_TIMEOUT_INDEFINITE_WAIT*/) {
retryOnError = true;
}
do {
long long timeoutForThisLoop;
if (receiveTimeout <= 0) {
timeoutForThisLoop = receiveTimeout;
} else {
timeoutForThisLoop = stopRetryTime - System::currentTimeMillis();
if (timeoutForThisLoop <= 0) {
errorCode = CMS_ERROR_RECEIVER_TIMEDOUT;
break;
}
}
mutexForCmsTemplate.lock();
if (cmsTemplate.get() != NULL) {
cmsTemplate->setReceiveTimeout(timeoutForThisLoop);
cms::Message* cmsMessage = NULL;
try {
cmsMessage = cmsTemplate->receive();
} catch (cms::CMSException& ex) {
mutexForCmsTemplate.unlock();
errorCode = CMS_ERROR_CAUGHT_CMS_EXCEPTION;
break;
}
mutexForCmsTemplate.unlock();
if (cmsMessage == NULL) {
break;
}
if (isMessageExpired(cmsMessage)) {
errorCode = CMS_ERROR_INVALID_MESSAGE;
delete cmsMessage;
continue;
}
wstring text;
cms::TextMessage* txtMessage = dynamic_cast<cms::TextMessage*>(cmsMessage);
if (txtMessage) {
message = txtMessage->getText();
}
delete cmsMessage;
} else {
mutexForCmsTemplate.unlock();
}
} while (errorCode != CMS_SUCCESS && retryOnError && System::currentTimeMillis() < stopRetryTime);
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::waitUntilReady() {
ready.await();
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::registerMessageListener(ReceiverListener* messageListener, ErrorCode& errorCode) {
errorCode = CMS_SUCCESS;
mutexGeneral.lock();
if (messageListener == NULL) {
errorCode = CMS_ERROR_INVALID_MESSAGELISTENER;
mutexGeneral.unlock();
return;
}
if (this->messageListener != NULL) {
errorCode = CMS_ERROR_A_MESSAGELISTENER_HAS_BEEN_REGISTERED_ALREADY;
mutexGeneral.unlock();
return;
}
this->messageListener = messageListener;
asyncReceiverThread.reset(new Thread(this, "AsyncReceiver"));
asyncReceiverThread->start();
mutexGeneral.unlock();
this->waitUntilReady();
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::run() {
ready.countDown();
while (!closing) {
string message = "";
ErrorCode errorCode = CMS_SUCCESS;
receiveMessage(message, errorCode, false);
if (message != "") {
if (bUseThreadPool) {
queueMessagingTask(message);
} else {
try {
executeMessagingTask(message, false);
} catch (...) {
}
}
} else {
if (errorCode == CMS_ERROR_CAUGHT_CMS_EXCEPTION || errorCode == CMS_ERROR_MESSAGE_BROKER_ERROR) {
long long sleepTime = 0;
mutexForCmsTemplate.lock();
sleepTime = cmsTemplate->getReceiveTimeout();
mutexForCmsTemplate.unlock();
Thread::sleep(sleepTime);
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::queueMessagingTask(const string& message) {
if (message != "" && (!closing)) {
MessagingTask* task = new MessagingTask(this, message);
increaseNumOfMessagingTasks();
threadPoolExecutor->execute(task);
}
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::executeMessagingTask(const string& message, bool bDecreaseNumOfMessagingTasks/*=true*/) {
if ((!closing)) {
mutexGeneral.lock();
ReceiverListener* copy = this->messageListener;
mutexGeneral.unlock();
if (copy) {
copy->onMessage(message);
}
}
if (bDecreaseNumOfMessagingTasks) {
decreaseNumOfMessagingTasks();
}
}
////////////////////////////////////////////////////////////////////////////////
bool Receiver::isMessageExpired(cms::Message* message) {
long long expireTime = message->getCMSExpiration();
long long currentTime = System::currentTimeMillis();
if (expireTime > 0 && currentTime > expireTime) {
return true;
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::increaseNumOfMessagingTasks() {
mutexGeneral.lock();
numOfMessagingTasks++;
mutexGeneral.unlock();
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::decreaseNumOfMessagingTasks() {
mutexGeneral.lock();
numOfMessagingTasks--;
mutexGeneral.unlock();
}
////////////////////////////////////////////////////////////////////////////////
long Receiver::getNumOfMessagingTasks() {
long numOfMessagingTasks = 0;
mutexGeneral.lock();
this->numOfMessagingTasks = numOfMessagingTasks;
mutexGeneral.unlock();
return numOfMessagingTasks;
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::close() {
closing = true;
}