blob: 17724c20fe6ea45e0c5e9d288a77035bb9049610 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.proton.jms;
import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.CompositeWritableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.apache.qpid.proton.codec.DroppingWritableBuffer;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.*;
import javax.jms.*;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import org.apache.qpid.proton.message.impl.MessageImpl;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
public class JMSMappingOutboundTransformer extends OutboundTransformer {
String prefixDeliveryAnnotations = "DA_";
String prefixMessageAnnotations= "MA_";
String prefixFooter = "FT_";
public JMSMappingOutboundTransformer(JMSVendor vendor) {
super(vendor);
}
@Override
public EncodedMessage transform(Message msg) throws Exception {
if( msg == null )
return null;
try {
if( msg.getBooleanProperty(prefixVendor + "NATIVE") ) {
return null;
}
} catch (MessageFormatException e) {
return null;
}
return transform(this, msg);
}
static EncodedMessage transform(JMSMappingOutboundTransformer options, Message msg) throws JMSException, UnsupportedEncodingException {
final JMSVendor vendor = options.vendor;
final String messageFormatKey = options.prefixVendor + "MESSAGE_FORMAT";
final String nativeKey = options.prefixVendor + "NATIVE";
final String firstAcquirerKey = options.prefixVendor + "FirstAcquirer";
final String prefixDeliveryAnnotationsKey = options.prefixVendor + options.prefixDeliveryAnnotations;
final String prefixMessageAnnotationsKey = options.prefixVendor + options.prefixMessageAnnotations;
final String subjectKey = options.prefixVendor +"Subject";
final String contentTypeKey = options.prefixVendor +"ContentType";
final String contentEncodingKey = options.prefixVendor +"ContentEncoding";
final String replyToGroupIDKey = options.prefixVendor +"ReplyToGroupID";
final String prefixFooterKey = options.prefixVendor + options.prefixFooter;
long messageFormat;
try {
messageFormat = msg.getLongProperty(messageFormatKey);
} catch (MessageFormatException e) {
return null;
}
Header header = new Header();
Properties props=new Properties();
HashMap daMap = null;
HashMap maMap = null;
HashMap apMap = null;
Section body=null;
HashMap footerMap = null;
if( msg instanceof BytesMessage ) {
BytesMessage m = (BytesMessage)msg;
byte data[] = new byte[(int) m.getBodyLength()];
m.readBytes(data);
body = new Data(new Binary(data));
} if( msg instanceof TextMessage ) {
body = new AmqpValue(((TextMessage) msg).getText());
} if( msg instanceof MapMessage ) {
final HashMap map = new HashMap();
final MapMessage m = (MapMessage) msg;
final Enumeration names = m.getMapNames();
while (names.hasMoreElements()) {
String key = (String) names.nextElement();
map.put(key, m.getObject(key));
}
body = new AmqpValue(map);
} if( msg instanceof StreamMessage ) {
ArrayList list = new ArrayList();
final StreamMessage m = (StreamMessage) msg;
try {
while(true) {
list.add(m.readObject());
}
} catch(MessageEOFException e){}
body = new AmqpSequence(list);
} if( msg instanceof ObjectMessage ) {
body = new AmqpValue(((ObjectMessage) msg).getObject());
}
header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
if( msg.getJMSExpiration() != 0 ) {
header.setTtl(new UnsignedInteger((int) msg.getJMSExpiration()));
}
if( msg.getJMSType()!=null ) {
if( maMap==null ) maMap = new HashMap();
maMap.put("x-opt-jms-type", msg.getJMSType());
}
if( msg.getJMSMessageID()!=null ) {
props.setMessageId(msg.getJMSMessageID());
}
if( msg.getJMSDestination()!=null ) {
props.setTo(vendor.toAddress(msg.getJMSDestination()));
if( maMap==null ) maMap = new HashMap();
maMap.put("x-opt-to-type", destinationAttributes(msg.getJMSDestination()));
}
if( msg.getJMSReplyTo()!=null ) {
props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
if( maMap==null ) maMap = new HashMap();
maMap.put("x-opt-reply-type", destinationAttributes(msg.getJMSReplyTo()));
}
if( msg.getJMSCorrelationID()!=null ) {
props.setCorrelationId(msg.getJMSCorrelationID());
}
if( msg.getJMSExpiration() != 0 ) {
props.setAbsoluteExpiryTime(new Date(msg.getJMSExpiration()));
}
if( msg.getJMSTimestamp()!= 0 ) {
props.setCreationTime(new Date(msg.getJMSTimestamp()));
}
final Enumeration keys = msg.getPropertyNames();
while (keys.hasMoreElements()) {
String key = (String) keys.nextElement();
if( key.equals(messageFormatKey) || key.equals(nativeKey)) {
// skip..
} else if( key.equals(firstAcquirerKey) ) {
header.setFirstAcquirer(msg.getBooleanProperty(key));
} else if( key.startsWith("JMSXDeliveryCount") ) {
header.setDeliveryCount(new UnsignedInteger(msg.getIntProperty(key)));
} else if( key.startsWith("JMSXUserID") ) {
String value = msg.getStringProperty(key);
props.setUserId(new Binary(value.getBytes("UTF-8")));
} else if( key.startsWith("JMSXGroupID") ) {
String value = msg.getStringProperty(key);
props.setGroupId(value);
if( apMap==null ) apMap = new HashMap();
apMap.put(key, value);
} else if( key.startsWith("JMSXGroupSeq") ) {
UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key));
props.setGroupSequence(value);
if( apMap==null ) apMap = new HashMap();
apMap.put(key, value);
} else if( key.startsWith(prefixDeliveryAnnotationsKey) ) {
if( daMap == null ) daMap = new HashMap();
String name = key.substring(prefixDeliveryAnnotationsKey.length());
daMap.put(name, msg.getObjectProperty(key));
} else if( key.startsWith(prefixMessageAnnotationsKey) ) {
if( maMap==null ) maMap = new HashMap();
String name = key.substring(prefixMessageAnnotationsKey.length());
maMap.put(name, msg.getObjectProperty(key));
} else if( key.equals(subjectKey) ) {
props.setSubject(msg.getStringProperty(key));
} else if( key.equals(contentTypeKey) ) {
props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
} else if( key.equals(contentEncodingKey) ) {
props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key)));
} else if( key.equals(replyToGroupIDKey) ) {
props.setReplyToGroupId(msg.getStringProperty(key));
} else if( key.startsWith(prefixFooterKey) ) {
if( footerMap==null ) footerMap = new HashMap();
String name = key.substring(prefixFooterKey.length());
footerMap.put(name, msg.getObjectProperty(key));
} else {
if( apMap==null ) apMap = new HashMap();
apMap.put(key, msg.getObjectProperty(key));
}
}
MessageAnnotations ma=null;
if( maMap!=null ) ma = new MessageAnnotations(maMap);
DeliveryAnnotations da=null;
if( daMap!=null ) da = new DeliveryAnnotations(daMap);
ApplicationProperties ap=null;
if( apMap!=null ) ap = new ApplicationProperties(apMap);
Footer footer=null;
if( footerMap!=null ) footer = new Footer(footerMap);
MessageImpl amqp = new MessageImpl(header, da, ma, props, ap, body, footer);
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
if( overflow.position() > 0 ) {
buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
}
return new EncodedMessage(messageFormat, buffer.array(), 0, c);
}
private static String destinationAttributes(Destination destination) {
if( destination instanceof Queue ) {
if( destination instanceof TemporaryQueue ) {
return "temporary,queue";
} else {
return "queue";
}
}
if( destination instanceof Topic ) {
if( destination instanceof TemporaryTopic ) {
return "temporary,topic";
} else {
return "topic";
}
}
return "";
}
}