blob: 5cf597c160aa28e0aaa0b477a369b5f385cfcefe [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client.message;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQSession_0_8;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
public class MessageFactoryRegistry
{
/**
* This class logger
*/
private final Logger _logger = LoggerFactory.getLogger(getClass());
private final Map<String, AbstractJMSMessageFactory> _mimeStringToFactoryMap = new HashMap<>();
private final AbstractJMSMessageFactory _default = new JMSBytesMessageFactory();
private final AMQSession<?, ?> _session;
public MessageFactoryRegistry(final AMQSession<?, ?> session)
{
_session = session;
}
/**
* Construct a new registry with the default message factories registered
*
* @return a message factory registry
* @param session
*/
public static MessageFactoryRegistry newDefaultRegistry(final AMQSession<?, ?> session)
{
MessageFactoryRegistry mf = new MessageFactoryRegistry(session);
mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
mf.registerFactory("text/plain", new JMSTextMessageFactory());
mf.registerFactory("text/xml", new JMSTextMessageFactory());
mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory(session.getAMQConnection()));
mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
mf.registerFactory(AMQPEncodedMapMessage.MIME_TYPE, new AMQPEncodedMapMessageFactory());
mf.registerFactory(AMQPEncodedListMessage.MIME_TYPE, new AMQPEncodedListMessageFactory());
mf.registerFactory(Encrypted091MessageFactory.ENCRYPTED_0_9_1_CONTENT_TYPE, new Encrypted091MessageFactory(mf));
mf.registerFactory(Encrypted010MessageFactory.ENCRYPTED_0_10_CONTENT_TYPE, new Encrypted010MessageFactory(mf));
mf.registerFactory(null, mf._default);
return mf;
}
public void registerFactory(String mimeType, AbstractJMSMessageFactory mf)
{
_mimeStringToFactoryMap.put(mimeType, mf);
}
/**
* Create a message. This looks up the MIME type from the content header and instantiates the appropriate
* concrete message type.
*
* @param deliveryTag the AMQ message id
* @param redelivered true if redelivered
* @param contentHeader the content header that was received
* @param bodies a list of ContentBody instances @return the message.
* @param queueDestinationCache
* @param topicDestinationCache
* @param addressType
* @throws JMSException
* @throws QpidException
*/
public AbstractJMSMessage createMessage(long deliveryTag,
boolean redelivered,
String exchange,
String routingKey,
ContentHeaderBody contentHeader,
List bodies,
AMQSession_0_8.DestinationCache<AMQQueue> queueDestinationCache,
AMQSession_0_8.DestinationCache<AMQTopic> topicDestinationCache,
final int addressType)
throws QpidException, JMSException
{
BasicContentHeaderProperties properties = contentHeader.getProperties();
// Get the message content type. This may be null for pure AMQP messages, but will always be set for JMS over
// AMQP. When the type is null, it can only be assumed that the message is a byte message.
AMQShortString contentTypeShortString = properties.getContentType();
contentTypeShortString = (contentTypeShortString == null) ? new AMQShortString(JMSBytesMessage.MIME_TYPE) : contentTypeShortString;
AbstractJMSMessageFactory mf = getMessageFactory(AMQShortString.toString(contentTypeShortString));
return mf.createMessage(deliveryTag, redelivered, contentHeader, exchange, routingKey, bodies, queueDestinationCache, topicDestinationCache, addressType);
}
AbstractJMSMessageFactory getMessageFactory(final String contentTypeShortString)
{
AbstractJMSMessageFactory mf = _mimeStringToFactoryMap.get(contentTypeShortString);
if (mf == null)
{
mf = _default;
}
return mf;
}
public AbstractJMSMessage createMessage(MessageTransfer transfer) throws QpidException, JMSException
{
MessageProperties mprop = transfer.getHeader().getMessageProperties();
String messageType = "";
if ( mprop == null || mprop.getContentType() == null)
{
_logger.debug("no message type specified, building a byte message");
messageType = JMSBytesMessage.MIME_TYPE;
}
else
{
messageType = mprop.getContentType();
}
AbstractJMSMessageFactory mf = getMessageFactory(messageType);
boolean redelivered = false;
DeliveryProperties deliverProps;
if((deliverProps = transfer.getHeader().getDeliveryProperties()) != null)
{
redelivered = deliverProps.getRedelivered();
}
return mf.createMessage(transfer.getId(),
redelivered,
mprop == null? new MessageProperties():mprop,
deliverProps == null? new DeliveryProperties():deliverProps,
transfer.getBody());
}
public AMQSession<?, ?> getSession()
{
return _session;
}
AbstractJMSMessageFactory getDefaultFactory()
{
return _default;
}
}