blob: bf4de782a5c54bb6225b1db66825bb6aba51edf1 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import java.io.UnsupportedEncodingException;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL };
protected final Logger _logger = LoggerFactory.getLogger(getClass());
private AMQConnection _connection;
/**
* If true, messages will not get a timestamp.
*/
protected boolean _disableTimestamps;
/**
* Priority of messages created by this producer.
*/
private int _messagePriority = Message.DEFAULT_PRIORITY;
/**
* Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
*/
private long _timeToLive;
/**
* Delivery mode used for this producer.
*/
private int _deliveryMode = DeliveryMode.PERSISTENT;
/**
* The Destination used for this consumer, if specified upon creation.
*/
protected AMQDestination _destination;
/**
* Default encoding used for messages produced by this producer.
*/
private String _encoding;
/**
* Default encoding used for message produced by this producer.
*/
private String _mimeType;
protected AMQProtocolHandler _protocolHandler;
/**
* True if this producer was created from a transacted session
*/
private boolean _transacted;
protected int _channelId;
/**
* This is an id generated by the session and is used to tie individual producers to the session. This means we
* can deregister a producer with the session when the producer is clsoed. We need to be able to tie producers
* to the session so that when an error is propagated to the session it can close the producer (meaning that
* a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
*/
private long _producerId;
/**
* The session used to create this producer
*/
protected AMQSession _session;
private final boolean _immediate;
private final boolean _mandatory;
private boolean _disableMessageId;
private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
protected String _userID; // ref user id used in the connection.
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
protected PublishMode publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory) throws AMQException
{
_connection = connection;
_destination = destination;
_transacted = transacted;
_protocolHandler = protocolHandler;
_channelId = channelId;
_session = session;
_producerId = producerId;
if (destination != null && !(destination instanceof AMQUndefinedDestination))
{
declareDestination(destination);
}
_immediate = immediate;
_mandatory = mandatory;
_userID = connection.getUsername();
setPublishMode();
}
void setPublishMode()
{
// Publish mode could be configured at destination level as well.
// Will add support for this when we provide a more robust binding URL
String syncPub = _connection.getSyncPublish();
// Support for deprecated option sync_persistence
if (syncPub.equals("persistent") || _connection.getSyncPersistence())
{
publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
}
else if (syncPub.equals("all"))
{
publishMode = PublishMode.SYNC_PUBLISH_ALL;
}
_logger.info("MessageProducer " + toString() + " using publish mode : " + publishMode);
}
void resubscribe() throws AMQException
{
if (_destination != null && !(_destination instanceof AMQUndefinedDestination))
{
declareDestination(_destination);
}
}
abstract void declareDestination(AMQDestination destination) throws AMQException;
public void setDisableMessageID(boolean b) throws JMSException
{
checkPreConditions();
checkNotClosed();
_disableMessageId = b;
}
public boolean getDisableMessageID() throws JMSException
{
checkNotClosed();
return _disableMessageId;
}
public void setDisableMessageTimestamp(boolean b) throws JMSException
{
checkPreConditions();
_disableTimestamps = b;
}
public boolean getDisableMessageTimestamp() throws JMSException
{
checkNotClosed();
return _disableTimestamps;
}
public void setDeliveryMode(int i) throws JMSException
{
checkPreConditions();
if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT))
{
throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
+ " is illegal");
}
_deliveryMode = i;
}
public int getDeliveryMode() throws JMSException
{
checkNotClosed();
return _deliveryMode;
}
public void setPriority(int i) throws JMSException
{
checkPreConditions();
if ((i < 0) || (i > 9))
{
throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
}
_messagePriority = i;
}
public int getPriority() throws JMSException
{
checkNotClosed();
return _messagePriority;
}
public void setTimeToLive(long l) throws JMSException
{
checkPreConditions();
if (l < 0)
{
throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
}
_timeToLive = l;
}
public long getTimeToLive() throws JMSException
{
checkNotClosed();
return _timeToLive;
}
public Destination getDestination() throws JMSException
{
checkNotClosed();
return _destination;
}
public void close() throws JMSException
{
_closed.set(true);
_session.deregisterProducer(_producerId);
}
public void send(Message message) throws JMSException
{
checkPreConditions();
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
}
}
public void send(Message message, int deliveryMode) throws JMSException
{
checkPreConditions();
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate);
}
}
public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
{
checkPreConditions();
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate);
}
}
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
{
checkPreConditions();
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
}
}
public void send(Destination destination, Message message) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
_immediate);
}
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate);
}
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
boolean mandatory) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, _immediate);
}
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
boolean mandatory, boolean immediate) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate);
}
}
private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
{
if (message instanceof AbstractJMSMessage)
{
return (AbstractJMSMessage) message;
}
else
{
AbstractJMSMessage newMessage;
if (message instanceof BytesMessage)
{
newMessage = new MessageConverter(_session, (BytesMessage) message).getConvertedMessage();
}
else if (message instanceof MapMessage)
{
newMessage = new MessageConverter(_session, (MapMessage) message).getConvertedMessage();
}
else if (message instanceof ObjectMessage)
{
newMessage = new MessageConverter(_session, (ObjectMessage) message).getConvertedMessage();
}
else if (message instanceof TextMessage)
{
newMessage = new MessageConverter(_session, (TextMessage) message).getConvertedMessage();
}
else if (message instanceof StreamMessage)
{
newMessage = new MessageConverter(_session, (StreamMessage) message).getConvertedMessage();
}
else
{
newMessage = new MessageConverter(_session, message).getConvertedMessage();
}
if (newMessage != null)
{
return newMessage;
}
else
{
throw new JMSException("Unable to send message, due to class conversion error: "
+ message.getClass().getName());
}
}
}
private void validateDestination(Destination destination) throws JMSException
{
if (!(destination instanceof AMQDestination))
{
throw new JMSException("Unsupported destination class: "
+ ((destination != null) ? destination.getClass() : null));
}
AMQDestination amqDestination = (AMQDestination) destination;
if(!amqDestination.isExchangeExistsChecked())
{
try
{
declareDestination(amqDestination);
}
catch(Exception e)
{
JMSException ex = new JMSException("Error validating destination");
ex.initCause(e);
ex.setLinkedException(e);
throw ex;
}
amqDestination.setExchangeExistsChecked(true);
}
}
/**
* The caller of this method must hold the failover mutex.
*
* @param destination
* @param origMessage
* @param deliveryMode
* @param priority
* @param timeToLive
* @param mandatory
* @param immediate
*
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
boolean mandatory, boolean immediate) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
AbstractJMSMessage message = convertToNativeMessage(origMessage);
UUID messageId = null;
if (_disableMessageId)
{
message.setJMSMessageID((UUID)null);
}
else
{
messageId = _messageIdGenerator.generate();
message.setJMSMessageID(messageId);
}
try
{
sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate);
}
catch (TransportException e)
{
throw getSession().toJMSException("Exception whilst sending:" + e.getMessage(), e);
}
if (message != origMessage)
{
_logger.debug("Updating original message");
origMessage.setJMSPriority(message.getJMSPriority());
origMessage.setJMSTimestamp(message.getJMSTimestamp());
_logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
origMessage.setJMSExpiration(message.getJMSExpiration());
origMessage.setJMSMessageID(message.getJMSMessageID());
}
if (_transacted)
{
_session.markDirty();
}
}
abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
boolean immediate) throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
if (destination instanceof TemporaryDestination)
{
_logger.debug("destination is temporary destination");
TemporaryDestination tempDest = (TemporaryDestination) destination;
if (tempDest.getSession().isClosed())
{
_logger.debug("session is closed");
throw new JMSException("Session for temporary destination has been closed");
}
if (tempDest.isDeleted())
{
_logger.debug("destination is deleted");
throw new JMSException("Cannot send to a deleted temporary destination");
}
}
}
public void setMimeType(String mimeType) throws JMSException
{
checkNotClosed();
_mimeType = mimeType;
}
public void setEncoding(String encoding) throws JMSException, UnsupportedEncodingException
{
checkNotClosed();
_encoding = encoding;
}
private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
{
checkNotClosed();
if ((_session == null) || _session.isClosed())
{
throw new javax.jms.IllegalStateException("Invalid Session");
}
if(_session.getAMQConnection().isClosed())
{
throw new javax.jms.IllegalStateException("Connection closed");
}
}
private void checkInitialDestination()
{
if (_destination == null)
{
throw new UnsupportedOperationException("Destination is null");
}
}
private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
{
if ((_destination != null) && (suppliedDestination != null))
{
throw new UnsupportedOperationException(
"This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
if (suppliedDestination == null)
{
throw new InvalidDestinationException("Supplied Destination was invalid");
}
}
public AMQSession getSession()
{
return _session;
}
public boolean isBound(AMQDestination destination) throws JMSException
{
try
{
return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
}
catch (TransportException e)
{
throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
}
}
}