blob: 495d9e6071661067606aa7cd77397e5d7f8e7951 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage;
import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame;
import java.io.IOException;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerContextAware;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.transport.stomp.Stomp.Headers;
import org.apache.activemq.transport.stomp.Stomp.Responses;
import org.apache.activemq.transport.stomp.Stomp.Transformations;
import org.apache.activemq.util.XStreamSupport;
import org.codehaus.jettison.mapped.Configuration;
import org.fusesource.hawtbuf.UTF8Buffer;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.converters.basic.AbstractSingleValueConverter;
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
import com.thoughtworks.xstream.io.xml.XppReader;
import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
/**
* Frame translator implementation that uses XStream to convert messages to and
* from XML and JSON
*/
public class JmsFrameTranslator extends LegacyFrameTranslator implements BrokerContextAware {
XStream xStream = null;
XStream xStreamAdvisory = null;
BrokerContext brokerContext;
@Override
public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
Map<String, String> headers = command.getHeaders();
ActiveMQMessage msg;
String transformation = headers.get(Headers.TRANSFORMATION);
if (headers.containsKey(Headers.CONTENT_LENGTH) || transformation.equals(Transformations.JMS_BYTE.toString())) {
msg = super.convertFrame(converter, command);
} else {
HierarchicalStreamReader in;
try {
String text = new String(command.getContent(), "UTF-8");
switch (Transformations.getValue(transformation)) {
case JMS_OBJECT_XML:
in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
msg = createObjectMessage(in);
break;
case JMS_OBJECT_JSON:
in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
msg = createObjectMessage(in);
break;
case JMS_MAP_XML:
in = new XppReader(new StringReader(text), XppFactory.createDefaultParser());
msg = createMapMessage(in);
break;
case JMS_MAP_JSON:
in = new JettisonMappedXmlDriver().createReader(new StringReader(text));
msg = createMapMessage(in);
break;
default:
throw new Exception("Unknown transformation: " + transformation);
}
} catch (Throwable e) {
command.getHeaders().put(Headers.TRANSFORMATION_ERROR, e.getMessage());
msg = super.convertFrame(converter, command);
}
}
copyStandardHeadersFromFrameToMessage(converter, command, msg, this);
return msg;
}
@Override
public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException {
StompFrame command = new StompFrame();
command.setAction(Responses.MESSAGE);
Map<String, String> headers = new HashMap<String, String>(25);
command.setHeaders(headers);
copyStandardHeadersFromMessageToFrame(converter, message, command, this);
String transformation = headers.get(Headers.TRANSFORMATION);
if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) {
if (Transformations.JMS_XML.equals(transformation)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString());
} else if (Transformations.JMS_JSON.equals(transformation)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_JSON.toString());
}
if (!headers.containsKey(Headers.TRANSFORMATION)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString());
}
ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy();
command.setContent(marshall(msg.getObject(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8"));
} else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) {
if (Transformations.JMS_XML.equals(transformation)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString());
} else if (Transformations.JMS_JSON.equals(transformation)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_JSON.toString());
}
if (!headers.containsKey(Headers.TRANSFORMATION)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString());
}
ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy();
command.setContent(marshall((Serializable) msg.getContentMap(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8"));
} else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) {
if (Transformations.JMS_XML.equals(transformation)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_XML.toString());
} else if (Transformations.JMS_JSON.equals(transformation)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString());
}
if (!headers.containsKey(Headers.TRANSFORMATION)) {
headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString());
}
String body = marshallAdvisory(message.getDataStructure(), headers.get(Headers.TRANSFORMATION));
command.setContent(body.getBytes("UTF-8"));
} else {
command = super.convertMessage(converter, message);
}
return command;
}
/**
* Marshal the Object to a string using XML or JSON encoding
*
* @param object
* the object to marshal
* @param transformation
* the transformation to apply to the object.
*
* @returns the marshaled form of the given object, in JSON or XML.
*
* @throws JMSException if an error occurs during the marshal operation.
*/
protected String marshall(Serializable object, String transformation) throws JMSException {
StringWriter buffer = new StringWriter();
HierarchicalStreamWriter out;
if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) {
out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer);
} else {
out = new PrettyPrintWriter(buffer);
}
getXStream().marshal(object, out);
return buffer.toString();
}
protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException {
ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage();
Object obj = getXStream().unmarshal(in);
objMsg.setObject((Serializable) obj);
return objMsg;
}
@SuppressWarnings("unchecked")
protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException {
ActiveMQMapMessage mapMsg = new ActiveMQMapMessage();
Map<String, Object> map = (Map<String, Object>) getXStream().unmarshal(in);
for (String key : map.keySet()) {
mapMsg.setObject(key, map.get(key));
}
return mapMsg;
}
protected String marshallAdvisory(final DataStructure ds, String transformation) {
StringWriter buffer = new StringWriter();
HierarchicalStreamWriter out;
if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) {
out = new JettisonMappedXmlDriver().createWriter(buffer);
} else {
out = new PrettyPrintWriter(buffer);
}
XStream xstream = getXStreamAdvisory();
xstream.setMode(XStream.NO_REFERENCES);
xstream.aliasPackage("", "org.apache.activemq.command");
xstream.marshal(ds, out);
return buffer.toString();
}
private XStream getXStreamAdvisory()
{
if (xStreamAdvisory == null) {
xStreamAdvisory = createXStream();
}
return xStreamAdvisory;
}
// Properties
// -------------------------------------------------------------------------
public XStream getXStream() {
if (xStream == null) {
xStream = createXStream();
}
return xStream;
}
public void setXStream(XStream xStream) {
this.xStream = xStream;
}
// Implementation methods
// -------------------------------------------------------------------------
@SuppressWarnings("unchecked")
protected XStream createXStream() {
XStream xstream = null;
if (brokerContext != null) {
Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class);
for (XStream bean : beans.values()) {
if (bean != null) {
xstream = bean;
break;
}
}
}
if (xstream == null) {
xstream = XStreamSupport.createXStream();
xstream.ignoreUnknownElements();
}
// For any object whose elements contains an UTF8Buffer instance instead
// of a String type we map it to String both in and out such that we don't
// marshal UTF8Buffers out
xstream.registerConverter(new AbstractSingleValueConverter() {
@Override
public Object fromString(String str) {
return str;
}
@SuppressWarnings("rawtypes")
@Override
public boolean canConvert(Class type) {
return type.equals(UTF8Buffer.class);
}
});
xstream.alias("string", UTF8Buffer.class);
return xstream;
}
@Override
public void setBrokerContext(BrokerContext brokerContext) {
this.brokerContext = brokerContext;
}
@Override
public BrokerContext getBrokerContext() {
return this.brokerContext;
}
/**
* Return an Advisory message as a JSON formatted string
*
* @param ds
* the DataStructure instance that is being marshaled.
*
* @return the JSON marshaled form of the given DataStructure instance.
*/
protected String marshallAdvisory(final DataStructure ds) {
XStream xstream = new XStream(new JsonHierarchicalStreamDriver());
xstream.setMode(XStream.NO_REFERENCES);
xstream.aliasPackage("", "org.apache.activemq.command");
return xstream.toXML(ds);
}
}