blob: 6ff2efde1400c1a47e119af66f016ca49e904c96 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "AdvisoryConsumer.h"
#include <cms/Topic.h>
#include <cms/Message.h>
#include <cms/TextMessage.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <activemq/commands/ActiveMQMessage.h>
#include <activemq/commands/ProducerInfo.h>
using namespace std;
using namespace activemq;
using namespace activemq::commands;
using namespace activemqcpp;
using namespace activemqcpp::examples;
using namespace activemqcpp::examples::advisories;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
AdvisoryConsumer::AdvisoryConsumer(cms::Session* session) : session(session), consumer(), advisoryConsumer() {
if (session == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Session Object passed was Null.");
}
std::auto_ptr<cms::Topic> destination(session->createTopic("HEART-BEAT-CHANNEL"));
std::auto_ptr<cms::Topic> advisories(
session->createTopic("ActiveMQ.Advisory.Producer.Topic.HEART-BEAT-CHANNEL"));
this->consumer.reset(session->createConsumer(destination.get()));
this->advisoryConsumer.reset(session->createConsumer(advisories.get()));
this->consumer->setMessageListener(this);
this->advisoryConsumer->setMessageListener(this);
}
////////////////////////////////////////////////////////////////////////////////
AdvisoryConsumer::~AdvisoryConsumer() {
}
////////////////////////////////////////////////////////////////////////////////
void AdvisoryConsumer::close() {
this->consumer.reset(NULL);
}
////////////////////////////////////////////////////////////////////////////////
void AdvisoryConsumer::onMessage(const cms::Message* message) {
if (message->getCMSType() == "Advisory") {
const ActiveMQMessage* amqMessage = dynamic_cast<const ActiveMQMessage*> (message);
// If you want you can get the ProducerInfo for instance, you could get
// the ConsumerInfo and ConnectionInfo.
if (amqMessage != NULL && amqMessage->getDataStructure() != NULL) {
const ProducerInfo* info = dynamic_cast<const ProducerInfo*> (amqMessage->getDataStructure().get());
std::cout << "Got ProducerInfo for producer: " << info->getProducerId()->toString() << std::endl;
}
if (message->propertyExists("producerCount")) {
std::string producerCount = message->getStringProperty("producerCount");
std::cout << "Number of Producers = " << producerCount << std::endl;
}
} else {
const cms::TextMessage* txtMessage = dynamic_cast<const cms::TextMessage*> (message);
if (txtMessage != NULL) {
std::cout << "Producer Reports Status as: " << txtMessage->getText() << std::endl;
}
}
}