blob: f456b5f9aa3ce8e8bcd87492b43d01602ec77579 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.net.MalformedURLException;
import java.util.Enumeration;
import javax.jms.BytesMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageEOFException;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.activemq.blob.BlobDownloader;
import org.apache.activemq.command.ActiveMQBlobMessage;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
/**
* A helper class for converting normal JMS interfaces into ActiveMQ specific
* ones.
*
*
*/
public final class ActiveMQMessageTransformation {
private ActiveMQMessageTransformation() {
}
/**
* Creates a an available JMS message from another provider.
*
* @param destination - Destination to be converted into ActiveMQ's
* implementation.
* @return ActiveMQDestination - ActiveMQ's implementation of the
* destination.
* @throws JMSException if an error occurs
*/
public static ActiveMQDestination transformDestination(Destination destination) throws JMSException {
return ActiveMQDestination.transform(destination);
}
/**
* Creates a fast shallow copy of the current ActiveMQMessage or creates a
* whole new message instance from an available JMS message from another
* provider.
*
* @param message - Message to be converted into ActiveMQ's implementation.
* @param connection
* @return ActiveMQMessage - ActiveMQ's implementation object of the
* message.
* @throws JMSException if an error occurs
*/
public static ActiveMQMessage transformMessage(Message message, ActiveMQConnection connection)
throws JMSException {
if (message instanceof ActiveMQMessage) {
return (ActiveMQMessage)message;
} else {
ActiveMQMessage activeMessage = null;
if (message instanceof BytesMessage) {
BytesMessage bytesMsg = (BytesMessage)message;
bytesMsg.reset();
ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
msg.setConnection(connection);
try {
for (;;) {
// Reads a byte from the message stream until the stream
// is empty
msg.writeByte(bytesMsg.readByte());
}
} catch (MessageEOFException e) {
// if an end of message stream as expected
} catch (JMSException e) {
}
activeMessage = msg;
} else if (message instanceof MapMessage) {
MapMessage mapMsg = (MapMessage)message;
ActiveMQMapMessage msg = new ActiveMQMapMessage();
msg.setConnection(connection);
Enumeration iter = mapMsg.getMapNames();
while (iter.hasMoreElements()) {
String name = iter.nextElement().toString();
msg.setObject(name, mapMsg.getObject(name));
}
activeMessage = msg;
} else if (message instanceof ObjectMessage) {
ObjectMessage objMsg = (ObjectMessage)message;
ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
msg.setConnection(connection);
msg.setObject(objMsg.getObject());
msg.storeContent();
activeMessage = msg;
} else if (message instanceof StreamMessage) {
StreamMessage streamMessage = (StreamMessage)message;
streamMessage.reset();
ActiveMQStreamMessage msg = new ActiveMQStreamMessage();
msg.setConnection(connection);
Object obj = null;
try {
while ((obj = streamMessage.readObject()) != null) {
msg.writeObject(obj);
}
} catch (MessageEOFException e) {
// if an end of message stream as expected
} catch (JMSException e) {
}
activeMessage = msg;
} else if (message instanceof TextMessage) {
TextMessage textMsg = (TextMessage)message;
ActiveMQTextMessage msg = new ActiveMQTextMessage();
msg.setConnection(connection);
msg.setText(textMsg.getText());
activeMessage = msg;
} else if (message instanceof BlobMessage) {
BlobMessage blobMessage = (BlobMessage)message;
ActiveMQBlobMessage msg = new ActiveMQBlobMessage();
msg.setConnection(connection);
if (connection != null){
msg.setBlobDownloader(new BlobDownloader(connection.getBlobTransferPolicy()));
}
try {
msg.setURL(blobMessage.getURL());
} catch (MalformedURLException e) {
}
activeMessage = msg;
} else {
activeMessage = new ActiveMQMessage();
activeMessage.setConnection(connection);
}
copyProperties(message, activeMessage);
return activeMessage;
}
}
/**
* Copies the standard JMS and user defined properties from the givem
* message to the specified message
*
* @param fromMessage the message to take the properties from
* @param toMessage the message to add the properties to
* @throws JMSException
*/
public static void copyProperties(Message fromMessage, Message toMessage) throws JMSException {
toMessage.setJMSMessageID(fromMessage.getJMSMessageID());
toMessage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
toMessage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
toMessage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
toMessage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
toMessage.setJMSDeliveryTime(getFromMessageDeliveryTime(fromMessage)); // TODO: AMQ-8500 DeliveryTime support ref: ActiveMQSession#send
toMessage.setJMSRedelivered(fromMessage.getJMSRedelivered());
toMessage.setJMSType(fromMessage.getJMSType());
toMessage.setJMSExpiration(fromMessage.getJMSExpiration());
toMessage.setJMSPriority(fromMessage.getJMSPriority());
toMessage.setJMSTimestamp(fromMessage.getJMSTimestamp());
Enumeration propertyNames = fromMessage.getPropertyNames();
while (propertyNames.hasMoreElements()) {
String name = propertyNames.nextElement().toString();
Object obj = fromMessage.getObjectProperty(name);
toMessage.setObjectProperty(name, obj);
}
}
private static long getFromMessageDeliveryTime(Message fromMessage) throws JMSException {
Method deliveryTimeGetMethod = null;
try {
Class<?> clazz = fromMessage.getClass();
Method method = clazz.getMethod("getJMSDeliveryTime");
if (!Modifier.isAbstract(method.getModifiers())) {
deliveryTimeGetMethod = method;
}
} catch (NoSuchMethodException e) {
// We fallback to JMSTimestamp for jms v1.x
}
if (deliveryTimeGetMethod != null) {
return fromMessage.getJMSDeliveryTime();
}
return fromMessage.getJMSTimestamp();
}
}