blob: bc2d375dd626b652353f2c4b23fc1ab16f2c778c [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.XASession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.StateWaiter;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.framing.ChannelOpenBody;
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.framing.ConfirmSelectBody;
import org.apache.qpid.framing.ConfirmSelectOkBody;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.TxSelectBody;
import org.apache.qpid.framing.TxSelectOkBody;
import org.apache.qpid.jms.ChannelLimitReachedException;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.jms.Session;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.transport.ConnectionSettings;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.io.IoNetworkTransport;
import org.apache.qpid.transport.network.security.SecurityLayer;
import org.apache.qpid.transport.network.security.SecurityLayerFactory;
public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class);
// deprecated legacy name for the option
private static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout";
private final AMQConnection _conn;
private final long _timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT,
Long.getLong(AMQJ_DEFAULT_SYNCWRITE_TIMEOUT,
ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT));
private boolean _messageCompressionSupported;
private boolean _addrSyntaxSupported;
private boolean _confirmedPublishSupported;
private boolean _confirmedPublishNonTransactionalSupported;
private boolean _virtualhostPropertiesSupported;
private boolean _queueLifetimeSupported;
public void closeConnection(long timeout) throws JMSException, QpidException
{
_conn.getProtocolHandler().closeConnection(timeout);
}
public AMQConnectionDelegate_8_0(AMQConnection conn)
{
_conn = conn;
_addrSyntaxSupported =
Boolean.parseBoolean(System.getProperty(ClientProperties.ADDR_SYNTAX_SUPPORTED_IN_0_8,
String.valueOf(ClientProperties.DEFAULT_ADDR_SYNTAX_0_8_SUPPORT)));
}
protected boolean checkException(Throwable thrown)
{
Throwable cause = thrown.getCause();
if (cause == null)
{
cause = thrown;
}
return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
public boolean isConfirmedPublishSupported()
{
return _confirmedPublishSupported;
}
public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws QpidException
{
if (_logger.isDebugEnabled())
{
_logger.debug("Connecting to broker:" + brokerDetail);
}
final Set<AMQState> openOrClosedStates =
EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
ConnectionSettings settings = brokerDetail.buildConnectionSettings();
//Check connection-level ssl override setting
String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL);
if(connectionSslOption != null)
{
boolean connUseSsl = Boolean.parseBoolean(connectionSslOption);
boolean brokerlistUseSsl = settings.isUseSSL();
if( connUseSsl != brokerlistUseSsl)
{
settings.setUseSSL(connUseSsl);
if (_logger.isDebugEnabled())
{
_logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl );
}
}
}
SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings);
IoNetworkTransport transport = new IoNetworkTransport();
ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler()));
NetworkConnection network = transport.connect(settings, monitoringReceiver,
_conn.getProtocolHandler());
try
{
_conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender()));
StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates);
_conn.getProtocolHandler().init(settings);
// this blocks until the connection has been set up or when an error
// has prevented the connection being set up
AMQState state = waiter.await();
if (state == AMQState.CONNECTION_OPEN)
{
_conn.getFailoverPolicy().attainedConnection();
_conn.setConnected(true);
_conn.logConnected(network.getLocalAddress(), network.getRemoteAddress());
_messageCompressionSupported =
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED);
_virtualhostPropertiesSupported =
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED);
_queueLifetimeSupported =
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED);
_confirmedPublishSupported =
checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED);
_confirmedPublishNonTransactionalSupported = checkConfirmedPublishNonTransactionalSupported();
_conn.setConnectionSettings(settings);
return null;
}
else
{
return _conn.getProtocolHandler().getSuggestedProtocolVersion();
}
}
catch(QpidException | RuntimeException e)
{
network.close();
throw e;
}
finally
{
// await the receiver to finish its execution (and so the IO threads too)
if (!_conn.isConnected())
{
boolean closedWithinTimeout = monitoringReceiver.awaitClose(_timeout);
if (!closedWithinTimeout)
{
_logger.warn("Timed-out waiting for receiver for connection to "
+ brokerDetail + " to be closed.");
}
}
}
}
// RabbitMQ supports confirmed publishing, but only on non transactional sessions
private boolean checkConfirmedPublishNonTransactionalSupported()
{
FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
if( serverProperties != null
&& serverProperties.containsKey("capabilities")
&& serverProperties.get("capabilities") instanceof FieldTable)
{
FieldTable capabilities = serverProperties.getFieldTable("capabilities");
if(capabilities.containsKey("publisher_confirms")
&& capabilities.get("publisher_confirms") instanceof Boolean
&& capabilities.getBoolean("publisher_confirms"))
{
return true;
}
else
{
return false;
}
}
else
{
return false;
}
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
throws JMSException
{
return createSession(transacted, acknowledgeMode, prefetch, prefetch);
}
public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException
{
throw new UnsupportedOperationException("0_8 version does not provide XA support");
}
public XASession createXASession(int ackMode) throws JMSException
{
throw new UnsupportedOperationException("0_8 version does not provide XA support");
}
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException
{
_conn.checkNotClosed();
if (_conn.channelLimitReached())
{
throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
}
return new FailoverRetrySupport<Session, JMSException>(
new FailoverProtectedOperation<Session, JMSException>()
{
public Session execute() throws JMSException, FailoverException
{
int channelId = _conn.getNextChannelID();
if (_logger.isDebugEnabled())
{
_logger.debug("Write channel open frame for channel id " + channelId);
}
// We must create the session and register it before actually sending the frame to the server to
// open it, so that there is no window where we could receive data on the channel and not be set
// up to handle it appropriately.
AMQSession_0_8 session =
new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh,
prefetchLow);
_conn.registerSession(channelId, session);
boolean success = false;
try
{
createChannelOverWire(channelId, transacted);
session.setPrefetchLimits(prefetchHigh, 0);
success = true;
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("Error creating session: " + e),
e);
}
finally
{
if (!success)
{
_conn.deregisterSession(channelId);
}
}
if (_conn.started())
{
try
{
session.start();
}
catch (QpidException e)
{
throw JMSExceptionHelper.chainJMSException(new JMSException("Session.start failed"), e);
}
}
return session;
}
}, _conn).execute();
}
/**
* Create an XASession with default prefetch values of:
* High = MaxPrefetch
* Low = MaxPrefetch / 2
* @return XASession
* @throws JMSException thrown if there is a problem creating the session.
*/
public XASession createXASession() throws JMSException
{
return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2);
}
private void createChannelOverWire(int channelId, boolean transacted)
throws QpidException, FailoverException
{
ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
_conn.getProtocolHandler().syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class);
if (transacted)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Issuing TxSelect for " + channelId);
}
TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody();
_conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), TxSelectOkBody.class);
}
boolean useConfirms = (_confirmedPublishSupported || (!transacted && _confirmedPublishNonTransactionalSupported))
&& "all".equals(_conn.getSyncPublish());
if(useConfirms)
{
if (_logger.isDebugEnabled())
{
_logger.debug("Issuing ConfirmSelect for " + channelId);
}
ConfirmSelectBody body = new ConfirmSelectBody(false);
_conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), ConfirmSelectOkBody.class);
}
}
/**
* For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
* The caller must hold the failover mutex before calling this method.
*/
public void resubscribeSessions() throws JMSException, QpidException, FailoverException
{
List<AMQSession> sessions = _conn.getSessions().values();
_logger.debug("Resubscribing sessions = {} sessions.size = {}", sessions, sessions.size());
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession_0_8 s = (AMQSession_0_8) it.next();
// reset the flow control flag
// on opening channel, broker sends flow blocked if virtual host is blocked
// if virtual host is not blocked, then broker does not send flow command
// that's why we need to reset the flow control flag
s.setFlowControl(true);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted());
s.setPrefetchLimits(s.getDefaultPrefetchHigh(), 0);
s.resubscribe();
}
}
private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws QpidException, FailoverException
{
try
{
createChannelOverWire(channelId, transacted);
}
catch (QpidException e)
{
_conn.deregisterSession(channelId);
throw new QpidException("Error reopening channel " + channelId + " after failover: " + e, e);
}
}
public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E
{
while (true)
{
try
{
_conn.blockUntilNotFailingOver();
}
catch (InterruptedException e)
{
_logger.debug("Interrupted: " + e, e);
Thread.currentThread().interrupt();
return null;
}
synchronized (_conn.getFailoverMutex())
{
try
{
return operation.execute();
}
catch (FailoverException e)
{
_logger.debug("Failover exception caught during operation: " + e, e);
}
catch (IllegalStateException e)
{
if (!(e.getMessage().startsWith("Fail-over interrupted no-op failover support")))
{
throw e;
}
}
}
}
}
public int getMaxChannelID()
{
ConnectionTuneParameters params = _conn.getProtocolHandler().getProtocolSession().getConnectionTuneParameters();
return params == null ? AMQProtocolSession.MAX_CHANNEL_MAX : params.getChannelMax();
}
public int getMinChannelID()
{
return AMQProtocolSession.MIN_USABLE_CHANNEL_NUM;
}
public ProtocolVersion getProtocolVersion()
{
return ProtocolVersion.v0_8;
}
public boolean verifyClientID() throws JMSException
{
return true;
}
/*
* @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String)
*/
public boolean isSupportedServerFeature(String featureName)
{
// The Apache Qpid Broker-J does not advertise features by the qpid.features property
// for AMQP protocols 0-8..0-9-1 , so for now we just hardcode JMS selectors as supported.
return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName);
}
@Override
public void setHeartbeatListener(HeartbeatListener listener)
{
_conn.getProtocolHandler().setHeartbeatListener(listener);
}
@Override
public boolean supportsIsBound()
{
//Rough check whether the 'isBound' AMQP extension method is supported, by trying to determine if we are connected to Qpid.
//As older versions of the Qpid broker did not send properties, the value will be assumed true if no server properties
//are found, or the 'product' entry isn't present, and will only be false if it is present but doesn't match expectation.
boolean connectedToQpid = true;
FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
if(serverProperties != null)
{
if(serverProperties.containsKey(ConnectionStartProperties.PRODUCT))
{
//String.valueof to ensure it is non-null, then lowercase it
String product = String.valueOf(serverProperties.getString(ConnectionStartProperties.PRODUCT)).toLowerCase();
//value is "unknown" when the naming properties file hasn't been found, e.g in IDE.
connectedToQpid = product.contains("qpid") || product.equals("unknown");
}
}
if(_logger.isDebugEnabled())
{
_logger.debug("supportsIsBound: " + connectedToQpid);
}
return connectedToQpid;
}
private boolean checkBooleanConnectionStartProperty(final String property)
{
FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties();
return serverProperties != null
&& Boolean.parseBoolean(serverProperties.getString(property));
}
public boolean isMessageCompressionSupported()
{
return _messageCompressionSupported;
}
@Override
public boolean isVirtualHostPropertiesSupported()
{
return _virtualhostPropertiesSupported;
}
@Override
public boolean isQueueLifetimePolicySupported()
{
return _queueLifetimeSupported;
}
public boolean isAddrSyntaxSupported()
{
return _addrSyntaxSupported;
}
public boolean isConfirmedPublishNonTransactionalSupported()
{
return _confirmedPublishNonTransactionalSupported;
}
private static class ReceiverClosedWaiter implements ExceptionHandlingByteBufferReceiver
{
private final CountDownLatch _closedWatcher;
private final ExceptionHandlingByteBufferReceiver _receiver;
public ReceiverClosedWaiter(ExceptionHandlingByteBufferReceiver receiver)
{
_receiver = receiver;
_closedWatcher = new CountDownLatch(1);
}
@Override
public void received(ByteBuffer msg)
{
_receiver.received(msg);
}
@Override
public void exception(Throwable t)
{
_receiver.exception(t);
}
@Override
public void closed()
{
try
{
_receiver.closed();
}
finally
{
_closedWatcher.countDown();
}
}
public boolean awaitClose(long timeout)
{
try
{
if (_logger.isDebugEnabled())
{
_logger.debug("Waiting " + timeout + "ms for receiver to be closed");
}
return _closedWatcher.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
return _closedWatcher.getCount() == 0;
}
}
};
}