blob: 12fa91fd8a13777a1864a574987adf486498bf69 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.slf4j.Logger;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageConverter;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.util.UUIDGen;
import org.apache.qpid.util.UUIDs;
public abstract class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
enum PublishMode { ASYNC_PUBLISH_ALL, SYNC_PUBLISH_PERSISTENT, SYNC_PUBLISH_ALL }
private final Logger _logger ;
private AMQConnection _connection;
private boolean _disableTimestamps;
/**
* Priority of messages created by this producer.
*/
private int _messagePriority = Message.DEFAULT_PRIORITY;
/**
* Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
*/
private long _timeToLive;
/**
* Delivery mode used for this producer.
*/
private int _deliveryMode = DeliveryMode.PERSISTENT;
private AMQDestination _destination;
/**
* True if this producer was created from a transacted session
*/
private boolean _transacted;
private int _channelId;
/**
* This is an id generated by the session and is used to tie individual producers to the session. This means we
* can deregister a producer with the session when the producer is clsoed. We need to be able to tie producers
* to the session so that when an error is propagated to the session it can close the producer (meaning that
* a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently).
*/
private long _producerId;
private AMQSession _session;
private final boolean _immediate;
private final Boolean _mandatory;
private boolean _disableMessageId;
private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
private String _userID; // ref user id used in the connection.
private long _deliveryDelay;
/**
* The default value for immediate flag used this producer is false. That is, a consumer does
* not need to be attached to a queue.
*/
private final boolean _defaultImmediateValue = Boolean.parseBoolean(System.getProperty("qpid.default_immediate", "false"));
/**
* The default value for mandatory flag used by this producer is true. That is, server will not
* silently drop messages where no queue is connected to the exchange for the message.
*/
private final boolean _defaultMandatoryValue = Boolean.parseBoolean(System.getProperty("qpid.default_mandatory", "true"));
/**
* The default value for mandatory flag used by this producer when publishing to a Topic is false. That is, server
* will silently drop messages where no queue is connected to the exchange for the message.
*/
private final boolean _defaultMandatoryTopicValue =
Boolean.parseBoolean(System.getProperty("qpid.default_mandatory_topic",
System.getProperties().containsKey("qpid.default_mandatory")
? System.getProperty("qpid.default_mandatory")
: "false"));
private PublishMode _publishMode = PublishMode.ASYNC_PUBLISH_ALL;
protected BasicMessageProducer(Logger logger,AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
AMQSession session, long producerId, Boolean immediate, Boolean mandatory) throws
QpidException
{
_logger = logger;
_connection = connection;
_destination = destination;
_transacted = transacted;
_channelId = channelId;
_session = session;
_producerId = producerId;
if (destination != null && !(destination.neverDeclare()))
{
declareDestination(destination);
}
_immediate = immediate == null ? _defaultImmediateValue : immediate;
_mandatory = mandatory == null
? destination == null ? null
: destination instanceof Topic
? _defaultMandatoryTopicValue
: _defaultMandatoryValue
: mandatory;
_userID = connection.isPopulateUserId() ? connection.getUsername() : null;
if(destination != null && destination.getDeliveryDelay() != 0L)
{
_deliveryDelay = destination.getDeliveryDelay();
}
setPublishMode();
}
protected AMQConnection getConnection()
{
return _connection;
}
void setPublishMode()
{
// Publish mode could be configured at destination level as well.
// Will add support for this when we provide a more robust binding URL
String syncPub = _connection.getSyncPublish();
// Support for deprecated option sync_persistence
if (syncPub.equals("persistent") || _connection.getSyncPersistence())
{
_publishMode = PublishMode.SYNC_PUBLISH_PERSISTENT;
}
else if (syncPub.equals("all"))
{
_publishMode = PublishMode.SYNC_PUBLISH_ALL;
}
if (_logger.isDebugEnabled())
{
_logger.debug("MessageProducer " + toString() + " using publish mode : " + _publishMode);
}
}
void resubscribe() throws QpidException
{
if (_destination != null && !_destination.neverDeclare())
{
declareDestination(_destination);
}
}
abstract void declareDestination(AMQDestination destination) throws QpidException;
public void setDisableMessageID(boolean b) throws JMSException
{
checkPreConditions();
checkNotClosed();
_disableMessageId = b;
}
public boolean getDisableMessageID() throws JMSException
{
checkNotClosed();
return _disableMessageId;
}
public void setDisableMessageTimestamp(boolean b) throws JMSException
{
checkPreConditions();
_disableTimestamps = b;
}
public boolean getDisableMessageTimestamp() throws JMSException
{
checkNotClosed();
return _disableTimestamps;
}
public void setDeliveryMode(int i) throws JMSException
{
checkPreConditions();
if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT))
{
throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
+ " is illegal");
}
_deliveryMode = i;
}
public int getDeliveryMode() throws JMSException
{
checkNotClosed();
return _deliveryMode;
}
public void setPriority(int i) throws JMSException
{
checkPreConditions();
if ((i < 0) || (i > 9))
{
throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
}
_messagePriority = i;
}
public int getPriority() throws JMSException
{
checkNotClosed();
return _messagePriority;
}
public void setTimeToLive(long l) throws JMSException
{
checkPreConditions();
if (l < 0)
{
throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
}
_timeToLive = l;
}
public long getTimeToLive() throws JMSException
{
checkNotClosed();
return _timeToLive;
}
protected AMQDestination getAMQDestination()
{
return _destination;
}
/**
* The Destination used for this consumer, if specified upon creation.
*/
public Destination getDestination() throws JMSException
{
checkNotClosed();
return _destination;
}
public void close() throws JMSException
{
setClosed();
_session.deregisterProducer(_producerId);
AMQDestination dest = getAMQDestination();
AMQSession ssn = getSession();
if (!ssn.isClosed() && dest != null && dest.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
{
try
{
if (dest.getDelete() == AMQDestination.AddressOption.ALWAYS ||
dest.getDelete() == AMQDestination.AddressOption.SENDER )
{
ssn.handleNodeDelete(dest);
}
ssn.handleLinkDelete(dest);
}
catch(TransportException e)
{
throw getSession().toJMSException("Exception while closing producer:" + e.getMessage(), e);
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("Exception while closing producer:"
+ e.getMessage()), e);
}
}
}
public void send(Message message) throws JMSException
{
checkPreConditions();
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate,
_deliveryDelay);
}
}
public void send(Message message, int deliveryMode) throws JMSException
{
checkPreConditions();
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, _immediate,
_deliveryDelay);
}
}
public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
{
checkPreConditions();
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive, _mandatory, immediate,
_deliveryDelay);
}
}
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
{
checkPreConditions();
checkInitialDestination();
synchronized (_connection.getFailoverMutex())
{
sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory, _immediate, _deliveryDelay);
}
}
public void send(Destination destination, Message message) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
AMQDestination amqDestination = (AMQDestination) destination;
sendImpl(amqDestination, message, _deliveryMode, _messagePriority, _timeToLive,
_mandatory == null
? destination instanceof Topic
? _defaultMandatoryTopicValue
: _defaultMandatoryValue
: _mandatory,
_immediate,
amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
AMQDestination amqDestination = (AMQDestination) destination;
sendImpl(amqDestination, message, deliveryMode, priority, timeToLive,
_mandatory == null
? destination instanceof Topic
? _defaultMandatoryTopicValue
: _defaultMandatoryValue
: _mandatory,
_immediate,
amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
boolean mandatory) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
AMQDestination amqDestination = (AMQDestination) destination;
sendImpl(amqDestination, message, deliveryMode, priority, timeToLive, mandatory, _immediate,
amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
boolean mandatory, boolean immediate) throws JMSException
{
checkPreConditions();
checkDestination(destination);
synchronized (_connection.getFailoverMutex())
{
validateDestination(destination);
AMQDestination amqDestination = (AMQDestination) destination;
sendImpl(amqDestination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
amqDestination.getDeliveryDelay() != 0L ? amqDestination.getDeliveryDelay() : _deliveryDelay);
}
}
private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
{
if (message instanceof AbstractJMSMessage)
{
return (AbstractJMSMessage) message;
}
else
{
AbstractJMSMessage newMessage;
if (message instanceof BytesMessage)
{
newMessage = new MessageConverter(_session, (BytesMessage) message).getConvertedMessage();
}
else if (message instanceof MapMessage)
{
newMessage = new MessageConverter(_session, (MapMessage) message).getConvertedMessage();
}
else if (message instanceof ObjectMessage)
{
newMessage = new MessageConverter(_session, (ObjectMessage) message).getConvertedMessage();
}
else if (message instanceof TextMessage)
{
newMessage = new MessageConverter(_session, (TextMessage) message).getConvertedMessage();
}
else if (message instanceof StreamMessage)
{
newMessage = new MessageConverter(_session, (StreamMessage) message).getConvertedMessage();
}
else
{
newMessage = new MessageConverter(_session, message).getConvertedMessage();
}
if (newMessage != null)
{
return newMessage;
}
else
{
throw new JMSException("Unable to send message, due to class conversion error: "
+ message.getClass().getName());
}
}
}
private void validateDestination(Destination destination) throws JMSException
{
if (!(destination instanceof AMQDestination))
{
throw new InvalidDestinationException("Unsupported destination class: "
+ ((destination != null) ? destination.getClass() : null));
}
AMQDestination amqDestination = (AMQDestination) destination;
if (_session.isResolved(amqDestination) || amqDestination.neverDeclare())
{
return;
}
try
{
declareDestination(amqDestination);
}
catch(Exception e)
{
throw JMSExceptionHelper.chainJMSException(new InvalidDestinationException(
"Error validating destination"), e);
}
}
/**
* The caller of this method must hold the failover mutex.
*
* @param destination
* @param origMessage
* @param deliveryMode
* @param priority
* @param timeToLive
* @param mandatory
* @param immediate
*
* @param deliveryDelay
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination,
Message origMessage,
int deliveryMode,
int priority,
long timeToLive,
boolean mandatory,
boolean immediate,
long deliveryDelay) throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
AbstractJMSMessage message = convertToNativeMessage(origMessage);
UUID messageId = null;
if (_disableMessageId)
{
message.setJMSMessageID((UUID)null);
}
else
{
messageId = _messageIdGenerator.generate();
message.setJMSMessageID(messageId);
}
try
{
sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate,
deliveryDelay);
}
catch (TransportException e)
{
throw getSession().toJMSException("Exception whilst sending:" + e.getMessage(), e);
}
if (message != origMessage)
{
_logger.debug("Updating original message");
origMessage.setJMSPriority(message.getJMSPriority());
origMessage.setJMSTimestamp(message.getJMSTimestamp());
if (_logger.isDebugEnabled())
{
_logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
}
origMessage.setJMSExpiration(message.getJMSExpiration());
origMessage.setJMSMessageID(message.getJMSMessageID());
}
if (_transacted)
{
_session.markDirty();
}
}
abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
boolean immediate, final long deliveryDelay) throws JMSException;
private void checkTemporaryDestination(AMQDestination destination) throws InvalidDestinationException
{
if (destination instanceof TemporaryDestination)
{
_logger.debug("destination is temporary destination");
TemporaryDestination tempDest = (TemporaryDestination) destination;
if (tempDest.getSession().isClosed())
{
_logger.debug("session is closed");
throw new InvalidDestinationException("Session for temporary destination has been closed");
}
if (tempDest.isDeleted())
{
_logger.debug("destination is deleted");
throw new InvalidDestinationException("Cannot send to a deleted temporary destination");
}
}
}
private void checkPreConditions() throws JMSException
{
checkNotClosed();
if ((_session == null) || _session.isClosed())
{
throw new javax.jms.IllegalStateException("Invalid Session");
}
if(_session.getAMQConnection().isClosed())
{
throw new javax.jms.IllegalStateException("Connection closed");
}
}
private void checkInitialDestination() throws JMSException
{
if (_destination == null)
{
throw new UnsupportedOperationException("Destination is null");
}
checkValidQueue();
}
private void checkDestination(Destination suppliedDestination) throws JMSException
{
if ((_destination != null) && (suppliedDestination != null))
{
throw new UnsupportedOperationException(
"This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
}
if(suppliedDestination instanceof AMQQueue)
{
AMQQueue destination = (AMQQueue) suppliedDestination;
checkValidQueue(destination);
}
if (suppliedDestination == null)
{
throw new InvalidDestinationException("Supplied Destination was invalid");
}
}
private void checkValidQueue() throws JMSException
{
if(_destination instanceof AMQQueue)
{
checkValidQueue((AMQQueue) _destination);
}
}
private void checkValidQueue(AMQQueue destination) throws JMSException
{
if (!destination.isCheckedForQueueBinding() && validateQueueOnSend())
{
if (getSession().isStrictAMQP())
{
getLogger().warn("AMQP does not support destination validation before publish");
destination.setCheckedForQueueBinding(true);
}
else
{
if (isBound(destination))
{
destination.setCheckedForQueueBinding(true);
}
else
{
throw new InvalidDestinationException("Queue: " + destination.getQueueName()
+ " is not a valid destination (no binding on server)");
}
}
}
}
private boolean validateQueueOnSend()
{
return _connection.validateQueueOnSend();
}
/**
* The session used to create this producer
*/
public AMQSession getSession()
{
return _session;
}
public boolean isBound(AMQDestination destination) throws JMSException
{
try
{
return _session.isQueueBound(destination.getExchangeName(), null, destination.getRoutingKey());
}
catch (TransportException e)
{
throw getSession().toJMSException("Exception whilst checking destination binding:" + e.getMessage(), e);
}
}
/**
* If true, messages will not get a timestamp.
*/
protected boolean isDisableTimestamps()
{
return _disableTimestamps;
}
protected void setDisableTimestamps(boolean disableTimestamps)
{
_disableTimestamps = disableTimestamps;
}
protected void setDestination(AMQDestination destination)
{
_destination = destination;
}
protected int getChannelId()
{
return _channelId;
}
protected void setChannelId(int channelId)
{
_channelId = channelId;
}
protected void setSession(AMQSession session)
{
_session = session;
}
protected String getUserID()
{
return _userID;
}
protected void setUserID(String userID)
{
_userID = userID;
}
protected PublishMode getPublishMode()
{
return _publishMode;
}
protected void setPublishMode(PublishMode publishMode)
{
_publishMode = publishMode;
}
public long getDeliveryDelay()
{
return _deliveryDelay;
}
@Override
public void setDeliveryDelay(final long deliveryDelay)
{
_deliveryDelay = deliveryDelay;
}
Logger getLogger()
{
return _logger;
}
}