| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.activemq.transport.stomp; |
| |
| import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromFrameToMessage; |
| import static org.apache.activemq.transport.stomp.FrameTranslator.Helper.copyStandardHeadersFromMessageToFrame; |
| |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.io.StringReader; |
| import java.io.StringWriter; |
| import java.util.HashMap; |
| import java.util.Locale; |
| import java.util.Map; |
| |
| import javax.jms.JMSException; |
| |
| import org.apache.activemq.advisory.AdvisorySupport; |
| import org.apache.activemq.broker.BrokerContext; |
| import org.apache.activemq.broker.BrokerContextAware; |
| import org.apache.activemq.command.ActiveMQMapMessage; |
| import org.apache.activemq.command.ActiveMQMessage; |
| import org.apache.activemq.command.ActiveMQObjectMessage; |
| import org.apache.activemq.command.DataStructure; |
| import org.apache.activemq.transport.stomp.Stomp.Headers; |
| import org.apache.activemq.transport.stomp.Stomp.Responses; |
| import org.apache.activemq.transport.stomp.Stomp.Transformations; |
| import org.apache.activemq.util.XStreamSupport; |
| import org.codehaus.jettison.mapped.Configuration; |
| import org.fusesource.hawtbuf.UTF8Buffer; |
| |
| import com.thoughtworks.xstream.XStream; |
| import com.thoughtworks.xstream.converters.basic.AbstractSingleValueConverter; |
| import com.thoughtworks.xstream.io.HierarchicalStreamReader; |
| import com.thoughtworks.xstream.io.HierarchicalStreamWriter; |
| import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver; |
| import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver; |
| import com.thoughtworks.xstream.io.xml.PrettyPrintWriter; |
| import com.thoughtworks.xstream.io.xml.XppReader; |
| import com.thoughtworks.xstream.io.xml.xppdom.XppFactory; |
| |
| /** |
| * Frame translator implementation that uses XStream to convert messages to and |
| * from XML and JSON |
| */ |
| public class JmsFrameTranslator extends LegacyFrameTranslator implements BrokerContextAware { |
| |
| XStream xStream = null; |
| XStream xStreamAdvisory = null; |
| BrokerContext brokerContext; |
| |
| @Override |
| public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException { |
| Map<String, String> headers = command.getHeaders(); |
| ActiveMQMessage msg; |
| String transformation = headers.get(Headers.TRANSFORMATION); |
| if (headers.containsKey(Headers.CONTENT_LENGTH) || transformation.equals(Transformations.JMS_BYTE.toString())) { |
| msg = super.convertFrame(converter, command); |
| } else { |
| HierarchicalStreamReader in; |
| |
| try { |
| String text = new String(command.getContent(), "UTF-8"); |
| switch (Transformations.getValue(transformation)) { |
| case JMS_OBJECT_XML: |
| in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); |
| msg = createObjectMessage(in); |
| break; |
| case JMS_OBJECT_JSON: |
| in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); |
| msg = createObjectMessage(in); |
| break; |
| case JMS_MAP_XML: |
| in = new XppReader(new StringReader(text), XppFactory.createDefaultParser()); |
| msg = createMapMessage(in); |
| break; |
| case JMS_MAP_JSON: |
| in = new JettisonMappedXmlDriver().createReader(new StringReader(text)); |
| msg = createMapMessage(in); |
| break; |
| default: |
| throw new Exception("Unknown transformation: " + transformation); |
| } |
| } catch (Throwable e) { |
| command.getHeaders().put(Headers.TRANSFORMATION_ERROR, e.getMessage()); |
| msg = super.convertFrame(converter, command); |
| } |
| } |
| |
| copyStandardHeadersFromFrameToMessage(converter, command, msg, this); |
| return msg; |
| } |
| |
| @Override |
| public StompFrame convertMessage(ProtocolConverter converter, ActiveMQMessage message) throws IOException, JMSException { |
| |
| StompFrame command = new StompFrame(); |
| command.setAction(Responses.MESSAGE); |
| Map<String, String> headers = new HashMap<String, String>(25); |
| command.setHeaders(headers); |
| |
| copyStandardHeadersFromMessageToFrame(converter, message, command, this); |
| |
| String transformation = headers.get(Headers.TRANSFORMATION); |
| |
| if (message.getDataStructureType() == ActiveMQObjectMessage.DATA_STRUCTURE_TYPE) { |
| |
| if (Transformations.JMS_XML.equals(transformation)) { |
| headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString()); |
| } else if (Transformations.JMS_JSON.equals(transformation)) { |
| headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_JSON.toString()); |
| } |
| |
| if (!headers.containsKey(Headers.TRANSFORMATION)) { |
| headers.put(Headers.TRANSFORMATION, Transformations.JMS_OBJECT_XML.toString()); |
| } |
| |
| ActiveMQObjectMessage msg = (ActiveMQObjectMessage) message.copy(); |
| command.setContent(marshall(msg.getObject(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8")); |
| |
| } else if (message.getDataStructureType() == ActiveMQMapMessage.DATA_STRUCTURE_TYPE) { |
| |
| if (Transformations.JMS_XML.equals(transformation)) { |
| headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString()); |
| } else if (Transformations.JMS_JSON.equals(transformation)) { |
| headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_JSON.toString()); |
| } |
| |
| if (!headers.containsKey(Headers.TRANSFORMATION)) { |
| headers.put(Headers.TRANSFORMATION, Transformations.JMS_MAP_XML.toString()); |
| } |
| |
| ActiveMQMapMessage msg = (ActiveMQMapMessage) message.copy(); |
| command.setContent(marshall((Serializable) msg.getContentMap(), headers.get(Headers.TRANSFORMATION)).getBytes("UTF-8")); |
| |
| } else if (message.getDataStructureType() == ActiveMQMessage.DATA_STRUCTURE_TYPE && AdvisorySupport.ADIVSORY_MESSAGE_TYPE.equals(message.getType())) { |
| |
| if (Transformations.JMS_XML.equals(transformation)) { |
| headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_XML.toString()); |
| } else if (Transformations.JMS_JSON.equals(transformation)) { |
| headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString()); |
| } |
| |
| if (!headers.containsKey(Headers.TRANSFORMATION)) { |
| headers.put(Headers.TRANSFORMATION, Transformations.JMS_ADVISORY_JSON.toString()); |
| } |
| |
| String body = marshallAdvisory(message.getDataStructure(), headers.get(Headers.TRANSFORMATION)); |
| command.setContent(body.getBytes("UTF-8")); |
| |
| } else { |
| command = super.convertMessage(converter, message); |
| } |
| |
| return command; |
| } |
| |
| /** |
| * Marshal the Object to a string using XML or JSON encoding |
| * |
| * @param object |
| * the object to marshal |
| * @param transformation |
| * the transformation to apply to the object. |
| * |
| * @returns the marshaled form of the given object, in JSON or XML. |
| * |
| * @throws JMSException if an error occurs during the marshal operation. |
| */ |
| protected String marshall(Serializable object, String transformation) throws JMSException { |
| StringWriter buffer = new StringWriter(); |
| HierarchicalStreamWriter out; |
| if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) { |
| out = new JettisonMappedXmlDriver(new Configuration(), false).createWriter(buffer); |
| } else { |
| out = new PrettyPrintWriter(buffer); |
| } |
| getXStream().marshal(object, out); |
| return buffer.toString(); |
| } |
| |
| protected ActiveMQObjectMessage createObjectMessage(HierarchicalStreamReader in) throws JMSException { |
| ActiveMQObjectMessage objMsg = new ActiveMQObjectMessage(); |
| Object obj = getXStream().unmarshal(in); |
| objMsg.setObject((Serializable) obj); |
| return objMsg; |
| } |
| |
| @SuppressWarnings("unchecked") |
| protected ActiveMQMapMessage createMapMessage(HierarchicalStreamReader in) throws JMSException { |
| ActiveMQMapMessage mapMsg = new ActiveMQMapMessage(); |
| Map<String, Object> map = (Map<String, Object>) getXStream().unmarshal(in); |
| for (String key : map.keySet()) { |
| mapMsg.setObject(key, map.get(key)); |
| } |
| return mapMsg; |
| } |
| |
| protected String marshallAdvisory(final DataStructure ds, String transformation) { |
| |
| StringWriter buffer = new StringWriter(); |
| HierarchicalStreamWriter out; |
| if (transformation.toLowerCase(Locale.ENGLISH).endsWith("json")) { |
| out = new JettisonMappedXmlDriver().createWriter(buffer); |
| } else { |
| out = new PrettyPrintWriter(buffer); |
| } |
| |
| XStream xstream = getXStreamAdvisory(); |
| xstream.setMode(XStream.NO_REFERENCES); |
| xstream.aliasPackage("", "org.apache.activemq.command"); |
| xstream.marshal(ds, out); |
| return buffer.toString(); |
| } |
| private XStream getXStreamAdvisory() |
| { |
| if (xStreamAdvisory == null) { |
| xStreamAdvisory = createXStream(); |
| } |
| return xStreamAdvisory; |
| } |
| |
| // Properties |
| // ------------------------------------------------------------------------- |
| public XStream getXStream() { |
| if (xStream == null) { |
| xStream = createXStream(); |
| } |
| return xStream; |
| } |
| |
| public void setXStream(XStream xStream) { |
| this.xStream = xStream; |
| } |
| |
| // Implementation methods |
| // ------------------------------------------------------------------------- |
| @SuppressWarnings("unchecked") |
| protected XStream createXStream() { |
| XStream xstream = null; |
| if (brokerContext != null) { |
| Map<String, XStream> beans = brokerContext.getBeansOfType(XStream.class); |
| for (XStream bean : beans.values()) { |
| if (bean != null) { |
| xstream = bean; |
| break; |
| } |
| } |
| } |
| |
| if (xstream == null) { |
| xstream = XStreamSupport.createXStream(); |
| xstream.ignoreUnknownElements(); |
| } |
| |
| // For any object whose elements contains an UTF8Buffer instance instead |
| // of a String type we map it to String both in and out such that we don't |
| // marshal UTF8Buffers out |
| xstream.registerConverter(new AbstractSingleValueConverter() { |
| |
| @Override |
| public Object fromString(String str) { |
| return str; |
| } |
| |
| @SuppressWarnings("rawtypes") |
| @Override |
| public boolean canConvert(Class type) { |
| return type.equals(UTF8Buffer.class); |
| } |
| }); |
| |
| xstream.alias("string", UTF8Buffer.class); |
| |
| return xstream; |
| } |
| |
| @Override |
| public void setBrokerContext(BrokerContext brokerContext) { |
| this.brokerContext = brokerContext; |
| } |
| |
| @Override |
| public BrokerContext getBrokerContext() { |
| return this.brokerContext; |
| } |
| |
| /** |
| * Return an Advisory message as a JSON formatted string |
| * |
| * @param ds |
| * the DataStructure instance that is being marshaled. |
| * |
| * @return the JSON marshaled form of the given DataStructure instance. |
| */ |
| protected String marshallAdvisory(final DataStructure ds) { |
| XStream xstream = new XStream(new JsonHierarchicalStreamDriver()); |
| xstream.setMode(XStream.NO_REFERENCES); |
| xstream.aliasPackage("", "org.apache.activemq.command"); |
| return xstream.toXML(ds); |
| } |
| } |