blob: c102ee1a42570d9448d8474cf2e062232083343a [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.metrics.InternodeOutboundMetrics;
import org.apache.cassandra.service.AbstractWriteResponseHandler;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.utils.FBUtilities;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.apache.cassandra.concurrent.Stage.INTERNAL_RESPONSE;
import static org.apache.cassandra.utils.MonotonicClock.preciseTime;
* An expiring map of request callbacks.
* Used to match response (id, peer) pairs to corresponding {@link RequestCallback}s, or, if said responses
* don't arrive in a timely manner (within verb's timeout), to expire the callbacks.
* Since we reuse the same request id for multiple messages now, the map is keyed by (id, peer) tuples
* rather than just id as it used to before 4.0.
public class RequestCallbacks implements OutboundMessageCallbacks
private static final Logger logger = LoggerFactory.getLogger(RequestCallbacks.class);
private final MessagingService messagingService;
private final ScheduledExecutorService executor = new DebuggableScheduledThreadPoolExecutor("Callback-Map-Reaper");
private final ConcurrentMap<CallbackKey, CallbackInfo> callbacks = new ConcurrentHashMap<>();
RequestCallbacks(MessagingService messagingService)
this.messagingService = messagingService;
long expirationInterval = DatabaseDescriptor.getMinRpcTimeout(NANOSECONDS) / 2;
executor.scheduleWithFixedDelay(this::expire, expirationInterval, expirationInterval, NANOSECONDS);
* @return the registered {@link CallbackInfo} for this id and peer, or {@code null} if unset or expired.
CallbackInfo get(long id, InetAddressAndPort peer)
return callbacks.get(key(id, peer));
* Remove and return the {@link CallbackInfo} associated with given id and peer, if known.
CallbackInfo remove(long id, InetAddressAndPort peer)
return callbacks.remove(key(id, peer));
* Register the provided {@link RequestCallback}, inferring expiry and id from the provided {@link Message}.
void addWithExpiration(RequestCallback cb, Message message, InetAddressAndPort to)
// mutations need to call the overload with a ConsistencyLevel
assert message.verb() != Verb.MUTATION_REQ && message.verb() != Verb.COUNTER_MUTATION_REQ && message.verb() != Verb.PAXOS_COMMIT_REQ;
CallbackInfo previous = callbacks.put(key(, to), new CallbackInfo(message, to, cb));
assert previous == null : format("Callback already exists for id %d/%s! (%s)",, to, previous);
// FIXME: shouldn't need a special overload for writes; hinting should be part of AbstractWriteResponseHandler
public void addWithExpiration(AbstractWriteResponseHandler<?> cb,
Message<?> message,
Replica to,
ConsistencyLevel consistencyLevel,
boolean allowHints)
assert message.verb() == Verb.MUTATION_REQ || message.verb() == Verb.COUNTER_MUTATION_REQ || message.verb() == Verb.PAXOS_COMMIT_REQ;
CallbackInfo previous = callbacks.put(key(, to.endpoint()), new WriteCallbackInfo(message, to, cb, consistencyLevel, allowHints));
assert previous == null : format("Callback already exists for id %d/%s! (%s)",, to.endpoint(), previous);
<In,Out> IVersionedAsymmetricSerializer<In, Out> responseSerializer(long id, InetAddressAndPort peer)
CallbackInfo info = get(id, peer);
return info == null ? null : info.responseVerb.serializer();
public void removeAndRespond(long id, InetAddressAndPort peer, Message message)
CallbackInfo ci = remove(id, peer);
if (null != ci) ci.callback.onResponse(message);
private void removeAndExpire(long id, InetAddressAndPort peer)
CallbackInfo ci = remove(id, peer);
if (null != ci) onExpired(ci);
private void expire()
long start =;
int n = 0;
for (Map.Entry<CallbackKey, CallbackInfo> entry : callbacks.entrySet())
if (entry.getValue().isReadyToDieAt(start))
if (callbacks.remove(entry.getKey(), entry.getValue()))
logger.trace("Expired {} entries", n);
private void forceExpire()
for (Map.Entry<CallbackKey, CallbackInfo> entry : callbacks.entrySet())
if (callbacks.remove(entry.getKey(), entry.getValue()))
private void onExpired(CallbackInfo info)
messagingService.latencySubscribers.maybeAdd(info.callback, info.peer, info.timeout(), NANOSECONDS);
if (info.invokeOnFailure())
INTERNAL_RESPONSE.submit(() -> info.callback.onFailure(info.peer, RequestFailureReason.TIMEOUT));
// FIXME: this has never belonged here, should be part of onFailure() in AbstractWriteResponseHandler
if (info.shouldHint())
WriteCallbackInfo writeCallbackInfo = ((WriteCallbackInfo) info);
Mutation mutation = writeCallbackInfo.mutation();
StorageProxy.submitHint(mutation, writeCallbackInfo.getReplica(), null);
void shutdownNow(boolean expireCallbacks)
if (expireCallbacks)
void shutdownGracefully()
if (!callbacks.isEmpty())
executor.schedule(this::shutdownGracefully, 100L, MILLISECONDS);
void awaitTerminationUntil(long deadlineNanos) throws TimeoutException, InterruptedException
if (!executor.isTerminated())
long wait = deadlineNanos - System.nanoTime();
if (wait <= 0 || !executor.awaitTermination(wait, NANOSECONDS))
throw new TimeoutException();
public void unsafeClear()
private static CallbackKey key(long id, InetAddressAndPort peer)
return new CallbackKey(id, peer);
private static class CallbackKey
final long id;
final InetAddressAndPort peer;
CallbackKey(long id, InetAddressAndPort peer)
{ = id;
this.peer = peer;
public boolean equals(Object o)
if (!(o instanceof CallbackKey))
return false;
CallbackKey that = (CallbackKey) o;
return == && this.peer.equals(that.peer);
public int hashCode()
return Long.hashCode(id) + 31 * peer.hashCode();
public String toString()
return "{id:" + id + ", peer:" + peer + '}';
static class CallbackInfo
final long createdAtNanos;
final long expiresAtNanos;
final InetAddressAndPort peer;
final RequestCallback callback;
@Deprecated // for 3.0 compatibility purposes only
public final Verb responseVerb;
private CallbackInfo(Message message, InetAddressAndPort peer, RequestCallback callback)
this.createdAtNanos = message.createdAtNanos();
this.expiresAtNanos = message.expiresAtNanos();
this.peer = peer;
this.callback = callback;
this.responseVerb = message.verb().responseVerb;
public long timeout()
return expiresAtNanos - createdAtNanos;
boolean isReadyToDieAt(long atNano)
return atNano > expiresAtNanos;
boolean shouldHint()
return false;
boolean invokeOnFailure()
return callback.invokeOnFailure();
public String toString()
return "{peer:" + peer + ", callback:" + callback + ", invokeOnFailure:" + invokeOnFailure() + '}';
// FIXME: shouldn't need a specialized container for write callbacks; hinting should be part of
// AbstractWriteResponseHandler implementation.
static class WriteCallbackInfo extends CallbackInfo
// either a Mutation, or a Paxos Commit (MessageOut)
private final Object mutation;
private final Replica replica;
WriteCallbackInfo(Message message, Replica replica, RequestCallback<?> callback, ConsistencyLevel consistencyLevel, boolean allowHints)
super(message, replica.endpoint(), callback);
this.mutation = shouldHint(allowHints, message, consistencyLevel) ? message.payload : null;
//Local writes shouldn't go through messaging service (
//noinspection AssertWithSideEffects
assert !peer.equals(FBUtilities.getBroadcastAddressAndPort());
this.replica = replica;
public boolean shouldHint()
return mutation != null && StorageProxy.shouldHint(replica);
public Replica getReplica()
return replica;
public Mutation mutation()
return getMutation(mutation);
private static Mutation getMutation(Object object)
assert object instanceof Commit || object instanceof Mutation : object;
return object instanceof Commit ? ((Commit) object).makeMutation()
: (Mutation) object;
private static boolean shouldHint(boolean allowHints, Message sentMessage, ConsistencyLevel consistencyLevel)
return allowHints && sentMessage.verb() != Verb.COUNTER_MUTATION_REQ && consistencyLevel != ConsistencyLevel.ANY;
public void onOverloaded(Message<?> message, InetAddressAndPort peer)
removeAndExpire(message, peer);
public void onExpired(Message<?> message, InetAddressAndPort peer)
removeAndExpire(message, peer);
public void onFailedSerialize(Message<?> message, InetAddressAndPort peer, int messagingVersion, int bytesWrittenToNetwork, Throwable failure)
removeAndExpire(message, peer);
public void onDiscardOnClose(Message<?> message, InetAddressAndPort peer)
removeAndExpire(message, peer);
private void removeAndExpire(Message message, InetAddressAndPort peer)
removeAndExpire(, peer);
/* in case of a write sent to a different DC, also expire all forwarding targets */
ForwardingInfo forwardTo = message.forwardTo();
if (null != forwardTo)