| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.qpid.client; |
| |
| import java.net.ConnectException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.UnresolvedAddressException; |
| import java.util.*; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.jms.JMSException; |
| import javax.jms.XASession; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.qpid.QpidException; |
| import org.apache.qpid.client.failover.FailoverException; |
| import org.apache.qpid.client.failover.FailoverProtectedOperation; |
| import org.apache.qpid.client.failover.FailoverRetrySupport; |
| import org.apache.qpid.client.protocol.AMQProtocolSession; |
| import org.apache.qpid.client.state.AMQState; |
| import org.apache.qpid.client.state.StateWaiter; |
| import org.apache.qpid.client.util.JMSExceptionHelper; |
| import org.apache.qpid.common.ServerPropertyNames; |
| import org.apache.qpid.configuration.ClientProperties; |
| import org.apache.qpid.framing.ChannelOpenBody; |
| import org.apache.qpid.framing.ChannelOpenOkBody; |
| import org.apache.qpid.framing.ConfirmSelectBody; |
| import org.apache.qpid.framing.ConfirmSelectOkBody; |
| import org.apache.qpid.framing.FieldTable; |
| import org.apache.qpid.framing.ProtocolVersion; |
| import org.apache.qpid.framing.TxSelectBody; |
| import org.apache.qpid.framing.TxSelectOkBody; |
| import org.apache.qpid.jms.ChannelLimitReachedException; |
| import org.apache.qpid.jms.ConnectionURL; |
| import org.apache.qpid.jms.Session; |
| import org.apache.qpid.properties.ConnectionStartProperties; |
| import org.apache.qpid.transport.ConnectionSettings; |
| import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver; |
| import org.apache.qpid.transport.network.NetworkConnection; |
| import org.apache.qpid.transport.network.io.IoNetworkTransport; |
| import org.apache.qpid.transport.network.security.SecurityLayer; |
| import org.apache.qpid.transport.network.security.SecurityLayerFactory; |
| |
| public class AMQConnectionDelegate_8_0 implements AMQConnectionDelegate |
| { |
| private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_8_0.class); |
| |
| // deprecated legacy name for the option |
| private static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout"; |
| |
| private final AMQConnection _conn; |
| private final long _timeout = Long.getLong(ClientProperties.QPID_SYNC_OP_TIMEOUT, |
| Long.getLong(AMQJ_DEFAULT_SYNCWRITE_TIMEOUT, |
| ClientProperties.DEFAULT_SYNC_OPERATION_TIMEOUT)); |
| private boolean _messageCompressionSupported; |
| private boolean _addrSyntaxSupported; |
| private boolean _confirmedPublishSupported; |
| private boolean _confirmedPublishNonTransactionalSupported; |
| private boolean _virtualhostPropertiesSupported; |
| private boolean _queueLifetimeSupported; |
| |
| public void closeConnection(long timeout) throws JMSException, QpidException |
| { |
| _conn.getProtocolHandler().closeConnection(timeout); |
| } |
| |
| public AMQConnectionDelegate_8_0(AMQConnection conn) |
| { |
| _conn = conn; |
| _addrSyntaxSupported = |
| Boolean.parseBoolean(System.getProperty(ClientProperties.ADDR_SYNTAX_SUPPORTED_IN_0_8, |
| String.valueOf(ClientProperties.DEFAULT_ADDR_SYNTAX_0_8_SUPPORT))); |
| } |
| |
| protected boolean checkException(Throwable thrown) |
| { |
| Throwable cause = thrown.getCause(); |
| |
| if (cause == null) |
| { |
| cause = thrown; |
| } |
| |
| return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException)); |
| } |
| |
| public boolean isConfirmedPublishSupported() |
| { |
| return _confirmedPublishSupported; |
| } |
| |
| public ProtocolVersion makeBrokerConnection(BrokerDetails brokerDetail) throws QpidException |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Connecting to broker:" + brokerDetail); |
| } |
| final Set<AMQState> openOrClosedStates = |
| EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED); |
| |
| ConnectionSettings settings = brokerDetail.buildConnectionSettings(); |
| |
| //Check connection-level ssl override setting |
| String connectionSslOption = _conn.getConnectionURL().getOption(ConnectionURL.OPTIONS_SSL); |
| if(connectionSslOption != null) |
| { |
| boolean connUseSsl = Boolean.parseBoolean(connectionSslOption); |
| boolean brokerlistUseSsl = settings.isUseSSL(); |
| |
| if( connUseSsl != brokerlistUseSsl) |
| { |
| settings.setUseSSL(connUseSsl); |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Applied connection ssl option override, setting UseSsl to: " + connUseSsl ); |
| } |
| } |
| } |
| |
| SecurityLayer securityLayer = SecurityLayerFactory.newInstance(settings); |
| |
| IoNetworkTransport transport = new IoNetworkTransport(); |
| |
| ReceiverClosedWaiter monitoringReceiver = new ReceiverClosedWaiter(securityLayer.receiver(_conn.getProtocolHandler())); |
| |
| NetworkConnection network = transport.connect(settings, monitoringReceiver, |
| _conn.getProtocolHandler()); |
| |
| try |
| { |
| _conn.getProtocolHandler().setNetworkConnection(network, securityLayer.sender(network.getSender())); |
| |
| StateWaiter waiter = _conn.getProtocolHandler().createWaiter(openOrClosedStates); |
| _conn.getProtocolHandler().init(settings); |
| |
| // this blocks until the connection has been set up or when an error |
| // has prevented the connection being set up |
| |
| AMQState state = waiter.await(); |
| |
| if (state == AMQState.CONNECTION_OPEN) |
| { |
| _conn.getFailoverPolicy().attainedConnection(); |
| _conn.setConnected(true); |
| _conn.logConnected(network.getLocalAddress(), network.getRemoteAddress()); |
| |
| _messageCompressionSupported = |
| checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_MESSAGE_COMPRESSION_SUPPORTED); |
| |
| _virtualhostPropertiesSupported = |
| checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_VIRTUALHOST_PROPERTIES_SUPPORTED); |
| _queueLifetimeSupported = |
| checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_QUEUE_LIFETIME_SUPPORTED); |
| _confirmedPublishSupported = |
| checkBooleanConnectionStartProperty(ConnectionStartProperties.QPID_CONFIRMED_PUBLISH_SUPPORTED); |
| _confirmedPublishNonTransactionalSupported = checkConfirmedPublishNonTransactionalSupported(); |
| _conn.setConnectionSettings(settings); |
| return null; |
| } |
| else |
| { |
| return _conn.getProtocolHandler().getSuggestedProtocolVersion(); |
| } |
| } |
| catch(QpidException | RuntimeException e) |
| { |
| network.close(); |
| throw e; |
| } |
| finally |
| { |
| // await the receiver to finish its execution (and so the IO threads too) |
| if (!_conn.isConnected()) |
| { |
| boolean closedWithinTimeout = monitoringReceiver.awaitClose(_timeout); |
| if (!closedWithinTimeout) |
| { |
| _logger.warn("Timed-out waiting for receiver for connection to " |
| + brokerDetail + " to be closed."); |
| } |
| } |
| } |
| |
| } |
| |
| // RabbitMQ supports confirmed publishing, but only on non transactional sessions |
| private boolean checkConfirmedPublishNonTransactionalSupported() |
| { |
| FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties(); |
| if( serverProperties != null |
| && serverProperties.containsKey("capabilities") |
| && serverProperties.get("capabilities") instanceof FieldTable) |
| { |
| FieldTable capabilities = serverProperties.getFieldTable("capabilities"); |
| if(capabilities.containsKey("publisher_confirms") |
| && capabilities.get("publisher_confirms") instanceof Boolean |
| && capabilities.getBoolean("publisher_confirms")) |
| { |
| return true; |
| } |
| else |
| { |
| return false; |
| } |
| } |
| else |
| { |
| return false; |
| } |
| } |
| |
| public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch) |
| throws JMSException |
| { |
| return createSession(transacted, acknowledgeMode, prefetch, prefetch); |
| } |
| |
| |
| public XASession createXASession(int prefetchHigh, int prefetchLow) throws JMSException |
| { |
| throw new UnsupportedOperationException("0_8 version does not provide XA support"); |
| } |
| |
| public XASession createXASession(int ackMode) throws JMSException |
| { |
| throw new UnsupportedOperationException("0_8 version does not provide XA support"); |
| } |
| public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, |
| final int prefetchHigh, final int prefetchLow) throws JMSException |
| { |
| _conn.checkNotClosed(); |
| |
| if (_conn.channelLimitReached()) |
| { |
| throw new ChannelLimitReachedException(_conn.getMaximumChannelCount()); |
| } |
| |
| return new FailoverRetrySupport<Session, JMSException>( |
| new FailoverProtectedOperation<Session, JMSException>() |
| { |
| public Session execute() throws JMSException, FailoverException |
| { |
| int channelId = _conn.getNextChannelID(); |
| |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Write channel open frame for channel id " + channelId); |
| } |
| |
| // We must create the session and register it before actually sending the frame to the server to |
| // open it, so that there is no window where we could receive data on the channel and not be set |
| // up to handle it appropriately. |
| AMQSession_0_8 session = |
| new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh, |
| prefetchLow); |
| _conn.registerSession(channelId, session); |
| |
| boolean success = false; |
| try |
| { |
| createChannelOverWire(channelId, transacted); |
| session.setPrefetchLimits(prefetchHigh, 0); |
| success = true; |
| } |
| catch (QpidException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException("Error creating session: " + e), |
| e); |
| } |
| finally |
| { |
| if (!success) |
| { |
| _conn.deregisterSession(channelId); |
| } |
| } |
| |
| if (_conn.started()) |
| { |
| try |
| { |
| session.start(); |
| } |
| catch (QpidException e) |
| { |
| throw JMSExceptionHelper.chainJMSException(new JMSException("Session.start failed"), e); |
| } |
| } |
| |
| return session; |
| } |
| }, _conn).execute(); |
| } |
| |
| /** |
| * Create an XASession with default prefetch values of: |
| * High = MaxPrefetch |
| * Low = MaxPrefetch / 2 |
| * @return XASession |
| * @throws JMSException thrown if there is a problem creating the session. |
| */ |
| public XASession createXASession() throws JMSException |
| { |
| return createXASession((int) _conn.getMaxPrefetch(), (int) _conn.getMaxPrefetch() / 2); |
| } |
| |
| private void createChannelOverWire(int channelId, boolean transacted) |
| throws QpidException, FailoverException |
| { |
| ChannelOpenBody channelOpenBody = _conn.getProtocolHandler().getMethodRegistry().createChannelOpenBody(null); |
| _conn.getProtocolHandler().syncWrite(channelOpenBody.generateFrame(channelId), ChannelOpenOkBody.class); |
| |
| if (transacted) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Issuing TxSelect for " + channelId); |
| } |
| TxSelectBody body = _conn.getProtocolHandler().getMethodRegistry().createTxSelectBody(); |
| |
| |
| _conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), TxSelectOkBody.class); |
| } |
| boolean useConfirms = (_confirmedPublishSupported || (!transacted && _confirmedPublishNonTransactionalSupported)) |
| && "all".equals(_conn.getSyncPublish()); |
| if(useConfirms) |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Issuing ConfirmSelect for " + channelId); |
| } |
| ConfirmSelectBody body = new ConfirmSelectBody(false); |
| |
| _conn.getProtocolHandler().syncWrite(body.generateFrame(channelId), ConfirmSelectOkBody.class); |
| } |
| } |
| |
| /** |
| * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling. |
| * The caller must hold the failover mutex before calling this method. |
| */ |
| public void resubscribeSessions() throws JMSException, QpidException, FailoverException |
| { |
| List<AMQSession> sessions = _conn.getSessions().values(); |
| _logger.debug("Resubscribing sessions = {} sessions.size = {}", sessions, sessions.size()); |
| for (Iterator it = sessions.iterator(); it.hasNext();) |
| { |
| AMQSession_0_8 s = (AMQSession_0_8) it.next(); |
| |
| // reset the flow control flag |
| // on opening channel, broker sends flow blocked if virtual host is blocked |
| // if virtual host is not blocked, then broker does not send flow command |
| // that's why we need to reset the flow control flag |
| s.setFlowControl(true); |
| reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.isTransacted()); |
| s.setPrefetchLimits(s.getDefaultPrefetchHigh(), 0); |
| s.resubscribe(); |
| } |
| } |
| |
| private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) |
| throws QpidException, FailoverException |
| { |
| try |
| { |
| createChannelOverWire(channelId, transacted); |
| } |
| catch (QpidException e) |
| { |
| _conn.deregisterSession(channelId); |
| throw new QpidException("Error reopening channel " + channelId + " after failover: " + e, e); |
| } |
| } |
| |
| public <T, E extends Exception> T executeRetrySupport(FailoverProtectedOperation<T,E> operation) throws E |
| { |
| while (true) |
| { |
| try |
| { |
| _conn.blockUntilNotFailingOver(); |
| } |
| catch (InterruptedException e) |
| { |
| _logger.debug("Interrupted: " + e, e); |
| Thread.currentThread().interrupt(); |
| return null; |
| } |
| |
| synchronized (_conn.getFailoverMutex()) |
| { |
| try |
| { |
| return operation.execute(); |
| } |
| catch (FailoverException e) |
| { |
| _logger.debug("Failover exception caught during operation: " + e, e); |
| } |
| catch (IllegalStateException e) |
| { |
| if (!(e.getMessage().startsWith("Fail-over interrupted no-op failover support"))) |
| { |
| throw e; |
| } |
| } |
| } |
| } |
| } |
| |
| public int getMaxChannelID() |
| { |
| ConnectionTuneParameters params = _conn.getProtocolHandler().getProtocolSession().getConnectionTuneParameters(); |
| |
| return params == null ? AMQProtocolSession.MAX_CHANNEL_MAX : params.getChannelMax(); |
| } |
| |
| public int getMinChannelID() |
| { |
| return AMQProtocolSession.MIN_USABLE_CHANNEL_NUM; |
| } |
| |
| public ProtocolVersion getProtocolVersion() |
| { |
| return ProtocolVersion.v0_8; |
| } |
| |
| public boolean verifyClientID() throws JMSException |
| { |
| return true; |
| } |
| |
| /* |
| * @see org.apache.qpid.client.AMQConnectionDelegate#isSupportedServerFeature(java.lang.String) |
| */ |
| public boolean isSupportedServerFeature(String featureName) |
| { |
| // The Apache Qpid Broker for Java does not advertise features by the qpid.features property |
| // for AMQP protocols 0-8..0-9-1 , so for now we just hardcode JMS selectors as supported. |
| return ServerPropertyNames.FEATURE_QPID_JMS_SELECTOR.equals(featureName); |
| } |
| |
| @Override |
| public void setHeartbeatListener(HeartbeatListener listener) |
| { |
| _conn.getProtocolHandler().setHeartbeatListener(listener); |
| } |
| |
| @Override |
| public boolean supportsIsBound() |
| { |
| //Rough check whether the 'isBound' AMQP extension method is supported, by trying to determine if we are connected to Qpid. |
| //As older versions of the Qpid broker did not send properties, the value will be assumed true if no server properties |
| //are found, or the 'product' entry isn't present, and will only be false if it is present but doesn't match expectation. |
| boolean connectedToQpid = true; |
| |
| FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties(); |
| if(serverProperties != null) |
| { |
| if(serverProperties.containsKey(ConnectionStartProperties.PRODUCT)) |
| { |
| //String.valueof to ensure it is non-null, then lowercase it |
| String product = String.valueOf(serverProperties.getString(ConnectionStartProperties.PRODUCT)).toLowerCase(); |
| |
| //value is "unknown" when the naming properties file hasn't been found, e.g in IDE. |
| connectedToQpid = product.contains("qpid") || product.equals("unknown"); |
| } |
| } |
| |
| if(_logger.isDebugEnabled()) |
| { |
| _logger.debug("supportsIsBound: " + connectedToQpid); |
| } |
| |
| return connectedToQpid; |
| } |
| |
| private boolean checkBooleanConnectionStartProperty(final String property) |
| { |
| FieldTable serverProperties = _conn.getProtocolHandler().getProtocolSession().getConnectionStartServerProperties(); |
| return serverProperties != null |
| && Boolean.parseBoolean(serverProperties.getString(property)); |
| } |
| |
| public boolean isMessageCompressionSupported() |
| { |
| return _messageCompressionSupported; |
| } |
| |
| @Override |
| public boolean isVirtualHostPropertiesSupported() |
| { |
| return _virtualhostPropertiesSupported; |
| } |
| |
| @Override |
| public boolean isQueueLifetimePolicySupported() |
| { |
| return _queueLifetimeSupported; |
| } |
| |
| public boolean isAddrSyntaxSupported() |
| { |
| return _addrSyntaxSupported; |
| } |
| |
| public boolean isConfirmedPublishNonTransactionalSupported() |
| { |
| return _confirmedPublishNonTransactionalSupported; |
| } |
| |
| |
| private static class ReceiverClosedWaiter implements ExceptionHandlingByteBufferReceiver |
| { |
| private final CountDownLatch _closedWatcher; |
| private final ExceptionHandlingByteBufferReceiver _receiver; |
| |
| public ReceiverClosedWaiter(ExceptionHandlingByteBufferReceiver receiver) |
| { |
| _receiver = receiver; |
| _closedWatcher = new CountDownLatch(1); |
| } |
| |
| @Override |
| public void received(ByteBuffer msg) |
| { |
| _receiver.received(msg); |
| } |
| |
| @Override |
| public void exception(Throwable t) |
| { |
| _receiver.exception(t); |
| } |
| |
| @Override |
| public void closed() |
| { |
| try |
| { |
| _receiver.closed(); |
| } |
| finally |
| { |
| _closedWatcher.countDown(); |
| } |
| } |
| |
| public boolean awaitClose(long timeout) |
| { |
| try |
| { |
| if (_logger.isDebugEnabled()) |
| { |
| _logger.debug("Waiting " + timeout + "ms for receiver to be closed"); |
| } |
| |
| return _closedWatcher.await(timeout, TimeUnit.MILLISECONDS); |
| } |
| catch (InterruptedException e) |
| { |
| Thread.currentThread().interrupt(); |
| return _closedWatcher.getCount() == 0; |
| } |
| } |
| }; |
| } |