blob: 01f8354c783c0e3b17481e0a107543e87156115b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "StompHelper.h"
#include <activemq/wireformat/stomp/StompWireFormat.h>
#include <activemq/wireformat/stomp/StompCommandConstants.h>
#include <activemq/commands/LocalTransactionId.h>
#include <decaf/util/StringTokenizer.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/Boolean.h>
#include <decaf/lang/Long.h>
using namespace std;
using namespace activemq;
using namespace activemq::commands;
using namespace activemq::wireformat;
using namespace activemq::wireformat::stomp;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
////////////////////////////////////////////////////////////////////////////////
namespace {
std::string doConvertDestination(StompWireFormat* wireFormat, Pointer<ActiveMQDestination> destination) {
switch (destination->getDestinationType()) {
case cms::Destination::TOPIC:
return std::string(wireFormat->getTopicPrefix()) + destination->getPhysicalName();
case cms::Destination::TEMPORARY_TOPIC:
if (destination->getPhysicalName().find("/remote-temp-topic/") == 0) {
return destination->getPhysicalName();
} else {
return std::string(wireFormat->getTempTopicPrefix()) + destination->getPhysicalName();
}
case cms::Destination::TEMPORARY_QUEUE:
if (destination->getPhysicalName().find("/remote-temp-queue/") == 0) {
return destination->getPhysicalName();
} else {
return std::string(wireFormat->getTempQueuePrefix()) + destination->getPhysicalName();
}
default:
return std::string(wireFormat->getQueuePrefix()) + destination->getPhysicalName();
}
}
}
////////////////////////////////////////////////////////////////////////////////
StompHelper::StompHelper(StompWireFormat* wireformat) : messageIdGenerator(), wireFormat(wireformat) {
}
////////////////////////////////////////////////////////////////////////////////
StompHelper::~StompHelper() {
}
////////////////////////////////////////////////////////////////////////////////
void StompHelper::convertProperties(const Pointer<StompFrame>& frame, const Pointer<Message>& message) {
const std::string destination = frame->removeProperty(StompCommandConstants::HEADER_DESTINATION);
message->setDestination(convertDestination(destination));
const std::string messageId = frame->removeProperty(StompCommandConstants::HEADER_MESSAGEID);
message->setMessageId(convertMessageId(messageId));
// the standard JMS headers
if (frame->hasProperty(StompCommandConstants::HEADER_CORRELATIONID)) {
message->setCorrelationId(
frame->removeProperty(StompCommandConstants::HEADER_CORRELATIONID));
}
if (frame->hasProperty(StompCommandConstants::HEADER_EXPIRES)) {
message->setExpiration(Long::parseLong(
frame->removeProperty(StompCommandConstants::HEADER_EXPIRES)));
}
if (frame->hasProperty(StompCommandConstants::HEADER_JMSPRIORITY)) {
message->setPriority((unsigned char) Integer::parseInt(
frame->removeProperty(StompCommandConstants::HEADER_JMSPRIORITY)));
}
if (frame->hasProperty(StompCommandConstants::HEADER_TYPE)) {
message->setType(
frame->removeProperty(StompCommandConstants::HEADER_TYPE));
}
if (frame->hasProperty(StompCommandConstants::HEADER_REPLYTO)) {
message->setReplyTo(convertDestination(
frame->removeProperty(StompCommandConstants::HEADER_REPLYTO)));
}
if (frame->hasProperty(StompCommandConstants::HEADER_PERSISTENT)) {
message->setPersistent(Boolean::parseBoolean(
frame->removeProperty(StompCommandConstants::HEADER_PERSISTENT)));
}
if (frame->hasProperty(StompCommandConstants::HEADER_TRANSACTIONID)) {
std::string transactionId = frame->removeProperty(StompCommandConstants::HEADER_TRANSACTIONID);
message->setTransactionId(convertTransactionId(transactionId));
}
// Handle JMSX Properties.
if (frame->hasProperty("JMSXDeliveryCount")) {
message->setRedeliveryCounter(Integer::parseInt(frame->removeProperty("JMSXDeliveryCount")));
}
if (frame->hasProperty("JMSXGroupID")) {
message->setGroupID(frame->removeProperty("JMSXGroupID"));
}
if (frame->hasProperty("JMSXGroupSeq")) {
message->setGroupSequence(Integer::parseInt(frame->removeProperty("JMSXGroupSeq")));
}
// Copy the general headers over to the Message.
std::vector<std::pair<std::string, std::string> > properties = frame->getProperties().toArray();
std::vector<std::pair<std::string, std::string> >::const_iterator iter = properties.begin();
for (; iter != properties.end(); ++iter) {
message->getMessageProperties().setString(iter->first, iter->second);
}
}
////////////////////////////////////////////////////////////////////////////////
void StompHelper::convertProperties(const Pointer<Message>& message, const Pointer<StompFrame>& frame) {
frame->setProperty(StompCommandConstants::HEADER_DESTINATION, convertDestination(message->getDestination()));
if (message->getCorrelationId() != "") {
frame->setProperty(StompCommandConstants::HEADER_CORRELATIONID, message->getCorrelationId());
}
frame->setProperty(StompCommandConstants::HEADER_EXPIRES, Long::toString(message->getExpiration()));
frame->setProperty(StompCommandConstants::HEADER_PERSISTENT, Boolean::toString(message->isPersistent()));
if (message->getRedeliveryCounter() != 0) {
frame->setProperty(StompCommandConstants::HEADER_REDELIVERED, "true");
}
frame->setProperty(StompCommandConstants::HEADER_JMSPRIORITY, Integer::toString(message->getPriority()));
if (message->getReplyTo() != NULL) {
frame->setProperty(StompCommandConstants::HEADER_REPLYTO, convertDestination(message->getReplyTo()));
}
frame->setProperty(StompCommandConstants::HEADER_TIMESTAMP, Long::toString(message->getTimestamp()));
if (message->getType() != "") {
frame->setProperty(StompCommandConstants::HEADER_TYPE, message->getType());
}
if (message->getTransactionId() != NULL) {
frame->setProperty(StompCommandConstants::HEADER_TRANSACTIONID, convertTransactionId(message->getTransactionId()));
}
// Handle JMSX Properties.
frame->setProperty("JMSXDeliveryCount", Integer::toString(message->getRedeliveryCounter()));
frame->setProperty("JMSXGroupSeq", Integer::toString(message->getGroupSequence()));
if (message->getGroupID() != "") {
frame->setProperty("JMSXGroupID", message->getGroupID());
}
Pointer<Iterator<std::string> > keys(message->getMessageProperties().keySet().iterator());
while (keys->hasNext()) {
std::string key = keys->next();
frame->setProperty(key, message->getMessageProperties().getString(key));
}
}
////////////////////////////////////////////////////////////////////////////////
std::string StompHelper::convertDestination(const Pointer<ActiveMQDestination>& destination) {
std::string result = "";
if (destination != NULL) {
if (destination->isComposite()) {
ArrayList< Pointer<ActiveMQDestination> > destinations = destination->getCompositeDestinations();
Pointer<Iterator< Pointer<ActiveMQDestination> > > destIter(destinations.iterator());
while (destIter->hasNext()) {
Pointer<ActiveMQDestination> composite = destIter->next();
if (!result.empty()) {
result.append(",");
}
result += doConvertDestination(wireFormat, composite);
}
} else {
result += doConvertDestination(wireFormat, destination);
}
}
return result;
}
////////////////////////////////////////////////////////////////////////////////
Pointer<ActiveMQDestination> StompHelper::convertDestination(const std::string& destination) {
if (destination == "") {
return Pointer<ActiveMQDestination>();
}
int type = 0;
std::string dest = "";
if (destination.find(wireFormat->getQueuePrefix()) == 0) {
dest = destination.substr(wireFormat->getQueuePrefix().length());
type = cms::Destination::QUEUE;
} else if (destination.find(wireFormat->getTopicPrefix()) == 0) {
dest = destination.substr(wireFormat->getTopicPrefix().length());
type = cms::Destination::TOPIC;
} else if (destination.find(wireFormat->getTempTopicPrefix()) == 0) {
dest = destination.substr(wireFormat->getTempTopicPrefix().length());
type = cms::Destination::TEMPORARY_TOPIC;
} else if (destination.find(wireFormat->getTempQueuePrefix()) == 0) {
dest = destination.substr(wireFormat->getTempQueuePrefix().length());
type = cms::Destination::TEMPORARY_QUEUE;
} else if (destination.find("/remote-temp-topic/") == 0) {
type = cms::Destination::TEMPORARY_TOPIC;
} else if (destination.find("/remote-temp-queue/") == 0) {
type = cms::Destination::TEMPORARY_QUEUE;
}
return ActiveMQDestination::createDestination(type, dest);
}
////////////////////////////////////////////////////////////////////////////////
std::string StompHelper::convertMessageId(const Pointer<MessageId>& messageId) {
// The Stomp MessageId is always hidden solely in the Producer Id.
std::string result = convertProducerId(messageId->getProducerId());
return result;
}
////////////////////////////////////////////////////////////////////////////////
Pointer<MessageId> StompHelper::convertMessageId(const std::string& messageId) {
if (messageId == "") {
return Pointer<MessageId>();
}
Pointer<MessageId> id(new MessageId());
id->setProducerId(convertProducerId(messageId));
id->setProducerSequenceId(this->messageIdGenerator.getNextSequenceId());
return id;
}
////////////////////////////////////////////////////////////////////////////////
std::string StompHelper::convertConsumerId(const Pointer<ConsumerId>& consumerId) {
return consumerId->getConnectionId() + ":" + Long::toString(consumerId->getSessionId()) + ":" + Long::toString(consumerId->getValue());
}
////////////////////////////////////////////////////////////////////////////////
Pointer<ConsumerId> StompHelper::convertConsumerId(const std::string& consumerId) {
if (consumerId == "") {
return Pointer<ConsumerId>();
}
Pointer<ConsumerId> id(new ConsumerId());
StringTokenizer tokenizer(consumerId, ":");
string connectionId;
connectionId += tokenizer.nextToken();
connectionId += ":";
connectionId += tokenizer.nextToken();
connectionId += ":";
connectionId += tokenizer.nextToken();
id->setConnectionId(connectionId);
while (tokenizer.hasMoreTokens()) {
string text = tokenizer.nextToken();
if (tokenizer.hasMoreTokens()) {
id->setSessionId(Long::parseLong(text));
} else {
id->setValue(Long::parseLong(text));
}
}
return id;
}
////////////////////////////////////////////////////////////////////////////////
std::string StompHelper::convertProducerId(const Pointer<ProducerId>& producerId) {
return producerId->getConnectionId();
}
////////////////////////////////////////////////////////////////////////////////
Pointer<ProducerId> StompHelper::convertProducerId(const std::string& producerId) {
if (producerId == "") {
return Pointer<ProducerId>();
}
Pointer<ProducerId> id(new ProducerId());
id->setConnectionId(producerId);
id->setSessionId(-1);
id->setValue(-1);
return id;
}
////////////////////////////////////////////////////////////////////////////////
std::string StompHelper::convertTransactionId(const Pointer<TransactionId>& transactionId) {
Pointer<LocalTransactionId> id = transactionId.dynamicCast<LocalTransactionId>();
std::string result = id->getConnectionId()->getValue() + ":" + Long::toString(id->getValue());
return result;
}
////////////////////////////////////////////////////////////////////////////////
Pointer<TransactionId> StompHelper::convertTransactionId(const std::string& transactionId) {
if (transactionId == "") {
return Pointer<TransactionId>();
}
Pointer<LocalTransactionId> id(new LocalTransactionId());
StringTokenizer tokenizer(transactionId, ":");
string connectionIdStr;
connectionIdStr += tokenizer.nextToken();
connectionIdStr += ":";
connectionIdStr += tokenizer.nextToken();
Pointer<ConnectionId> connectionId(new ConnectionId());
connectionId->setValue(connectionIdStr);
id->setConnectionId(connectionId);
while (tokenizer.hasMoreTokens()) {
string text = tokenizer.nextToken();
id->setValue(Long::parseLong(text));
}
return id;
}