blob: 6814af87ccdccc4a613cd56c65576561e5cc2567 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Receiver.h"
#include "MessagingTask.h"
#include "ConnectionFactoryMgr.h"
#include "BrokerMonitor.h"
#include "CmsMessageCreator.h"
#include <cms/Message.h>
#include <activemq/cmsutil/MessageCreator.h>
#include <decaf/lang/System.h>
#include <decaf/lang/Exception.h>
#include <decaf/util/concurrent/TimeUnit.h>
#include <stdio.h>
using namespace decaf::lang;
using namespace decaf::util::concurrent;
using namespace activemq::cmsutil;
using namespace cms;
using namespace cms::stress;
////////////////////////////////////////////////////////////////////////////////
Receiver::Receiver(const std::string& url, const std::string& queueOrTopicName,
bool isTopic, BrokerMonitor* monitor, CountDownLatch* quit,
long long receiveTimeout, bool useThreadPool) :
url(url),
mutexForCmsTemplate(),
mutexGeneral(),
closing(false),
brokerOnline(true),
ready(1),
quit(quit),
messageListener(NULL),
cmsTemplate(NULL),
asyncReceiverThread(NULL),
receiveTimeout(receiveTimeout),
cmsTemplateCreateTime(System::currentTimeMillis()),
useThreadPool(useThreadPool),
numOfMessagingTasks(0),
monitor(monitor),
selector() {
ConnectionFactory* connectionFactory = ConnectionFactoryMgr::getConnectionFactory(url);
cmsTemplateCreateTime = System::currentTimeMillis();
cmsTemplate = new CmsTemplate(connectionFactory);
cmsTemplate->setDefaultDestinationName(queueOrTopicName);
cmsTemplate->setPubSubDomain(isTopic);
cmsTemplate->setReceiveTimeout(receiveTimeout);
}
////////////////////////////////////////////////////////////////////////////////
Receiver::~Receiver() {
closing = true;
//delete cmsTemplate
mutexForCmsTemplate.lock();
if (cmsTemplate) {
delete cmsTemplate;
cmsTemplate = NULL;
}
mutexForCmsTemplate.unlock();
//wait until all outstanding messaging tasks are done
while (getNumOfMessagingTasks() > 0) {
Thread::sleep(100);
}
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::receiveMessage(std::string& message, ErrorCode& errorCode,
const std::string &selector, bool retryOnError) {
long long stopRetryTime = System::currentTimeMillis() + receiveTimeout;
errorCode = CMS_SUCCESS;
if (receiveTimeout == 0 /*CmsTemplate::RECEIVE_TIMEOUT_NO_WAIT*/) {
retryOnError = false;
} else if (receiveTimeout == -1 /*CmsTemplate::RECEIVE_TIMEOUT_INDEFINITE_WAIT*/) {
retryOnError = true;
}
if (monitor != NULL) {
if (monitor->isBrokerOk()) {
if (!brokerOnline) {
mutexForCmsTemplate.lock();
if (cmsTemplate) {
cmsTemplateCreateTime = System::currentTimeMillis();
CmsTemplate* cmsTemplate = new CmsTemplate(cmsTemplate->getConnectionFactory());
cmsTemplate->setDefaultDestinationName(cmsTemplate->getDefaultDestinationName());
cmsTemplate->setPubSubDomain(cmsTemplate->isPubSubDomain());
cmsTemplate->setReceiveTimeout(cmsTemplate->getReceiveTimeout());
delete cmsTemplate;
}
mutexForCmsTemplate.unlock();
brokerOnline = true;
}
} else {
brokerOnline = false;
errorCode = CMS_ERROR_MESSAGE_BROKER_ERROR;
return;
}
}
do {
long long timeoutForThisLoop;
if (receiveTimeout <= 0) {
timeoutForThisLoop = receiveTimeout;
} else {
timeoutForThisLoop = stopRetryTime - System::currentTimeMillis();
if (timeoutForThisLoop <= 0) {
errorCode = CMS_ERROR_RECEIVER_TIMEDOUT;
break;
}
}
mutexForCmsTemplate.lock();
if (cmsTemplate) {
cmsTemplate->setReceiveTimeout(timeoutForThisLoop);
cms::Message* cmsMessage = NULL;
try {
if (selector != "") {
cmsMessage = cmsTemplate->receiveSelected(selector);
} else {
cmsMessage = cmsTemplate->receive();
}
} catch (cms::CMSException& ex) {
mutexForCmsTemplate.unlock();
errorCode = CMS_ERROR_CAUGHT_CMS_EXCEPTION;
break;
}
mutexForCmsTemplate.unlock();
if (cmsMessage == NULL) {
break;
}
if (isMessageExpired(cmsMessage)) {
errorCode = CMS_ERROR_INVALID_MESSAGE;
delete cmsMessage;
continue;
}
wstring text;
cms::TextMessage* txtMessage = dynamic_cast<cms::TextMessage*>(cmsMessage);
if (txtMessage) {
message = txtMessage->getText();
}
delete cmsMessage;
} else {
mutexForCmsTemplate.unlock();
}
} while (errorCode != CMS_SUCCESS && retryOnError && System::currentTimeMillis() < stopRetryTime);
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::waitUntilReady() {
ready.await();
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::registerMessageListener(ReceiverListener* messageListener, ErrorCode& errorCode,
const std::string& selector, int id) {
errorCode = CMS_SUCCESS;
char buffer[512];
if (id != 0) {
sprintf(buffer, "TestListener-%d", id);
} else {
sprintf(buffer, "TestAsyncListener");
}
mutexGeneral.lock();
if (messageListener == NULL) {
errorCode = CMS_ERROR_INVALID_MESSAGELISTENER;
mutexGeneral.unlock();
return;
}
if (messageListener != NULL) {
errorCode = CMS_ERROR_A_MESSAGELISTENER_HAS_BEEN_REGISTERED_ALREADY;
mutexGeneral.unlock();
return;
}
this->messageListener = messageListener;
this->selector = selector;
asyncReceiverThread = new Thread(this, buffer);
asyncReceiverThread->start();
mutexGeneral.unlock();
this->waitUntilReady();
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::run() {
ready.countDown();
while (!closing) {
std::string message = "";
ErrorCode errorCode = CMS_SUCCESS;
Receiver::receiveMessage(message, errorCode, selector, false);
if (quit->getCount() == 0) {
closing = true;
}
if ((message != "") && (!closing)) {
if (useThreadPool) {
MessagingTask* task = new MessagingTask(this, message);
increaseNumOfMessagingTasks();
task->queue();
} else {
try {
executeMessagingTask(message, false);
} catch (...) {
}
}
} else if (!closing) {
if (errorCode == CMS_ERROR_CAUGHT_CMS_EXCEPTION || errorCode == CMS_ERROR_MESSAGE_BROKER_ERROR) {
long long sleepTime = 0;
mutexForCmsTemplate.lock();
sleepTime = cmsTemplate->getReceiveTimeout();
mutexForCmsTemplate.unlock();
if (quit->await(sleepTime)) {
closing = true;
}
}
}
}
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::executeMessagingTask(const std::string& message, bool isDecreaseNumOfMessagingTasks) {
if (!closing) {
mutexGeneral.lock();
ReceiverListener* copy = messageListener;
mutexGeneral.unlock();
if (copy) {
copy->onMessage(message);
}
}
if (isDecreaseNumOfMessagingTasks) {
decreaseNumOfMessagingTasks();
}
}
////////////////////////////////////////////////////////////////////////////////
bool Receiver::isMessageExpired(cms::Message* message) {
long long expireTime = message->getCMSExpiration();
long long currentTime = System::currentTimeMillis();
if (expireTime > 0 && currentTime > expireTime) {
return true;
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::increaseNumOfMessagingTasks() {
mutexGeneral.lock();
numOfMessagingTasks++;
mutexGeneral.unlock();
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::decreaseNumOfMessagingTasks() {
mutexGeneral.lock();
numOfMessagingTasks--;
mutexGeneral.unlock();
}
////////////////////////////////////////////////////////////////////////////////
long Receiver::getNumOfMessagingTasks() {
long result = 0;
mutexGeneral.lock();
result = numOfMessagingTasks;
mutexGeneral.unlock();
return result;
}
////////////////////////////////////////////////////////////////////////////////
void Receiver::close() {
closing = true;
if (asyncReceiverThread) {
asyncReceiverThread->join();
delete asyncReceiverThread;
asyncReceiverThread = NULL;
}
}