blob: 8608fbe828205e76af68a73e8d559c0aab7ddba4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "AdvisoryConsumer.h"
#include <activemq/core/ActiveMQConstants.h>
#include <activemq/util/AdvisorySupport.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/lang/exceptions/ClassCastException.h>
using namespace activemq;
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::commands;
using namespace activemq::exceptions;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::util::concurrent::atomic;
////////////////////////////////////////////////////////////////////////////////
namespace activemq {
namespace core {
class AdvisoryConsumerConfig {
public:
int deliveredCounter;
Pointer<ConsumerInfo> info;
AtomicBoolean closed;
int hashCode;
AdvisoryConsumerConfig() : deliveredCounter(0), info(), closed(false), hashCode() {
}
};
}}
////////////////////////////////////////////////////////////////////////////////
AdvisoryConsumer::AdvisoryConsumer(ActiveMQConnection* connection, Pointer<commands::ConsumerId> consumerId) :
Dispatcher(), config(new AdvisoryConsumerConfig()), connection(connection) {
if (connection == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Parent Connection pointer was NULL");
}
this->config->info.reset(new ConsumerInfo());
this->config->info->setConsumerId(consumerId);
this->config->info->setDestination(
Pointer<ActiveMQDestination>(AdvisorySupport::getTempDestinationCompositeAdvisoryTopic()));
this->config->info->setPrefetchSize(1000);
this->config->info->setNoLocal(true);
this->config->info->setDispatchAsync(true);
this->config->hashCode = consumerId->getHashCode();
try {
this->connection->addDispatcher(this->config->info->getConsumerId(), this);
this->connection->syncRequest(this->config->info);
} catch(...) {
delete this->config;
throw;
}
}
////////////////////////////////////////////////////////////////////////////////
AdvisoryConsumer::~AdvisoryConsumer() {
try {
delete config;
}
AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
void AdvisoryConsumer::dispose() {
if (this->config->closed.compareAndSet(false, true)) {
try {
this->connection->oneway(this->config->info->createRemoveCommand());
} catch (cms::CMSException& e) {
}
this->connection->removeDispatcher(this->config->info->getConsumerId());
}
}
////////////////////////////////////////////////////////////////////////////////
void AdvisoryConsumer::dispatch(const Pointer<MessageDispatch>& message) {
// Auto ack messages when we reach 75% of the prefetch
this->config->deliveredCounter++;
if (this->config->deliveredCounter > (0.75 * this->config->info->getPrefetchSize())) {
try {
Pointer<MessageAck> ack(new MessageAck());
ack->setAckType(ActiveMQConstants::ACK_TYPE_CONSUMED);
ack->setConsumerId(this->config->info->getConsumerId());
ack->setDestination(message->getDestination());
ack->setMessageCount(this->config->deliveredCounter);
ack->setLastMessageId(message->getMessage()->getMessageId());
this->connection->oneway(ack);
this->config->deliveredCounter = 0;
} catch (Exception& e) {
this->connection->onClientInternalException(e);
}
}
Pointer<DataStructure> object = message->getMessage()->getDataStructure();
if (object != NULL) {
try {
Pointer<DestinationInfo> info = object.dynamicCast<DestinationInfo>();
processDestinationInfo(info);
} catch (ClassCastException& ex) {
}
}
}
////////////////////////////////////////////////////////////////////////////////
void AdvisoryConsumer::processDestinationInfo(Pointer<commands::DestinationInfo> info) {
Pointer<ActiveMQDestination> dest = info->getDestination();
if (!dest->isTemporary()) {
return;
}
Pointer<ActiveMQTempDestination> tempDest = dest.dynamicCast<ActiveMQTempDestination>();
if (info->getOperationType() == ActiveMQConstants::DESTINATION_ADD_OPERATION) {
this->connection->addTempDestination(tempDest);
} else if (info->getOperationType() == ActiveMQConstants::DESTINATION_REMOVE_OPERATION) {
this->connection->removeTempDestination(tempDest);
}
}
////////////////////////////////////////////////////////////////////////////////
int AdvisoryConsumer::getHashCode() const {
return this->config->hashCode;
}