blob: 219e613e23bfca1f2449127312d50f4cfff92f32 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.net;
import java.nio.BufferOverflowException;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.carrotsearch.hppc.LongObjectHashMap;
import com.carrotsearch.hppc.predicates.LongObjectPredicate;
import com.carrotsearch.hppc.procedures.LongObjectProcedure;
import com.carrotsearch.hppc.procedures.LongProcedure;
import org.apache.cassandra.net.Verifier.ExpiredMessageEvent.ExpirationType;
import org.apache.cassandra.utils.concurrent.WaitQueue;
import static java.util.concurrent.TimeUnit.*;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.net.MessagingService.VERSION_40;
import static org.apache.cassandra.net.MessagingService.current_version;
import static org.apache.cassandra.net.ConnectionType.LARGE_MESSAGES;
import static org.apache.cassandra.net.OutboundConnection.LargeMessageDelivery.DEFAULT_BUFFER_SIZE;
import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
import static org.apache.cassandra.net.Verifier.EventCategory.OTHER;
import static org.apache.cassandra.net.Verifier.EventCategory.RECEIVE;
import static org.apache.cassandra.net.Verifier.EventCategory.SEND;
import static org.apache.cassandra.net.Verifier.EventType.ARRIVE;
import static org.apache.cassandra.net.Verifier.EventType.CLOSED_BEFORE_ARRIVAL;
import static org.apache.cassandra.net.Verifier.EventType.DESERIALIZE;
import static org.apache.cassandra.net.Verifier.EventType.ENQUEUE;
import static org.apache.cassandra.net.Verifier.EventType.FAILED_CLOSING;
import static org.apache.cassandra.net.Verifier.EventType.FAILED_DESERIALIZE;
import static org.apache.cassandra.net.Verifier.EventType.FAILED_EXPIRED_ON_SEND;
import static org.apache.cassandra.net.Verifier.EventType.FAILED_EXPIRED_ON_RECEIVE;
import static org.apache.cassandra.net.Verifier.EventType.FAILED_FRAME;
import static org.apache.cassandra.net.Verifier.EventType.FAILED_OVERLOADED;
import static org.apache.cassandra.net.Verifier.EventType.FAILED_SERIALIZE;
import static org.apache.cassandra.net.Verifier.EventType.FINISH_SERIALIZE_LARGE;
import static org.apache.cassandra.net.Verifier.EventType.PROCESS;
import static org.apache.cassandra.net.Verifier.EventType.SEND_FRAME;
import static org.apache.cassandra.net.Verifier.EventType.SENT_FRAME;
import static org.apache.cassandra.net.Verifier.EventType.SERIALIZE;
import static org.apache.cassandra.net.Verifier.ExpiredMessageEvent.ExpirationType.ON_SENT;
import static org.apache.cassandra.utils.MonotonicClock.approxTime;
/**
* This class is a single-threaded verifier monitoring a single link, with events supplied by inbound and outbound threads
*
* By making verification single threaded, it is easier to reason about (and complex enough as is), but also permits
* a dedicated thread to monitor timeliness of events, e.g. elapsed time between a given SEND and its corresponding RECEIVE
*
* TODO: timeliness of events
* TODO: periodically stop all activity to/from a given endpoint, until it stops (and verify queues all empty, counters all accurate)
* TODO: integrate with proxy that corrupts frames
* TODO: test _OutboundConnection_ close
*/
@SuppressWarnings("WeakerAccess")
public class Verifier
{
private static final Logger logger = LoggerFactory.getLogger(Verifier.class);
public enum Destiny
{
SUCCEED,
FAIL_TO_SERIALIZE,
FAIL_TO_DESERIALIZE,
}
enum EventCategory
{
SEND, RECEIVE, OTHER
}
enum EventType
{
FAILED_OVERLOADED(SEND),
ENQUEUE(SEND),
FAILED_EXPIRED_ON_SEND(SEND),
SERIALIZE(SEND),
FAILED_SERIALIZE(SEND),
FINISH_SERIALIZE_LARGE(SEND),
SEND_FRAME(SEND),
FAILED_FRAME(SEND),
SENT_FRAME(SEND),
ARRIVE(RECEIVE),
FAILED_EXPIRED_ON_RECEIVE(RECEIVE),
DESERIALIZE(RECEIVE),
CLOSED_BEFORE_ARRIVAL(RECEIVE),
FAILED_DESERIALIZE(RECEIVE),
PROCESS(RECEIVE),
FAILED_CLOSING(SEND),
CONNECT_OUTBOUND(OTHER),
SYNC(OTHER), // the connection will stop sending messages, and promptly process any waiting inbound messages
CONTROLLER_UPDATE(OTHER);
final EventCategory category;
EventType(EventCategory category)
{
this.category = category;
}
}
public static class Event
{
final EventType type;
Event(EventType type)
{
this.type = type;
}
}
static class SimpleEvent extends Event
{
final long at;
SimpleEvent(EventType type, long at)
{
super(type);
this.at = at;
}
public String toString() { return type.toString(); }
}
static class BoundedEvent extends Event
{
final long start;
volatile long end;
BoundedEvent(EventType type, long start)
{
super(type);
this.start = start;
}
public void complete(Verifier verifier)
{
end = verifier.sequenceId.getAndIncrement();
verifier.events.put(end, this);
}
}
static class SimpleMessageEvent extends SimpleEvent
{
final long messageId;
SimpleMessageEvent(EventType type, long at, long messageId)
{
super(type, at);
this.messageId = messageId;
}
}
static class BoundedMessageEvent extends BoundedEvent
{
final long messageId;
BoundedMessageEvent(EventType type, long start, long messageId)
{
super(type, start);
this.messageId = messageId;
}
}
static class EnqueueMessageEvent extends BoundedMessageEvent
{
final Message<?> message;
final Destiny destiny;
EnqueueMessageEvent(EventType type, long start, Message<?> message, Destiny destiny)
{
super(type, start, message.id());
this.message = message;
this.destiny = destiny;
}
public String toString() { return String.format("%s{%s}", type, destiny); }
}
static class SerializeMessageEvent extends SimpleMessageEvent
{
final int messagingVersion;
SerializeMessageEvent(EventType type, long at, long messageId, int messagingVersion)
{
super(type, at, messageId);
this.messagingVersion = messagingVersion;
}
public String toString() { return String.format("%s{ver=%d}", type, messagingVersion); }
}
static class SimpleMessageEventWithSize extends SimpleMessageEvent
{
final int messageSize;
SimpleMessageEventWithSize(EventType type, long at, long messageId, int messageSize)
{
super(type, at, messageId);
this.messageSize = messageSize;
}
public String toString() { return String.format("%s{size=%d}", type, messageSize); }
}
static class FailedSerializeEvent extends SimpleMessageEvent
{
final int bytesWrittenToNetwork;
final Throwable failure;
FailedSerializeEvent(long at, long messageId, int bytesWrittenToNetwork, Throwable failure)
{
super(FAILED_SERIALIZE, at, messageId);
this.bytesWrittenToNetwork = bytesWrittenToNetwork;
this.failure = failure;
}
public String toString() { return String.format("FAILED_SERIALIZE{written=%d, failure=%s}", bytesWrittenToNetwork, failure); }
}
static class ExpiredMessageEvent extends SimpleMessageEvent
{
enum ExpirationType {ON_SENT, ON_ARRIVED, ON_PROCESSED }
final int messageSize;
final long timeElapsed;
final TimeUnit timeUnit;
final ExpirationType expirationType;
ExpiredMessageEvent(long at, long messageId, int messageSize, long timeElapsed, TimeUnit timeUnit, ExpirationType expirationType)
{
super(expirationType == ON_SENT ? FAILED_EXPIRED_ON_SEND : FAILED_EXPIRED_ON_RECEIVE, at, messageId);
this.messageSize = messageSize;
this.timeElapsed = timeElapsed;
this.timeUnit = timeUnit;
this.expirationType = expirationType;
}
public String toString() { return String.format("EXPIRED_%s{size=%d,elapsed=%d,unit=%s}", expirationType, messageSize, timeElapsed, timeUnit); }
}
static class FrameEvent extends SimpleEvent
{
final int messageCount;
final int payloadSizeInBytes;
FrameEvent(EventType type, long at, int messageCount, int payloadSizeInBytes)
{
super(type, at);
this.messageCount = messageCount;
this.payloadSizeInBytes = payloadSizeInBytes;
}
}
static class ProcessMessageEvent extends SimpleMessageEvent
{
final Message<?> message;
ProcessMessageEvent(long at, Message<?> message)
{
super(PROCESS, at, message.id());
this.message = message;
}
}
EnqueueMessageEvent onEnqueue(Message<?> message, Destiny destiny)
{
EnqueueMessageEvent enqueue = new EnqueueMessageEvent(ENQUEUE, nextId(), message, destiny);
events.put(enqueue.start, enqueue);
return enqueue;
}
void onOverloaded(long messageId)
{
long at = nextId();
events.put(at, new SimpleMessageEvent(FAILED_OVERLOADED, at, messageId));
}
void onFailedClosing(long messageId)
{
long at = nextId();
events.put(at, new SimpleMessageEvent(FAILED_CLOSING, at, messageId));
}
void onSerialize(long messageId, int messagingVersion)
{
long at = nextId();
events.put(at, new SerializeMessageEvent(SERIALIZE, at, messageId, messagingVersion));
}
void onFinishSerializeLarge(long messageId)
{
long at = nextId();
events.put(at, new SimpleMessageEvent(FINISH_SERIALIZE_LARGE, at, messageId));
}
void onFailedSerialize(long messageId, int bytesWrittenToNetwork, Throwable failure)
{
long at = nextId();
events.put(at, new FailedSerializeEvent(at, messageId, bytesWrittenToNetwork, failure));
}
void onExpiredBeforeSend(long messageId, int messageSize, long timeElapsed, TimeUnit timeUnit)
{
onExpired(messageId, messageSize, timeElapsed, timeUnit, ON_SENT);
}
void onSendFrame(int messageCount, int payloadSizeInBytes)
{
long at = nextId();
events.put(at, new FrameEvent(SEND_FRAME, at, messageCount, payloadSizeInBytes));
}
void onSentFrame(int messageCount, int payloadSizeInBytes)
{
long at = nextId();
events.put(at, new FrameEvent(SENT_FRAME, at, messageCount, payloadSizeInBytes));
}
void onFailedFrame(int messageCount, int payloadSizeInBytes)
{
long at = nextId();
events.put(at, new FrameEvent(FAILED_FRAME, at, messageCount, payloadSizeInBytes));
}
void onArrived(long messageId, int messageSize)
{
long at = nextId();
events.put(at, new SimpleMessageEventWithSize(ARRIVE, at, messageId, messageSize));
}
void onArrivedExpired(long messageId, int messageSize, boolean wasCorrupt, long timeElapsed, TimeUnit timeUnit)
{
onExpired(messageId, messageSize, timeElapsed, timeUnit, ExpirationType.ON_ARRIVED);
}
void onDeserialize(long messageId, int messagingVersion)
{
long at = nextId();
events.put(at, new SerializeMessageEvent(DESERIALIZE, at, messageId, messagingVersion));
}
void onClosedBeforeArrival(long messageId, int messageSize)
{
long at = nextId();
events.put(at, new SimpleMessageEventWithSize(CLOSED_BEFORE_ARRIVAL, at, messageId, messageSize));
}
void onFailedDeserialize(long messageId, int messageSize)
{
long at = nextId();
events.put(at, new SimpleMessageEventWithSize(FAILED_DESERIALIZE, at, messageId, messageSize));
}
void process(Message<?> message)
{
long at = nextId();
events.put(at, new ProcessMessageEvent(at, message));
}
void onProcessExpired(long messageId, int messageSize, long timeElapsed, TimeUnit timeUnit)
{
onExpired(messageId, messageSize, timeElapsed, timeUnit, ExpirationType.ON_PROCESSED);
}
private void onExpired(long messageId, int messageSize, long timeElapsed, TimeUnit timeUnit, ExpirationType expirationType)
{
long at = nextId();
events.put(at, new ExpiredMessageEvent(at, messageId, messageSize, timeElapsed, timeUnit, expirationType));
}
static class ConnectOutboundEvent extends SimpleEvent
{
final int messagingVersion;
final OutboundConnectionSettings settings;
ConnectOutboundEvent(long at, int messagingVersion, OutboundConnectionSettings settings)
{
super(EventType.CONNECT_OUTBOUND, at);
this.messagingVersion = messagingVersion;
this.settings = settings;
}
}
// TODO: do we need this?
static class ConnectInboundEvent extends SimpleEvent
{
final int messagingVersion;
final InboundMessageHandler handler;
ConnectInboundEvent(long at, int messagingVersion, InboundMessageHandler handler)
{
super(EventType.CONNECT_OUTBOUND, at);
this.messagingVersion = messagingVersion;
this.handler = handler;
}
}
static class SyncEvent extends SimpleEvent
{
final Runnable onCompletion;
SyncEvent(long at, Runnable onCompletion)
{
super(EventType.SYNC, at);
this.onCompletion = onCompletion;
}
}
static class ControllerEvent extends BoundedEvent
{
final long minimumBytesInFlight;
final long maximumBytesInFlight;
ControllerEvent(long start, long minimumBytesInFlight, long maximumBytesInFlight)
{
super(EventType.CONTROLLER_UPDATE, start);
this.minimumBytesInFlight = minimumBytesInFlight;
this.maximumBytesInFlight = maximumBytesInFlight;
}
}
void onSync(Runnable onCompletion)
{
SyncEvent connect = new SyncEvent(nextId(), onCompletion);
events.put(connect.at, connect);
}
void onConnectOutbound(int messagingVersion, OutboundConnectionSettings settings)
{
ConnectOutboundEvent connect = new ConnectOutboundEvent(nextId(), messagingVersion, settings);
events.put(connect.at, connect);
}
void onConnectInbound(int messagingVersion, InboundMessageHandler handler)
{
ConnectInboundEvent connect = new ConnectInboundEvent(nextId(), messagingVersion, handler);
events.put(connect.at, connect);
}
private final BytesInFlightController controller;
private final AtomicLong sequenceId = new AtomicLong();
private final EventSequence events = new EventSequence();
private final InboundMessageHandlers inbound;
private final OutboundConnection outbound;
Verifier(BytesInFlightController controller, OutboundConnection outbound, InboundMessageHandlers inbound)
{
this.controller = controller;
this.inbound = inbound;
this.outbound = outbound;
}
private long nextId()
{
return sequenceId.getAndIncrement();
}
public void logFailure(String message, Object ... params)
{
fail(message, params);
}
private void fail(String message, Object ... params)
{
logger.error("{}", String.format(message, params));
logger.error("Connection: {}", currentConnection);
}
private void fail(String message, Throwable t, Object ... params)
{
logger.error("{}", String.format(message, params), t);
logger.error("Connection: {}", currentConnection);
}
private void failinfo(String message, Object ... params)
{
logger.error("{}", String.format(message, params));
}
private static class MessageState
{
final Message<?> message;
final Destiny destiny;
int messagingVersion;
// set initially to message.expiresAtNanos, but if at serialization time we use
// an older messaging version we may not be able to serialize expiration
long expiresAtNanos;
long enqueueStart, enqueueEnd, serialize, arrive, deserialize;
boolean processOnEventLoop, processOutOfOrder;
Event sendState, receiveState;
long lastUpdateAt;
long lastUpdateNanos;
ConnectionState sentOn;
boolean doneSend, doneReceive;
int messageSize()
{
return message.serializedSize(messagingVersion);
}
MessageState(Message<?> message, Destiny destiny, long enqueueStart)
{
this.message = message;
this.destiny = destiny;
this.enqueueStart = enqueueStart;
this.expiresAtNanos = message.expiresAtNanos();
}
void update(SimpleEvent event, long now)
{
update(event, event.at, now);
}
void update(Event event, long at, long now)
{
lastUpdateAt = at;
lastUpdateNanos = now;
switch (event.type.category)
{
case SEND:
sendState = event;
break;
case RECEIVE:
receiveState = event;
break;
default: throw new IllegalStateException();
}
}
boolean is(EventType type)
{
switch (type.category)
{
case SEND: return sendState != null && sendState.type == type;
case RECEIVE: return receiveState != null && receiveState.type == type;
default: return false;
}
}
boolean is(EventType type1, EventType type2)
{
return is(type1) || is(type2);
}
boolean is(EventType type1, EventType type2, EventType type3)
{
return is(type1) || is(type2) || is(type3);
}
void require(EventType event, Verifier verifier, EventType type)
{
if (!is(type))
verifier.fail("Invalid state at %s for %s: expected %s", event, this, type);
}
void require(EventType event, Verifier verifier, EventType type1, EventType type2)
{
if (!is(type1) && !is(type2))
verifier.fail("Invalid state at %s for %s: expected %s or %s", event, this, type1, type2);
}
void require(EventType event, Verifier verifier, EventType type1, EventType type2, EventType type3)
{
if (!is(type1) && !is(type2) && !is(type3))
verifier.fail("Invalid state %s for %s: expected %s, %s or %s", event, this, type1, type2, type3);
}
public String toString()
{
return String.format("{id:%d, state:[%s,%s], upd:%d, ver:%d, enqueue:[%d,%d], ser:%d, arr:%d, deser:%d, expires:%d, sentOn: %d}",
message.id(), sendState, receiveState, lastUpdateAt, messagingVersion, enqueueStart, enqueueEnd, serialize, arrive, deserialize, approxTime.translate().toMillisSinceEpoch(expiresAtNanos), sentOn == null ? -1 : sentOn.connectionId);
}
}
private final LongObjectHashMap<MessageState> messages = new LongObjectHashMap<>();
// messages start here, but may enter in a haphazard (non-sequential) fashion;
// ENQUEUE_START, ENQUEUE_END both take place here, with the latter imposing bounds on the out-of-order appearance of messages.
// note that ENQUEUE_END - being concurrent - may not appear before the message's lifespan has completely ended.
private final Queue<MessageState> enqueueing = new Queue<>();
static final class ConnectionState
{
final long connectionId;
final int messagingVersion;
// Strict message order will then be determined at serialization time, since this happens on a single thread.
// The order in which messages arrive here determines the order they will arrive on the other node.
// must follow either ENQUEUE_START or ENQUEUE_END
final Queue<MessageState> serializing = new Queue<>();
// Messages sent on the small connection will all be sent in frames; this is a concurrent operation,
// so only the sendingFrame MUST be encountered before any future events -
// large connections skip this step and goes straight to arriving
// we consult the queues in reverse order in arriving, as it is acceptable to find our frame in any of these queues
final FramesInFlight framesInFlight = new FramesInFlight(); // unknown if the messages will arrive, accept either
// for large messages OR < VERSION_40, arriving can occur BEFORE serializing completes successfully
// OR a frame is fully serialized
final Queue<MessageState> arriving = new Queue<>();
final Queue<MessageState> deserializingOnEventLoop = new Queue<>(),
deserializingOffEventLoop = new Queue<>();
InboundMessageHandler inbound;
// TODO
long sentCount, sentBytes;
long receivedCount, receivedBytes;
ConnectionState(long connectionId, int messagingVersion)
{
this.connectionId = connectionId;
this.messagingVersion = messagingVersion;
}
public String toString()
{
return String.format("{id: %d, ver: %d, ser: %d, inFlight: %s, arriving: %d, deserOn: %d, deserOff: %d}",
connectionId, messagingVersion, serializing.size(), framesInFlight, arriving.size(), deserializingOnEventLoop.size(), deserializingOffEventLoop.size());
}
}
private final Queue<MessageState> processingOutOfOrder = new Queue<>();
private SyncEvent sync;
private long nextMessageId = 0;
private long now;
private long connectionCounter;
private ConnectionState currentConnection = new ConnectionState(connectionCounter++, current_version);
private long outboundSentCount, outboundSentBytes;
private long outboundSubmittedCount;
private long outboundOverloadedCount, outboundOverloadedBytes;
private long outboundExpiredCount, outboundExpiredBytes;
private long outboundErrorCount, outboundErrorBytes;
public void run(Runnable onFailure, long deadlineNanos)
{
try
{
long lastEventAt = approxTime.now();
while ((now = approxTime.now()) < deadlineNanos)
{
Event next = events.await(nextMessageId, 100L, MILLISECONDS);
if (next == null)
{
// decide if we have any messages waiting too long to proceed
while (!processingOutOfOrder.isEmpty())
{
MessageState m = processingOutOfOrder.get(0);
if (now - m.lastUpdateNanos > SECONDS.toNanos(10L))
{
fail("Unreasonably long period spent waiting for out-of-order deser/delivery of received message %d", m.message.id());
MessageState v = maybeRemove(m.message.id(), PROCESS);
controller.fail(v.message.serializedSize(v.messagingVersion == 0 ? current_version : v.messagingVersion));
processingOutOfOrder.remove(0);
}
else break;
}
if (sync != null)
{
// if we have waited 100ms since beginning a sync, with no events, and ANY of our queues are
// non-empty, something is probably wrong; however, let's give ourselves a little bit longer
boolean done =
currentConnection.serializing.isEmpty()
&& currentConnection.arriving.isEmpty()
&& currentConnection.deserializingOnEventLoop.isEmpty()
&& currentConnection.deserializingOffEventLoop.isEmpty()
&& currentConnection.framesInFlight.isEmpty()
&& enqueueing.isEmpty()
&& processingOutOfOrder.isEmpty()
&& messages.isEmpty()
&& controller.inFlight() == 0;
//outbound.pendingCount() > 0 ? 5L : 2L
if (!done && now - lastEventAt > SECONDS.toNanos(5L))
{
// TODO: even 2s or 5s are unreasonable periods of time without _any_ movement on a message waiting to arrive
// this seems to happen regularly on MacOS, but we should confirm this does not happen on Linux
fail("Unreasonably long period spent waiting for sync (%dms)", NANOSECONDS.toMillis(now - lastEventAt));
messages.<LongObjectProcedure<MessageState>>forEach((k, v) -> {
failinfo("%s", v);
controller.fail(v.message.serializedSize(v.messagingVersion == 0 ? current_version : v.messagingVersion));
});
currentConnection.serializing.clear();
currentConnection.arriving.clear();
currentConnection.deserializingOnEventLoop.clear();
currentConnection.deserializingOffEventLoop.clear();
enqueueing.clear();
processingOutOfOrder.clear();
messages.clear();
while (!currentConnection.framesInFlight.isEmpty())
currentConnection.framesInFlight.poll();
done = true;
}
if (done)
{
ConnectionUtils.check(outbound)
.pending(0, 0)
.error(outboundErrorCount, outboundErrorBytes)
.submitted(outboundSubmittedCount)
.expired(outboundExpiredCount, outboundExpiredBytes)
.overload(outboundOverloadedCount, outboundOverloadedBytes)
.sent(outboundSentCount, outboundSentBytes)
.check((message, expect, actual) -> fail("%s: expect %d, actual %d", message, expect, actual));
sync.onCompletion.run();
sync = null;
}
}
continue;
}
events.clear(nextMessageId); // TODO: simplify collection if we end up using it exclusively as a queue, as we are now
lastEventAt = now;
switch (next.type)
{
case ENQUEUE:
{
MessageState m;
EnqueueMessageEvent e = (EnqueueMessageEvent) next;
assert nextMessageId == e.start || nextMessageId == e.end;
assert e.message != null;
if (nextMessageId == e.start)
{
if (sync != null)
fail("Sync in progress - there should be no messages beginning to enqueue");
m = new MessageState(e.message, e.destiny, e.start);
messages.put(e.messageId, m);
enqueueing.add(m);
m.update(e, e.start, now);
}
else
{
// warning: enqueueEnd can occur at any time in the future, since it's a different thread;
// it could be arbitrarily paused, long enough even for the messsage to be fully processed
m = messages.get(e.messageId);
if (m != null)
m.enqueueEnd = e.end;
outboundSubmittedCount += 1;
}
break;
}
case FAILED_OVERLOADED:
{
// TODO: verify that we could have exceeded our memory limits
SimpleMessageEvent e = (SimpleMessageEvent) next;
assert nextMessageId == e.at;
MessageState m = remove(e.messageId, enqueueing, messages);
m.require(FAILED_OVERLOADED, this, ENQUEUE);
outboundOverloadedBytes += m.message.serializedSize(current_version);
outboundOverloadedCount += 1;
break;
}
case FAILED_CLOSING:
{
// TODO: verify if this is acceptable due to e.g. inbound refusing to process for long enough
SimpleMessageEvent e = (SimpleMessageEvent) next;
assert nextMessageId == e.at;
MessageState m = messages.remove(e.messageId); // definitely cannot have been sent (in theory)
enqueueing.remove(m);
m.require(FAILED_CLOSING, this, ENQUEUE);
fail("Invalid discard of %d: connection was closing for too long", m.message.id());
break;
}
case SERIALIZE:
{
// serialize happens serially, so we can compress the asynchronicity of the above enqueue
// into a linear sequence of events we expect to occur on arrival
SerializeMessageEvent e = (SerializeMessageEvent) next;
assert nextMessageId == e.at;
MessageState m = get(e);
assert m.is(ENQUEUE);
m.serialize = e.at;
m.messagingVersion = e.messagingVersion;
if (current_version != e.messagingVersion)
controller.adjust(m.message.serializedSize(current_version), m.message.serializedSize(e.messagingVersion));
m.processOnEventLoop = willProcessOnEventLoop(outbound.type(), m.message, e.messagingVersion);
m.expiresAtNanos = expiresAtNanos(m.message, e.messagingVersion);
int mi = enqueueing.indexOf(m);
for (int i = 0 ; i < mi ; ++i)
{
MessageState pm = enqueueing.get(i);
if (pm.enqueueEnd != 0 && pm.enqueueEnd < m.enqueueStart)
{
fail("Invalid order of events: %s enqueued strictly before %s, but serialized after",
pm, m);
}
}
enqueueing.remove(mi);
m.sentOn = currentConnection;
currentConnection.serializing.add(m);
m.update(e, now);
break;
}
case FINISH_SERIALIZE_LARGE:
{
// serialize happens serially, so we can compress the asynchronicity of the above enqueue
// into a linear sequence of events we expect to occur on arrival
SimpleMessageEvent e = (SimpleMessageEvent) next;
assert nextMessageId == e.at;
MessageState m = maybeRemove(e);
outboundSentBytes += m.messageSize();
outboundSentCount += 1;
m.sentOn.serializing.remove(m);
m.update(e, now);
break;
}
case FAILED_SERIALIZE:
{
FailedSerializeEvent e = (FailedSerializeEvent) next;
assert nextMessageId == e.at;
MessageState m = maybeRemove(e);
if (outbound.type() == LARGE_MESSAGES)
assert e.failure instanceof InvalidSerializedSizeException || e.failure instanceof Connection.IntentionalIOException || e.failure instanceof Connection.IntentionalRuntimeException;
else
assert e.failure instanceof InvalidSerializedSizeException || e.failure instanceof Connection.IntentionalIOException || e.failure instanceof Connection.IntentionalRuntimeException || e.failure instanceof BufferOverflowException;
if (e.bytesWrittenToNetwork == 0) // TODO: use header size
messages.remove(m.message.id());
InvalidSerializedSizeException ex;
if (outbound.type() != LARGE_MESSAGES
|| !(e.failure instanceof InvalidSerializedSizeException)
|| ((ex = (InvalidSerializedSizeException) e.failure).expectedSize <= DEFAULT_BUFFER_SIZE && ex.actualSizeAtLeast <= DEFAULT_BUFFER_SIZE)
|| (ex.expectedSize > DEFAULT_BUFFER_SIZE && ex.actualSizeAtLeast < DEFAULT_BUFFER_SIZE))
{
assert e.bytesWrittenToNetwork == 0;
}
m.require(FAILED_SERIALIZE, this, SERIALIZE);
m.sentOn.serializing.remove(m);
if (m.destiny != Destiny.FAIL_TO_SERIALIZE)
fail("%s failed to serialize, but its destiny was to %s", m, m.destiny);
outboundErrorBytes += m.messageSize();
outboundErrorCount += 1;
m.update(e, now);
break;
}
case SEND_FRAME:
{
FrameEvent e = (FrameEvent) next;
assert nextMessageId == e.at;
int size = 0;
Frame frame = new Frame();
MessageState first = currentConnection.serializing.get(0);
int messagingVersion = first.messagingVersion;
for (int i = 0 ; i < e.messageCount ; ++i)
{
MessageState m = currentConnection.serializing.get(i);
size += m.message.serializedSize(m.messagingVersion);
if (m.messagingVersion != messagingVersion)
{
fail("Invalid sequence of events: %s encoded to same frame as %s",
m, first);
}
frame.add(m);
m.update(e, now);
assert !m.doneSend;
m.doneSend = true;
if (m.doneReceive)
messages.remove(m.message.id());
}
frame.payloadSizeInBytes = e.payloadSizeInBytes;
frame.messageCount = e.messageCount;
frame.messagingVersion = messagingVersion;
currentConnection.framesInFlight.add(frame);
currentConnection.serializing.removeFirst(e.messageCount);
if (e.payloadSizeInBytes != size)
fail("Invalid frame payload size with %s: expected %d, actual %d", first, size, e.payloadSizeInBytes);
break;
}
case SENT_FRAME:
{
Frame frame = currentConnection.framesInFlight.supplySendStatus(Frame.Status.SUCCESS);
frame.forEach(m -> m.update((SimpleEvent) next, now));
outboundSentBytes += frame.payloadSizeInBytes;
outboundSentCount += frame.messageCount;
break;
}
case FAILED_FRAME:
{
// TODO: is it possible for this to be signalled AFTER our reconnect event? probably, in which case this will be wrong
// TODO: verify that this was expected
Frame frame = currentConnection.framesInFlight.supplySendStatus(Frame.Status.FAILED);
frame.forEach(m -> m.update((SimpleEvent) next, now));
if (frame.messagingVersion >= VERSION_40)
{
// the contents cannot be delivered without the whole frame arriving, so clear the contents now
clear(frame, messages);
currentConnection.framesInFlight.remove(frame);
}
outboundErrorBytes += frame.payloadSizeInBytes;
outboundErrorCount += frame.messageCount;
break;
}
case ARRIVE:
{
SimpleMessageEventWithSize e = (SimpleMessageEventWithSize) next;
assert nextMessageId == e.at;
MessageState m = get(e);
m.arrive = e.at;
if (e.messageSize != m.messageSize())
fail("onArrived with invalid size for %s: %d vs %d", m, e.messageSize, m.messageSize());
if (outbound.type() == LARGE_MESSAGES)
{
m.require(ARRIVE, this, SERIALIZE, FAILED_SERIALIZE, FINISH_SERIALIZE_LARGE);
}
else
{
if (!m.is(SEND_FRAME, SENT_FRAME))
{
fail("Invalid order of events: %s arrived before being sent in a frame", m);
break;
}
int fi = -1, mi = -1;
while (fi + 1 < m.sentOn.framesInFlight.size() && mi < 0)
mi = m.sentOn.framesInFlight.get(++fi).indexOf(m);
if (fi == m.sentOn.framesInFlight.size())
{
fail("Invalid state: %s, but no frame in flight was found to contain it", m);
break;
}
if (fi > 0)
{
// we have skipped over some frames, meaning these have either failed (and we know it)
// or we have not yet heard about them and they have presumably failed, or something
// has gone wrong
fail("BEGIN: Successfully sent frames were not delivered");
for (int i = 0 ; i < fi ; ++i)
{
Frame skip = m.sentOn.framesInFlight.get(i);
skip.receiveStatus = Frame.Status.FAILED;
if (skip.sendStatus == Frame.Status.SUCCESS)
{
failinfo("Frame %s", skip);
for (int j = 0 ; j < skip.size() ; ++j)
failinfo("Containing: %s", skip.get(j));
}
clear(skip, messages);
}
m.sentOn.framesInFlight.removeFirst(fi);
failinfo("END: Successfully sent frames were not delivered");
}
Frame frame = m.sentOn.framesInFlight.get(0);
for (int i = 0; i < mi; ++i)
fail("Invalid order of events: %s serialized strictly before %s, but arrived after", frame.get(i), m);
frame.remove(mi);
if (frame.isEmpty())
m.sentOn.framesInFlight.poll();
}
m.sentOn.arriving.add(m);
m.update(e, now);
break;
}
case DESERIALIZE:
{
// deserialize may happen in parallel for large messages, but in sequence for small messages
// we currently require that this event be issued before any possible error is thrown
SimpleMessageEvent e = (SimpleMessageEvent) next;
assert nextMessageId == e.at;
MessageState m = get(e);
m.require(DESERIALIZE, this, ARRIVE);
m.deserialize = e.at;
// deserialize may be off-loaded, so we can only impose meaningful ordering constraints
// on those messages we know to have been processed on the event loop
int mi = m.sentOn.arriving.indexOf(m);
if (m.processOnEventLoop)
{
for (int i = 0 ; i < mi ; ++i)
{
MessageState pm = m.sentOn.arriving.get(i);
if (pm.processOnEventLoop)
{
fail("Invalid order of events: %d (%d, %d) arrived strictly before %d (%d, %d), but deserialized after",
pm.message.id(), pm.arrive, pm.deserialize, m.message.id(), m.arrive, m.deserialize);
}
}
m.sentOn.deserializingOnEventLoop.add(m);
}
else
{
m.sentOn.deserializingOffEventLoop.add(m);
}
m.sentOn.arriving.remove(mi);
m.update(e, now);
break;
}
case CLOSED_BEFORE_ARRIVAL:
{
SimpleMessageEventWithSize e = (SimpleMessageEventWithSize) next;
assert nextMessageId == e.at;
MessageState m = maybeRemove(e);
if (e.messageSize != m.messageSize())
fail("onClosedBeforeArrival has invalid size for %s: %d vs %d", m, e.messageSize, m.messageSize());
m.sentOn.deserializingOffEventLoop.remove(m);
if (m.destiny == Destiny.FAIL_TO_SERIALIZE && outbound.type() == LARGE_MESSAGES)
break;
fail("%s closed before arrival, but its destiny was to %s", m, m.destiny);
break;
}
case FAILED_DESERIALIZE:
{
SimpleMessageEventWithSize e = (SimpleMessageEventWithSize) next;
assert nextMessageId == e.at;
MessageState m = maybeRemove(e);
if (e.messageSize != m.messageSize())
fail("onFailedDeserialize has invalid size for %s: %d vs %d", m, e.messageSize, m.messageSize());
m.require(FAILED_DESERIALIZE, this, ARRIVE, DESERIALIZE);
(m.processOnEventLoop ? m.sentOn.deserializingOnEventLoop : m.sentOn.deserializingOffEventLoop).remove(m);
switch (m.destiny)
{
case FAIL_TO_DESERIALIZE:
break;
case FAIL_TO_SERIALIZE:
if (outbound.type() == LARGE_MESSAGES)
break;
default:
fail("%s failed to deserialize, but its destiny was to %s", m, m.destiny);
}
break;
}
case PROCESS:
{
ProcessMessageEvent e = (ProcessMessageEvent) next;
assert nextMessageId == e.at;
MessageState m = maybeRemove(e);
m.require(PROCESS, this, DESERIALIZE);
if (!Arrays.equals((byte[]) e.message.payload, (byte[]) m.message.payload))
{
fail("Invalid message payload for %d: %s supplied by processor, but %s implied by original message and messaging version",
e.messageId, Arrays.toString((byte[]) e.message.payload), Arrays.toString((byte[]) m.message.payload));
}
if (m.processOutOfOrder)
{
assert !m.processOnEventLoop; // will have already been reported small (processOnEventLoop) messages
processingOutOfOrder.remove(m);
}
else if (m.processOnEventLoop)
{
// we can expect that processing happens sequentially in this case, more specifically
// we can actually expect that this event will occur _immediately_ after the deserialize event
// so that we have exactly one mess
// c
int mi = m.sentOn.deserializingOnEventLoop.indexOf(m);
for (int i = 0 ; i < mi ; ++i)
{
MessageState pm = m.sentOn.deserializingOnEventLoop.get(i);
fail("Invalid order of events: %s deserialized strictly before %s, but processed after",
pm, m);
}
clearFirst(mi, m.sentOn.deserializingOnEventLoop, messages);
m.sentOn.deserializingOnEventLoop.poll();
}
else
{
int mi = m.sentOn.deserializingOffEventLoop.indexOf(m);
// process may be off-loaded, so we can only impose meaningful ordering constraints
// on those messages we know to have been processed on the event loop
for (int i = 0 ; i < mi ; ++i)
{
MessageState pm = m.sentOn.deserializingOffEventLoop.get(i);
pm.processOutOfOrder = true;
processingOutOfOrder.add(pm);
}
m.sentOn.deserializingOffEventLoop.removeFirst(mi + 1);
}
// this message has been fully validated
break;
}
case FAILED_EXPIRED_ON_SEND:
case FAILED_EXPIRED_ON_RECEIVE:
{
ExpiredMessageEvent e = (ExpiredMessageEvent) next;
assert nextMessageId == e.at;
MessageState m;
switch (e.expirationType)
{
case ON_SENT:
{
m = messages.remove(e.messageId);
m.require(e.type, this, ENQUEUE);
outboundExpiredBytes += m.message.serializedSize(current_version);
outboundExpiredCount += 1;
messages.remove(m.message.id());
break;
}
case ON_ARRIVED:
m = maybeRemove(e);
if (!m.is(ARRIVE))
{
if (outbound.type() != LARGE_MESSAGES) m.require(e.type, this, SEND_FRAME, SENT_FRAME, FAILED_FRAME);
else m.require(e.type, this, SERIALIZE, FAILED_SERIALIZE, FINISH_SERIALIZE_LARGE);
}
break;
case ON_PROCESSED:
m = maybeRemove(e);
m.require(e.type, this, DESERIALIZE);
break;
default:
throw new IllegalStateException();
}
now = System.nanoTime();
if (m.expiresAtNanos > now)
{
// we fix the conversion AlmostSameTime for an entire run, which should suffice to guarantee these comparisons
fail("Invalid expiry of %d: expiry should occur in %dms; event believes %dms have elapsed, and %dms have actually elapsed", m.message.id(),
NANOSECONDS.toMillis(m.expiresAtNanos - m.message.createdAtNanos()),
e.timeUnit.toMillis(e.timeElapsed),
NANOSECONDS.toMillis(now - m.message.createdAtNanos()));
}
switch (e.expirationType)
{
case ON_SENT:
enqueueing.remove(m);
break;
case ON_ARRIVED:
if (m.is(ARRIVE))
m.sentOn.arriving.remove(m);
switch (m.sendState.type)
{
case SEND_FRAME:
case SENT_FRAME:
case FAILED_FRAME:
// TODO: this should be robust to re-ordering; should perhaps extract a common method
m.sentOn.framesInFlight.get(0).remove(m);
if (m.sentOn.framesInFlight.get(0).isEmpty())
m.sentOn.framesInFlight.poll();
break;
}
break;
case ON_PROCESSED:
(m.processOnEventLoop ? m.sentOn.deserializingOnEventLoop : m.sentOn.deserializingOffEventLoop).remove(m);
break;
}
if (m.messagingVersion != 0 && e.messageSize != m.messageSize())
fail("onExpired %s with invalid size for %s: %d vs %d", e.expirationType, m, e.messageSize, m.messageSize());
break;
}
case CONTROLLER_UPDATE:
{
break;
}
case CONNECT_OUTBOUND:
{
ConnectOutboundEvent e = (ConnectOutboundEvent) next;
currentConnection = new ConnectionState(connectionCounter++, e.messagingVersion);
break;
}
case SYNC:
{
sync = (SyncEvent) next;
break;
}
default:
throw new IllegalStateException();
}
++nextMessageId;
}
}
catch (InterruptedException e)
{
}
catch (Throwable t)
{
logger.error("Unexpected error:", t);
onFailure.run();
}
}
private MessageState get(SimpleMessageEvent onEvent)
{
MessageState m = messages.get(onEvent.messageId);
if (m == null)
throw new IllegalStateException("Missing " + onEvent + ": " + onEvent.messageId);
return m;
}
private MessageState maybeRemove(SimpleMessageEvent onEvent)
{
return maybeRemove(onEvent.messageId, onEvent.type, onEvent);
}
private MessageState maybeRemove(long messageId, EventType onEvent)
{
return maybeRemove(messageId, onEvent, onEvent);
}
private MessageState maybeRemove(long messageId, EventType onEvent, Object id)
{
MessageState m = messages.get(messageId);
if (m == null)
throw new IllegalStateException("Missing " + id + ": " + messageId);
switch (onEvent.category)
{
case SEND:
if (m.doneSend)
fail("%s already doneSend %s", onEvent, m);
m.doneSend = true;
if (m.doneReceive) messages.remove(messageId);
break;
case RECEIVE:
if (m.doneReceive)
fail("%s already doneReceive %s", onEvent, m);
m.doneReceive = true;
if (m.doneSend) messages.remove(messageId);
}
return m;
}
private static class Frame extends Queue<MessageState>
{
enum Status { SUCCESS, FAILED, UNKNOWN }
Status sendStatus = Status.UNKNOWN, receiveStatus = Status.UNKNOWN;
int messagingVersion;
int messageCount;
int payloadSizeInBytes;
public String toString()
{
return String.format("{count:%d, size:%d, version:%d, send:%s, receive:%s}",
messageCount, payloadSizeInBytes, messagingVersion, sendStatus, receiveStatus);
}
}
private static MessageState remove(long messageId, Queue<MessageState> queue, LongObjectHashMap<MessageState> lookup)
{
MessageState m = lookup.remove(messageId);
queue.remove(m);
return m;
}
private static void clearFirst(int count, Queue<MessageState> queue, LongObjectHashMap<MessageState> lookup)
{
if (count > 0)
{
for (int i = 0 ; i < count ; ++i)
lookup.remove(queue.get(i).message.id());
queue.removeFirst(count);
}
}
private static void clear(Queue<MessageState> queue, LongObjectHashMap<MessageState> lookup)
{
if (!queue.isEmpty())
clearFirst(queue.size(), queue, lookup);
}
private static class EventSequence
{
static final int CHUNK_SIZE = 1 << 10;
static class Chunk extends AtomicReferenceArray<Event>
{
final long sequenceId;
int removed = 0;
Chunk(long sequenceId)
{
super(CHUNK_SIZE);
this.sequenceId = sequenceId;
}
Event get(long sequenceId)
{
return get((int)(sequenceId - this.sequenceId));
}
void set(long sequenceId, Event event)
{
lazySet((int)(sequenceId - this.sequenceId), event);
}
}
// we use a concurrent skip list to permit efficient searching, even if we always append
final ConcurrentSkipListMap<Long, Chunk> chunkList = new ConcurrentSkipListMap<>();
final WaitQueue writerWaiting = new WaitQueue();
volatile Chunk writerChunk = new Chunk(0);
Chunk readerChunk = writerChunk;
long readerWaitingFor;
volatile Thread readerWaiting;
EventSequence()
{
chunkList.put(0L, writerChunk);
}
public void put(long sequenceId, Event event)
{
long chunkSequenceId = sequenceId & -CHUNK_SIZE;
Chunk chunk = writerChunk;
if (chunk.sequenceId != chunkSequenceId)
{
try
{
chunk = ensureChunk(chunkSequenceId);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}
chunk.set(sequenceId, event);
Thread wake = readerWaiting;
long wakeIf = readerWaitingFor; // we are guarded by the above volatile read
if (wake != null && wakeIf == sequenceId)
LockSupport.unpark(wake);
}
Chunk ensureChunk(long chunkSequenceId) throws InterruptedException
{
Chunk chunk = chunkList.get(chunkSequenceId);
if (chunk == null)
{
Map.Entry<Long, Chunk> e;
while ( null != (e = chunkList.firstEntry()) && chunkSequenceId - e.getKey() > 1 << 12)
{
WaitQueue.Signal signal = writerWaiting.register();
if (null != (e = chunkList.firstEntry()) && chunkSequenceId - e.getKey() > 1 << 12)
signal.await();
else
signal.cancel();
}
chunk = chunkList.get(chunkSequenceId);
if (chunk == null)
{
synchronized (this)
{
chunk = chunkList.get(chunkSequenceId);
if (chunk == null)
chunkList.put(chunkSequenceId, chunk = new Chunk(chunkSequenceId));
}
}
}
return chunk;
}
Chunk readerChunk(long readerId) throws InterruptedException
{
long chunkSequenceId = readerId & -CHUNK_SIZE;
if (readerChunk.sequenceId != chunkSequenceId)
readerChunk = ensureChunk(chunkSequenceId);
return readerChunk;
}
public Event await(long id, long timeout, TimeUnit unit) throws InterruptedException
{
return await(id, System.nanoTime() + unit.toNanos(timeout));
}
public Event await(long id, long deadlineNanos) throws InterruptedException
{
Chunk chunk = readerChunk(id);
Event result = chunk.get(id);
if (result != null)
return result;
readerWaitingFor = id;
readerWaiting = Thread.currentThread();
while (null == (result = chunk.get(id)))
{
long waitNanos = deadlineNanos - System.nanoTime();
if (waitNanos <= 0)
return null;
LockSupport.parkNanos(waitNanos);
if (Thread.interrupted())
throw new InterruptedException();
}
readerWaitingFor = -1;
readerWaiting = null;
return result;
}
public Event find(long sequenceId)
{
long chunkSequenceId = sequenceId & -CHUNK_SIZE;
Chunk chunk = readerChunk;
if (chunk.sequenceId != chunkSequenceId)
{
chunk = writerChunk;
if (chunk.sequenceId != chunkSequenceId)
chunk = chunkList.get(chunkSequenceId);
}
return chunk.get(sequenceId);
}
public void clear(long sequenceId)
{
long chunkSequenceId = sequenceId & -CHUNK_SIZE;
Chunk chunk = chunkList.get(chunkSequenceId);
chunk.set(sequenceId, null);
if (++chunk.removed == CHUNK_SIZE)
{
chunkList.remove(chunkSequenceId);
writerWaiting.signalAll();
}
}
}
static class Queue<T>
{
private Object[] items = new Object[10];
private int begin, end;
int size()
{
return end - begin;
}
T get(int i)
{
//noinspection unchecked
return (T) items[i + begin];
}
int indexOf(T item)
{
for (int i = begin ; i < end ; ++i)
{
if (item == items[i])
return i - begin;
}
return -1;
}
void remove(T item)
{
int i = indexOf(item);
if (i >= 0)
remove(i);
}
void remove(int i)
{
i += begin;
assert i < end;
if (i == begin || i + 1 == end)
{
items[i] = null;
if (begin + 1 == end) begin = end = 0;
else if (i == begin) ++begin;
else --end;
}
else if (i - begin < end - i)
{
System.arraycopy(items, begin, items, begin + 1, i - begin);
items[begin++] = null;
}
else
{
System.arraycopy(items, i + 1, items, i, (end - 1) - i);
items[--end] = null;
}
}
void add(T item)
{
if (end == items.length)
{
Object[] src = items;
Object[] trg;
if (end - begin < src.length / 2) trg = src;
else trg = new Object[src.length * 2];
System.arraycopy(src, begin, trg, 0, end - begin);
end -= begin;
begin = 0;
items = trg;
}
items[end++] = item;
}
void clear()
{
Arrays.fill(items, begin, end, null);
begin = end = 0;
}
void removeFirst(int count)
{
Arrays.fill(items, begin, begin + count, null);
begin += count;
if (begin == end)
begin = end = 0;
}
T poll()
{
if (begin == end)
return null;
//noinspection unchecked
T result = (T) items[begin];
items[begin++] = null;
if (begin == end)
begin = end = 0;
return result;
}
void forEach(Consumer<T> consumer)
{
for (int i = 0 ; i < size() ; ++i)
consumer.accept(get(i));
}
boolean isEmpty()
{
return begin == end;
}
public String toString()
{
StringBuilder result = new StringBuilder();
result.append('[');
toString(result);
result.append(']');
return result.toString();
}
void toString(StringBuilder out)
{
for (int i = 0 ; i < size() ; ++i)
{
if (i > 0) out.append(", ");
out.append(get(i));
}
}
}
static class FramesInFlight
{
// this may be negative, indicating we have processed a frame whose status we did not know at the time
// TODO: we should verify the status of these frames by logging the inferred status and verifying it matches
final Queue<Frame> inFlight = new Queue<>();
final Queue<Frame> retiredWithoutStatus = new Queue<>();
private int withStatus;
Frame supplySendStatus(Frame.Status status)
{
Frame frame;
if (withStatus >= 0) frame = inFlight.get(withStatus);
else frame = retiredWithoutStatus.poll();
assert frame.sendStatus == Frame.Status.UNKNOWN;
frame.sendStatus = status;
++withStatus;
return frame;
}
boolean isEmpty()
{
return inFlight.isEmpty();
}
int size()
{
return inFlight.size();
}
Frame get(int i)
{
return inFlight.get(i);
}
void add(Frame frame)
{
assert frame.sendStatus == Frame.Status.UNKNOWN;
inFlight.add(frame);
}
void remove(Frame frame)
{
int i = inFlight.indexOf(frame);
if (i > 0) throw new IllegalStateException();
if (i == 0) poll();
}
void removeFirst(int count)
{
while (count-- > 0)
poll();
}
Frame poll()
{
Frame frame = inFlight.poll();
if (--withStatus < 0)
{
assert frame.sendStatus == Frame.Status.UNKNOWN;
retiredWithoutStatus.add(frame);
}
else
assert frame.sendStatus != Frame.Status.UNKNOWN;
return frame;
}
public String toString()
{
StringBuilder result = new StringBuilder();
result.append("[withStatus=");
result.append(withStatus);
result.append("; ");
inFlight.toString(result);
result.append("; ");
retiredWithoutStatus.toString(result);
result.append(']');
return result.toString();
}
}
private static boolean willProcessOnEventLoop(ConnectionType type, Message<?> message, int messagingVersion)
{
int size = message.serializedSize(messagingVersion);
if (type == ConnectionType.SMALL_MESSAGES && messagingVersion >= VERSION_40)
return size <= LARGE_MESSAGE_THRESHOLD;
else if (messagingVersion >= VERSION_40)
return size <= DEFAULT_BUFFER_SIZE;
else
return size <= LARGE_MESSAGE_THRESHOLD;
}
private static long expiresAtNanos(Message<?> message, int messagingVersion)
{
return messagingVersion < VERSION_40 ? message.verb().expiresAtNanos(message.createdAtNanos())
: message.expiresAtNanos();
}
}