blob: bf7497a84993ecf32b96a83ffcefd22a6b6c3d7e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <CMS_MessageConsumer.h>
#include <Config.h>
#include <types/CMS_Types.h>
#include <cms/Message.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/StreamMessage.h>
#include <cms/MapMessage.h>
#ifdef HAVE_STDLIB_H
#include <stdlib.h>
#endif
#include <memory>
////////////////////////////////////////////////////////////////////////////////
cms_status createDefaultConsumer(CMS_Session* session, CMS_Destination* destination, CMS_MessageConsumer** consumer) {
cms_status result = CMS_SUCCESS;
std::auto_ptr<CMS_MessageConsumer> wrapper( new CMS_MessageConsumer );
try{
if (session == NULL || destination == NULL) {
result = CMS_ERROR;
} else {
wrapper->consumer = session->session->createConsumer(destination->destination);
*consumer = wrapper.release();
}
} catch(...) {
result = CMS_ERROR;
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
cms_status createConsumer(CMS_Session* session, CMS_Destination* destination, CMS_MessageConsumer** consumer, const char* selector, int noLocal) {
cms_status result = CMS_SUCCESS;
std::auto_ptr<CMS_MessageConsumer> wrapper( new CMS_MessageConsumer );
try{
if (session == NULL || destination == NULL) {
result = CMS_ERROR;
} else {
std::string sel = selector == NULL ? "" : std::string(selector);
wrapper->consumer = session->session->createConsumer(
destination->destination, sel, noLocal > 0 ? true : false);
*consumer = wrapper.release();
}
} catch(...) {
result = CMS_ERROR;
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
cms_status createDurableConsumer(CMS_Session* session,
CMS_Destination* destination,
CMS_MessageConsumer** consumer,
const char* subscriptionName,
const char* selector,
int noLocal) {
cms_status result = CMS_SUCCESS;
std::auto_ptr<CMS_MessageConsumer> wrapper( new CMS_MessageConsumer );
try{
if (session == NULL || destination == NULL) {
result = CMS_ERROR;
} else {
if (destination->type != CMS_TOPIC) {
result = CMS_INVALID_DESTINATION;
} else {
cms::Topic* topic = dynamic_cast<cms::Topic*>(destination->destination);
std::string name = subscriptionName == NULL ? "" : std::string(subscriptionName);
std::string sel = selector == NULL ? "" : std::string(selector);
wrapper->consumer = session->session->createDurableConsumer(
topic, name, sel, noLocal > 0 ? true : false);
*consumer = wrapper.release();
}
}
} catch(...) {
result = CMS_ERROR;
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
cms_status consumerReceive(CMS_MessageConsumer* consumer, CMS_Message** message) {
cms_status result = CMS_SUCCESS;
if(consumer != NULL) {
try{
std::auto_ptr<CMS_Message> wrapper( new CMS_Message );
cms::Message* msg = consumer->consumer->receive();
if(msg != NULL) {
wrapper->message = msg;
if(dynamic_cast<cms::TextMessage*>(msg) != NULL) {
wrapper->type = CMS_TEXT_MESSAGE;
} else if(dynamic_cast<cms::BytesMessage*>(msg) != NULL) {
wrapper->type = CMS_BYTES_MESSAGE;
} else if(dynamic_cast<cms::MapMessage*>(msg) != NULL) {
wrapper->type = CMS_MAP_MESSAGE;
} else if(dynamic_cast<cms::StreamMessage*>(msg) != NULL) {
wrapper->type = CMS_STREAM_MESSAAGE;
} else {
wrapper->type = CMS_MESSAGE;
}
*message = wrapper.release();
} else {
*message = NULL;
}
} catch(...) {
result = CMS_ERROR;
}
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
cms_status consumerReceiveWithTimeout(CMS_MessageConsumer* consumer, CMS_Message** message, int timeout) {
cms_status result = CMS_SUCCESS;
if(consumer != NULL) {
try{
std::auto_ptr<CMS_Message> wrapper( new CMS_Message );
cms::Message* msg = consumer->consumer->receive(timeout);
if (msg != NULL) {
wrapper->message = msg;
if(dynamic_cast<cms::TextMessage*>(msg) != NULL) {
wrapper->type = CMS_TEXT_MESSAGE;
} else if(dynamic_cast<cms::BytesMessage*>(msg) != NULL) {
wrapper->type = CMS_BYTES_MESSAGE;
} else if(dynamic_cast<cms::MapMessage*>(msg) != NULL) {
wrapper->type = CMS_MAP_MESSAGE;
} else if(dynamic_cast<cms::StreamMessage*>(msg) != NULL) {
wrapper->type = CMS_STREAM_MESSAAGE;
} else {
wrapper->type = CMS_MESSAGE;
}
*message = wrapper.release();
} else {
*message = NULL;
}
} catch(...) {
result = CMS_ERROR;
}
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
cms_status consumerReceiveNoWait(CMS_MessageConsumer* consumer, CMS_Message** message) {
cms_status result = CMS_SUCCESS;
if(consumer != NULL) {
try{
std::auto_ptr<CMS_Message> wrapper( new CMS_Message );
cms::Message* msg = consumer->consumer->receive();
if (msg != NULL) {
wrapper->message = msg;
if(dynamic_cast<cms::TextMessage*>(msg) != NULL) {
wrapper->type = CMS_TEXT_MESSAGE;
} else if(dynamic_cast<cms::BytesMessage*>(msg) != NULL) {
wrapper->type = CMS_BYTES_MESSAGE;
} else if(dynamic_cast<cms::MapMessage*>(msg) != NULL) {
wrapper->type = CMS_MAP_MESSAGE;
} else if(dynamic_cast<cms::StreamMessage*>(msg) != NULL) {
wrapper->type = CMS_STREAM_MESSAAGE;
} else {
wrapper->type = CMS_MESSAGE;
}
*message = wrapper.release();
} else {
*message = NULL;
}
} catch(...) {
result = CMS_ERROR;
}
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
cms_status closeConsumer(CMS_MessageConsumer* consumer) {
cms_status result = CMS_SUCCESS;
if(consumer != NULL) {
try{
consumer->consumer->close();
} catch(...) {
result = CMS_ERROR;
}
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
cms_status getConsumerMessageSelector(CMS_MessageConsumer* consumer, char* dest, int size) {
cms_status result = CMS_SUCCESS;
if(consumer != NULL && dest != NULL && size > 0) {
try{
std::string selector = consumer->consumer->getMessageSelector();
if(!selector.empty()) {
std::size_t pos = 0;
for(; pos < selector.size() && pos < size - 1; ++pos) {
dest[pos] = selector.at(pos);
}
dest[pos] = '\0';
} else {
dest[0] = '\0';
}
} catch(...) {
result = CMS_ERROR;
}
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
cms_status destroyConsumer(CMS_MessageConsumer* consumer) {
cms_status result = CMS_SUCCESS;
if(consumer != NULL) {
try{
delete consumer->consumer;
delete consumer;
} catch(...) {
result = CMS_ERROR;
}
}
return result;
}