| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.net; |
| |
| import java.io.IOException; |
| import java.net.ConnectException; |
| import java.net.InetSocketAddress; |
| import java.nio.channels.ClosedChannelException; |
| import java.util.Objects; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLongFieldUpdater; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; |
| |
| import javax.annotation.Nullable; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.util.concurrent.Uninterruptibles; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelFuture; |
| import io.netty.channel.ChannelHandlerContext; |
| import io.netty.channel.ChannelInboundHandlerAdapter; |
| import io.netty.channel.EventLoop; |
| import io.netty.channel.unix.Errors; |
| import io.netty.util.concurrent.Future; |
| import io.netty.util.concurrent.Promise; |
| import io.netty.util.concurrent.PromiseNotifier; |
| import io.netty.util.concurrent.SucceededFuture; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.io.util.DataOutputBufferFixed; |
| import org.apache.cassandra.net.OutboundConnectionInitiator.Result.MessagingSuccess; |
| import org.apache.cassandra.tracing.Tracing; |
| import org.apache.cassandra.utils.FBUtilities; |
| import org.apache.cassandra.utils.JVMStabilityInspector; |
| import org.apache.cassandra.utils.NoSpamLogger; |
| |
| import static java.lang.Math.max; |
| import static java.lang.Math.min; |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static org.apache.cassandra.net.MessagingService.current_version; |
| import static org.apache.cassandra.net.OutboundConnectionInitiator.*; |
| import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD; |
| import static org.apache.cassandra.net.ResourceLimits.*; |
| import static org.apache.cassandra.net.ResourceLimits.Outcome.*; |
| import static org.apache.cassandra.net.SocketFactory.*; |
| import static org.apache.cassandra.utils.FBUtilities.prettyPrintMemory; |
| import static org.apache.cassandra.utils.MonotonicClock.approxTime; |
| import static org.apache.cassandra.utils.Throwables.isCausedBy; |
| |
| /** |
| * Represents a connection type to a peer, and handles the state transistions on the connection and the netty {@link Channel}. |
| * The underlying socket is not opened until explicitly requested (by sending a message). |
| * |
| * TODO: complete this description |
| * |
| * Aside from a few administrative methods, the main entry point to sending a message is {@link #enqueue(Message)}. |
| * Any thread may send a message (enqueueing it to {@link #queue}), but only one thread may consume messages from this |
| * queue. There is a single delivery thread - either the event loop, or a companion thread - that has logical ownership |
| * of the queue, but other threads may temporarily take ownership in order to perform book keeping, pruning, etc., |
| * to ensure system stability. |
| * |
| * {@link Delivery#run()} is the main entry point for consuming messages from the queue, and executes either on the event |
| * loop or on a non-dedicated companion thread. This processing is activated via {@link Delivery#execute()}. |
| * |
| * Almost all internal state maintenance on this class occurs on the eventLoop, a single threaded executor which is |
| * assigned in the constructor. Further details are outlined below in the class. Some behaviours require coordination |
| * between the eventLoop and the companion thread (if any). Some minimal set of behaviours are permitted to occur on |
| * producers to ensure the connection remains healthy and does not overcommit resources. |
| * |
| * All methods are safe to invoke from any thread unless otherwise stated. |
| */ |
| @SuppressWarnings({ "WeakerAccess", "FieldMayBeFinal", "NonAtomicOperationOnVolatileField", "SameParameterValue" }) |
| public class OutboundConnection |
| { |
| static final Logger logger = LoggerFactory.getLogger(OutboundConnection.class); |
| private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 30L, TimeUnit.SECONDS); |
| |
| private static final AtomicLongFieldUpdater<OutboundConnection> submittedUpdater = AtomicLongFieldUpdater.newUpdater(OutboundConnection.class, "submittedCount"); |
| private static final AtomicLongFieldUpdater<OutboundConnection> pendingCountAndBytesUpdater = AtomicLongFieldUpdater.newUpdater(OutboundConnection.class, "pendingCountAndBytes"); |
| private static final AtomicLongFieldUpdater<OutboundConnection> overloadedCountUpdater = AtomicLongFieldUpdater.newUpdater(OutboundConnection.class, "overloadedCount"); |
| private static final AtomicLongFieldUpdater<OutboundConnection> overloadedBytesUpdater = AtomicLongFieldUpdater.newUpdater(OutboundConnection.class, "overloadedBytes"); |
| private static final AtomicReferenceFieldUpdater<OutboundConnection, Future> closingUpdater = AtomicReferenceFieldUpdater.newUpdater(OutboundConnection.class, Future.class, "closing"); |
| private static final AtomicReferenceFieldUpdater<OutboundConnection, Future> scheduledCloseUpdater = AtomicReferenceFieldUpdater.newUpdater(OutboundConnection.class, Future.class, "scheduledClose"); |
| |
| private final EventLoop eventLoop; |
| private final Delivery delivery; |
| |
| private final OutboundMessageCallbacks callbacks; |
| private final OutboundDebugCallbacks debug; |
| @VisibleForTesting |
| final OutboundMessageQueue queue; |
| /** the number of bytes we permit to queue to the network without acquiring any shared resource permits */ |
| private final long pendingCapacityInBytes; |
| /** the number of messages and bytes queued for flush to the network, |
| * including those that are being flushed but have not been completed, |
| * packed into a long (top 20 bits for count, bottom 42 for bytes)*/ |
| private volatile long pendingCountAndBytes = 0; |
| /** global shared limits that we use only if our local limits are exhausted; |
| * we allocate from here whenever queueSize > queueCapacity */ |
| private final EndpointAndGlobal reserveCapacityInBytes; |
| |
| /** Used in logging statements to lazily build a human-readable number of pending bytes. */ |
| private final Object readablePendingBytes = |
| new Object() { @Override public String toString() { return prettyPrintMemory(pendingBytes()); } }; |
| |
| /** Used in logging statements to lazily build a human-readable number of reserve endpoint bytes in use. */ |
| private final Object readableReserveEndpointUsing = |
| new Object() { @Override public String toString() { return prettyPrintMemory(reserveCapacityInBytes.endpoint.using()); } }; |
| |
| /** Used in logging statements to lazily build a human-readable number of reserve global bytes in use. */ |
| private final Object readableReserveGlobalUsing = |
| new Object() { @Override public String toString() { return prettyPrintMemory(reserveCapacityInBytes.global.using()); } }; |
| |
| private volatile long submittedCount = 0; // updated with cas |
| private volatile long overloadedCount = 0; // updated with cas |
| private volatile long overloadedBytes = 0; // updated with cas |
| private long expiredCount = 0; // updated with queue lock held |
| private long expiredBytes = 0; // updated with queue lock held |
| private long errorCount = 0; // updated only by delivery thread |
| private long errorBytes = 0; // updated by delivery thread only |
| private long sentCount; // updated by delivery thread only |
| private long sentBytes; // updated by delivery thread only |
| private long successfulConnections; // updated by event loop only |
| private long connectionAttempts; // updated by event loop only |
| |
| private static final int pendingByteBits = 42; |
| private static boolean isMaxPendingCount(long pendingCountAndBytes) |
| { |
| return (pendingCountAndBytes & (-1L << pendingByteBits)) == (-1L << pendingByteBits); |
| } |
| |
| private static int pendingCount(long pendingCountAndBytes) |
| { |
| return (int) (pendingCountAndBytes >>> pendingByteBits); |
| } |
| |
| private static long pendingBytes(long pendingCountAndBytes) |
| { |
| return pendingCountAndBytes & (-1L >>> (64 - pendingByteBits)); |
| } |
| |
| private static long pendingCountAndBytes(long pendingCount, long pendingBytes) |
| { |
| return (pendingCount << pendingByteBits) | pendingBytes; |
| } |
| |
| private final ConnectionType type; |
| |
| /** |
| * Contains the base settings for this connection, _including_ any defaults filled in. |
| * |
| */ |
| private OutboundConnectionSettings template; |
| |
| private static class State |
| { |
| static final State CLOSED = new State(Kind.CLOSED); |
| |
| enum Kind { ESTABLISHED, CONNECTING, DORMANT, CLOSED } |
| |
| final Kind kind; |
| |
| State(Kind kind) |
| { |
| this.kind = kind; |
| } |
| |
| boolean isEstablished() { return kind == Kind.ESTABLISHED; } |
| boolean isConnecting() { return kind == Kind.CONNECTING; } |
| boolean isDisconnected() { return kind == Kind.CONNECTING || kind == Kind.DORMANT; } |
| boolean isClosed() { return kind == Kind.CLOSED; } |
| |
| Established established() { return (Established) this; } |
| Connecting connecting() { return (Connecting) this; } |
| Disconnected disconnected() { return (Disconnected) this; } |
| } |
| |
| /** |
| * We have successfully negotiated a channel, and believe it to still be valid. |
| * |
| * Before using this, we should check isConnected() to check the Channel hasn't |
| * become invalid. |
| */ |
| private static class Established extends State |
| { |
| final int messagingVersion; |
| final Channel channel; |
| final FrameEncoder.PayloadAllocator payloadAllocator; |
| final OutboundConnectionSettings settings; |
| |
| Established(int messagingVersion, Channel channel, FrameEncoder.PayloadAllocator payloadAllocator, OutboundConnectionSettings settings) |
| { |
| super(Kind.ESTABLISHED); |
| this.messagingVersion = messagingVersion; |
| this.channel = channel; |
| this.payloadAllocator = payloadAllocator; |
| this.settings = settings; |
| } |
| |
| boolean isConnected() { return channel.isOpen(); } |
| } |
| |
| private static class Disconnected extends State |
| { |
| /** Periodic message expiry scheduled while we are disconnected; this will be cancelled and cleared each time we connect */ |
| final Future<?> maintenance; |
| Disconnected(Kind kind, Future<?> maintenance) |
| { |
| super(kind); |
| this.maintenance = maintenance; |
| } |
| |
| public static Disconnected dormant(Future<?> maintenance) |
| { |
| return new Disconnected(Kind.DORMANT, maintenance); |
| } |
| } |
| |
| private static class Connecting extends Disconnected |
| { |
| /** |
| * Currently (or scheduled to) (re)connect; this may be cancelled (if closing) or waited on (for delivery) |
| * |
| * - The work managed by this future is partially performed asynchronously, not necessarily on the eventLoop. |
| * - It is only completed on the eventLoop |
| * - It may not be executing, but might be scheduled to be submitted if {@link #scheduled} is not null |
| */ |
| final Future<Result<MessagingSuccess>> attempt; |
| |
| /** |
| * If we are retrying to connect with some delay, this represents the scheduled inititation of another attempt |
| */ |
| @Nullable |
| final Future<?> scheduled; |
| |
| /** |
| * true iff we are retrying to connect after some failure (immediately or following a delay) |
| */ |
| final boolean isFailingToConnect; |
| |
| Connecting(Disconnected previous, Future<Result<MessagingSuccess>> attempt) |
| { |
| this(previous, attempt, null); |
| } |
| |
| Connecting(Disconnected previous, Future<Result<MessagingSuccess>> attempt, Future<?> scheduled) |
| { |
| super(Kind.CONNECTING, previous.maintenance); |
| this.attempt = attempt; |
| this.scheduled = scheduled; |
| this.isFailingToConnect = scheduled != null || (previous.isConnecting() && previous.connecting().isFailingToConnect); |
| } |
| |
| /** |
| * Cancel the connection attempt |
| * |
| * No cleanup is needed here, as {@link #attempt} is only completed on the eventLoop, |
| * so we have either already invoked the callbacks and are no longer in {@link #state}, |
| * or the {@link OutboundConnectionInitiator} will handle our successful cancellation |
| * when it comes to complete, by closing the channel (if we could not cancel it before then) |
| */ |
| void cancel() |
| { |
| if (scheduled != null) |
| scheduled.cancel(true); |
| |
| // we guarantee that attempt is only ever completed by the eventLoop |
| boolean cancelled = attempt.cancel(true); |
| assert cancelled; |
| } |
| } |
| |
| private volatile State state; |
| |
| /** The connection is being permanently closed */ |
| private volatile Future<Void> closing; |
| /** The connection is being permanently closed in the near future */ |
| private volatile Future<Void> scheduledClose; |
| |
| OutboundConnection(ConnectionType type, OutboundConnectionSettings settings, EndpointAndGlobal reserveCapacityInBytes) |
| { |
| this.template = settings.withDefaults(ConnectionCategory.MESSAGING); |
| this.type = type; |
| this.eventLoop = template.socketFactory.defaultGroup().next(); |
| this.pendingCapacityInBytes = template.applicationSendQueueCapacityInBytes; |
| this.reserveCapacityInBytes = reserveCapacityInBytes; |
| this.callbacks = template.callbacks; |
| this.debug = template.debug; |
| this.queue = new OutboundMessageQueue(approxTime, this::onExpired); |
| this.delivery = type == ConnectionType.LARGE_MESSAGES |
| ? new LargeMessageDelivery(template.socketFactory.synchronousWorkExecutor) |
| : new EventLoopDelivery(); |
| setDisconnected(); |
| } |
| |
| /** |
| * This is the main entry point for enqueuing a message to be sent to the remote peer. |
| */ |
| public void enqueue(Message message) throws ClosedChannelException |
| { |
| if (isClosing()) |
| throw new ClosedChannelException(); |
| |
| final int canonicalSize = canonicalSize(message); |
| if (canonicalSize > DatabaseDescriptor.getInternodeMaxMessageSizeInBytes()) |
| throw new Message.OversizedMessageException(canonicalSize); |
| |
| submittedUpdater.incrementAndGet(this); |
| switch (acquireCapacity(canonicalSize)) |
| { |
| case INSUFFICIENT_ENDPOINT: |
| // if we're overloaded to one endpoint, we may be accumulating expirable messages, so |
| // attempt an expiry to see if this makes room for our newer message. |
| // this is an optimisation only; messages will be expired on ~100ms cycle, and by Delivery when it runs |
| if (queue.maybePruneExpired() && SUCCESS == acquireCapacity(canonicalSize)) |
| break; |
| case INSUFFICIENT_GLOBAL: |
| onOverloaded(message); |
| return; |
| } |
| |
| queue.add(message); |
| delivery.execute(); |
| |
| // we might race with the channel closing; if this happens, to ensure this message eventually arrives |
| // we need to remove ourselves from the queue and throw a ClosedChannelException, so that another channel |
| // can be opened in our place to try and send on. |
| if (isClosing() && queue.remove(message)) |
| { |
| releaseCapacity(1, canonicalSize); |
| throw new ClosedChannelException(); |
| } |
| } |
| |
| /** |
| * Try to acquire the necessary resource permits for a number of pending bytes for this connection. |
| * |
| * Since the owner limit is shared amongst multiple connections, our semantics cannot be super trivial. |
| * Were they per-connection, we could simply perform an atomic increment of the queue size, then |
| * allocate any excess we need in the reserve, and on release free everything we see from both. |
| * Since we are coordinating two independent atomic variables we have to track every byte we allocate in reserve |
| * and ensure it is matched by a corresponding released byte. We also need to be sure we do not permit another |
| * releasing thread to release reserve bytes we have not yet - and may never - actually reserve. |
| * |
| * As such, we have to first check if we would need reserve bytes, then allocate them *before* we increment our |
| * queue size. We only increment the queue size if the reserve bytes are definitely not needed, or we could first |
| * obtain them. If in the process of obtaining any reserve bytes the queue size changes, we have some bytes that are |
| * reserved for us, but may be a different number to that we need. So we must continue to track these. |
| * |
| * In the happy path, this is still efficient as we simply CAS |
| */ |
| private Outcome acquireCapacity(long bytes) |
| { |
| return acquireCapacity(1, bytes); |
| } |
| |
| private Outcome acquireCapacity(long count, long bytes) |
| { |
| long increment = pendingCountAndBytes(count, bytes); |
| long unusedClaimedReserve = 0; |
| Outcome outcome = null; |
| loop: while (true) |
| { |
| long current = pendingCountAndBytes; |
| if (isMaxPendingCount(current)) |
| { |
| outcome = INSUFFICIENT_ENDPOINT; |
| break; |
| } |
| |
| long next = current + increment; |
| if (pendingBytes(next) <= pendingCapacityInBytes) |
| { |
| if (pendingCountAndBytesUpdater.compareAndSet(this, current, next)) |
| { |
| outcome = SUCCESS; |
| break; |
| } |
| continue; |
| } |
| |
| State state = this.state; |
| if (state.isConnecting() && state.connecting().isFailingToConnect) |
| { |
| outcome = INSUFFICIENT_ENDPOINT; |
| break; |
| } |
| |
| long requiredReserve = min(bytes, pendingBytes(next) - pendingCapacityInBytes); |
| if (unusedClaimedReserve < requiredReserve) |
| { |
| long extraGlobalReserve = requiredReserve - unusedClaimedReserve; |
| switch (outcome = reserveCapacityInBytes.tryAllocate(extraGlobalReserve)) |
| { |
| case INSUFFICIENT_ENDPOINT: |
| case INSUFFICIENT_GLOBAL: |
| break loop; |
| case SUCCESS: |
| unusedClaimedReserve += extraGlobalReserve; |
| } |
| } |
| |
| if (pendingCountAndBytesUpdater.compareAndSet(this, current, next)) |
| { |
| unusedClaimedReserve -= requiredReserve; |
| break; |
| } |
| } |
| |
| if (unusedClaimedReserve > 0) |
| reserveCapacityInBytes.release(unusedClaimedReserve); |
| |
| return outcome; |
| } |
| |
| /** |
| * Mark a number of pending bytes as flushed to the network, releasing their capacity for new outbound messages. |
| */ |
| private void releaseCapacity(long count, long bytes) |
| { |
| long decrement = pendingCountAndBytes(count, bytes); |
| long prev = pendingCountAndBytesUpdater.getAndAdd(this, -decrement); |
| if (pendingBytes(prev) > pendingCapacityInBytes) |
| { |
| long excess = min(pendingBytes(prev) - pendingCapacityInBytes, bytes); |
| reserveCapacityInBytes.release(excess); |
| } |
| } |
| |
| private void onOverloaded(Message<?> message) |
| { |
| overloadedCountUpdater.incrementAndGet(this); |
| |
| int canonicalSize = canonicalSize(message); |
| overloadedBytesUpdater.addAndGet(this, canonicalSize); |
| |
| noSpamLogger.warn("{} overloaded; dropping {} message (queue: {} local, {} endpoint, {} global)", |
| this, FBUtilities.prettyPrintMemory(canonicalSize), |
| readablePendingBytes, readableReserveEndpointUsing, readableReserveGlobalUsing); |
| |
| callbacks.onOverloaded(message, template.to); |
| } |
| |
| /** |
| * Take any necessary cleanup action after a message has been selected to be discarded from the queue. |
| * |
| * Only to be invoked while holding OutboundMessageQueue.WithLock |
| */ |
| private boolean onExpired(Message<?> message) |
| { |
| noSpamLogger.warn("{} dropping message of type {} whose timeout expired before reaching the network", id(), message.verb()); |
| releaseCapacity(1, canonicalSize(message)); |
| expiredCount += 1; |
| expiredBytes += canonicalSize(message); |
| callbacks.onExpired(message, template.to); |
| return true; |
| } |
| |
| /** |
| * Take any necessary cleanup action after a message has been selected to be discarded from the queue. |
| * |
| * Only to be invoked by the delivery thread |
| */ |
| private void onFailedSerialize(Message<?> message, int messagingVersion, int bytesWrittenToNetwork, Throwable t) |
| { |
| logger.warn("{} dropping message of type {} due to error", id(), message.verb(), t); |
| JVMStabilityInspector.inspectThrowable(t); |
| releaseCapacity(1, canonicalSize(message)); |
| errorCount += 1; |
| errorBytes += message.serializedSize(messagingVersion); |
| callbacks.onFailedSerialize(message, template.to, messagingVersion, bytesWrittenToNetwork, t); |
| } |
| |
| /** |
| * Take any necessary cleanup action after a message has been selected to be discarded from the queue on close. |
| * Note that this is only for messages that were queued prior to closing without graceful flush, OR |
| * for those that are unceremoniously dropped when we decide close has been trying to complete for too long. |
| */ |
| private void onClosed(Message<?> message) |
| { |
| releaseCapacity(1, canonicalSize(message)); |
| callbacks.onDiscardOnClose(message, template.to); |
| } |
| |
| /** |
| * Delivery bundles the following: |
| * |
| * - the work that is necessary to actually deliver messages safely, and handle any exceptional states |
| * - the ability to schedule delivery for some time in the future |
| * - the ability to schedule some non-delivery work to happen some time in the future, that is guaranteed |
| * NOT to coincide with delivery for its duration, including any data that is being flushed (e.g. for closing channels) |
| * - this feature is *not* efficient, and should only be used for infrequent operations |
| */ |
| private abstract class Delivery extends AtomicInteger implements Runnable |
| { |
| final ExecutorService executor; |
| |
| // the AtomicInteger we extend always contains some combination of these bit flags, representing our current run state |
| |
| /** Not running, and will not be scheduled again until transitioned to a new state */ |
| private static final int STOPPED = 0; |
| /** Currently executing (may only be scheduled to execute, or may be about to terminate); |
| * will stop at end of this run, without rescheduling */ |
| private static final int EXECUTING = 1; |
| /** Another execution has been requested; a new execution will begin some time after this state is taken */ |
| private static final int EXECUTE_AGAIN = 2; |
| /** We are currently executing and will submit another execution before we terminate */ |
| private static final int EXECUTING_AGAIN = EXECUTING | EXECUTE_AGAIN; |
| /** Will begin a new execution some time after this state is taken, but only once some condition is met. |
| * This state will initially be taken in tandem with EXECUTING, but if delivery completes without clearing |
| * the state, the condition will be held on its own until {@link #executeAgain} is invoked */ |
| private static final int WAITING_TO_EXECUTE = 4; |
| |
| /** |
| * Force all task execution to stop, once any currently in progress work is completed |
| */ |
| private volatile boolean terminated; |
| |
| /** |
| * Is there asynchronous delivery work in progress. |
| * |
| * This temporarily prevents any {@link #stopAndRun} work from being performed. |
| * Once both inProgress and stopAndRun are set we perform no more delivery work until one is unset, |
| * to ensure we eventually run stopAndRun. |
| * |
| * This should be updated and read only on the Delivery thread. |
| */ |
| private boolean inProgress = false; |
| |
| /** |
| * Request a task's execution while there is no delivery work in progress. |
| * |
| * This is to permit cleanly tearing down a connection without interrupting any messages that might be in flight. |
| * If stopAndRun is set, we should not enter doRun() until a corresponding setInProgress(false) occurs. |
| */ |
| final AtomicReference<Runnable> stopAndRun = new AtomicReference<>(); |
| |
| Delivery(ExecutorService executor) |
| { |
| this.executor = executor; |
| } |
| |
| /** |
| * Ensure that any messages or stopAndRun that were queued prior to this invocation will be seen by at least |
| * one future invocation of the delivery task, unless delivery has already been terminated. |
| */ |
| public void execute() |
| { |
| if (get() < EXECUTE_AGAIN && STOPPED == getAndUpdate(i -> i == STOPPED ? EXECUTING: i | EXECUTE_AGAIN)) |
| executor.execute(this); |
| } |
| |
| private boolean isExecuting(int state) |
| { |
| return 0 != (state & EXECUTING); |
| } |
| |
| /** |
| * This method is typically invoked after WAITING_TO_EXECUTE is set. |
| * |
| * However WAITING_TO_EXECUTE does not need to be set; all this method needs to ensure is that |
| * delivery unconditionally performs one new execution promptly. |
| */ |
| void executeAgain() |
| { |
| // if we are already executing, set EXECUTING_AGAIN and leave scheduling to the currently running one. |
| // otherwise, set ourselves unconditionally to EXECUTING and schedule ourselves immediately |
| if (!isExecuting(getAndUpdate(i -> !isExecuting(i) ? EXECUTING : EXECUTING_AGAIN))) |
| executor.execute(this); |
| } |
| |
| /** |
| * Invoke this when we cannot make further progress now, but we guarantee that we will execute later when we can. |
| * This simply communicates to {@link #run} that we should not schedule ourselves again, just unset the EXECUTING bit. |
| */ |
| void promiseToExecuteLater() |
| { |
| set(EXECUTING | WAITING_TO_EXECUTE); |
| } |
| |
| /** |
| * Called when exiting {@link #run} to schedule another run if necessary. |
| * |
| * If we are currently executing, we only reschedule if the present state is EXECUTING_AGAIN. |
| * If this is the case, we clear the EXECUTE_AGAIN bit (setting ourselves to EXECUTING), and reschedule. |
| * Otherwise, we clear the EXECUTING bit and terminate, which will set us to either STOPPED or WAITING_TO_EXECUTE |
| * (or possibly WAITING_TO_EXECUTE | EXECUTE_AGAIN, which is logically the same as WAITING_TO_EXECUTE) |
| */ |
| private void maybeExecuteAgain() |
| { |
| if (EXECUTING_AGAIN == getAndUpdate(i -> i == EXECUTING_AGAIN ? EXECUTING : (i & ~EXECUTING))) |
| executor.execute(this); |
| } |
| |
| /** |
| * No more tasks or delivery will be executed, once any in progress complete. |
| */ |
| public void terminate() |
| { |
| terminated = true; |
| } |
| |
| /** |
| * Only to be invoked by the Delivery task. |
| * |
| * If true, indicates that we have begun asynchronous delivery work, so that |
| * we cannot safely stopAndRun until it completes. |
| * |
| * Once it completes, we ensure any stopAndRun task has a chance to execute |
| * by ensuring delivery is scheduled. |
| * |
| * If stopAndRun is also set, we should not enter doRun() until a corresponding |
| * setInProgress(false) occurs. |
| */ |
| void setInProgress(boolean inProgress) |
| { |
| boolean wasInProgress = this.inProgress; |
| this.inProgress = inProgress; |
| if (!inProgress && wasInProgress) |
| executeAgain(); |
| } |
| |
| /** |
| * Perform some delivery work. |
| * |
| * Must never be invoked directly, only via {@link #execute()} |
| */ |
| public void run() |
| { |
| /* do/while handling setup for {@link #doRun()}, and repeat invocations thereof */ |
| while (true) |
| { |
| if (terminated) |
| return; |
| |
| if (null != stopAndRun.get()) |
| { |
| // if we have an external request to perform, attempt it - if no async delivery is in progress |
| |
| if (inProgress) |
| { |
| // if we are in progress, we cannot do anything; |
| // so, exit and rely on setInProgress(false) executing us |
| // (which must happen later, since it must happen on this thread) |
| promiseToExecuteLater(); |
| break; |
| } |
| |
| stopAndRun.getAndSet(null).run(); |
| } |
| |
| State state = OutboundConnection.this.state; |
| if (!state.isEstablished() || !state.established().isConnected()) |
| { |
| // if we have messages yet to deliver, or a task to run, we need to reconnect and try again |
| // we try to reconnect before running another stopAndRun so that we do not infinite loop in close |
| if (hasPending() || null != stopAndRun.get()) |
| { |
| promiseToExecuteLater(); |
| requestConnect().addListener(f -> executeAgain()); |
| } |
| break; |
| } |
| |
| if (!doRun(state.established())) |
| break; |
| } |
| |
| maybeExecuteAgain(); |
| } |
| |
| /** |
| * @return true if we should run again immediately; |
| * always false for eventLoop executor, as want to service other channels |
| */ |
| abstract boolean doRun(Established established); |
| |
| /** |
| * Schedule a task to run later on the delivery thread while delivery is not in progress, |
| * i.e. there are no bytes in flight to the network buffer. |
| * |
| * Does not guarantee to run promptly if there is no current connection to the remote host. |
| * May wait until a new connection is established, or a connection timeout elapses, before executing. |
| * |
| * Update the shared atomic property containing work we want to interrupt message processing to perform, |
| * the invoke schedule() to be certain it gets run. |
| */ |
| void stopAndRun(Runnable run) |
| { |
| stopAndRun.accumulateAndGet(run, OutboundConnection::andThen); |
| execute(); |
| } |
| |
| /** |
| * Schedule a task to run on the eventLoop, guaranteeing that delivery will not occur while the task is performed. |
| */ |
| abstract void stopAndRunOnEventLoop(Runnable run); |
| |
| } |
| |
| /** |
| * Delivery that runs entirely on the eventLoop |
| * |
| * Since this has single threaded access to most of its environment, it can be simple and efficient, however |
| * it must also have bounded run time, and limit its resource consumption to ensure other channels serviced by the |
| * eventLoop can also make progress. |
| * |
| * This operates on modest buffers, no larger than the {@link OutboundConnections#LARGE_MESSAGE_THRESHOLD} and |
| * filling at most one at a time before writing (potentially asynchronously) to the socket. |
| * |
| * We track the number of bytes we have in flight, ensuring no more than a user-defined maximum at any one time. |
| */ |
| class EventLoopDelivery extends Delivery |
| { |
| private int flushingBytes; |
| private boolean isWritable = true; |
| |
| EventLoopDelivery() |
| { |
| super(eventLoop); |
| } |
| |
| /** |
| * {@link Delivery#doRun} |
| * |
| * Since we are on the eventLoop, in order to ensure other channels are serviced |
| * we never return true to request another run immediately. |
| * |
| * If there is more work to be done, we submit ourselves for execution once the eventLoop has time. |
| */ |
| @SuppressWarnings("resource") |
| boolean doRun(Established established) |
| { |
| if (!isWritable) |
| return false; |
| |
| // pendingBytes is updated before queue.size() (which triggers notEmpty, and begins delivery), |
| // so it is safe to use it here to exit delivery |
| // this number is inaccurate for old versions, but we don't mind terribly - we'll send at least one message, |
| // and get round to it eventually (though we could add a fudge factor for some room for older versions) |
| int maxSendBytes = (int) min(pendingBytes() - flushingBytes, LARGE_MESSAGE_THRESHOLD); |
| if (maxSendBytes == 0) |
| return false; |
| |
| OutboundConnectionSettings settings = established.settings; |
| int messagingVersion = established.messagingVersion; |
| |
| FrameEncoder.Payload sending = null; |
| int canonicalSize = 0; // number of bytes we must use for our resource accounting |
| int sendingBytes = 0; |
| int sendingCount = 0; |
| try (OutboundMessageQueue.WithLock withLock = queue.lockOrCallback(approxTime.now(), this::execute)) |
| { |
| if (withLock == null) |
| return false; // we failed to acquire the queue lock, so return; we will be scheduled again when the lock is available |
| |
| sending = established.payloadAllocator.allocate(true, maxSendBytes); |
| DataOutputBufferFixed out = new DataOutputBufferFixed(sending.buffer); |
| |
| Message<?> next; |
| while ( null != (next = withLock.peek()) ) |
| { |
| try |
| { |
| int messageSize = next.serializedSize(messagingVersion); |
| |
| // actual message size for this version is larger than permitted maximum |
| if (messageSize > DatabaseDescriptor.getInternodeMaxMessageSizeInBytes()) |
| throw new Message.OversizedMessageException(messageSize); |
| |
| if (messageSize > sending.remaining()) |
| { |
| // if we don't have enough room to serialize the next message, we have either |
| // 1) run out of room after writing some messages successfully; this might mean that we are |
| // overflowing our highWaterMark, or that we have just filled our buffer |
| // 2) we have a message that is too large for this connection; this can happen if a message's |
| // size was calculated for the wrong messaging version when enqueued. |
| // In this case we want to write it anyway, so simply allocate a large enough buffer. |
| |
| if (sendingBytes > 0) |
| break; |
| |
| sending.release(); |
| sending = null; // set to null to prevent double-release if we fail to allocate our new buffer |
| sending = established.payloadAllocator.allocate(true, messageSize); |
| //noinspection IOResourceOpenedButNotSafelyClosed |
| out = new DataOutputBufferFixed(sending.buffer); |
| } |
| |
| Tracing.instance.traceOutgoingMessage(next, messageSize, settings.connectTo); |
| Message.serializer.serialize(next, out, messagingVersion); |
| |
| if (sending.length() != sendingBytes + messageSize) |
| throw new InvalidSerializedSizeException(next.verb(), messageSize, sending.length() - sendingBytes); |
| |
| canonicalSize += canonicalSize(next); |
| sendingCount += 1; |
| sendingBytes += messageSize; |
| } |
| catch (Throwable t) |
| { |
| onFailedSerialize(next, messagingVersion, 0, t); |
| |
| assert sending != null; |
| // reset the buffer to ignore the message we failed to serialize |
| sending.trim(sendingBytes); |
| } |
| withLock.removeHead(next); |
| } |
| if (0 == sendingBytes) |
| return false; |
| |
| sending.finish(); |
| debug.onSendSmallFrame(sendingCount, sendingBytes); |
| ChannelFuture flushResult = AsyncChannelPromise.writeAndFlush(established.channel, sending); |
| sending = null; |
| |
| if (flushResult.isSuccess()) |
| { |
| sentCount += sendingCount; |
| sentBytes += sendingBytes; |
| debug.onSentSmallFrame(sendingCount, sendingBytes); |
| } |
| else |
| { |
| flushingBytes += canonicalSize; |
| setInProgress(true); |
| |
| boolean hasOverflowed = flushingBytes >= settings.flushHighWaterMark; |
| if (hasOverflowed) |
| { |
| isWritable = false; |
| promiseToExecuteLater(); |
| } |
| |
| int releaseBytesFinal = canonicalSize; |
| int sendingBytesFinal = sendingBytes; |
| int sendingCountFinal = sendingCount; |
| flushResult.addListener(future -> { |
| |
| releaseCapacity(sendingCountFinal, releaseBytesFinal); |
| flushingBytes -= releaseBytesFinal; |
| if (flushingBytes == 0) |
| setInProgress(false); |
| |
| if (!isWritable && flushingBytes <= settings.flushLowWaterMark) |
| { |
| isWritable = true; |
| executeAgain(); |
| } |
| |
| if (future.isSuccess()) |
| { |
| sentCount += sendingCountFinal; |
| sentBytes += sendingBytesFinal; |
| debug.onSentSmallFrame(sendingCountFinal, sendingBytesFinal); |
| } |
| else |
| { |
| errorCount += sendingCountFinal; |
| errorBytes += sendingBytesFinal; |
| invalidateChannel(established, future.cause()); |
| debug.onFailedSmallFrame(sendingCountFinal, sendingBytesFinal); |
| } |
| }); |
| canonicalSize = 0; |
| } |
| } |
| catch (Throwable t) |
| { |
| errorCount += sendingCount; |
| errorBytes += sendingBytes; |
| invalidateChannel(established, t); |
| } |
| finally |
| { |
| if (canonicalSize > 0) |
| releaseCapacity(sendingCount, canonicalSize); |
| |
| if (sending != null) |
| sending.release(); |
| |
| if (pendingBytes() > flushingBytes && isWritable) |
| execute(); |
| } |
| |
| return false; |
| } |
| |
| void stopAndRunOnEventLoop(Runnable run) |
| { |
| stopAndRun(run); |
| } |
| } |
| |
| /** |
| * Delivery that coordinates between the eventLoop and another (non-dedicated) thread |
| * |
| * This is to service messages that are too large to fully serialize on the eventLoop, as they could block |
| * prompt service of other requests. Since our serializers assume blocking IO, the easiest approach is to |
| * ensure a companion thread performs blocking IO that, under the hood, is serviced by async IO on the eventLoop. |
| * |
| * Most of the work here is handed off to {@link AsyncChannelOutputPlus}, with our main job being coordinating |
| * when and what we should run. |
| * |
| * To avoid allocating a huge number of threads across a cluster, we utilise the shared methods of {@link Delivery} |
| * to ensure that only one run() is actually scheduled to run at a time - this permits us to use any {@link ExecutorService} |
| * as a backing, with the number of threads defined only by the maximum concurrency needed to deliver all large messages. |
| * We use a shared caching {@link java.util.concurrent.ThreadPoolExecutor}, and rename the Threads that service |
| * our connection on entry and exit. |
| */ |
| class LargeMessageDelivery extends Delivery |
| { |
| static final int DEFAULT_BUFFER_SIZE = 32 * 1024; |
| |
| LargeMessageDelivery(ExecutorService executor) |
| { |
| super(executor); |
| } |
| |
| /** |
| * A simple wrapper of {@link Delivery#run} to set the current Thread name for the duration of its execution. |
| */ |
| public void run() |
| { |
| String threadName, priorThreadName = null; |
| try |
| { |
| priorThreadName = Thread.currentThread().getName(); |
| threadName = "Messaging-OUT-" + template.from() + "->" + template.to + '-' + type; |
| Thread.currentThread().setName(threadName); |
| |
| super.run(); |
| } |
| finally |
| { |
| if (priorThreadName != null) |
| Thread.currentThread().setName(priorThreadName); |
| } |
| } |
| |
| @SuppressWarnings({ "resource", "RedundantSuppression" }) // make eclipse warnings go away |
| boolean doRun(Established established) |
| { |
| Message<?> send = queue.tryPoll(approxTime.now(), this::execute); |
| if (send == null) |
| return false; |
| |
| AsyncMessageOutputPlus out = null; |
| try |
| { |
| int messageSize = send.serializedSize(established.messagingVersion); |
| out = new AsyncMessageOutputPlus(established.channel, DEFAULT_BUFFER_SIZE, messageSize, established.payloadAllocator); |
| // actual message size for this version is larger than permitted maximum |
| if (messageSize > DatabaseDescriptor.getInternodeMaxMessageSizeInBytes()) |
| throw new Message.OversizedMessageException(messageSize); |
| |
| Tracing.instance.traceOutgoingMessage(send, messageSize, established.settings.connectTo); |
| Message.serializer.serialize(send, out, established.messagingVersion); |
| |
| if (out.position() != messageSize) |
| throw new InvalidSerializedSizeException(send.verb(), messageSize, out.position()); |
| |
| out.close(); |
| sentCount += 1; |
| sentBytes += messageSize; |
| releaseCapacity(1, canonicalSize(send)); |
| return hasPending(); |
| } |
| catch (Throwable t) |
| { |
| boolean tryAgain = true; |
| |
| if (out != null) |
| { |
| out.discard(); |
| if (out.flushed() > 0 || |
| isCausedBy(t, cause -> isConnectionReset(cause) |
| || cause instanceof Errors.NativeIoException |
| || cause instanceof AsyncChannelOutputPlus.FlushException)) |
| { |
| // close the channel, and wait for eventLoop to execute |
| disconnectNow(established).awaitUninterruptibly(); |
| tryAgain = false; |
| try |
| { |
| // after closing, wait until we are signalled about the in flight writes; |
| // this ensures flushedToNetwork() is correct below |
| out.waitUntilFlushed(0, 0); |
| } |
| catch (Throwable ignore) |
| { |
| // irrelevant |
| } |
| } |
| } |
| |
| onFailedSerialize(send, established.messagingVersion, out == null ? 0 : (int) out.flushedToNetwork(), t); |
| return tryAgain; |
| } |
| } |
| |
| void stopAndRunOnEventLoop(Runnable run) |
| { |
| stopAndRun(() -> { |
| try |
| { |
| runOnEventLoop(run).await(); |
| } |
| catch (InterruptedException e) |
| { |
| throw new RuntimeException(e); |
| } |
| }); |
| } |
| } |
| |
| /* |
| * Size used for capacity enforcement purposes. Using current messaging version no matter what the peer's version is. |
| */ |
| private int canonicalSize(Message<?> message) |
| { |
| return message.serializedSize(current_version); |
| } |
| |
| private void invalidateChannel(Established established, Throwable cause) |
| { |
| JVMStabilityInspector.inspectThrowable(cause); |
| |
| if (state != established) |
| return; // do nothing; channel already invalidated |
| |
| if (isCausedByConnectionReset(cause)) |
| logger.info("{} channel closed by provider", id(), cause); |
| else |
| logger.error("{} channel in potentially inconsistent state after error; closing", id(), cause); |
| |
| disconnectNow(established); |
| } |
| |
| /** |
| * Attempt to open a new channel to the remote endpoint. |
| * |
| * Most of the actual work is performed by OutboundConnectionInitiator, this method just manages |
| * our book keeping on either success or failure. |
| * |
| * This method is only to be invoked by the eventLoop, and the inner class' methods should only be evaluated by the eventtLoop |
| */ |
| Future<?> initiate() |
| { |
| class Initiate |
| { |
| /** |
| * If we fail to connect, we want to try and connect again before any messages timeout. |
| * However, we update this each time to ensure we do not retry unreasonably often, and settle on a periodicity |
| * that might lead to timeouts in some aggressive systems. |
| */ |
| long retryRateMillis = DatabaseDescriptor.getMinRpcTimeout(MILLISECONDS) / 2; |
| |
| // our connection settings, possibly updated on retry |
| int messagingVersion = template.endpointToVersion().get(template.to); |
| OutboundConnectionSettings settings; |
| |
| /** |
| * If we failed for any reason, try again |
| */ |
| void onFailure(Throwable cause) |
| { |
| if (cause instanceof ConnectException) |
| noSpamLogger.info("{} failed to connect", id(), cause); |
| else |
| noSpamLogger.error("{} failed to connect", id(), cause); |
| |
| JVMStabilityInspector.inspectThrowable(cause); |
| |
| if (hasPending()) |
| { |
| Promise<Result<MessagingSuccess>> result = new AsyncPromise<>(eventLoop); |
| state = new Connecting(state.disconnected(), result, eventLoop.schedule(() -> attempt(result), max(100, retryRateMillis), MILLISECONDS)); |
| retryRateMillis = min(1000, retryRateMillis * 2); |
| } |
| else |
| { |
| // this Initiate will be discarded |
| state = Disconnected.dormant(state.disconnected().maintenance); |
| } |
| } |
| |
| void onCompletedHandshake(Result<MessagingSuccess> result) |
| { |
| switch (result.outcome) |
| { |
| case SUCCESS: |
| // it is expected that close, if successful, has already cancelled us; so we do not need to worry about leaking connections |
| assert !state.isClosed(); |
| |
| MessagingSuccess success = result.success(); |
| debug.onConnect(success.messagingVersion, settings); |
| state.disconnected().maintenance.cancel(false); |
| |
| FrameEncoder.PayloadAllocator payloadAllocator = success.allocator; |
| Channel channel = success.channel; |
| Established established = new Established(messagingVersion, channel, payloadAllocator, settings); |
| state = established; |
| channel.pipeline().addLast("handleExceptionalStates", new ChannelInboundHandlerAdapter() { |
| @Override |
| public void channelInactive(ChannelHandlerContext ctx) |
| { |
| disconnectNow(established); |
| ctx.fireChannelInactive(); |
| } |
| |
| @Override |
| public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) |
| { |
| try |
| { |
| invalidateChannel(established, cause); |
| } |
| catch (Throwable t) |
| { |
| logger.error("Unexpected exception in {}.exceptionCaught", this.getClass().getSimpleName(), t); |
| } |
| } |
| }); |
| ++successfulConnections; |
| |
| logger.info("{} successfully connected, version = {}, framing = {}, encryption = {}", |
| id(true), |
| success.messagingVersion, |
| settings.framing, |
| encryptionConnectionSummary(channel)); |
| break; |
| |
| case RETRY: |
| if (logger.isTraceEnabled()) |
| logger.trace("{} incorrect legacy peer version predicted; reconnecting", id()); |
| |
| // the messaging version we connected with was incorrect; try again with the one supplied by the remote host |
| messagingVersion = result.retry().withMessagingVersion; |
| settings.endpointToVersion.set(settings.to, messagingVersion); |
| |
| initiate(); |
| break; |
| |
| case INCOMPATIBLE: |
| // we cannot communicate with this peer given its messaging version; mark this as any other failure, and continue trying |
| Throwable t = new IOException(String.format("Incompatible peer: %s, messaging version: %s", |
| settings.to, result.incompatible().maxMessagingVersion)); |
| t.fillInStackTrace(); |
| onFailure(t); |
| break; |
| |
| default: |
| throw new AssertionError(); |
| } |
| } |
| |
| /** |
| * Initiate all the actions required to establish a working, valid connection. This includes |
| * opening the socket, negotiating the internode messaging handshake, and setting up the working |
| * Netty {@link Channel}. However, this method will not block for all those actions: it will only |
| * kick off the connection attempt, setting the @{link #connecting} future to track its completion. |
| * |
| * Note: this should only be invoked on the event loop. |
| */ |
| private void attempt(Promise<Result<MessagingSuccess>> result) |
| { |
| ++connectionAttempts; |
| |
| /* |
| * Re-evaluate messagingVersion before re-attempting the connection in case |
| * endpointToVersion were updated. This happens if the outbound connection |
| * is made before the endpointToVersion table is initially constructed or out |
| * of date (e.g. if outbound connections are established for gossip |
| * as a result of an inbound connection) and can result in the wrong outbound |
| * port being selected if configured with enable_legacy_ssl_storage_port=true. |
| */ |
| int knownMessagingVersion = messagingVersion(); |
| if (knownMessagingVersion != messagingVersion) |
| { |
| logger.trace("Endpoint version changed from {} to {} since connection initialized, updating.", |
| messagingVersion, knownMessagingVersion); |
| messagingVersion = knownMessagingVersion; |
| } |
| |
| settings = template; |
| if (messagingVersion > settings.acceptVersions.max) |
| messagingVersion = settings.acceptVersions.max; |
| |
| // ensure we connect to the correct SSL port |
| settings = settings.withLegacyPortIfNecessary(messagingVersion); |
| |
| initiateMessaging(eventLoop, type, settings, messagingVersion, result) |
| .addListener(future -> { |
| if (future.isCancelled()) |
| return; |
| if (future.isSuccess()) //noinspection unchecked |
| onCompletedHandshake((Result<MessagingSuccess>) future.getNow()); |
| else |
| onFailure(future.cause()); |
| }); |
| } |
| |
| Future<Result<MessagingSuccess>> initiate() |
| { |
| Promise<Result<MessagingSuccess>> result = new AsyncPromise<>(eventLoop); |
| state = new Connecting(state.disconnected(), result); |
| attempt(result); |
| return result; |
| } |
| } |
| |
| return new Initiate().initiate(); |
| } |
| |
| /** |
| * Returns a future that completes when we are _maybe_ reconnected. |
| * |
| * The connection attempt is guaranteed to have completed (successfully or not) by the time any listeners are invoked, |
| * so if a reconnection attempt is needed, it is already scheduled. |
| */ |
| private Future<?> requestConnect() |
| { |
| // we may race with updates to this variable, but this is fine, since we only guarantee that we see a value |
| // that did at some point represent an active connection attempt - if it is stale, it will have been completed |
| // and the caller can retry (or utilise the successfully established connection) |
| { |
| State state = this.state; |
| if (state.isConnecting()) |
| return state.connecting().attempt; |
| } |
| |
| Promise<Object> promise = AsyncPromise.uncancellable(eventLoop); |
| runOnEventLoop(() -> { |
| if (isClosed()) // never going to connect |
| { |
| promise.tryFailure(new ClosedChannelException()); |
| } |
| else if (state.isEstablished() && state.established().isConnected()) // already connected |
| { |
| promise.trySuccess(null); |
| } |
| else |
| { |
| if (state.isEstablished()) |
| setDisconnected(); |
| |
| if (!state.isConnecting()) |
| { |
| assert eventLoop.inEventLoop(); |
| assert !isConnected(); |
| initiate().addListener(new PromiseNotifier<>(promise)); |
| } |
| else |
| { |
| state.connecting().attempt.addListener(new PromiseNotifier<>(promise)); |
| } |
| } |
| }); |
| return promise; |
| } |
| |
| /** |
| * Change the IP address on which we connect to the peer. We will attempt to connect to the new address if there |
| * was a previous connection, and new incoming messages as well as existing {@link #queue} messages will be sent there. |
| * Any outstanding messages in the existing channel will still be sent to the previous address (we won't/can't move them from |
| * one channel to another). |
| * |
| * Returns null if the connection is closed. |
| */ |
| Future<Void> reconnectWith(OutboundConnectionSettings reconnectWith) |
| { |
| OutboundConnectionSettings newTemplate = reconnectWith.withDefaults(ConnectionCategory.MESSAGING); |
| if (newTemplate.socketFactory != template.socketFactory) throw new IllegalArgumentException(); |
| if (newTemplate.callbacks != template.callbacks) throw new IllegalArgumentException(); |
| if (!Objects.equals(newTemplate.applicationSendQueueCapacityInBytes, template.applicationSendQueueCapacityInBytes)) throw new IllegalArgumentException(); |
| if (!Objects.equals(newTemplate.applicationSendQueueReserveEndpointCapacityInBytes, template.applicationSendQueueReserveEndpointCapacityInBytes)) throw new IllegalArgumentException(); |
| if (newTemplate.applicationSendQueueReserveGlobalCapacityInBytes != template.applicationSendQueueReserveGlobalCapacityInBytes) throw new IllegalArgumentException(); |
| |
| logger.info("{} updating connection settings", id()); |
| |
| Promise<Void> done = AsyncPromise.uncancellable(eventLoop); |
| delivery.stopAndRunOnEventLoop(() -> { |
| template = newTemplate; |
| // delivery will immediately continue after this, triggering a reconnect if necessary; |
| // this might mean a slight delay for large message delivery, as the connect will be scheduled |
| // asynchronously, so we must wait for a second turn on the eventLoop |
| if (state.isEstablished()) |
| { |
| disconnectNow(state.established()); |
| } |
| else if (state.isConnecting()) |
| { |
| // cancel any in-flight connection attempt and restart with new template |
| state.connecting().cancel(); |
| initiate(); |
| } |
| done.setSuccess(null); |
| }); |
| return done; |
| } |
| |
| /** |
| * Close any currently open connection, forcing a reconnect if there are messages outstanding |
| * (or leaving it closed for now otherwise) |
| */ |
| public boolean interrupt() |
| { |
| State state = this.state; |
| if (!state.isEstablished()) |
| return false; |
| |
| disconnectGracefully(state.established()); |
| return true; |
| } |
| |
| /** |
| * Schedule a safe close of the provided channel, if it has not already been closed. |
| * |
| * This means ensuring that delivery has stopped so that we do not corrupt or interrupt any |
| * in progress transmissions. |
| * |
| * The actual closing of the channel is performed asynchronously, to simplify our internal state management |
| * and promptly get the connection going again; the close is considered to have succeeded as soon as we |
| * have set our internal state. |
| */ |
| private void disconnectGracefully(Established closeIfIs) |
| { |
| // delivery will immediately continue after this, triggering a reconnect if necessary; |
| // this might mean a slight delay for large message delivery, as the connect will be scheduled |
| // asynchronously, so we must wait for a second turn on the eventLoop |
| delivery.stopAndRunOnEventLoop(() -> disconnectNow(closeIfIs)); |
| } |
| |
| /** |
| * The channel is already known to be invalid, so there's no point waiting for a clean break in delivery. |
| * |
| * Delivery will be executed again as soon as we have logically closed the channel; we do not wait |
| * for the channel to actually be closed. |
| * |
| * The Future returned _does_ wait for the channel to be completely closed, so that callers can wait to be sure |
| * all writes have been completed either successfully or not. |
| */ |
| private Future<?> disconnectNow(Established closeIfIs) |
| { |
| return runOnEventLoop(() -> { |
| if (state == closeIfIs) |
| { |
| // no need to wait until the channel is closed to set ourselves as disconnected (and potentially open a new channel) |
| setDisconnected(); |
| if (hasPending()) |
| delivery.execute(); |
| closeIfIs.channel.close() |
| .addListener(future -> { |
| if (!future.isSuccess()) |
| logger.info("Problem closing channel {}", closeIfIs, future.cause()); |
| }); |
| } |
| }); |
| } |
| |
| /** |
| * Schedules regular cleaning of the connection's state while it is disconnected from its remote endpoint. |
| * |
| * To be run only by the eventLoop or in the constructor |
| */ |
| private void setDisconnected() |
| { |
| assert state == null || state.isEstablished(); |
| state = Disconnected.dormant(eventLoop.scheduleAtFixedRate(queue::maybePruneExpired, 100L, 100L, TimeUnit.MILLISECONDS)); |
| } |
| |
| /** |
| * Schedule this connection to be permanently closed; only one close may be scheduled, |
| * any future scheduled closes are referred to the original triggering one (which may have a different schedule) |
| */ |
| Future<Void> scheduleClose(long time, TimeUnit unit, boolean flushQueue) |
| { |
| Promise<Void> scheduledClose = AsyncPromise.uncancellable(eventLoop); |
| if (!scheduledCloseUpdater.compareAndSet(this, null, scheduledClose)) |
| return this.scheduledClose; |
| |
| eventLoop.schedule(() -> close(flushQueue).addListener(new PromiseNotifier<>(scheduledClose)), time, unit); |
| return scheduledClose; |
| } |
| |
| /** |
| * Permanently close this connection. |
| * |
| * Immediately prevent any new messages from being enqueued - these will throw ClosedChannelException. |
| * The close itself happens asynchronously on the eventLoop, so a Future is returned to help callers |
| * wait for its completion. |
| * |
| * The flushQueue parameter indicates if any outstanding messages should be delivered before closing the connection. |
| * |
| * - If false, any already flushed or in-progress messages are completed, and the remaining messages are cleared |
| * before the connection is promptly torn down. |
| * |
| * - If true, we attempt delivery of all queued messages. If necessary, we will continue to open new connections |
| * to the remote host until they have been delivered. Only if we continue to fail to open a connection for |
| * an extended period of time will we drop any outstanding messages and close the connection. |
| */ |
| public Future<Void> close(boolean flushQueue) |
| { |
| // ensure only one close attempt can be in flight |
| Promise<Void> closing = AsyncPromise.uncancellable(eventLoop); |
| if (!closingUpdater.compareAndSet(this, null, closing)) |
| return this.closing; |
| |
| /* |
| * Now define a cleanup closure, that will be deferred until it is safe to do so. |
| * Once run it: |
| * - immediately _logically_ closes the channel by updating this object's fields, but defers actually closing |
| * - cancels any in-flight connection attempts |
| * - cancels any maintenance work that might be scheduled |
| * - clears any waiting messages on the queue |
| * - terminates the delivery thread |
| * - finally, schedules any open channel's closure, and propagates its completion to the close promise |
| */ |
| Runnable eventLoopCleanup = () -> { |
| Runnable onceNotConnecting = () -> { |
| // start by setting ourselves to definitionally closed |
| State state = this.state; |
| this.state = State.CLOSED; |
| |
| try |
| { |
| // note that we never clear the queue, to ensure that an enqueue has the opportunity to remove itself |
| // if it raced with close, to potentially requeue the message on a replacement connection |
| |
| // we terminate delivery here, to ensure that any listener to {@link connecting} do not schedule more work |
| delivery.terminate(); |
| |
| // stop periodic cleanup |
| if (state.isDisconnected()) |
| { |
| state.disconnected().maintenance.cancel(true); |
| closing.setSuccess(null); |
| } |
| else |
| { |
| assert state.isEstablished(); |
| state.established().channel.close() |
| .addListener(new PromiseNotifier<>(closing)); |
| } |
| } |
| catch (Throwable t) |
| { |
| // in case of unexpected exception, signal completion and try to close the channel |
| closing.trySuccess(null); |
| try |
| { |
| if (state.isEstablished()) |
| state.established().channel.close(); |
| } |
| catch (Throwable t2) |
| { |
| t.addSuppressed(t2); |
| logger.error("Failed to close connection cleanly:", t); |
| } |
| throw t; |
| } |
| }; |
| |
| if (state.isConnecting()) |
| { |
| // stop any in-flight connection attempts; these should be running on the eventLoop, so we should |
| // be able to cleanly cancel them, but executing on a listener guarantees correct semantics either way |
| Connecting connecting = state.connecting(); |
| connecting.cancel(); |
| connecting.attempt.addListener(future -> onceNotConnecting.run()); |
| } |
| else |
| { |
| onceNotConnecting.run(); |
| } |
| }; |
| |
| /* |
| * If we want to shutdown gracefully, flushing any outstanding messages, we have to do it very carefully. |
| * Things to note: |
| * |
| * - It is possible flushing messages will require establishing a new connection |
| * (However, if a connection cannot be established, we do not want to keep trying) |
| * - We have to negotiate with a separate thread, so we must be sure it is not in-progress before we stop (like channel close) |
| * - Cleanup must still happen on the eventLoop |
| * |
| * To achieve all of this, we schedule a recurring operation on the delivery thread, executing while delivery |
| * is between messages, that checks if the queue is empty; if it is, it schedules cleanup on the eventLoop. |
| */ |
| |
| Runnable clearQueue = () -> |
| { |
| CountDownLatch done = new CountDownLatch(1); |
| queue.runEventually(withLock -> { |
| withLock.consume(this::onClosed); |
| done.countDown(); |
| }); |
| //noinspection UnstableApiUsage |
| Uninterruptibles.awaitUninterruptibly(done); |
| }; |
| |
| if (flushQueue) |
| { |
| // just keep scheduling on delivery executor a check to see if we're done; there should always be one |
| // delivery attempt between each invocation, unless there is a wider problem with delivery scheduling |
| class FinishDelivery implements Runnable |
| { |
| public void run() |
| { |
| if (!hasPending()) |
| delivery.stopAndRunOnEventLoop(eventLoopCleanup); |
| else |
| delivery.stopAndRun(() -> { |
| if (state.isConnecting() && state.connecting().isFailingToConnect) |
| clearQueue.run(); |
| run(); |
| }); |
| } |
| } |
| |
| delivery.stopAndRun(new FinishDelivery()); |
| } |
| else |
| { |
| delivery.stopAndRunOnEventLoop(() -> { |
| clearQueue.run(); |
| eventLoopCleanup.run(); |
| }); |
| } |
| |
| return closing; |
| } |
| |
| /** |
| * Run the task immediately if we are the eventLoop, otherwise queue it for execution on the eventLoop. |
| */ |
| private Future<?> runOnEventLoop(Runnable runnable) |
| { |
| if (!eventLoop.inEventLoop()) |
| return eventLoop.submit(runnable); |
| |
| runnable.run(); |
| return new SucceededFuture<>(eventLoop, null); |
| } |
| |
| public boolean isConnected() |
| { |
| State state = this.state; |
| return state.isEstablished() && state.established().isConnected(); |
| } |
| |
| boolean isClosing() |
| { |
| return closing != null; |
| } |
| |
| boolean isClosed() |
| { |
| return state.isClosed(); |
| } |
| |
| private String id(boolean includeReal) |
| { |
| State state = this.state; |
| if (!includeReal || !state.isEstablished()) |
| return id(); |
| Established established = state.established(); |
| Channel channel = established.channel; |
| OutboundConnectionSettings settings = established.settings; |
| return SocketFactory.channelId(settings.from, (InetSocketAddress) channel.localAddress(), |
| settings.to, (InetSocketAddress) channel.remoteAddress(), |
| type, channel.id().asShortText()); |
| } |
| |
| private String id() |
| { |
| State state = this.state; |
| Channel channel = null; |
| OutboundConnectionSettings settings = template; |
| if (state.isEstablished()) |
| { |
| channel = state.established().channel; |
| settings = state.established().settings; |
| } |
| String channelId = channel != null ? channel.id().asShortText() : "[no-channel]"; |
| return SocketFactory.channelId(settings.from(), settings.to, type, channelId); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return id(); |
| } |
| |
| public boolean hasPending() |
| { |
| return 0 != pendingCountAndBytes; |
| } |
| |
| public int pendingCount() |
| { |
| return pendingCount(pendingCountAndBytes); |
| } |
| |
| public long pendingBytes() |
| { |
| return pendingBytes(pendingCountAndBytes); |
| } |
| |
| public long sentCount() |
| { |
| // not volatile, but shouldn't matter |
| return sentCount; |
| } |
| |
| public long sentBytes() |
| { |
| // not volatile, but shouldn't matter |
| return sentBytes; |
| } |
| |
| public long submittedCount() |
| { |
| // not volatile, but shouldn't matter |
| return submittedCount; |
| } |
| |
| public long dropped() |
| { |
| return overloadedCount + expiredCount; |
| } |
| |
| public long overloadedBytes() |
| { |
| return overloadedBytes; |
| } |
| |
| public long overloadedCount() |
| { |
| return overloadedCount; |
| } |
| |
| public long expiredCount() |
| { |
| return expiredCount; |
| } |
| |
| public long expiredBytes() |
| { |
| return expiredBytes; |
| } |
| |
| public long errorCount() |
| { |
| return errorCount; |
| } |
| |
| public long errorBytes() |
| { |
| return errorBytes; |
| } |
| |
| public long successfulConnections() |
| { |
| return successfulConnections; |
| } |
| |
| public long connectionAttempts() |
| { |
| return connectionAttempts; |
| } |
| |
| private static Runnable andThen(Runnable a, Runnable b) |
| { |
| if (a == null || b == null) |
| return a == null ? b : a; |
| return () -> { a.run(); b.run(); }; |
| } |
| |
| @VisibleForTesting |
| public ConnectionType type() |
| { |
| return type; |
| } |
| |
| @VisibleForTesting |
| OutboundConnectionSettings settings() |
| { |
| State state = this.state; |
| return state.isEstablished() ? state.established().settings : template; |
| } |
| |
| @VisibleForTesting |
| int messagingVersion() |
| { |
| State state = this.state; |
| return state.isEstablished() ? state.established().messagingVersion |
| : template.endpointToVersion().get(template.to); |
| } |
| |
| @VisibleForTesting |
| void unsafeRunOnDelivery(Runnable run) |
| { |
| delivery.stopAndRun(run); |
| } |
| |
| @VisibleForTesting |
| Channel unsafeGetChannel() |
| { |
| State state = this.state; |
| return state.isEstablished() ? state.established().channel : null; |
| } |
| |
| @VisibleForTesting |
| boolean unsafeAcquireCapacity(long amount) |
| { |
| return SUCCESS == acquireCapacity(amount); |
| } |
| |
| @VisibleForTesting |
| boolean unsafeAcquireCapacity(long count, long amount) |
| { |
| return SUCCESS == acquireCapacity(count, amount); |
| } |
| |
| @VisibleForTesting |
| void unsafeReleaseCapacity(long amount) |
| { |
| releaseCapacity(1, amount); |
| } |
| |
| @VisibleForTesting |
| void unsafeReleaseCapacity(long count, long amount) |
| { |
| releaseCapacity(count, amount); |
| } |
| |
| @VisibleForTesting |
| Limit unsafeGetEndpointReserveLimits() |
| { |
| return reserveCapacityInBytes.endpoint; |
| } |
| } |