blob: e2cf68d6d1eec43898ea231d0178deee6e5b21ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.io.IOException;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.FrameDecoder.CorruptFrame;
import org.apache.cassandra.net.FrameDecoder.Frame;
import org.apache.cassandra.net.FrameDecoder.FrameProcessor;
import org.apache.cassandra.net.FrameDecoder.IntactFrame;
import org.apache.cassandra.net.Message.Header;
import org.apache.cassandra.net.ResourceLimits.Limit;
import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.cassandra.net.Crc.InvalidCrc;
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
/**
* Core logic for handling inbound message deserialization and execution (in tandem with {@link FrameDecoder}).
*
* Handles small and large messages, corruption, flow control, dispatch of message processing to a suitable
* consumer.
*
* # Interaction with {@link FrameDecoder}
*
* An {@link AbstractMessageHandler} implementation sits on top of a {@link FrameDecoder} in the Netty pipeline,
* and is tightly coupled with it.
*
* {@link FrameDecoder} decodes inbound frames and relies on a supplied {@link FrameProcessor} to act on them.
* {@link AbstractMessageHandler} provides two implementations of that interface:
* - {@link #process(Frame)} is the default, primary processor, and is expected to be implemented by subclasses
* - {@link UpToOneMessageFrameProcessor}, supplied to the decoder when the handler is reactivated after being
* put in waiting mode due to lack of acquirable reserve memory capacity permits
*
* Return value of {@link FrameProcessor#process(Frame)} determines whether the decoder should keep processing
* frames (if {@code true} is returned) or stop until explicitly reactivated (if {@code false} is). To reactivate
* the decoder (once notified of available resource permits), {@link FrameDecoder#reactivate()} is invoked.
*
* # Frames
*
* {@link AbstractMessageHandler} operates on frames of messages, and there are several kinds of them:
* 1. {@link IntactFrame} that are contained. As names suggest, these contain one or multiple fully contained
* messages believed to be uncorrupted. Guaranteed to not contain an part of an incomplete message.
* See {@link #processFrameOfContainedMessages(ShareableBytes, Limit, Limit)}.
* 2. {@link IntactFrame} that are NOT contained. These are uncorrupted parts of a large message split over multiple
* parts due to their size. Can represent first or subsequent frame of a large message.
* See {@link #processFirstFrameOfLargeMessage(IntactFrame, Limit, Limit)} and
* {@link #processSubsequentFrameOfLargeMessage(Frame)}.
* 3. {@link CorruptFrame} with corrupt header. These are unrecoverable, and force a connection to be dropped.
* 4. {@link CorruptFrame} with a valid header, but corrupt payload. These can be either contained or uncontained.
* - contained frames with corrupt payload can be gracefully dropped without dropping the connection
* - uncontained frames with corrupt payload can be gracefully dropped unless they represent the first
* frame of a new large message, as in that case we don't know how many bytes to skip
* See {@link #processCorruptFrame(CorruptFrame)}.
*
* Fundamental frame invariants:
* 1. A contained frame can only have fully-encapsulated messages - 1 to n, that don't cross frame boundaries
* 2. An uncontained frame can hold a part of one message only. It can NOT, say, contain end of one large message
* and a beginning of another one. All the bytes in an uncontained frame always belong to a single message.
*
* # Small vs large messages
*
* A single handler is equipped to process both small and large messages, potentially interleaved, but the logic
* differs depending on size. Small messages are deserialized in place, and then handed off to an appropriate
* thread pool for processing. Large messages accumulate frames until completion of a message, then hand off
* the untouched frames to the correct thread pool for the verb to be deserialized there and immediately processed.
*
* See {@link LargeMessage} and subclasses for concrete {@link AbstractMessageHandler} implementations for details
* of the large-message accumulating state-machine, and {@link InboundMessageHandler.ProcessMessage} and its inheritors
* for the differences in execution.
*
* # Flow control (backpressure)
*
* To prevent message producers from overwhelming and bringing nodes down with more inbound messages that
* can be processed in a timely manner, {@link AbstractMessageHandler} provides support for implementations to
* provide their own flow control policy.
*
* Before we attempt to process a message fully, we first infer its size from the stream. This inference is
* delegated to implementations as the encoding of the message size is protocol specific. Having assertained
* the size of the incoming message, we then attempt to acquire the corresponding number of memory permits.
* If we succeed, then we move on actually process the message. If we fail, the frame decoder deactivates
* until sufficient permits are released for the message to be processed and the handler is activated again.
* Permits are released back once the message has been fully processed - the definition of which is again
* delegated to the concrete implementations.
*
* Every connection has an exclusive number of permits allocated to it. In addition to it, there is a per-endpoint
* reserve capacity and a global reserve capacity {@link Limit}, shared between all connections from the same host
* and all connections, respectively. So long as long as the handler stays within its exclusive limit, it doesn't
* need to tap into reserve capacity.
*
* If tapping into reserve capacity is necessary, but the handler fails to acquire capacity from either
* endpoint of global reserve (and it needs to acquire from both), the handler and its frame decoder become
* inactive and register with a {@link WaitQueue} of the appropriate type, depending on which of the reserves
* couldn't be tapped into. Once enough messages have finished processing and had their permits released back
* to the reserves, {@link WaitQueue} will reactivate the sleeping handlers and they'll resume processing frames.
*
* The reason we 'split' reserve capacity into two limits - endpoing and global - is to guarantee liveness, and
* prevent single endpoint's connections from taking over the whole reserve, starving other connections.
*
* One permit per byte of serialized message gets acquired. When inflated on-heap, each message will occupy more
* than that, necessarily, but despite wide variance, it's a good enough proxy that correlates with on-heap footprint.
*/
public abstract class AbstractMessageHandler extends ChannelInboundHandlerAdapter implements FrameProcessor
{
private static final Logger logger = LoggerFactory.getLogger(AbstractMessageHandler.class);
protected final FrameDecoder decoder;
protected final Channel channel;
protected final int largeThreshold;
protected LargeMessage<?> largeMessage;
protected final long queueCapacity;
volatile long queueSize = 0L;
private static final AtomicLongFieldUpdater<AbstractMessageHandler> queueSizeUpdater =
AtomicLongFieldUpdater.newUpdater(AbstractMessageHandler.class, "queueSize");
protected final Limit endpointReserveCapacity;
protected final WaitQueue endpointWaitQueue;
protected final Limit globalReserveCapacity;
protected final WaitQueue globalWaitQueue;
protected final OnHandlerClosed onClosed;
// wait queue handle, non-null if we overrun endpoint or global capacity and request to be resumed once it's released
private WaitQueue.Ticket ticket = null;
protected long corruptFramesRecovered, corruptFramesUnrecovered;
protected long receivedCount, receivedBytes;
protected long throttledCount, throttledNanos;
private boolean isClosed;
public AbstractMessageHandler(FrameDecoder decoder,
Channel channel,
int largeThreshold,
long queueCapacity,
Limit endpointReserveCapacity,
Limit globalReserveCapacity,
WaitQueue endpointWaitQueue,
WaitQueue globalWaitQueue,
OnHandlerClosed onClosed)
{
this.decoder = decoder;
this.channel = channel;
this.largeThreshold = largeThreshold;
this.queueCapacity = queueCapacity;
this.endpointReserveCapacity = endpointReserveCapacity;
this.endpointWaitQueue = endpointWaitQueue;
this.globalReserveCapacity = globalReserveCapacity;
this.globalWaitQueue = globalWaitQueue;
this.onClosed = onClosed;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
{
/*
* InboundMessageHandler works in tandem with FrameDecoder to implement flow control
* and work stashing optimally. We rely on FrameDecoder to invoke the provided
* FrameProcessor rather than on the pipeline and invocations of channelRead().
* process(Frame) is the primary entry point for this class.
*/
throw new IllegalStateException("InboundMessageHandler doesn't expect channelRead() to be invoked");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx)
{
decoder.activate(this); // the frame decoder starts inactive until explicitly activated by the added inbound message handler
}
@Override
public boolean process(Frame frame) throws IOException
{
if (frame instanceof IntactFrame)
return processIntactFrame((IntactFrame) frame, endpointReserveCapacity, globalReserveCapacity);
processCorruptFrame((CorruptFrame) frame);
return true;
}
private boolean processIntactFrame(IntactFrame frame, Limit endpointReserve, Limit globalReserve) throws IOException
{
if (frame.isSelfContained)
return processFrameOfContainedMessages(frame.contents, endpointReserve, globalReserve);
else if (null == largeMessage)
return processFirstFrameOfLargeMessage(frame, endpointReserve, globalReserve);
else
return processSubsequentFrameOfLargeMessage(frame);
}
/*
* Handle contained messages (not crossing boundaries of the frame) - both small and large, for the inbound
* definition of large (breaching the size threshold for what we are willing to process on event-loop vs.
* off event-loop).
*/
private boolean processFrameOfContainedMessages(ShareableBytes bytes, Limit endpointReserve, Limit globalReserve) throws IOException
{
while (bytes.hasRemaining())
if (!processOneContainedMessage(bytes, endpointReserve, globalReserve))
return false;
return true;
}
protected abstract boolean processOneContainedMessage(ShareableBytes bytes, Limit endpointReserve, Limit globalReserve) throws IOException;
/*
* Handling of multi-frame large messages
*/
protected abstract boolean processFirstFrameOfLargeMessage(IntactFrame frame, Limit endpointReserve, Limit globalReserve) throws IOException;
protected boolean processSubsequentFrameOfLargeMessage(Frame frame)
{
receivedBytes += frame.frameSize;
if (largeMessage.supply(frame))
{
receivedCount++;
largeMessage = null;
}
return true;
}
/*
* We can handle some corrupt frames gracefully without dropping the connection and losing all the
* queued up messages, but not others.
*
* Corrupt frames that *ARE NOT* safe to skip gracefully and require the connection to be dropped:
* - any frame with corrupt header (!frame.isRecoverable())
* - first corrupt-payload frame of a large message (impossible to infer message size, and without it
* impossible to skip the message safely
*
* Corrupt frames that *ARE* safe to skip gracefully, without reconnecting:
* - any self-contained frame with a corrupt payload (but not header): we lose all the messages in the
* frame, but that has no effect on subsequent ones
* - any non-first payload-corrupt frame of a large message: we know the size of the large message in
* flight, so we just skip frames until we've seen all its bytes; we only lose the large message
*/
protected abstract void processCorruptFrame(CorruptFrame frame) throws InvalidCrc;
private void onEndpointReserveCapacityRegained(Limit endpointReserve, long elapsedNanos)
{
onReserveCapacityRegained(endpointReserve, globalReserveCapacity, elapsedNanos);
}
private void onGlobalReserveCapacityRegained(Limit globalReserve, long elapsedNanos)
{
onReserveCapacityRegained(endpointReserveCapacity, globalReserve, elapsedNanos);
}
private void onReserveCapacityRegained(Limit endpointReserve, Limit globalReserve, long elapsedNanos)
{
if (isClosed)
return;
assert channel.eventLoop().inEventLoop();
ticket = null;
throttledNanos += elapsedNanos;
try
{
/*
* Process up to one message using supplied overridden reserves - one of them pre-allocated,
* and guaranteed to be enough for one message - then, if no obstacles encountered, reactivate
* the frame decoder using normal reserve capacities.
*/
if (processUpToOneMessage(endpointReserve, globalReserve))
{
decoder.reactivate();
if (decoder.isActive())
ClientMetrics.instance.unpauseConnection();
}
}
catch (Throwable t)
{
fatalExceptionCaught(t);
}
}
protected abstract void fatalExceptionCaught(Throwable t);
// return true if the handler should be reactivated - if no new hurdles were encountered,
// like running out of the other kind of reserve capacity
protected boolean processUpToOneMessage(Limit endpointReserve, Limit globalReserve) throws IOException
{
UpToOneMessageFrameProcessor processor = new UpToOneMessageFrameProcessor(endpointReserve, globalReserve);
decoder.processBacklog(processor);
return processor.isActive;
}
/*
* Process at most one message. Won't always be an entire one (if the message in the head of line
* is a large one, and there aren't sufficient frames to decode it entirely), but will never be more than one.
*/
private class UpToOneMessageFrameProcessor implements FrameProcessor
{
private final Limit endpointReserve;
private final Limit globalReserve;
boolean isActive = true;
boolean firstFrame = true;
private UpToOneMessageFrameProcessor(Limit endpointReserve, Limit globalReserve)
{
this.endpointReserve = endpointReserve;
this.globalReserve = globalReserve;
}
@Override
public boolean process(Frame frame) throws IOException
{
if (firstFrame)
{
if (!(frame instanceof IntactFrame))
throw new IllegalStateException("First backlog frame must be intact");
firstFrame = false;
return processFirstFrame((IntactFrame) frame);
}
return processSubsequentFrame(frame);
}
private boolean processFirstFrame(IntactFrame frame) throws IOException
{
if (frame.isSelfContained)
{
isActive = processOneContainedMessage(frame.contents, endpointReserve, globalReserve);
return false; // stop after one message
}
else
{
isActive = processFirstFrameOfLargeMessage(frame, endpointReserve, globalReserve);
return isActive; // continue unless fallen behind coprocessor or ran out of reserve capacity again
}
}
private boolean processSubsequentFrame(Frame frame) throws IOException
{
if (frame instanceof IntactFrame)
processSubsequentFrameOfLargeMessage(frame);
else
processCorruptFrame((CorruptFrame) frame);
return largeMessage != null; // continue until done with the large message
}
}
/**
* Try to acquire permits for the inbound message. In case of failure, register with the right wait queue to be
* reactivated once permit capacity is regained.
*/
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
protected boolean acquireCapacity(Limit endpointReserve, Limit globalReserve, int bytes, long currentTimeNanos, long expiresAtNanos)
{
ResourceLimits.Outcome outcome = acquireCapacity(endpointReserve, globalReserve, bytes);
if (outcome == ResourceLimits.Outcome.INSUFFICIENT_ENDPOINT)
ticket = endpointWaitQueue.register(this, bytes, currentTimeNanos, expiresAtNanos);
else if (outcome == ResourceLimits.Outcome.INSUFFICIENT_GLOBAL)
ticket = globalWaitQueue.register(this, bytes, currentTimeNanos, expiresAtNanos);
if (outcome != ResourceLimits.Outcome.SUCCESS)
throttledCount++;
return outcome == ResourceLimits.Outcome.SUCCESS;
}
protected ResourceLimits.Outcome acquireCapacity(Limit endpointReserve, Limit globalReserve, int bytes)
{
long currentQueueSize = queueSize;
/*
* acquireCapacity() is only ever called on the event loop, and as such queueSize is only ever increased
* on the event loop. If there is enough capacity, we can safely addAndGet() and immediately return.
*/
if (currentQueueSize + bytes <= queueCapacity)
{
queueSizeUpdater.addAndGet(this, bytes);
return ResourceLimits.Outcome.SUCCESS;
}
// we know we don't have enough local queue capacity for the entire message, so we need to borrow some from reserve capacity
long allocatedExcess = min(currentQueueSize + bytes - queueCapacity, bytes);
if (!globalReserve.tryAllocate(allocatedExcess))
return ResourceLimits.Outcome.INSUFFICIENT_GLOBAL;
if (!endpointReserve.tryAllocate(allocatedExcess))
{
globalReserve.release(allocatedExcess);
globalWaitQueue.signal();
return ResourceLimits.Outcome.INSUFFICIENT_ENDPOINT;
}
long newQueueSize = queueSizeUpdater.addAndGet(this, bytes);
long actualExcess = max(0, min(newQueueSize - queueCapacity, bytes));
/*
* It's possible that some permits were released at some point after we loaded current queueSize,
* and we can satisfy more of the permits using our exclusive per-connection capacity, needing
* less than previously estimated from the reserves. If that's the case, release the now unneeded
* permit excess back to endpoint/global reserves.
*/
if (actualExcess != allocatedExcess) // actualExcess < allocatedExcess
{
long excess = allocatedExcess - actualExcess;
endpointReserve.release(excess);
globalReserve.release(excess);
endpointWaitQueue.signal();
globalWaitQueue.signal();
}
return ResourceLimits.Outcome.SUCCESS;
}
public void releaseCapacity(int bytes)
{
long oldQueueSize = queueSizeUpdater.getAndAdd(this, -bytes);
if (oldQueueSize > queueCapacity)
{
long excess = min(oldQueueSize - queueCapacity, bytes);
endpointReserveCapacity.release(excess);
globalReserveCapacity.release(excess);
endpointWaitQueue.signal();
globalWaitQueue.signal();
}
}
/**
* Invoked to release capacity for a message that has been fully, successfully processed.
*
* Normally no different from invoking {@link #releaseCapacity(int)}, but is necessary for the verifier
* to be able to delay capacity release for backpressure testing.
*/
@VisibleForTesting
protected void releaseProcessedCapacity(int size, Header header)
{
releaseCapacity(size);
}
@Override
public void channelInactive(ChannelHandlerContext ctx)
{
isClosed = true;
if (null != largeMessage)
largeMessage.abort();
if (null != ticket)
ticket.invalidate();
onClosed.call(this);
}
private EventLoop eventLoop()
{
return channel.eventLoop();
}
protected abstract String id();
/*
* A large-message frame-accumulating state machine.
*
* Collects intact frames until it's has all the bytes necessary to deserialize the large message,
* at which point it schedules a task on the appropriate {@link Stage},
* a task that deserializes the message and immediately invokes the verb handler.
*
* Also handles corrupt frames and potential expiry of the large message during accumulation:
* if it's taking the frames too long to arrive, there is no point in holding on to the
* accumulated frames, or in gathering more - so we release the ones we already have, and
* skip any remaining ones, alongside with returning memory permits early.
*/
protected abstract class LargeMessage<H>
{
protected final int size;
protected final H header;
protected final List<ShareableBytes> buffers = new ArrayList<>();
protected int received;
protected final long expiresAtNanos;
protected boolean isExpired;
protected boolean isCorrupt;
protected LargeMessage(int size, H header, long expiresAtNanos, boolean isExpired)
{
this.size = size;
this.header = header;
this.expiresAtNanos = expiresAtNanos;
this.isExpired = isExpired;
}
protected LargeMessage(int size, H header, long expiresAtNanos, ShareableBytes bytes)
{
this(size, header, expiresAtNanos, false);
buffers.add(bytes);
}
/**
* Return true if this was the last frame of the large message.
*/
public boolean supply(Frame frame)
{
if (frame instanceof IntactFrame)
onIntactFrame((IntactFrame) frame);
else
onCorruptFrame();
received += frame.frameSize;
if (size == received)
onComplete();
return size == received;
}
private void onIntactFrame(IntactFrame frame)
{
boolean expires = approxTime.isAfter(expiresAtNanos);
if (!isExpired && !isCorrupt)
{
if (!expires)
{
buffers.add(frame.contents.sliceAndConsume(frame.frameSize).share());
return;
}
releaseBuffersAndCapacity(); // release resources once we transition from normal state to expired
}
frame.consume();
isExpired |= expires;
}
private void onCorruptFrame()
{
if (!isExpired && !isCorrupt)
releaseBuffersAndCapacity(); // release resources once we transition from normal state to corrupt
isCorrupt = true;
isExpired |= approxTime.isAfter(expiresAtNanos);
}
protected abstract void onComplete();
protected abstract void abort();
protected void releaseBuffers()
{
buffers.forEach(ShareableBytes::release); buffers.clear();
}
protected void releaseBuffersAndCapacity()
{
releaseBuffers(); releaseCapacity(size);
}
}
/**
* A special-purpose wait queue to park inbound message handlers that failed to allocate
* reserve capacity for a message in. Upon such failure a handler registers itself with
* a {@link WaitQueue} of the appropriate kind (either ENDPOINT or GLOBAL - if failed
* to allocate endpoint or global reserve capacity, respectively), stops processing any
* accumulated frames or receiving new ones, and waits - until reactivated.
*
* Every time permits are returned to an endpoint or global {@link Limit}, the respective
* queue gets signalled, and if there are any handlers registered in it, we will attempt
* to reactivate as many waiting handlers as current available reserve capacity allows
* us to - immediately, on the {@link #signal()}-calling thread. At most one such attempt
* will be in progress at any given time.
*
* Handlers that can be reactivated will be grouped by their {@link EventLoop} and a single
* {@link ReactivateHandlers} task will be scheduled per event loop, on the corresponding
* event loops.
*
* When run, the {@link ReactivateHandlers} task will ask each handler in its group to first
* process one message - using preallocated reserve capacity - and if no obstacles were met -
* reactivate the handlers, this time using their regular reserves.
*
* See {@link WaitQueue#schedule()}, {@link ReactivateHandlers#run()}, {@link Ticket#reactivateHandler(Limit)}.
*/
public static final class WaitQueue
{
enum Kind { ENDPOINT, GLOBAL }
private static final int NOT_RUNNING = 0;
@SuppressWarnings("unused")
private static final int RUNNING = 1;
private static final int RUN_AGAIN = 2;
private volatile int scheduled;
private static final AtomicIntegerFieldUpdater<WaitQueue> scheduledUpdater =
AtomicIntegerFieldUpdater.newUpdater(WaitQueue.class, "scheduled");
private final Kind kind;
private final Limit reserveCapacity;
private final ManyToOneConcurrentLinkedQueue<Ticket> queue = new ManyToOneConcurrentLinkedQueue<>();
private WaitQueue(Kind kind, Limit reserveCapacity)
{
this.kind = kind;
this.reserveCapacity = reserveCapacity;
}
public static WaitQueue endpoint(Limit endpointReserveCapacity)
{
return new WaitQueue(Kind.ENDPOINT, endpointReserveCapacity);
}
public static WaitQueue global(Limit globalReserveCapacity)
{
return new WaitQueue(Kind.GLOBAL, globalReserveCapacity);
}
private Ticket register(AbstractMessageHandler handler, int bytesRequested, long registeredAtNanos, long expiresAtNanos)
{
Ticket ticket = new Ticket(this, handler, bytesRequested, registeredAtNanos, expiresAtNanos);
Ticket previous = queue.relaxedPeekLastAndOffer(ticket);
if (null == previous || !previous.isWaiting())
signal(); // only signal the queue if this handler is first to register
return ticket;
}
@VisibleForTesting
public void signal()
{
if (queue.relaxedIsEmpty())
return; // we can return early if no handlers have registered with the wait queue
if (NOT_RUNNING == scheduledUpdater.getAndUpdate(this, i -> min(RUN_AGAIN, i + 1)))
{
do
{
schedule();
}
while (RUN_AGAIN == scheduledUpdater.getAndDecrement(this));
}
}
private void schedule()
{
Map<EventLoop, ReactivateHandlers> tasks = null;
long currentTimeNanos = approxTime.now();
Ticket t;
while ((t = queue.peek()) != null)
{
if (!t.call()) // invalidated
{
queue.remove();
continue;
}
boolean isLive = t.isLive(currentTimeNanos);
if (isLive && !reserveCapacity.tryAllocate(t.bytesRequested))
{
if (!t.reset()) // the ticket was invalidated after being called but before now
{
queue.remove();
continue;
}
break; // TODO: traverse the entire queue to unblock handlers that have expired or invalidated tickets
}
if (null == tasks)
tasks = new IdentityHashMap<>();
queue.remove();
tasks.computeIfAbsent(t.handler.eventLoop(), e -> new ReactivateHandlers()).add(t, isLive);
}
if (null != tasks)
tasks.forEach(EventLoop::execute);
}
private class ReactivateHandlers implements Runnable
{
List<Ticket> tickets = new ArrayList<>();
long capacity = 0L;
private void add(Ticket ticket, boolean isLive)
{
tickets.add(ticket);
if (isLive) capacity += ticket.bytesRequested;
}
public void run()
{
Limit limit = new ResourceLimits.Basic(capacity);
try
{
for (Ticket ticket : tickets)
ticket.reactivateHandler(limit);
}
finally
{
/*
* Free up any unused capacity, if any. Will be non-zero if one or more handlers were closed
* when we attempted to run their callback, or used more of their other reserve; or if the first
* message in the unprocessed stream has expired in the narrow time window.
*/
long remaining = limit.remaining();
if (remaining > 0)
{
reserveCapacity.release(remaining);
signal();
}
}
}
}
private static final class Ticket
{
private static final int WAITING = 0;
private static final int CALLED = 1;
private static final int INVALIDATED = 2; // invalidated by a handler that got closed
private volatile int state;
private static final AtomicIntegerFieldUpdater<Ticket> stateUpdater =
AtomicIntegerFieldUpdater.newUpdater(Ticket.class, "state");
private final WaitQueue waitQueue;
private final AbstractMessageHandler handler;
private final int bytesRequested;
private final long reigsteredAtNanos;
private final long expiresAtNanos;
private Ticket(WaitQueue waitQueue, AbstractMessageHandler handler, int bytesRequested, long registeredAtNanos, long expiresAtNanos)
{
this.waitQueue = waitQueue;
this.handler = handler;
this.bytesRequested = bytesRequested;
this.reigsteredAtNanos = registeredAtNanos;
this.expiresAtNanos = expiresAtNanos;
}
private void reactivateHandler(Limit capacity)
{
long elapsedNanos = approxTime.now() - reigsteredAtNanos;
try
{
if (waitQueue.kind == Kind.ENDPOINT)
handler.onEndpointReserveCapacityRegained(capacity, elapsedNanos);
else
handler.onGlobalReserveCapacityRegained(capacity, elapsedNanos);
}
catch (Throwable t)
{
logger.error("{} exception caught while reactivating a handler", handler.id(), t);
}
}
private boolean isWaiting()
{
return state == WAITING;
}
private boolean isLive(long currentTimeNanos)
{
return !approxTime.isAfter(currentTimeNanos, expiresAtNanos);
}
private void invalidate()
{
state = INVALIDATED;
waitQueue.signal();
}
private boolean call()
{
return stateUpdater.compareAndSet(this, WAITING, CALLED);
}
private boolean reset()
{
return stateUpdater.compareAndSet(this, CALLED, WAITING);
}
}
}
public interface OnHandlerClosed
{
void call(AbstractMessageHandler handler);
}
}