| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.qpid.protonj2.client.impl; |
| |
| import java.io.IOException; |
| import java.io.UncheckedIOException; |
| import java.security.Principal; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
| import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; |
| import java.util.function.BiConsumer; |
| |
| import org.apache.qpid.protonj2.buffer.ProtonBuffer; |
| import org.apache.qpid.protonj2.client.Client; |
| import org.apache.qpid.protonj2.client.Connection; |
| import org.apache.qpid.protonj2.client.ConnectionEvent; |
| import org.apache.qpid.protonj2.client.ConnectionOptions; |
| import org.apache.qpid.protonj2.client.DisconnectionEvent; |
| import org.apache.qpid.protonj2.client.ErrorCondition; |
| import org.apache.qpid.protonj2.client.Message; |
| import org.apache.qpid.protonj2.client.NextReceiverPolicy; |
| import org.apache.qpid.protonj2.client.Receiver; |
| import org.apache.qpid.protonj2.client.ReceiverOptions; |
| import org.apache.qpid.protonj2.client.ReconnectLocation; |
| import org.apache.qpid.protonj2.client.Sender; |
| import org.apache.qpid.protonj2.client.SenderOptions; |
| import org.apache.qpid.protonj2.client.Session; |
| import org.apache.qpid.protonj2.client.SessionOptions; |
| import org.apache.qpid.protonj2.client.StreamReceiver; |
| import org.apache.qpid.protonj2.client.StreamReceiverOptions; |
| import org.apache.qpid.protonj2.client.StreamSender; |
| import org.apache.qpid.protonj2.client.StreamSenderOptions; |
| import org.apache.qpid.protonj2.client.Tracker; |
| import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException; |
| import org.apache.qpid.protonj2.client.exceptions.ClientConnectionSecurityException; |
| import org.apache.qpid.protonj2.client.exceptions.ClientConnectionSecuritySaslException; |
| import org.apache.qpid.protonj2.client.exceptions.ClientException; |
| import org.apache.qpid.protonj2.client.exceptions.ClientIOException; |
| import org.apache.qpid.protonj2.client.exceptions.ClientIllegalStateException; |
| import org.apache.qpid.protonj2.client.exceptions.ClientOperationTimedOutException; |
| import org.apache.qpid.protonj2.client.exceptions.ClientUnsupportedOperationException; |
| import org.apache.qpid.protonj2.client.futures.ClientFuture; |
| import org.apache.qpid.protonj2.client.futures.ClientFutureFactory; |
| import org.apache.qpid.protonj2.client.transport.NettyIOContext; |
| import org.apache.qpid.protonj2.client.transport.Transport; |
| import org.apache.qpid.protonj2.client.util.ReconnectLocationPool; |
| import org.apache.qpid.protonj2.client.util.TrackableThreadFactory; |
| import org.apache.qpid.protonj2.engine.Engine; |
| import org.apache.qpid.protonj2.engine.EngineFactory; |
| import org.apache.qpid.protonj2.engine.sasl.client.SaslAuthenticator; |
| import org.apache.qpid.protonj2.engine.sasl.client.SaslCredentialsProvider; |
| import org.apache.qpid.protonj2.engine.sasl.client.SaslMechanismSelector; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A {@link Connection} implementation that uses the Proton engine for AMQP protocol support. |
| */ |
| public final class ClientConnection implements Connection { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(ClientConnection.class); |
| |
| private static final int UNLIMITED = -1; |
| private static final int UNDEFINED = -1; |
| |
| // Future tracking of Closing. Closed. Failed state vs just simple boolean is intended here |
| // later on we may decide this is overly optimized. |
| private static final AtomicIntegerFieldUpdater<ClientConnection> CLOSED_UPDATER = |
| AtomicIntegerFieldUpdater.newUpdater(ClientConnection.class, "closed"); |
| private static final AtomicReferenceFieldUpdater<ClientConnection, ClientIOException> FAILURE_CAUSE_UPDATER = |
| AtomicReferenceFieldUpdater.newUpdater(ClientConnection.class, ClientIOException.class, "failureCause"); |
| |
| private final ClientInstance client; |
| private final ConnectionOptions options; |
| private final ClientConnectionCapabilities capabilities = new ClientConnectionCapabilities(); |
| private final ClientFutureFactory futureFactory; |
| private final ClientSessionBuilder sessionBuilder; |
| private final ReconnectLocationPool reconnectPool = new ReconnectLocationPool(); |
| private final NettyIOContext ioContext; |
| private final String connectionId; |
| private final ScheduledExecutorService executor; |
| private final Map<ClientFuture<?>, Object> requests = new ConcurrentHashMap<>(); |
| private final ThreadPoolExecutor notifications; |
| |
| private Engine engine; |
| private org.apache.qpid.protonj2.engine.Connection protonConnection; |
| private ClientSession connectionSession; |
| private ClientSender connectionSender; |
| private Transport transport; |
| private boolean autoFlush = true; |
| private ClientFuture<Connection> openFuture; |
| private ClientFuture<Connection> closeFuture; |
| private volatile int closed; |
| private volatile ClientIOException failureCause; |
| private long totalConnections; |
| private long reconnectAttempts; |
| private long nextReconnectDelay = -1; |
| |
| /** |
| * Create a connection and define the initial configuration used to manage the |
| * connection to the remote. |
| * |
| * @param host |
| * the host that this connection is connecting to. |
| * @param port |
| * the port on the remote host where this connection attaches. |
| * @param client |
| * the {@link Client} that this connection resides within. |
| * @param options |
| * the connection options that configure this {@link Connection} instance. |
| */ |
| ClientConnection(ClientInstance client, String host, int port, ConnectionOptions options) { |
| this.client = client; |
| this.options = options; |
| this.connectionId = client.nextConnectionId(); |
| this.futureFactory = ClientFutureFactory.create(client.options().futureType()); |
| this.openFuture = futureFactory.createFuture(); |
| this.closeFuture = futureFactory.createFuture(); |
| this.sessionBuilder = new ClientSessionBuilder(this); |
| this.ioContext = new NettyIOContext(options.transportOptions(), |
| options.sslOptions(), |
| "ClientConnection :(" + connectionId + "): I/O Thread"); |
| this.executor = ioContext.eventLoop(); |
| |
| // This executor can be used for dispatching asynchronous tasks that might block or result |
| // in reentrant calls to this Connection that could block. |
| notifications = new ThreadPoolExecutor(1, 1, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<Runnable>(), |
| new TrackableThreadFactory("protonj2 Client Connection Executor: " + getId(), true)); |
| notifications.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy()); |
| |
| reconnectPool.add(new ReconnectLocation(host, port)); |
| reconnectPool.addAll(options.reconnectOptions().reconnectLocations()); |
| } |
| |
| @Override |
| public ClientInstance client() { |
| return client; |
| } |
| |
| @Override |
| public Future<Connection> openFuture() { |
| return openFuture; |
| } |
| |
| @Override |
| public void close() { |
| try { |
| doClose(null).get(); |
| } catch (InterruptedException | ExecutionException e) { |
| Thread.interrupted(); |
| } |
| } |
| |
| @Override |
| public void close(ErrorCondition error) { |
| try { |
| doClose(error).get(); |
| } catch (InterruptedException | ExecutionException e) { |
| Thread.interrupted(); |
| } |
| } |
| |
| @Override |
| public Future<Connection> closeAsync() { |
| return doClose(null); |
| } |
| |
| @Override |
| public Future<Connection> closeAsync(ErrorCondition error) { |
| Objects.requireNonNull(error, "Error supplied cannot be null"); |
| |
| return doClose(error); |
| } |
| |
| private Future<Connection> doClose(ErrorCondition error) { |
| if (CLOSED_UPDATER.compareAndSet(this, 0, 1)) { |
| try { |
| // Already closed by failure or shutdown so no need to queue task |
| if (!closeFuture.isDone()) { |
| executor.execute(() -> { |
| LOG.trace("Close requested for connection: {}", this); |
| |
| if (protonConnection.isLocallyOpen()) { |
| protonConnection.setCondition(ClientErrorCondition.asProtonErrorCondition(error)); |
| |
| try { |
| protonConnection.close(); |
| } catch (Throwable ignored) { |
| // Engine error handler will kick in if the write of Close fails |
| } |
| } else { |
| engine.shutdown(); |
| } |
| }); |
| } |
| } catch (RejectedExecutionException rje) { |
| // If the engine already shutdown due to the remote dropping then we |
| // can just ignore that and continue as if everything already was closed. |
| LOG.trace("Close task rejected from the event loop", rje); |
| } |
| } |
| |
| return closeFuture; |
| } |
| |
| @Override |
| public Session defaultSession() throws ClientException { |
| checkClosedOrFailed(); |
| final ClientFuture<Session> defaultSession = getFutureFactory().createFuture(); |
| |
| executor.execute(() -> { |
| try { |
| checkClosedOrFailed(); |
| defaultSession.complete(lazyCreateConnectionSession()); |
| } catch (Throwable error) { |
| defaultSession.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| return request(this, defaultSession); |
| } |
| |
| @Override |
| public Session openSession() throws ClientException { |
| return openSession(null); |
| } |
| |
| @Override |
| public Session openSession(SessionOptions sessionOptions) throws ClientException { |
| checkClosedOrFailed(); |
| final ClientFuture<Session> createSession = getFutureFactory().createFuture(); |
| |
| executor.execute(() -> { |
| try { |
| checkClosedOrFailed(); |
| createSession.complete(sessionBuilder.session(sessionOptions).open()); |
| } catch (Throwable error) { |
| createSession.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| return request(this, createSession); |
| } |
| |
| @Override |
| public Receiver openReceiver(String address) throws ClientException { |
| return openReceiver(address, null); |
| } |
| |
| @Override |
| public Receiver openReceiver(String address, ReceiverOptions receiverOptions) throws ClientException { |
| checkClosedOrFailed(); |
| Objects.requireNonNull(address, "Cannot create a receiver with a null address"); |
| final ClientFuture<Receiver> createReceiver = getFutureFactory().createFuture(); |
| |
| executor.execute(() -> { |
| try { |
| checkClosedOrFailed(); |
| createReceiver.complete(lazyCreateConnectionSession().internalOpenReceiver(address, receiverOptions)); |
| } catch (Throwable error) { |
| createReceiver.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| return request(this, createReceiver); |
| } |
| |
| @Override |
| public Receiver openDurableReceiver(String address, String subscriptionName) throws ClientException { |
| return openDurableReceiver(address, subscriptionName, null); |
| } |
| |
| @Override |
| public Receiver openDurableReceiver(String address, String subscriptionName, ReceiverOptions receiverOptions) throws ClientException { |
| checkClosedOrFailed(); |
| Objects.requireNonNull(address, "Cannot create a receiver with a null address"); |
| final ClientFuture<Receiver> createReceiver = getFutureFactory().createFuture(); |
| |
| executor.execute(() -> { |
| try { |
| checkClosedOrFailed(); |
| createReceiver.complete(lazyCreateConnectionSession().internalOpenDurableReceiver(address, subscriptionName, receiverOptions)); |
| } catch (Throwable error) { |
| createReceiver.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| return request(this, createReceiver); |
| } |
| |
| @Override |
| public Receiver openDynamicReceiver() throws ClientException { |
| return openDynamicReceiver(null, null); |
| } |
| |
| @Override |
| public Receiver openDynamicReceiver(Map<String, Object> dynamicNodeProperties) throws ClientException { |
| return openDynamicReceiver(dynamicNodeProperties, null); |
| } |
| |
| @Override |
| public Receiver openDynamicReceiver(ReceiverOptions receiverOptions) throws ClientException { |
| return openDynamicReceiver(null, receiverOptions); |
| } |
| |
| @Override |
| public Receiver openDynamicReceiver(Map<String, Object> dynamicNodeProperties, ReceiverOptions receiverOptions) throws ClientException { |
| checkClosedOrFailed(); |
| final ClientFuture<Receiver> createReceiver = getFutureFactory().createFuture(); |
| |
| executor.execute(() -> { |
| try { |
| checkClosedOrFailed(); |
| createReceiver.complete(lazyCreateConnectionSession().internalOpenDynamicReceiver(dynamicNodeProperties, receiverOptions)); |
| } catch (Throwable error) { |
| createReceiver.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| return request(this, createReceiver); |
| } |
| |
| @Override |
| public StreamReceiver openStreamReceiver(String address) throws ClientException { |
| return openStreamReceiver(address, null); |
| } |
| |
| @Override |
| public StreamReceiver openStreamReceiver(String address, StreamReceiverOptions receiverOptions) throws ClientException { |
| checkClosedOrFailed(); |
| final ClientFuture<StreamReceiver> createRequest = getFutureFactory().createFuture(); |
| |
| executor.execute(() -> { |
| try { |
| int sessionCapacity = StreamReceiverOptions.DEFAULT_READ_BUFFER_SIZE; |
| if (receiverOptions != null) { |
| sessionCapacity = receiverOptions.readBufferSize() / 2; |
| } |
| |
| // Session capacity cannot be smaller than one frame size so we adjust to the lower bound |
| sessionCapacity = (int) Math.max(sessionCapacity, protonConnection.getMaxFrameSize()); |
| |
| checkClosedOrFailed(); |
| SessionOptions sessionOptions = new SessionOptions(sessionBuilder.getDefaultSessionOptions()); |
| ClientStreamSession session = (ClientStreamSession) sessionBuilder.streamSession(sessionOptions.incomingCapacity(sessionCapacity)).open(); |
| createRequest.complete(session.internalOpenStreamReceiver(address, receiverOptions)); |
| } catch (Throwable error) { |
| createRequest.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| return request(this, createRequest); |
| } |
| |
| @Override |
| public Sender defaultSender() throws ClientException { |
| checkClosedOrFailed(); |
| final ClientFuture<Sender> defaultSender = getFutureFactory().createFuture(); |
| |
| executor.execute(() -> { |
| try { |
| checkClosedOrFailed(); |
| defaultSender.complete(lazyCreateConnectionSender()); |
| } catch (Throwable error) { |
| defaultSender.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| return request(this, defaultSender); |
| } |
| |
| @Override |
| public Sender openSender(String address) throws ClientException { |
| return openSender(address, null); |
| } |
| |
| @Override |
| public Sender openSender(String address, SenderOptions senderOptions) throws ClientException { |
| checkClosedOrFailed(); |
| Objects.requireNonNull(address, "Cannot create a sender with a null address"); |
| final ClientFuture<Sender> createSender = getFutureFactory().createFuture(); |
| |
| executor.execute(() -> { |
| try { |
| checkClosedOrFailed(); |
| createSender.complete(lazyCreateConnectionSession().internalOpenSender(address, senderOptions)); |
| } catch (Throwable error) { |
| createSender.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| return request(this, createSender); |
| } |
| |
| @Override |
| public Sender openAnonymousSender() throws ClientException { |
| return openAnonymousSender(null); |
| } |
| |
| @Override |
| public Sender openAnonymousSender(SenderOptions senderOptions) throws ClientException { |
| checkClosedOrFailed(); |
| final ClientFuture<Sender> createRequest = getFutureFactory().createFuture(); |
| |
| executor.execute(() -> { |
| try { |
| checkClosedOrFailed(); |
| createRequest.complete(lazyCreateConnectionSession().internalOpenAnonymousSender(senderOptions)); |
| } catch (Throwable error) { |
| createRequest.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| return request(this, createRequest); |
| } |
| |
| @Override |
| public StreamSender openStreamSender(String address) throws ClientException { |
| return openStreamSender(address, null); |
| } |
| |
| @Override |
| public StreamSender openStreamSender(String address, StreamSenderOptions senderOptions) throws ClientException { |
| checkClosedOrFailed(); |
| Objects.requireNonNull(address, "Cannot create a sender with a null address"); |
| final ClientFuture<StreamSender> createRequest = getFutureFactory().createFuture(); |
| |
| executor.execute(() -> { |
| try { |
| int sessionCapacity = StreamSenderOptions.DEFAULT_PENDING_WRITES_BUFFER_SIZE; |
| if (senderOptions != null) { |
| sessionCapacity = senderOptions.pendingWritesBufferSize(); |
| } |
| |
| // Session capacity cannot be smaller than one frame size so we adjust to the lower bound |
| sessionCapacity = (int) Math.max(sessionCapacity, protonConnection.getMaxFrameSize()); |
| |
| checkClosedOrFailed(); |
| SessionOptions sessionOptions = new SessionOptions(sessionBuilder.getDefaultSessionOptions()); |
| ClientStreamSession session = (ClientStreamSession) sessionBuilder.streamSession(sessionOptions.outgoingCapacity(sessionCapacity)).open(); |
| createRequest.complete(session.internalOpenStreamSender(address, senderOptions)); |
| } catch (Throwable error) { |
| createRequest.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| return request(this, createRequest); |
| } |
| |
| @Override |
| public Tracker send(Message<?> message) throws ClientException { |
| checkClosedOrFailed(); |
| Objects.requireNonNull(message, "Cannot send a null message"); |
| final ClientFuture<Sender> result = getFutureFactory().createFuture(); |
| |
| executor.execute(() -> { |
| try { |
| checkClosedOrFailed(); |
| result.complete(lazyCreateConnectionSender()); |
| } catch (Throwable error) { |
| result.failed(ClientExceptionSupport.createNonFatalOrPassthrough(error)); |
| } |
| }); |
| |
| return request(this, result).send(message); |
| } |
| |
| @Override |
| public Receiver nextReceiver() throws ClientException { |
| checkClosedOrFailed(); |
| return defaultSession().nextReceiver(); |
| } |
| |
| @Override |
| public Receiver nextReceiver(long timeout, TimeUnit unit) throws ClientException { |
| checkClosedOrFailed(); |
| return defaultSession().nextReceiver(timeout, unit); |
| } |
| |
| @Override |
| public Receiver nextReceiver(NextReceiverPolicy policy) throws ClientException { |
| checkClosedOrFailed(); |
| return defaultSession().nextReceiver(policy); |
| } |
| |
| @Override |
| public Receiver nextReceiver(NextReceiverPolicy policy, long timeout, TimeUnit unit) throws ClientException { |
| checkClosedOrFailed(); |
| return defaultSession().nextReceiver(policy, timeout, unit); |
| } |
| |
| @Override |
| public Map<String, Object> properties() throws ClientException { |
| waitForOpenToComplete(); |
| return ClientConversionSupport.toStringKeyedMap(protonConnection.getRemoteProperties()); |
| } |
| |
| @Override |
| public String[] offeredCapabilities() throws ClientException { |
| waitForOpenToComplete(); |
| return ClientConversionSupport.toStringArray(protonConnection.getRemoteOfferedCapabilities()); |
| } |
| |
| @Override |
| public String[] desiredCapabilities() throws ClientException { |
| waitForOpenToComplete(); |
| return ClientConversionSupport.toStringArray(protonConnection.getRemoteDesiredCapabilities()); |
| } |
| |
| @Override |
| public String toString() { |
| return "ClientConnection:[" + getId() + "]"; |
| } |
| |
| //----- Internal API |
| |
| String getId() { |
| return connectionId; |
| } |
| |
| Engine getEngine() { |
| return engine; |
| } |
| |
| ClientConnection connect() throws ClientException { |
| try { |
| final ReconnectLocation remoteLocation = reconnectPool.getNext(); |
| |
| // Initial configuration validation happens here, if this step fails then the |
| // user most likely configured something incorrect or that violates some constraint |
| // like an invalid SASL mechanism etc. |
| initializeProtonResources(remoteLocation); |
| scheduleReconnect(remoteLocation); |
| |
| return this; |
| } catch (Exception ex) { |
| CLOSED_UPDATER.set(this, 1); |
| FAILURE_CAUSE_UPDATER.compareAndSet(this, null, ClientExceptionSupport.createOrPassthroughFatal(ex)); |
| openFuture.failed(failureCause); |
| closeFuture.complete(this); |
| ioContext.shutdown(); |
| |
| throw failureCause; |
| } |
| } |
| |
| boolean isClosed() { |
| return closed > 0; |
| } |
| |
| ScheduledExecutorService getScheduler() { |
| return executor; |
| } |
| |
| ClientFutureFactory getFutureFactory() { |
| return futureFactory; |
| } |
| |
| ConnectionOptions getOptions() { |
| return options; |
| } |
| |
| ClientConnectionCapabilities getCapabilities() { |
| return capabilities; |
| } |
| |
| org.apache.qpid.protonj2.engine.Connection getProtonConnection() { |
| return protonConnection; |
| } |
| |
| <T> T request(Object requestor, ClientFuture<T> request) throws ClientException { |
| requests.put(request, requestor); |
| |
| try { |
| return request.get(); |
| } catch (Throwable error) { |
| request.cancel(false); |
| throw ClientExceptionSupport.createNonFatalOrPassthrough(error); |
| } finally { |
| requests.remove(request); |
| } |
| } |
| |
| void failAllPendingRequests(Object requestor, ClientException cause) { |
| requests.entrySet().removeIf(entry -> { |
| if (entry.getValue() == requestor) { |
| entry.getKey().failed(cause); |
| return true; |
| } |
| |
| return false; |
| }); |
| } |
| |
| boolean autoFlushOff() { |
| boolean oldState = autoFlush; |
| autoFlush = false; |
| return oldState; |
| } |
| |
| void autoFlushOn() { |
| autoFlush = true; |
| } |
| |
| void flush() { |
| try { |
| transport.flush(); |
| } catch (IOException e) { |
| LOG.debug("Error while flushing engine output to transport: ", e.getMessage()); |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| //----- Private implementation events handlers and utility methods |
| |
| private void handleLocalOpen(org.apache.qpid.protonj2.engine.Connection connection) { |
| connection.tickAuto(getScheduler()); |
| |
| if (options.openTimeout() > 0) { |
| executor.schedule(() -> { |
| if (!openFuture.isDone()) { |
| // Ensure a close write is attempted and then force failure regardless |
| // as we don't expect the remote to respond given it hasn't done so yet. |
| try { |
| connection.close(); |
| } catch (Throwable ignore) {} |
| |
| connection.getEngine().engineFailed(new ClientOperationTimedOutException( |
| "Connection Open timed out waiting for remote to open")); |
| } |
| }, options.openTimeout(), TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| private void handleLocalClose(org.apache.qpid.protonj2.engine.Connection connection) { |
| if (connection.isRemotelyClosed()) { |
| final ClientException failureCause; |
| |
| if (engine.connection().getRemoteCondition() != null) { |
| failureCause = ClientExceptionSupport.convertToConnectionClosedException(connection.getRemoteCondition()); |
| } else { |
| failureCause = new ClientConnectionRemotelyClosedException("Unknown error led to connection disconnect"); |
| } |
| |
| try { |
| connection.getEngine().engineFailed(failureCause); |
| } catch (Throwable ignore) { |
| } |
| } else if (!engine.isShutdown() || !engine.isFailed()) { |
| // Ensure engine gets shut down and future completed if remote doesn't respond. |
| executor.schedule(() -> { |
| try { |
| connection.getEngine().shutdown(); |
| } catch (Throwable ignore) { |
| } |
| }, options.closeTimeout(), TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| private void handleRemoteOpen(org.apache.qpid.protonj2.engine.Connection connection) { |
| connectionEstablished(); |
| capabilities.determineCapabilities(connection); |
| |
| if (totalConnections == 1) { |
| LOG.info("Connection {} connected to server: {}:{}", getId(), transport.getHost(), transport.getPort()); |
| submitConnectionEvent(options.connectedHandler(), transport.getHost(), transport.getPort(), null); |
| } else { |
| LOG.info("Connection {} reconnected to server: {}:{}", getId(), transport.getHost(), transport.getPort()); |
| submitConnectionEvent(options.reconnectedHandler(), transport.getHost(), transport.getPort(), null); |
| } |
| |
| openFuture.complete(this); |
| } |
| |
| private void handleRemoteClose(org.apache.qpid.protonj2.engine.Connection connection) { |
| // When the connection is already locally closed this implies the application requested |
| // a close of this connection so this is normal, if not then the remote is closing for |
| // some reason and we should react as if the connection has failed which we will determine |
| // in the local close handler based on state. |
| if (connection.isLocallyClosed()) { |
| try { |
| connection.getEngine().shutdown(); |
| } catch (Throwable ignore) { |
| LOG.debug("Unexpected exception thrown from engine shutdown: ", ignore); |
| } |
| } else { |
| try { |
| connection.close(); |
| } catch (Throwable ignored) { |
| // Engine handlers will ensure we close down if not already locally closed. |
| } |
| } |
| } |
| |
| private void handleEngineOutput(ProtonBuffer output, Runnable ioComplete) { |
| try { |
| if (autoFlush) { |
| transport.writeAndFlush(output, ioComplete); |
| } else { |
| transport.write(output, ioComplete); |
| } |
| } catch (IOException e) { |
| LOG.debug("Error while writing engine output to transport: ", e.getMessage()); |
| throw new UncheckedIOException(e); |
| } |
| } |
| |
| /* |
| * When an engine fails we check if we can reconnect or not and act accordingly. |
| */ |
| private void handleEngineFailure(Engine engine) { |
| final ClientIOException failureCause; |
| |
| if (engine.connection().getRemoteCondition() != null) { |
| failureCause = ClientExceptionSupport.convertToConnectionClosedException(engine.connection().getRemoteCondition()); |
| } else if (engine.failureCause() != null) { |
| failureCause = ClientExceptionSupport.convertToConnectionClosedException(engine.failureCause()); |
| } else { |
| failureCause = new ClientConnectionRemotelyClosedException("Unknown error led to connection disconnect"); |
| } |
| |
| LOG.trace("Engine reports failure with error: {}", failureCause.getMessage()); |
| |
| if (isReconnectAllowed(failureCause)) { |
| // Disconnect the failed engine for this connection's event handling |
| // to prevent cleanup processing of that engine instance from triggering |
| // normal connection shutdown processing. |
| engine.shutdownHandler(null); |
| |
| LOG.info("Connection {} interrupted to server: {}:{}", getId(), transport.getHost(), transport.getPort()); |
| submitDisconnectionEvent(options.interruptedHandler(), transport.getHost(), transport.getPort(), failureCause); |
| |
| // Initial configuration validation happens here, if this step fails then the |
| // user most likely configured something incorrect or that violates some constraint |
| // like an invalid SASL mechanism etc. |
| try { |
| final ReconnectLocation remoteLocation = reconnectPool.getNext(); |
| |
| initializeProtonResources(remoteLocation); |
| scheduleReconnect(remoteLocation); |
| } catch (ClientException initError) { |
| FAILURE_CAUSE_UPDATER.compareAndSet(this, null, ClientExceptionSupport.createOrPassthroughFatal(initError)); |
| this.engine.shutdown(); // Close down the engine created for reconnect |
| } finally { |
| engine.shutdown(); // Failed instance gets cleaned up. |
| } |
| } else { |
| FAILURE_CAUSE_UPDATER.compareAndSet(this, null, failureCause); |
| engine.shutdown(); |
| } |
| } |
| |
| /* |
| * Handle normal engine shutdown which should only happen when the connection is closed |
| * by the user, all other cases should lead to engine failed event first which will deal |
| * with reconnect cases and avoid this event unless reconnect cannot proceed. |
| */ |
| private void handleEngineShutdown(Engine engine) { |
| try { |
| protonConnection.close(); |
| } catch (Exception ignore) {} |
| |
| try { |
| transport.flush(); |
| } catch (Exception ignore) {} |
| |
| try { |
| transport.close(); |
| } catch (Exception ignore) {} |
| |
| ioContext.shutdownAsync(); |
| |
| if (failureCause != null) |
| { |
| openFuture.failed(failureCause); |
| closeFuture.complete(this); |
| |
| LOG.warn("Connection {} has failed due to: {}", getId(), failureCause != null ? |
| failureCause.getClass().getSimpleName() + " -> " + failureCause.getMessage() : "No failure details provided."); |
| |
| submitDisconnectionEvent(options.disconnectedHandler(), transport.getHost(), transport.getPort(), failureCause); |
| } |
| else |
| { |
| openFuture.complete(this); |
| closeFuture.complete(this); |
| } |
| |
| client.unregisterConnection(this); |
| } |
| |
| private void submitConnectionEvent(BiConsumer<Connection, ConnectionEvent> handler, String host, int port, ClientIOException cause) { |
| if (handler != null) { |
| try { |
| notifications.submit(() -> { |
| try { |
| handler.accept(this, new ConnectionEvent(host, port)); |
| } catch (Exception ex) { |
| LOG.trace("User supplied connection life-cycle event handler threw: ", ex); |
| } |
| }); |
| } catch (Exception ex) { |
| LOG.trace("Error thrown while attempting to submit event notification ", ex); |
| } |
| } |
| } |
| |
| private void submitDisconnectionEvent(BiConsumer<Connection, DisconnectionEvent> handler, String host, int port, ClientIOException cause) { |
| if (handler != null) { |
| try { |
| notifications.submit(() -> { |
| try { |
| handler.accept(this, new DisconnectionEvent(host, port, cause)); |
| } catch (Exception ex) { |
| LOG.trace("User supplied disconnection life-cycle event handler threw: ", ex); |
| } |
| }); |
| } catch (Exception ex) { |
| LOG.trace("Error thrown while attempting to submit event notification ", ex); |
| } |
| } |
| } |
| |
| private Engine configureEngineSaslSupport() { |
| if (options.saslOptions().saslEnabled()) { |
| SaslMechanismSelector mechSelector = |
| new SaslMechanismSelector(ClientConversionSupport.toSymbolSet(options.saslOptions().allowedMechanisms())); |
| |
| engine.saslDriver().client().setListener(new SaslAuthenticator(mechSelector, new SaslCredentialsProvider() { |
| |
| @Override |
| public String vhost() { |
| return options.virtualHost(); |
| } |
| |
| @Override |
| public String username() { |
| return options.user(); |
| } |
| |
| @Override |
| public String password() { |
| return options.password(); |
| } |
| |
| @Override |
| public Principal localPrincipal() { |
| return transport.getLocalPrincipal(); |
| } |
| })); |
| } |
| |
| return engine; |
| } |
| |
| private void initializeProtonResources(ReconnectLocation location) throws ClientException { |
| if (options.saslOptions().saslEnabled()) { |
| engine = EngineFactory.PROTON.createEngine(); |
| } else { |
| engine = EngineFactory.PROTON.createNonSaslEngine(); |
| } |
| |
| if (options.traceFrames()) { |
| engine.configuration().setTraceFrames(true); |
| if (!engine.configuration().isTraceFrames()) { |
| LOG.warn("Connection {} frame tracing was enabled but protocol engine does not support it", getId()); |
| } |
| } |
| |
| engine.outputHandler(this::handleEngineOutput) |
| .shutdownHandler(this::handleEngineShutdown) |
| .errorHandler(this::handleEngineFailure); |
| |
| protonConnection = engine.connection(); |
| |
| if (client.containerId() != null) { |
| protonConnection.setContainerId(client.containerId()); |
| } else { |
| protonConnection.setContainerId(connectionId); |
| } |
| |
| protonConnection.setLinkedResource(this); |
| protonConnection.setChannelMax(options.channelMax()); |
| protonConnection.setMaxFrameSize(options.maxFrameSize()); |
| protonConnection.setHostname(location.getHost()); |
| protonConnection.setIdleTimeout((int) options.idleTimeout()); |
| protonConnection.setOfferedCapabilities(ClientConversionSupport.toSymbolArray(options.offeredCapabilities())); |
| protonConnection.setDesiredCapabilities(ClientConversionSupport.toSymbolArray(options.desiredCapabilities())); |
| protonConnection.setProperties(ClientConversionSupport.toSymbolKeyedMap(options.properties())); |
| protonConnection.localOpenHandler(this::handleLocalOpen) |
| .localCloseHandler(this::handleLocalClose) |
| .openHandler(this::handleRemoteOpen) |
| .closeHandler(this::handleRemoteClose); |
| |
| configureEngineSaslSupport(); |
| } |
| |
| private ClientSession lazyCreateConnectionSession() throws ClientException { |
| if (connectionSession == null) { |
| connectionSession = sessionBuilder.session(null).open(); |
| } |
| |
| return connectionSession; |
| } |
| |
| private Sender lazyCreateConnectionSender() throws ClientException { |
| if (connectionSender == null) { |
| if (openFuture.isComplete()) { |
| checkAnonymousRelaySupported(); |
| } |
| |
| connectionSender = lazyCreateConnectionSession().internalOpenAnonymousSender(null); |
| connectionSender.remotelyClosedHandler((sender) -> { |
| try { |
| sender.closeAsync(); |
| } catch (Throwable ignore) {} |
| |
| // Clear the old closed sender, a lazy create needs to construct a new sender. |
| connectionSender = null; |
| }); |
| } |
| |
| return connectionSender; |
| } |
| |
| void checkAnonymousRelaySupported() throws ClientUnsupportedOperationException { |
| if (!capabilities.anonymousRelaySupported()) { |
| throw new ClientUnsupportedOperationException("Anonymous relay support not available from this connection"); |
| } |
| } |
| |
| protected void checkClosedOrFailed() throws ClientException { |
| if (closed > 0) { |
| throw new ClientIllegalStateException("The Connection was explicitly closed", failureCause); |
| } else if (failureCause != null) { |
| throw failureCause; |
| } |
| } |
| |
| private void waitForOpenToComplete() throws ClientException { |
| if (!openFuture.isComplete() || openFuture.isFailed()) { |
| try { |
| openFuture.get(); |
| } catch (ExecutionException | InterruptedException e) { |
| Thread.interrupted(); |
| if (failureCause != null) { |
| throw failureCause; |
| } else { |
| throw ClientExceptionSupport.createNonFatalOrPassthrough(e.getCause()); |
| } |
| } |
| } |
| } |
| |
| //----- Reconnection related internal API |
| |
| private void attemptConnection(ReconnectLocation location) { |
| try { |
| reconnectAttempts++; |
| transport = ioContext.newTransport(); |
| LOG.trace("Connection {} Attempting connection to remote {}:{}", getId(), location.getHost(), location.getPort()); |
| transport.connect(location.getHost(), location.getPort(), new ClientTransportListener(this, engine)); |
| } catch (Throwable error) { |
| engine.engineFailed(ClientExceptionSupport.createOrPassthroughFatal(error)); |
| } |
| } |
| |
| private void scheduleReconnect(ReconnectLocation location) { |
| // Warn of ongoing connection attempts if configured. |
| int warnInterval = options.reconnectOptions().warnAfterReconnectAttempts(); |
| if (reconnectAttempts > 0 && warnInterval > 0 && (reconnectAttempts % warnInterval) == 0) { |
| LOG.warn("Connection {}: Failed to connect after: {} attempt(s) continuing to retry.", getId(), reconnectAttempts); |
| } |
| |
| // If no connection recovery required then we have never fully connected to a remote |
| // so we proceed down the connect with one immediate connection attempt and then follow |
| // on delayed attempts based on configuration. |
| if (totalConnections == 0) { |
| if (reconnectAttempts == 0) { |
| LOG.trace("Initial connect attempt will be performed immediately"); |
| executor.execute(() -> attemptConnection(location)); |
| } else { |
| long delay = nextReconnectDelay(); |
| LOG.trace("Next connect attempt will be in {} milliseconds", delay); |
| executor.schedule(() -> attemptConnection(location), delay, TimeUnit.MILLISECONDS); |
| } |
| } else if (reconnectAttempts == 0) { |
| LOG.trace("Initial reconnect attempt will be performed immediately"); |
| executor.execute(() -> attemptConnection(location)); |
| } else { |
| long delay = nextReconnectDelay(); |
| LOG.trace("Next reconnect attempt will be in {} milliseconds", delay); |
| executor.schedule(() -> attemptConnection(location), delay, TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| private void connectionEstablished() { |
| totalConnections++; |
| nextReconnectDelay = -1; |
| reconnectAttempts = 0; |
| } |
| |
| private boolean isLimitExceeded() { |
| int reconnectLimit = reconnectAttemptLimit(); |
| if (reconnectLimit != UNLIMITED && reconnectAttempts >= reconnectLimit) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| private boolean isReconnectAllowed(ClientException cause) { |
| if (options.reconnectOptions().reconnectEnabled() && !isClosed()) { |
| // If a connection attempts fail due to Security errors than we abort |
| // reconnection as there is a configuration issue and we want to avoid |
| // a spinning reconnect cycle that can never complete. |
| if (isStoppageCause(cause)) { |
| return false; |
| } |
| |
| return !isLimitExceeded(); |
| } else { |
| return false; |
| } |
| } |
| |
| private boolean isStoppageCause(ClientException cause) { |
| if (cause instanceof ClientConnectionSecuritySaslException) { |
| ClientConnectionSecuritySaslException saslFailure = (ClientConnectionSecuritySaslException) cause; |
| return !saslFailure.isSysTempFailure(); |
| } else if (cause instanceof ClientConnectionSecurityException ) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| private int reconnectAttemptLimit() { |
| int maxReconnectValue = options.reconnectOptions().maxReconnectAttempts(); |
| if (totalConnections == 0 && options.reconnectOptions().maxInitialConnectionAttempts() != UNDEFINED) { |
| // If this is the first connection attempt and a specific startup retry limit |
| // is configured then use it, otherwise use the main reconnect limit |
| maxReconnectValue = options.reconnectOptions().maxInitialConnectionAttempts(); |
| } |
| |
| return maxReconnectValue; |
| } |
| |
| private long nextReconnectDelay() { |
| if (nextReconnectDelay == UNDEFINED) { |
| nextReconnectDelay = options.reconnectOptions().reconnectDelay(); |
| } |
| |
| if (options.reconnectOptions().useReconnectBackOff() && reconnectAttempts > 1) { |
| // Exponential increment of reconnect delay. |
| nextReconnectDelay *= options.reconnectOptions().reconnectBackOffMultiplier(); |
| if (nextReconnectDelay > options.reconnectOptions().maxReconnectDelay()) { |
| nextReconnectDelay = options.reconnectOptions().maxReconnectDelay(); |
| } |
| } |
| |
| return nextReconnectDelay; |
| } |
| } |