| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.cassandra.net; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.IdentityHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
| import java.util.concurrent.atomic.AtomicLongFieldUpdater; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelHandlerContext; |
| import io.netty.channel.ChannelInboundHandlerAdapter; |
| import io.netty.channel.EventLoop; |
| import org.apache.cassandra.metrics.ClientMetrics; |
| import org.apache.cassandra.net.FrameDecoder.CorruptFrame; |
| import org.apache.cassandra.net.FrameDecoder.Frame; |
| import org.apache.cassandra.net.FrameDecoder.FrameProcessor; |
| import org.apache.cassandra.net.FrameDecoder.IntactFrame; |
| import org.apache.cassandra.net.Message.Header; |
| import org.apache.cassandra.net.ResourceLimits.Limit; |
| |
| import static java.lang.Math.max; |
| import static java.lang.Math.min; |
| import static org.apache.cassandra.net.Crc.InvalidCrc; |
| import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime; |
| |
| /** |
| * Core logic for handling inbound message deserialization and execution (in tandem with {@link FrameDecoder}). |
| * |
| * Handles small and large messages, corruption, flow control, dispatch of message processing to a suitable |
| * consumer. |
| * |
| * # Interaction with {@link FrameDecoder} |
| * |
| * An {@link AbstractMessageHandler} implementation sits on top of a {@link FrameDecoder} in the Netty pipeline, |
| * and is tightly coupled with it. |
| * |
| * {@link FrameDecoder} decodes inbound frames and relies on a supplied {@link FrameProcessor} to act on them. |
| * {@link AbstractMessageHandler} provides two implementations of that interface: |
| * - {@link #process(Frame)} is the default, primary processor, and is expected to be implemented by subclasses |
| * - {@link UpToOneMessageFrameProcessor}, supplied to the decoder when the handler is reactivated after being |
| * put in waiting mode due to lack of acquirable reserve memory capacity permits |
| * |
| * Return value of {@link FrameProcessor#process(Frame)} determines whether the decoder should keep processing |
| * frames (if {@code true} is returned) or stop until explicitly reactivated (if {@code false} is). To reactivate |
| * the decoder (once notified of available resource permits), {@link FrameDecoder#reactivate()} is invoked. |
| * |
| * # Frames |
| * |
| * {@link AbstractMessageHandler} operates on frames of messages, and there are several kinds of them: |
| * 1. {@link IntactFrame} that are contained. As names suggest, these contain one or multiple fully contained |
| * messages believed to be uncorrupted. Guaranteed to not contain an part of an incomplete message. |
| * See {@link #processFrameOfContainedMessages(ShareableBytes, Limit, Limit)}. |
| * 2. {@link IntactFrame} that are NOT contained. These are uncorrupted parts of a large message split over multiple |
| * parts due to their size. Can represent first or subsequent frame of a large message. |
| * See {@link #processFirstFrameOfLargeMessage(IntactFrame, Limit, Limit)} and |
| * {@link #processSubsequentFrameOfLargeMessage(Frame)}. |
| * 3. {@link CorruptFrame} with corrupt header. These are unrecoverable, and force a connection to be dropped. |
| * 4. {@link CorruptFrame} with a valid header, but corrupt payload. These can be either contained or uncontained. |
| * - contained frames with corrupt payload can be gracefully dropped without dropping the connection |
| * - uncontained frames with corrupt payload can be gracefully dropped unless they represent the first |
| * frame of a new large message, as in that case we don't know how many bytes to skip |
| * See {@link #processCorruptFrame(CorruptFrame)}. |
| * |
| * Fundamental frame invariants: |
| * 1. A contained frame can only have fully-encapsulated messages - 1 to n, that don't cross frame boundaries |
| * 2. An uncontained frame can hold a part of one message only. It can NOT, say, contain end of one large message |
| * and a beginning of another one. All the bytes in an uncontained frame always belong to a single message. |
| * |
| * # Small vs large messages |
| * |
| * A single handler is equipped to process both small and large messages, potentially interleaved, but the logic |
| * differs depending on size. Small messages are deserialized in place, and then handed off to an appropriate |
| * thread pool for processing. Large messages accumulate frames until completion of a message, then hand off |
| * the untouched frames to the correct thread pool for the verb to be deserialized there and immediately processed. |
| * |
| * See {@link LargeMessage} and subclasses for concrete {@link AbstractMessageHandler} implementations for details |
| * of the large-message accumulating state-machine, and {@link InboundMessageHandler.ProcessMessage} and its inheritors |
| * for the differences in execution. |
| * |
| * # Flow control (backpressure) |
| * |
| * To prevent message producers from overwhelming and bringing nodes down with more inbound messages that |
| * can be processed in a timely manner, {@link AbstractMessageHandler} provides support for implementations to |
| * provide their own flow control policy. |
| * |
| * Before we attempt to process a message fully, we first infer its size from the stream. This inference is |
| * delegated to implementations as the encoding of the message size is protocol specific. Having assertained |
| * the size of the incoming message, we then attempt to acquire the corresponding number of memory permits. |
| * If we succeed, then we move on actually process the message. If we fail, the frame decoder deactivates |
| * until sufficient permits are released for the message to be processed and the handler is activated again. |
| * Permits are released back once the message has been fully processed - the definition of which is again |
| * delegated to the concrete implementations. |
| * |
| * Every connection has an exclusive number of permits allocated to it. In addition to it, there is a per-endpoint |
| * reserve capacity and a global reserve capacity {@link Limit}, shared between all connections from the same host |
| * and all connections, respectively. So long as long as the handler stays within its exclusive limit, it doesn't |
| * need to tap into reserve capacity. |
| * |
| * If tapping into reserve capacity is necessary, but the handler fails to acquire capacity from either |
| * endpoint of global reserve (and it needs to acquire from both), the handler and its frame decoder become |
| * inactive and register with a {@link WaitQueue} of the appropriate type, depending on which of the reserves |
| * couldn't be tapped into. Once enough messages have finished processing and had their permits released back |
| * to the reserves, {@link WaitQueue} will reactivate the sleeping handlers and they'll resume processing frames. |
| * |
| * The reason we 'split' reserve capacity into two limits - endpoing and global - is to guarantee liveness, and |
| * prevent single endpoint's connections from taking over the whole reserve, starving other connections. |
| * |
| * One permit per byte of serialized message gets acquired. When inflated on-heap, each message will occupy more |
| * than that, necessarily, but despite wide variance, it's a good enough proxy that correlates with on-heap footprint. |
| */ |
| public abstract class AbstractMessageHandler extends ChannelInboundHandlerAdapter implements FrameProcessor |
| { |
| private static final Logger logger = LoggerFactory.getLogger(AbstractMessageHandler.class); |
| |
| protected final FrameDecoder decoder; |
| |
| protected final Channel channel; |
| |
| protected final int largeThreshold; |
| protected LargeMessage<?> largeMessage; |
| |
| protected final long queueCapacity; |
| volatile long queueSize = 0L; |
| private static final AtomicLongFieldUpdater<AbstractMessageHandler> queueSizeUpdater = |
| AtomicLongFieldUpdater.newUpdater(AbstractMessageHandler.class, "queueSize"); |
| |
| protected final Limit endpointReserveCapacity; |
| protected final WaitQueue endpointWaitQueue; |
| |
| protected final Limit globalReserveCapacity; |
| protected final WaitQueue globalWaitQueue; |
| |
| protected final OnHandlerClosed onClosed; |
| |
| // wait queue handle, non-null if we overrun endpoint or global capacity and request to be resumed once it's released |
| private WaitQueue.Ticket ticket = null; |
| |
| protected long corruptFramesRecovered, corruptFramesUnrecovered; |
| protected long receivedCount, receivedBytes; |
| protected long throttledCount, throttledNanos; |
| |
| private boolean isClosed; |
| |
| public AbstractMessageHandler(FrameDecoder decoder, |
| |
| Channel channel, |
| int largeThreshold, |
| |
| long queueCapacity, |
| Limit endpointReserveCapacity, |
| Limit globalReserveCapacity, |
| WaitQueue endpointWaitQueue, |
| WaitQueue globalWaitQueue, |
| |
| OnHandlerClosed onClosed) |
| { |
| this.decoder = decoder; |
| |
| this.channel = channel; |
| this.largeThreshold = largeThreshold; |
| |
| this.queueCapacity = queueCapacity; |
| this.endpointReserveCapacity = endpointReserveCapacity; |
| this.endpointWaitQueue = endpointWaitQueue; |
| this.globalReserveCapacity = globalReserveCapacity; |
| this.globalWaitQueue = globalWaitQueue; |
| |
| this.onClosed = onClosed; |
| } |
| |
| @Override |
| public void channelRead(ChannelHandlerContext ctx, Object msg) |
| { |
| /* |
| * InboundMessageHandler works in tandem with FrameDecoder to implement flow control |
| * and work stashing optimally. We rely on FrameDecoder to invoke the provided |
| * FrameProcessor rather than on the pipeline and invocations of channelRead(). |
| * process(Frame) is the primary entry point for this class. |
| */ |
| throw new IllegalStateException("InboundMessageHandler doesn't expect channelRead() to be invoked"); |
| } |
| |
| @Override |
| public void handlerAdded(ChannelHandlerContext ctx) |
| { |
| decoder.activate(this); // the frame decoder starts inactive until explicitly activated by the added inbound message handler |
| } |
| |
| @Override |
| public boolean process(Frame frame) throws IOException |
| { |
| if (frame instanceof IntactFrame) |
| return processIntactFrame((IntactFrame) frame, endpointReserveCapacity, globalReserveCapacity); |
| |
| processCorruptFrame((CorruptFrame) frame); |
| return true; |
| } |
| |
| private boolean processIntactFrame(IntactFrame frame, Limit endpointReserve, Limit globalReserve) throws IOException |
| { |
| if (frame.isSelfContained) |
| return processFrameOfContainedMessages(frame.contents, endpointReserve, globalReserve); |
| else if (null == largeMessage) |
| return processFirstFrameOfLargeMessage(frame, endpointReserve, globalReserve); |
| else |
| return processSubsequentFrameOfLargeMessage(frame); |
| } |
| |
| /* |
| * Handle contained messages (not crossing boundaries of the frame) - both small and large, for the inbound |
| * definition of large (breaching the size threshold for what we are willing to process on event-loop vs. |
| * off event-loop). |
| */ |
| private boolean processFrameOfContainedMessages(ShareableBytes bytes, Limit endpointReserve, Limit globalReserve) throws IOException |
| { |
| while (bytes.hasRemaining()) |
| if (!processOneContainedMessage(bytes, endpointReserve, globalReserve)) |
| return false; |
| return true; |
| } |
| |
| protected abstract boolean processOneContainedMessage(ShareableBytes bytes, Limit endpointReserve, Limit globalReserve) throws IOException; |
| |
| |
| /* |
| * Handling of multi-frame large messages |
| */ |
| |
| protected abstract boolean processFirstFrameOfLargeMessage(IntactFrame frame, Limit endpointReserve, Limit globalReserve) throws IOException; |
| |
| protected boolean processSubsequentFrameOfLargeMessage(Frame frame) |
| { |
| receivedBytes += frame.frameSize; |
| if (largeMessage.supply(frame)) |
| { |
| receivedCount++; |
| largeMessage = null; |
| } |
| return true; |
| } |
| |
| /* |
| * We can handle some corrupt frames gracefully without dropping the connection and losing all the |
| * queued up messages, but not others. |
| * |
| * Corrupt frames that *ARE NOT* safe to skip gracefully and require the connection to be dropped: |
| * - any frame with corrupt header (!frame.isRecoverable()) |
| * - first corrupt-payload frame of a large message (impossible to infer message size, and without it |
| * impossible to skip the message safely |
| * |
| * Corrupt frames that *ARE* safe to skip gracefully, without reconnecting: |
| * - any self-contained frame with a corrupt payload (but not header): we lose all the messages in the |
| * frame, but that has no effect on subsequent ones |
| * - any non-first payload-corrupt frame of a large message: we know the size of the large message in |
| * flight, so we just skip frames until we've seen all its bytes; we only lose the large message |
| */ |
| protected abstract void processCorruptFrame(CorruptFrame frame) throws InvalidCrc; |
| |
| private void onEndpointReserveCapacityRegained(Limit endpointReserve, long elapsedNanos) |
| { |
| onReserveCapacityRegained(endpointReserve, globalReserveCapacity, elapsedNanos); |
| } |
| |
| private void onGlobalReserveCapacityRegained(Limit globalReserve, long elapsedNanos) |
| { |
| onReserveCapacityRegained(endpointReserveCapacity, globalReserve, elapsedNanos); |
| } |
| |
| private void onReserveCapacityRegained(Limit endpointReserve, Limit globalReserve, long elapsedNanos) |
| { |
| if (isClosed) |
| return; |
| |
| assert channel.eventLoop().inEventLoop(); |
| |
| ticket = null; |
| throttledNanos += elapsedNanos; |
| |
| try |
| { |
| /* |
| * Process up to one message using supplied overridden reserves - one of them pre-allocated, |
| * and guaranteed to be enough for one message - then, if no obstacles encountered, reactivate |
| * the frame decoder using normal reserve capacities. |
| */ |
| if (processUpToOneMessage(endpointReserve, globalReserve)) |
| { |
| decoder.reactivate(); |
| |
| if (decoder.isActive()) |
| ClientMetrics.instance.unpauseConnection(); |
| } |
| } |
| catch (Throwable t) |
| { |
| fatalExceptionCaught(t); |
| } |
| } |
| |
| protected abstract void fatalExceptionCaught(Throwable t); |
| |
| // return true if the handler should be reactivated - if no new hurdles were encountered, |
| // like running out of the other kind of reserve capacity |
| protected boolean processUpToOneMessage(Limit endpointReserve, Limit globalReserve) throws IOException |
| { |
| UpToOneMessageFrameProcessor processor = new UpToOneMessageFrameProcessor(endpointReserve, globalReserve); |
| decoder.processBacklog(processor); |
| return processor.isActive; |
| } |
| |
| /* |
| * Process at most one message. Won't always be an entire one (if the message in the head of line |
| * is a large one, and there aren't sufficient frames to decode it entirely), but will never be more than one. |
| */ |
| private class UpToOneMessageFrameProcessor implements FrameProcessor |
| { |
| private final Limit endpointReserve; |
| private final Limit globalReserve; |
| |
| boolean isActive = true; |
| boolean firstFrame = true; |
| |
| private UpToOneMessageFrameProcessor(Limit endpointReserve, Limit globalReserve) |
| { |
| this.endpointReserve = endpointReserve; |
| this.globalReserve = globalReserve; |
| } |
| |
| @Override |
| public boolean process(Frame frame) throws IOException |
| { |
| if (firstFrame) |
| { |
| if (!(frame instanceof IntactFrame)) |
| throw new IllegalStateException("First backlog frame must be intact"); |
| firstFrame = false; |
| return processFirstFrame((IntactFrame) frame); |
| } |
| |
| return processSubsequentFrame(frame); |
| } |
| |
| private boolean processFirstFrame(IntactFrame frame) throws IOException |
| { |
| if (frame.isSelfContained) |
| { |
| isActive = processOneContainedMessage(frame.contents, endpointReserve, globalReserve); |
| return false; // stop after one message |
| } |
| else |
| { |
| isActive = processFirstFrameOfLargeMessage(frame, endpointReserve, globalReserve); |
| return isActive; // continue unless fallen behind coprocessor or ran out of reserve capacity again |
| } |
| } |
| |
| private boolean processSubsequentFrame(Frame frame) throws IOException |
| { |
| if (frame instanceof IntactFrame) |
| processSubsequentFrameOfLargeMessage(frame); |
| else |
| processCorruptFrame((CorruptFrame) frame); |
| |
| return largeMessage != null; // continue until done with the large message |
| } |
| } |
| |
| /** |
| * Try to acquire permits for the inbound message. In case of failure, register with the right wait queue to be |
| * reactivated once permit capacity is regained. |
| */ |
| @SuppressWarnings("BooleanMethodIsAlwaysInverted") |
| protected boolean acquireCapacity(Limit endpointReserve, Limit globalReserve, int bytes, long currentTimeNanos, long expiresAtNanos) |
| { |
| ResourceLimits.Outcome outcome = acquireCapacity(endpointReserve, globalReserve, bytes); |
| |
| if (outcome == ResourceLimits.Outcome.INSUFFICIENT_ENDPOINT) |
| ticket = endpointWaitQueue.register(this, bytes, currentTimeNanos, expiresAtNanos); |
| else if (outcome == ResourceLimits.Outcome.INSUFFICIENT_GLOBAL) |
| ticket = globalWaitQueue.register(this, bytes, currentTimeNanos, expiresAtNanos); |
| |
| if (outcome != ResourceLimits.Outcome.SUCCESS) |
| throttledCount++; |
| |
| return outcome == ResourceLimits.Outcome.SUCCESS; |
| } |
| |
| protected ResourceLimits.Outcome acquireCapacity(Limit endpointReserve, Limit globalReserve, int bytes) |
| { |
| long currentQueueSize = queueSize; |
| |
| /* |
| * acquireCapacity() is only ever called on the event loop, and as such queueSize is only ever increased |
| * on the event loop. If there is enough capacity, we can safely addAndGet() and immediately return. |
| */ |
| if (currentQueueSize + bytes <= queueCapacity) |
| { |
| queueSizeUpdater.addAndGet(this, bytes); |
| return ResourceLimits.Outcome.SUCCESS; |
| } |
| |
| // we know we don't have enough local queue capacity for the entire message, so we need to borrow some from reserve capacity |
| long allocatedExcess = min(currentQueueSize + bytes - queueCapacity, bytes); |
| |
| if (!globalReserve.tryAllocate(allocatedExcess)) |
| return ResourceLimits.Outcome.INSUFFICIENT_GLOBAL; |
| |
| if (!endpointReserve.tryAllocate(allocatedExcess)) |
| { |
| globalReserve.release(allocatedExcess); |
| globalWaitQueue.signal(); |
| return ResourceLimits.Outcome.INSUFFICIENT_ENDPOINT; |
| } |
| |
| long newQueueSize = queueSizeUpdater.addAndGet(this, bytes); |
| long actualExcess = max(0, min(newQueueSize - queueCapacity, bytes)); |
| |
| /* |
| * It's possible that some permits were released at some point after we loaded current queueSize, |
| * and we can satisfy more of the permits using our exclusive per-connection capacity, needing |
| * less than previously estimated from the reserves. If that's the case, release the now unneeded |
| * permit excess back to endpoint/global reserves. |
| */ |
| if (actualExcess != allocatedExcess) // actualExcess < allocatedExcess |
| { |
| long excess = allocatedExcess - actualExcess; |
| |
| endpointReserve.release(excess); |
| globalReserve.release(excess); |
| |
| endpointWaitQueue.signal(); |
| globalWaitQueue.signal(); |
| } |
| |
| return ResourceLimits.Outcome.SUCCESS; |
| } |
| |
| public void releaseCapacity(int bytes) |
| { |
| long oldQueueSize = queueSizeUpdater.getAndAdd(this, -bytes); |
| if (oldQueueSize > queueCapacity) |
| { |
| long excess = min(oldQueueSize - queueCapacity, bytes); |
| |
| endpointReserveCapacity.release(excess); |
| globalReserveCapacity.release(excess); |
| |
| endpointWaitQueue.signal(); |
| globalWaitQueue.signal(); |
| } |
| } |
| |
| /** |
| * Invoked to release capacity for a message that has been fully, successfully processed. |
| * |
| * Normally no different from invoking {@link #releaseCapacity(int)}, but is necessary for the verifier |
| * to be able to delay capacity release for backpressure testing. |
| */ |
| @VisibleForTesting |
| protected void releaseProcessedCapacity(int size, Header header) |
| { |
| releaseCapacity(size); |
| } |
| |
| |
| @Override |
| public void channelInactive(ChannelHandlerContext ctx) |
| { |
| isClosed = true; |
| |
| if (null != largeMessage) |
| largeMessage.abort(); |
| |
| if (null != ticket) |
| ticket.invalidate(); |
| |
| onClosed.call(this); |
| } |
| |
| private EventLoop eventLoop() |
| { |
| return channel.eventLoop(); |
| } |
| |
| protected abstract String id(); |
| |
| /* |
| * A large-message frame-accumulating state machine. |
| * |
| * Collects intact frames until it's has all the bytes necessary to deserialize the large message, |
| * at which point it schedules a task on the appropriate {@link Stage}, |
| * a task that deserializes the message and immediately invokes the verb handler. |
| * |
| * Also handles corrupt frames and potential expiry of the large message during accumulation: |
| * if it's taking the frames too long to arrive, there is no point in holding on to the |
| * accumulated frames, or in gathering more - so we release the ones we already have, and |
| * skip any remaining ones, alongside with returning memory permits early. |
| */ |
| protected abstract class LargeMessage<H> |
| { |
| protected final int size; |
| protected final H header; |
| |
| protected final List<ShareableBytes> buffers = new ArrayList<>(); |
| protected int received; |
| |
| protected final long expiresAtNanos; |
| |
| protected boolean isExpired; |
| protected boolean isCorrupt; |
| |
| protected LargeMessage(int size, H header, long expiresAtNanos, boolean isExpired) |
| { |
| this.size = size; |
| this.header = header; |
| this.expiresAtNanos = expiresAtNanos; |
| this.isExpired = isExpired; |
| } |
| |
| protected LargeMessage(int size, H header, long expiresAtNanos, ShareableBytes bytes) |
| { |
| this(size, header, expiresAtNanos, false); |
| buffers.add(bytes); |
| } |
| |
| /** |
| * Return true if this was the last frame of the large message. |
| */ |
| public boolean supply(Frame frame) |
| { |
| if (frame instanceof IntactFrame) |
| onIntactFrame((IntactFrame) frame); |
| else |
| onCorruptFrame(); |
| |
| received += frame.frameSize; |
| if (size == received) |
| onComplete(); |
| return size == received; |
| } |
| |
| private void onIntactFrame(IntactFrame frame) |
| { |
| boolean expires = approxTime.isAfter(expiresAtNanos); |
| if (!isExpired && !isCorrupt) |
| { |
| if (!expires) |
| { |
| buffers.add(frame.contents.sliceAndConsume(frame.frameSize).share()); |
| return; |
| } |
| releaseBuffersAndCapacity(); // release resources once we transition from normal state to expired |
| } |
| frame.consume(); |
| isExpired |= expires; |
| } |
| |
| private void onCorruptFrame() |
| { |
| if (!isExpired && !isCorrupt) |
| releaseBuffersAndCapacity(); // release resources once we transition from normal state to corrupt |
| isCorrupt = true; |
| isExpired |= approxTime.isAfter(expiresAtNanos); |
| } |
| |
| protected abstract void onComplete(); |
| |
| protected abstract void abort(); |
| |
| protected void releaseBuffers() |
| { |
| buffers.forEach(ShareableBytes::release); buffers.clear(); |
| } |
| |
| protected void releaseBuffersAndCapacity() |
| { |
| releaseBuffers(); releaseCapacity(size); |
| } |
| } |
| |
| /** |
| * A special-purpose wait queue to park inbound message handlers that failed to allocate |
| * reserve capacity for a message in. Upon such failure a handler registers itself with |
| * a {@link WaitQueue} of the appropriate kind (either ENDPOINT or GLOBAL - if failed |
| * to allocate endpoint or global reserve capacity, respectively), stops processing any |
| * accumulated frames or receiving new ones, and waits - until reactivated. |
| * |
| * Every time permits are returned to an endpoint or global {@link Limit}, the respective |
| * queue gets signalled, and if there are any handlers registered in it, we will attempt |
| * to reactivate as many waiting handlers as current available reserve capacity allows |
| * us to - immediately, on the {@link #signal()}-calling thread. At most one such attempt |
| * will be in progress at any given time. |
| * |
| * Handlers that can be reactivated will be grouped by their {@link EventLoop} and a single |
| * {@link ReactivateHandlers} task will be scheduled per event loop, on the corresponding |
| * event loops. |
| * |
| * When run, the {@link ReactivateHandlers} task will ask each handler in its group to first |
| * process one message - using preallocated reserve capacity - and if no obstacles were met - |
| * reactivate the handlers, this time using their regular reserves. |
| * |
| * See {@link WaitQueue#schedule()}, {@link ReactivateHandlers#run()}, {@link Ticket#reactivateHandler(Limit)}. |
| */ |
| public static final class WaitQueue |
| { |
| enum Kind { ENDPOINT, GLOBAL } |
| |
| private static final int NOT_RUNNING = 0; |
| @SuppressWarnings("unused") |
| private static final int RUNNING = 1; |
| private static final int RUN_AGAIN = 2; |
| |
| private volatile int scheduled; |
| private static final AtomicIntegerFieldUpdater<WaitQueue> scheduledUpdater = |
| AtomicIntegerFieldUpdater.newUpdater(WaitQueue.class, "scheduled"); |
| |
| private final Kind kind; |
| private final Limit reserveCapacity; |
| |
| private final ManyToOneConcurrentLinkedQueue<Ticket> queue = new ManyToOneConcurrentLinkedQueue<>(); |
| |
| private WaitQueue(Kind kind, Limit reserveCapacity) |
| { |
| this.kind = kind; |
| this.reserveCapacity = reserveCapacity; |
| } |
| |
| public static WaitQueue endpoint(Limit endpointReserveCapacity) |
| { |
| return new WaitQueue(Kind.ENDPOINT, endpointReserveCapacity); |
| } |
| |
| public static WaitQueue global(Limit globalReserveCapacity) |
| { |
| return new WaitQueue(Kind.GLOBAL, globalReserveCapacity); |
| } |
| |
| private Ticket register(AbstractMessageHandler handler, int bytesRequested, long registeredAtNanos, long expiresAtNanos) |
| { |
| Ticket ticket = new Ticket(this, handler, bytesRequested, registeredAtNanos, expiresAtNanos); |
| Ticket previous = queue.relaxedPeekLastAndOffer(ticket); |
| if (null == previous || !previous.isWaiting()) |
| signal(); // only signal the queue if this handler is first to register |
| return ticket; |
| } |
| |
| @VisibleForTesting |
| public void signal() |
| { |
| if (queue.relaxedIsEmpty()) |
| return; // we can return early if no handlers have registered with the wait queue |
| |
| if (NOT_RUNNING == scheduledUpdater.getAndUpdate(this, i -> min(RUN_AGAIN, i + 1))) |
| { |
| do |
| { |
| schedule(); |
| } |
| while (RUN_AGAIN == scheduledUpdater.getAndDecrement(this)); |
| } |
| } |
| |
| private void schedule() |
| { |
| Map<EventLoop, ReactivateHandlers> tasks = null; |
| |
| long currentTimeNanos = approxTime.now(); |
| |
| Ticket t; |
| while ((t = queue.peek()) != null) |
| { |
| if (!t.call()) // invalidated |
| { |
| queue.remove(); |
| continue; |
| } |
| |
| boolean isLive = t.isLive(currentTimeNanos); |
| if (isLive && !reserveCapacity.tryAllocate(t.bytesRequested)) |
| { |
| if (!t.reset()) // the ticket was invalidated after being called but before now |
| { |
| queue.remove(); |
| continue; |
| } |
| break; // TODO: traverse the entire queue to unblock handlers that have expired or invalidated tickets |
| } |
| |
| if (null == tasks) |
| tasks = new IdentityHashMap<>(); |
| |
| queue.remove(); |
| tasks.computeIfAbsent(t.handler.eventLoop(), e -> new ReactivateHandlers()).add(t, isLive); |
| } |
| |
| if (null != tasks) |
| tasks.forEach(EventLoop::execute); |
| } |
| |
| private class ReactivateHandlers implements Runnable |
| { |
| List<Ticket> tickets = new ArrayList<>(); |
| long capacity = 0L; |
| |
| private void add(Ticket ticket, boolean isLive) |
| { |
| tickets.add(ticket); |
| if (isLive) capacity += ticket.bytesRequested; |
| } |
| |
| public void run() |
| { |
| Limit limit = new ResourceLimits.Basic(capacity); |
| try |
| { |
| for (Ticket ticket : tickets) |
| ticket.reactivateHandler(limit); |
| } |
| finally |
| { |
| /* |
| * Free up any unused capacity, if any. Will be non-zero if one or more handlers were closed |
| * when we attempted to run their callback, or used more of their other reserve; or if the first |
| * message in the unprocessed stream has expired in the narrow time window. |
| */ |
| long remaining = limit.remaining(); |
| if (remaining > 0) |
| { |
| reserveCapacity.release(remaining); |
| signal(); |
| } |
| } |
| } |
| } |
| |
| private static final class Ticket |
| { |
| private static final int WAITING = 0; |
| private static final int CALLED = 1; |
| private static final int INVALIDATED = 2; // invalidated by a handler that got closed |
| |
| private volatile int state; |
| private static final AtomicIntegerFieldUpdater<Ticket> stateUpdater = |
| AtomicIntegerFieldUpdater.newUpdater(Ticket.class, "state"); |
| |
| private final WaitQueue waitQueue; |
| private final AbstractMessageHandler handler; |
| private final int bytesRequested; |
| private final long reigsteredAtNanos; |
| private final long expiresAtNanos; |
| |
| private Ticket(WaitQueue waitQueue, AbstractMessageHandler handler, int bytesRequested, long registeredAtNanos, long expiresAtNanos) |
| { |
| this.waitQueue = waitQueue; |
| this.handler = handler; |
| this.bytesRequested = bytesRequested; |
| this.reigsteredAtNanos = registeredAtNanos; |
| this.expiresAtNanos = expiresAtNanos; |
| } |
| |
| private void reactivateHandler(Limit capacity) |
| { |
| long elapsedNanos = approxTime.now() - reigsteredAtNanos; |
| try |
| { |
| if (waitQueue.kind == Kind.ENDPOINT) |
| handler.onEndpointReserveCapacityRegained(capacity, elapsedNanos); |
| else |
| handler.onGlobalReserveCapacityRegained(capacity, elapsedNanos); |
| } |
| catch (Throwable t) |
| { |
| logger.error("{} exception caught while reactivating a handler", handler.id(), t); |
| } |
| } |
| |
| private boolean isWaiting() |
| { |
| return state == WAITING; |
| } |
| |
| private boolean isLive(long currentTimeNanos) |
| { |
| return !approxTime.isAfter(currentTimeNanos, expiresAtNanos); |
| } |
| |
| private void invalidate() |
| { |
| state = INVALIDATED; |
| waitQueue.signal(); |
| } |
| |
| private boolean call() |
| { |
| return stateUpdater.compareAndSet(this, WAITING, CALLED); |
| } |
| |
| private boolean reset() |
| { |
| return stateUpdater.compareAndSet(this, CALLED, WAITING); |
| } |
| } |
| } |
| |
| public interface OnHandlerClosed |
| { |
| void call(AbstractMessageHandler handler); |
| } |
| } |