blob: 369c8a6e9d3a01e752eb0fa73b29637270b75365 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.ReturnMessage;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicAckBody;
import org.apache.qpid.framing.BasicConsumeBody;
import org.apache.qpid.framing.BasicConsumeOkBody;
import org.apache.qpid.framing.BasicQosBody;
import org.apache.qpid.framing.BasicQosOkBody;
import org.apache.qpid.framing.BasicRecoverBody;
import org.apache.qpid.framing.BasicRecoverOkBody;
import org.apache.qpid.framing.BasicRecoverSyncBody;
import org.apache.qpid.framing.BasicRecoverSyncOkBody;
import org.apache.qpid.framing.BasicRejectBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.framing.ChannelFlowOkBody;
import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
import org.apache.qpid.framing.ExchangeDeclareOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.QueueBindOkBody;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
import org.apache.qpid.framing.TxCommitOkBody;
import org.apache.qpid.framing.TxRollbackBody;
import org.apache.qpid.framing.TxRollbackOkBody;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.transport.TransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
/** Used for debugging. */
private static final Logger _logger = LoggerFactory.getLogger(AMQSession.class);
/**
* Creates a new session on a connection.
*
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
* @param acknowledgeMode The acknowledgement mode for the session.
* @param messageFactoryRegistry The message factory factory for the session.
* @param defaultPrefetchHighMark The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLowMark The number of prefetched messages at which to resume the session.
*/
AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
{
super(con,channelId,transacted,acknowledgeMode,messageFactoryRegistry,defaultPrefetchHighMark,defaultPrefetchLowMark);
}
/**
* Creates a new session on a connection with the default message factory factory.
*
* @param con The connection on which to create the session.
* @param channelId The unique identifier for the session.
* @param transacted Indicates whether or not the session is transactional.
* @param acknowledgeMode The acknowledgement mode for the session.
* @param defaultPrefetchHigh The maximum number of messages to prefetched before suspending the session.
* @param defaultPrefetchLow The number of prefetched messages at which to resume the session.
*/
AMQSession_0_8(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh,
int defaultPrefetchLow)
{
this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh,
defaultPrefetchLow);
}
private ProtocolVersion getProtocolVersion()
{
return getProtocolHandler().getProtocolVersion();
}
protected void acknowledgeImpl()
{
while (true)
{
Long tag = _unacknowledgedMessageTags.poll();
if (tag == null)
{
break;
}
acknowledgeMessage(tag, false);
}
}
public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
BasicAckBody body = getMethodRegistry().createBasicAckBody(deliveryTag, multiple);
final AMQFrame ackFrame = body.generateFrame(_channelId);
if (_logger.isDebugEnabled())
{
_logger.debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId);
}
getProtocolHandler().writeFrame(ackFrame);
_unacknowledgedMessageTags.remove(deliveryTag);
}
public void sendQueueBind(final AMQShortString queueName, final AMQShortString routingKey, final FieldTable arguments,
final AMQShortString exchangeName, final AMQDestination dest,
final boolean nowait) throws AMQException, FailoverException
{
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createQueueBindBody
(getTicket(),queueName,exchangeName,routingKey,false,arguments).
generateFrame(_channelId), QueueBindOkBody.class);
}
public void sendClose(long timeout) throws AMQException, FailoverException
{
// we also need to check the state manager for 08/09 as the
// _connection variable may not be updated in time by the error receiving
// thread.
// We can't close the session if we are already in the process of
// closing/closed the connection.
if (!(getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSED)
|| getProtocolHandler().getStateManager().getCurrentState().equals(AMQState.CONNECTION_CLOSING)))
{
getProtocolHandler().closeSession(this);
getProtocolHandler().syncWrite(getProtocolHandler().getMethodRegistry().createChannelCloseBody(AMQConstant.REPLY_SUCCESS.getCode(),
new AMQShortString("JMS client closing channel"), 0, 0).generateFrame(_channelId),
ChannelCloseOkBody.class, timeout);
// When control resumes at this point, a reply will have been received that
// indicates the broker has closed the channel successfully.
}
}
public void commitImpl() throws AMQException, FailoverException, TransportException
{
// Acknowledge all delivered messages
while (true)
{
Long tag = _deliveredMessageTags.poll();
if (tag == null)
{
break;
}
acknowledgeMessage(tag, false);
}
final AMQProtocolHandler handler = getProtocolHandler();
handler.syncWrite(getProtocolHandler().getMethodRegistry().createTxCommitBody().generateFrame(_channelId), TxCommitOkBody.class);
}
public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable, final boolean exclusive, final Map<String, Object> arguments) throws AMQException,
FailoverException
{
FieldTable table = null;
if(arguments != null && !arguments.isEmpty())
{
table = new FieldTable();
for(Map.Entry<String, Object> entry : arguments.entrySet())
{
table.setObject(entry.getKey(), entry.getValue());
}
}
QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),name,false,durable,exclusive,autoDelete,false,table);
AMQFrame queueDeclare = body.generateFrame(_channelId);
getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
public void sendRecover() throws AMQException, FailoverException
{
_unacknowledgedMessageTags.clear();
if (isStrictAMQP())
{
// We can't use the BasicRecoverBody-OK method as it isn't part of the spec.
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
_connection.getProtocolHandler().writeFrame(body.generateFrame(_channelId));
_logger.warn("Session Recover cannot be guaranteed with STRICT_AMQP. Messages may arrive out of order.");
}
else
{
// in Qpid the 0-8 spec was hacked to have a recover-ok method... this is bad
// in 0-9 we used the cleaner addition of a new sync recover method with its own ok
if(getProtocolHandler().getProtocolVersion().equals(ProtocolVersion.v8_0))
{
BasicRecoverBody body = getMethodRegistry().createBasicRecoverBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverOkBody.class);
}
else if(getProtocolVersion().equals(ProtocolVersion.v0_9))
{
BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
}
else if(getProtocolVersion().equals(ProtocolVersion.v0_91))
{
BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
}
else
{
throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion());
}
}
}
public void releaseForRollback()
{
// Reject all the messages that have been received in this session and
// have not yet been acknowledged. Should look to remove
// _deliveredMessageTags and use _txRangeSet as used by 0-10.
// Otherwise messages will be able to arrive out of order to a second
// consumer on the queue. Whilst this is within the JMS spec it is not
// user friendly and avoidable.
while (true)
{
Long tag = _deliveredMessageTags.poll();
if (tag == null)
{
break;
}
rejectMessage(tag, true);
}
}
public void rejectMessage(long deliveryTag, boolean requeue)
{
if ((_acknowledgeMode == CLIENT_ACKNOWLEDGE) || (_acknowledgeMode == SESSION_TRANSACTED))
{
if (_logger.isDebugEnabled())
{
_logger.debug("Rejecting delivery tag:" + deliveryTag + ":SessionHC:" + this.hashCode());
}
BasicRejectBody body = getMethodRegistry().createBasicRejectBody(deliveryTag, requeue);
AMQFrame frame = body.generateFrame(_channelId);
_connection.getProtocolHandler().writeFrame(frame);
}
}
public boolean isQueueBound(final AMQDestination destination) throws JMSException
{
return isQueueBound(destination.getExchangeName(),destination.getAMQQueueName(),destination.getAMQQueueName());
}
public boolean isQueueBound(final AMQShortString exchangeName, final AMQShortString queueName, final AMQShortString routingKey)
throws JMSException
{
try
{
AMQMethodEvent response = new FailoverRetrySupport<AMQMethodEvent, AMQException>(
new FailoverProtectedOperation<AMQMethodEvent, AMQException>()
{
public AMQMethodEvent execute() throws AMQException, FailoverException
{
AMQFrame boundFrame = getProtocolHandler().getMethodRegistry().createExchangeBoundBody
(exchangeName, routingKey, queueName).generateFrame(_channelId);
return getProtocolHandler().syncWrite(boundFrame, ExchangeBoundOkBody.class);
}
}, _connection).execute();
// Extract and return the response code from the query.
ExchangeBoundOkBody responseBody = (ExchangeBoundOkBody) response.getMethod();
return (responseBody.getReplyCode() == 0);
}
catch (AMQException e)
{
throw new JMSAMQException("Queue bound query failed: " + e.getMessage(), e);
}
}
@Override public void sendConsume(BasicMessageConsumer_0_8 consumer,
AMQShortString queueName,
AMQProtocolHandler protocolHandler,
boolean nowait,
String messageSelector,
int tag) throws AMQException, FailoverException
{
FieldTable arguments = FieldTableFactory.newFieldTable();
if ((messageSelector != null) && !messageSelector.equals(""))
{
arguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), messageSelector);
}
if (consumer.isAutoClose())
{
arguments.put(AMQPFilterTypes.AUTO_CLOSE.getValue(), Boolean.TRUE);
}
if (consumer.isNoConsume())
{
arguments.put(AMQPFilterTypes.NO_CONSUME.getValue(), Boolean.TRUE);
}
BasicConsumeBody body = getMethodRegistry().createBasicConsumeBody(getTicket(),
queueName,
new AMQShortString(String.valueOf(tag)),
consumer.isNoLocal(),
consumer.getAcknowledgeMode() == Session.NO_ACKNOWLEDGE,
consumer.isExclusive(),
nowait,
arguments);
AMQFrame jmsConsume = body.generateFrame(_channelId);
if (nowait)
{
protocolHandler.writeFrame(jmsConsume);
}
else
{
protocolHandler.syncWrite(jmsConsume, BasicConsumeOkBody.class);
}
}
public void sendExchangeDeclare(final AMQShortString name, final AMQShortString type, final AMQProtocolHandler protocolHandler,
final boolean nowait) throws AMQException, FailoverException
{
ExchangeDeclareBody body = getMethodRegistry().createExchangeDeclareBody(getTicket(),name,type,
name.toString().startsWith("amq."),
false,false,false,false,null);
AMQFrame exchangeDeclare = body.generateFrame(_channelId);
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
public void sendQueueDeclare(final AMQDestination amqd, final AMQProtocolHandler protocolHandler,
final boolean nowait) throws AMQException, FailoverException
{
QueueDeclareBody body = getMethodRegistry().createQueueDeclareBody(getTicket(),amqd.getAMQQueueName(),false,amqd.isDurable(),amqd.isExclusive(),amqd.isAutoDelete(),false,null);
AMQFrame queueDeclare = body.generateFrame(_channelId);
protocolHandler.syncWrite(queueDeclare, QueueDeclareOkBody.class);
}
public void sendQueueDelete(final AMQShortString queueName) throws AMQException, FailoverException
{
QueueDeleteBody body = getMethodRegistry().createQueueDeleteBody(getTicket(),
queueName,
false,
false,
true);
AMQFrame queueDeleteFrame = body.generateFrame(_channelId);
getProtocolHandler().syncWrite(queueDeleteFrame, QueueDeleteOkBody.class);
}
public void sendSuspendChannel(boolean suspend) throws AMQException, FailoverException
{
ChannelFlowBody body = getMethodRegistry().createChannelFlowBody(!suspend);
AMQFrame channelFlowFrame = body.generateFrame(_channelId);
_connection.getProtocolHandler().syncWrite(channelFlowFrame, ChannelFlowOkBody.class);
}
public BasicMessageConsumer_0_8 createMessageConsumer(final AMQDestination destination, final int prefetchHigh,
final int prefetchLow, final boolean noLocal, final boolean exclusive, String messageSelector, final FieldTable arguments,
final boolean noConsume, final boolean autoClose) throws JMSException
{
final AMQProtocolHandler protocolHandler = getProtocolHandler();
return new BasicMessageConsumer_0_8(_channelId, _connection, destination, messageSelector, noLocal,
_messageFactoryRegistry,this, protocolHandler, arguments, prefetchHigh, prefetchLow,
exclusive, _acknowledgeMode, noConsume, autoClose);
}
public BasicMessageProducer_0_8 createMessageProducer(final Destination destination, final boolean mandatory,
final boolean immediate, long producerId) throws JMSException
{
try
{
return new BasicMessageProducer_0_8(_connection, (AMQDestination) destination, _transacted, _channelId,
this, getProtocolHandler(), producerId, immediate, mandatory);
}
catch (AMQException e)
{
JMSException ex = new JMSException("Error creating producer");
ex.initCause(e);
ex.setLinkedException(e);
throw ex;
}
}
@Override public void messageReceived(UnprocessedMessage message)
{
if (message instanceof ReturnMessage)
{
// Return of the bounced message.
returnBouncedMessage((ReturnMessage) message);
}
else
{
super.messageReceived(message);
}
}
private void returnBouncedMessage(final ReturnMessage msg)
{
_connection.performConnectionTask(new Runnable()
{
public void run()
{
try
{
// Bounced message is processed here, away from the mina thread
AbstractJMSMessage bouncedMessage =
_messageFactoryRegistry.createMessage(0, false, msg.getExchange(),
msg.getRoutingKey(), msg.getContentHeader(), msg.getBodies());
AMQConstant errorCode = AMQConstant.getConstant(msg.getReplyCode());
AMQShortString reason = msg.getReplyText();
_logger.debug("Message returned with error code " + errorCode + " (" + reason + ")");
// @TODO should this be moved to an exception handler of sorts. Somewhere errors are converted to correct execeptions.
if (errorCode == AMQConstant.NO_CONSUMERS)
{
_connection.exceptionReceived(new AMQNoConsumersException("Error: " + reason, bouncedMessage, null));
}
else if (errorCode == AMQConstant.NO_ROUTE)
{
_connection.exceptionReceived(new AMQNoRouteException("Error: " + reason, bouncedMessage, null));
}
else
{
_connection.exceptionReceived(
new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage, null));
}
}
catch (Exception e)
{
_logger.error(
"Caught exception trying to raise undelivered message exception (dump follows) - ignoring...",
e);
}
}
});
}
public void sendRollback() throws AMQException, FailoverException
{
TxRollbackBody body = getMethodRegistry().createTxRollbackBody();
AMQFrame frame = body.generateFrame(getChannelId());
getProtocolHandler().syncWrite(frame, TxRollbackOkBody.class);
}
public void setPrefetchLimits(final int messagePrefetch, final long sizePrefetch) throws AMQException
{
new FailoverRetrySupport<Object, AMQException>(
new FailoverProtectedOperation<Object, AMQException>()
{
public Object execute() throws AMQException, FailoverException
{
BasicQosBody basicQosBody = getProtocolHandler().getMethodRegistry().createBasicQosBody(sizePrefetch, messagePrefetch, false);
// todo send low water mark when protocol allows.
// todo Be aware of possible changes to parameter order as versions change.
getProtocolHandler().syncWrite(basicQosBody.generateFrame(getChannelId()), BasicQosOkBody.class);
return null;
}
}, _connection).execute();
}
class QueueDeclareOkHandler extends SpecificMethodFrameListener
{
private long _messageCount;
private long _consumerCount;
public QueueDeclareOkHandler()
{
super(getChannelId(), QueueDeclareOkBody.class);
}
public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
{
boolean matches = super.processMethod(channelId, frame);
if (matches)
{
QueueDeclareOkBody declareOk = (QueueDeclareOkBody) frame;
_messageCount = declareOk.getMessageCount();
_consumerCount = declareOk.getConsumerCount();
}
return matches;
}
}
protected Long requestQueueDepth(AMQDestination amqd) throws AMQException, FailoverException
{
AMQFrame queueDeclare =
getMethodRegistry().createQueueDeclareBody(getTicket(),
amqd.getAMQQueueName(),
true,
amqd.isDurable(),
amqd.isExclusive(),
amqd.isAutoDelete(),
false,
null).generateFrame(_channelId);
QueueDeclareOkHandler okHandler = new QueueDeclareOkHandler();
getProtocolHandler().writeCommandFrameAndWaitForReply(queueDeclare, okHandler);
return okHandler._messageCount;
}
protected final boolean tagLE(long tag1, long tag2)
{
return tag1 <= tag2;
}
protected final boolean updateRollbackMark(long currentMark, long deliveryTag)
{
return false;
}
public AMQMessageDelegateFactory getMessageDelegateFactory()
{
return AMQMessageDelegateFactory.FACTORY_0_8;
}
public void sync() throws AMQException
{
declareExchange(new AMQShortString("amq.direct"), new AMQShortString("direct"), false);
}
public void handleAddressBasedDestination(AMQDestination dest,
boolean isConsumer,
boolean noWait) throws AMQException
{
throw new UnsupportedOperationException("The new addressing based sytanx is "
+ "not supported for AMQP 0-8/0-9 versions");
}
protected void flushAcknowledgments()
{
}
@Override
protected void deleteTemporaryDestination(final TemporaryDestination amqQueue)
throws JMSException
{
// Currently TemporaryDestination is set to be auto-delete which, for 0-8..0-9-1, means that the queue will be deleted
// by the server when there are no more subscriptions to that queue/topic (rather than when the client disconnects).
// This is not quite right for JMSCompliance as the queue/topic should remain until the connection closes, or the
// client explicitly deletes it.
/* intentional no-op */
}
public boolean isQueueBound(String exchangeName, String queueName,
String bindingKey, Map<String, Object> args) throws JMSException
{
return isQueueBound(exchangeName == null ? null : new AMQShortString(exchangeName),
queueName == null ? null : new AMQShortString(queueName),
bindingKey == null ? null : new AMQShortString(bindingKey));
}
public AMQException getLastException()
{
// if the Connection has closed then we should throw any exception that
// has occurred that we were not waiting for
AMQStateManager manager = _connection.getProtocolHandler()
.getStateManager();
Exception e = manager.getLastException();
if (manager.getCurrentState().equals(AMQState.CONNECTION_CLOSED)
&& e != null)
{
if (e instanceof AMQException)
{
return (AMQException) e;
}
else
{
AMQException amqe = new AMQException(AMQConstant
.getConstant(AMQConstant.INTERNAL_ERROR.getCode()),
e.getMessage(), e.getCause());
return amqe;
}
}
else
{
return null;
}
}
}