blob: 557163c3772acdf8ea66cf5865203ff002ccb100 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.services.messaging.adapters;
import flex.messaging.MessageException;
import flex.messaging.config.ConfigurationException;
import flex.messaging.log.Log;
import flex.messaging.messages.MessagePerformanceUtils;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.naming.NamingException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.Map;
/**
* A JMSProxy subclass for <code>javax.jms.MessageProducer</code> instances.
*
*
*/
public abstract class JMSProducer extends JMSProxy
{
/* JMS related variables */
protected MessageProducer producer;
protected int deliveryMode;
protected int messagePriority;
protected String messageType;
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Create a new JMSProducer with default delivery mode of <code>javax.jms.Message.DEFAULT_DELIVERY_MODE</code>
* and default message priority of <code>javax.jms.Message.DEFAULT_PRIORITY</code>.
*/
public JMSProducer()
{
super();
deliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
messagePriority = javax.jms.Message.DEFAULT_PRIORITY;
}
//--------------------------------------------------------------------------
//
// Initialize, validate, start, and stop methods.
//
//--------------------------------------------------------------------------
/**
* Initialize with settings from the JMS adapter.
*
* @param settings JMS settings to use for initialization.
*/
public void initialize(JMSSettings settings)
{
super.initialize(settings);
String deliveryString = settings.getDeliveryMode();
if (deliveryString.equals(JMSConfigConstants.DEFAULT_DELIVERY_MODE))
deliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
else if (deliveryString.equals(JMSConfigConstants.PERSISTENT))
deliveryMode = javax.jms.DeliveryMode.PERSISTENT;
else if (deliveryString.equals(JMSConfigConstants.NON_PERSISTENT))
deliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;
messagePriority = settings.getMessagePriority();
messageType = settings.getMessageType();
}
/**
* Verifies that the <code>JMSProducer</code> is in valid state before
* it is started. For <code>JMSProducer</code> to be in valid state, it needs
* to have a message type assigned.
*/
protected void validate()
{
super.validate();
if (messageType == null || !(messageType.equals(JMSConfigConstants.TEXT_MESSAGE)
|| messageType.equals(JMSConfigConstants.OBJECT_MESSAGE)
|| messageType.equals(JMSConfigConstants.MAP_MESSAGE)) )
{
// Unsupported JMS Message Type ''{0}''. Valid values are javax.jms.TextMessage, javax.jms.ObjectMessage, javax.jms.MapMessage.
ConfigurationException ce = new ConfigurationException();
ce.setMessage(JMSConfigConstants.INVALID_JMS_MESSAGE_TYPE, new Object[] {messageType});
throw ce;
}
}
/**
* Starts the <code>JMSProducer</code>. Subclasses should call <code>super.start</code>.
*/
public void start() throws NamingException, JMSException
{
super.start();
if (Log.isInfo())
Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS producer for JMS destination '"
+ destinationJndiName +"' is starting.");
}
/**
* Stops the <code>JMSProducer</code> by closing its underlying
* <code>MessageProducer</code>. It then calls <code>JMSProxy.close</code>
* for session and connection closure.
*/
public void stop()
{
if (Log.isInfo())
Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS producer for JMS destination '" +
destinationJndiName + "' is stopping.");
try
{
if (producer != null)
producer.close();
}
catch (JMSException e)
{
if (Log.isWarn())
Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMS producer for JMS destination '" +
destinationJndiName + "' received an error while closing"
+ " its underlying MessageProducer: " + e.getMessage());
}
super.stop();
}
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
/**
* Returns the delivery mode used by the <code>JMSProducer</code>.
*
* @return The delivery mode used by the <code>JMSProducer</code>.
*/
public int getDeliveryMode()
{
return deliveryMode;
}
/**
* Sets the delivery mode used by the <code>JMSProducer</code>. Valid values
* javax.jms.DeliveryMode.PERSISTENT and javax.jms.DeliveryMode.NON_PERSISTENT.
* This propery is optional and defaults to javax.jms.Message.DEFAULT_DELIVERY_MODE.
*
* @param deliveryMode
*/
public void setDeliveryMode(int deliveryMode)
{
if (deliveryMode == javax.jms.Message.DEFAULT_DELIVERY_MODE
|| deliveryMode == javax.jms.DeliveryMode.NON_PERSISTENT
|| deliveryMode == javax.jms.DeliveryMode.PERSISTENT)
this.deliveryMode = deliveryMode;
}
/**
* Returns the message priority used by the <code>JMSProducer</code>.
*
* @return an int specifying the message priority.
*/
public int getMessagePriority()
{
return messagePriority;
}
/**
* Sets the message priority used by the <code>JMSProducer</code>.
* This property is optional and defaults to <code>javax.jms.Message.DEFAULT_PRIORITY</code>.
*
* @param messagePriority an int specifying the message priority.
*/
public void setMessagePriority(int messagePriority)
{
this.messagePriority = messagePriority;
}
/**
* Returns the message type used by the <code>JMSProducer</code>.
*
* @return The message type used by the <code>JMSProducer</code>.
*/
public String getMessageType()
{
return messageType;
}
/**
* Sets the message type used by the <code>JMSProducer</code>. Supported
* types are <code>javax.jms.TextMessage</code> and <code>javax.jms.ObjectMessage</code>.
* This property is mandatory.
*
* @param messageType String representing the message type used.
*/
public void setMessageType(String messageType)
{
this.messageType = messageType;
}
//--------------------------------------------------------------------------
//
// Protected and Private Methods
//
//--------------------------------------------------------------------------
protected void copyHeadersToProperties(Map properties, javax.jms.Message message) throws JMSException
{
// Generic Flex headers become JMS properties, named Flex headers become JMS headers
for (Iterator iter = properties.keySet().iterator(); iter.hasNext();)
{
String propName = (String)iter.next();
Object propValue = properties.get(propName);
// For now, only named property is TTL.
if (!propName.equals(JMSConfigConstants.TIME_TO_LIVE))
{
// MPI header contains a MessagePerformaceInfo object that cannot
// be set as a JMS header property. Instead, it is broken down
// to its primitive types and each primitive is individually
// set as a JMS header property.
if (propName.equals(MessagePerformanceUtils.MPI_HEADER_IN))
{
Field[] fields = propValue.getClass().getFields();
for (int i = 0; i < fields.length; i++)
{
Field field = fields[i];
// Use MPI_HEADER_IN as prefix to the property name so that
// they can be distinguished later when the MessagePerformanceInfo
// object gets built from the headers.
String mpiPropertyName = MessagePerformanceUtils.MPI_HEADER_IN + field.getName();
Object mpiPropertyValue = null;
try
{
mpiPropertyValue = field.get(propValue);
message.setObjectProperty(mpiPropertyName, mpiPropertyValue);
}
catch (Exception e)
{
if (Log.isWarn())
Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMSProducer could not retrieve the value of MessagePerformanceUtils property '"
+ propValue + "' from the Flex message, therefore it will not be set on the JMS message.");
}
}
}
else if (propValue != null)
{
message.setObjectProperty(propName, propValue);
}
}
}
}
protected long getTimeToLive(Map properties) throws JMSException
{
long timeToLive = producer.getTimeToLive();
if (properties.containsKey(JMSConfigConstants.TIME_TO_LIVE))
{
long l = ((Long)properties.get(JMSConfigConstants.TIME_TO_LIVE)).longValue();
if (l != 0)
{
// Don't let Flex default override JMS implementation default,
// only explicit ActionScript TTL usage overrides JMS TTL.
timeToLive = l;
}
}
return timeToLive;
}
void sendMessage(flex.messaging.messages.Message flexMessage) throws JMSException
{
MessagePerformanceUtils.markServerPreAdapterExternalTime(flexMessage);
if (JMSConfigConstants.TEXT_MESSAGE.equals(messageType))
{
sendTextMessage(flexMessage.getBody().toString(), flexMessage.getHeaders());
}
else if (JMSConfigConstants.OBJECT_MESSAGE.equals(messageType))
{
try
{
sendObjectMessage((Serializable)flexMessage.getBody(), flexMessage.getHeaders());
}
catch (ClassCastException ce)
{
// The body of the Flex Message could not be converted to a Serializable Java Object.
MessageException me = new MessageException();
me.setMessage(JMSConfigConstants.NONSERIALIZABLE_MESSAGE_BODY);
throw me;
}
}
else if (JMSConfigConstants.MAP_MESSAGE.equals(messageType))
{
try
{
sendMapMessage((Map<String,?>)flexMessage.getBody(), flexMessage.getHeaders());
}
catch (ClassCastException ce)
{
// 10812=The body of the Flex message could not be converted to a Java Map object.
MessageException me = new MessageException();
me.setMessage(JMSConfigConstants.NONMAP_MESSAGE_BODY);
throw me;
}
}
}
abstract void sendObjectMessage(Serializable obj, Map properties) throws JMSException;
abstract void sendTextMessage(String text, Map properties) throws JMSException;
abstract void sendMapMessage(Map<String,?> map, Map properties) throws JMSException;
}