blob: 6a3818482904a8d9102907dd451a85db6a4a2682 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.transport;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
import org.apache.cassandra.concurrent.DebuggableTask;
import org.apache.cassandra.concurrent.LocalAwareExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.reads.thresholds.CoordinatorWarnings;
import org.apache.cassandra.transport.ClientResourceLimits.Overload;
import org.apache.cassandra.transport.Flusher.FlushItem;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
public class Dispatcher implements CQLMessageHandler.MessageConsumer<Message.Request>
{
private static final Logger logger = LoggerFactory.getLogger(Dispatcher.class);
@VisibleForTesting
static final LocalAwareExecutorPlus requestExecutor = SHARED.newExecutor(DatabaseDescriptor.getNativeTransportMaxThreads(),
DatabaseDescriptor::setNativeTransportMaxThreads,
"transport",
"Native-Transport-Requests");
/** CASSANDRA-17812: Rate-limit new client connection setup to avoid overwhelming during bcrypt
*
* authExecutor is a separate thread pool for handling requests on connections that need to be authenticated.
* Calls to AUTHENTICATE can be expensive if the number of rounds for bcrypt is configured to a high value,
* so during a connection storm checking the password hash would starve existing connected clients for CPU and
* trigger timeouts if on the same thread pool as standard requests.
*
* Moving authentication requests to a small, separate pool prevents starvation handling all other
* requests. If the authExecutor pool backs up, it may cause authentication timeouts but the clients should
* back off and retry while the rest of the system continues to make progress.
*
* Setting less than 1 will service auth requests on the standard {@link Dispatcher#requestExecutor}
*/
@VisibleForTesting
static final LocalAwareExecutorPlus authExecutor = SHARED.newExecutor(Math.max(1, DatabaseDescriptor.getNativeTransportMaxAuthThreads()),
DatabaseDescriptor::setNativeTransportMaxAuthThreads,
"transport",
"Native-Transport-Auth-Requests");
private static final ConcurrentMap<EventLoop, Flusher> flusherLookup = new ConcurrentHashMap<>();
private final boolean useLegacyFlusher;
/**
* Takes a Channel, Request and the Response produced by processRequest and outputs a FlushItem
* appropriate for the pipeline, which is specific to the protocol version. V5 and above will
* produce FlushItem.Framed instances whereas earlier versions require FlushItem.Unframed.
* The instances of these FlushItem subclasses are specialized to release resources in the
* right way for the specific pipeline that produced them.
*/
// TODO parameterize with FlushItem subclass
interface FlushItemConverter
{
FlushItem<?> toFlushItem(Channel channel, Message.Request request, Message.Response response);
}
public Dispatcher(boolean useLegacyFlusher)
{
this.useLegacyFlusher = useLegacyFlusher;
}
@Override
public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure)
{
if (!request.connection().getTracker().isRunning())
{
// We can not respond with a custom, transport, or server exceptions since, given current implementation of clients,
// they will defunct the connection. Without a protocol version bump that introduces an "I am going away message",
// we have to stick to an existing error code.
Message.Response response = ErrorMessage.fromException(new OverloadedException("Server is shutting down"));
response.setStreamId(request.getStreamId());
response.setWarnings(ClientWarn.instance.getWarnings());
response.attach(request.connection);
FlushItem<?> toFlush = forFlusher.toFlushItem(channel, request, response);
flush(toFlush);
return;
}
// if native_transport_max_auth_threads is < 1, don't delegate to new pool on auth messages
boolean isAuthQuery = DatabaseDescriptor.getNativeTransportMaxAuthThreads() > 0 &&
(request.type == Message.Type.AUTH_RESPONSE || request.type == Message.Type.CREDENTIALS);
// Importantly, the authExecutor will handle the AUTHENTICATE message which may be CPU intensive.
LocalAwareExecutorPlus executor = isAuthQuery ? authExecutor : requestExecutor;
executor.submit(new RequestProcessor(channel, request, forFlusher, backpressure));
ClientMetrics.instance.markRequestDispatched();
}
public static class RequestTime
{
private final long enqueuedAtNanos;
private final long startedAtNanos;
public RequestTime(long createdAtNanos)
{
this(createdAtNanos, createdAtNanos);
}
public RequestTime(long enqueuedAtNanos, long startedAtNanos)
{
Preconditions.checkArgument(enqueuedAtNanos != -1);
this.enqueuedAtNanos = enqueuedAtNanos;
this.startedAtNanos = startedAtNanos;
}
public static RequestTime forImmediateExecution()
{
return new RequestTime(MonotonicClock.Global.preciseTime.now());
}
public RequestTime withStartedAt(long startedAtNanos)
{
return new RequestTime(enqueuedAtNanos, startedAtNanos);
}
public long startedAtNanos()
{
return startedAtNanos;
}
public long enqueuedAtNanos()
{
return enqueuedAtNanos;
}
/**
* Base time is used by timeouts, and can be set to either when the request was added to the queue,
* or when the processing has started, which is controlled by {@link DatabaseDescriptor#getCQLStartTime()}
*
* Since client read/write timeouts are usually aligned with server-side timeouts, it is desireable to use
* enqueue time as a base. However, since client removes the handler `readTimeoutMillis` (which is 12 seconds
* by default), the upper bound for any execution on the coordinator is 12 seconds (thanks to CASSANDRA-7392,
* any replica-side query is capped by the verb timeout), if REQUEST option is used. But even simply allowing
* such long timeouts also implicitly allows queues to grow large, since our queues are currently unbounded.
*
* Latency, however, is _always_ based on request processing time, since the amount of time that request spends
* in the queue is not a representative metric of replica performance.
*/
public long baseTimeNanos()
{
switch (DatabaseDescriptor.getCQLStartTime())
{
case REQUEST:
return startedAtNanos();
case QUEUE:
return enqueuedAtNanos();
default:
throw new IllegalArgumentException("Unknown start time: " + DatabaseDescriptor.getCQLStartTime());
}
}
/**
* Given the current time and a base timeout for the verb return a request's expiration deadline,
* the time at which it becomes eligible for load shedding.
* The two factors to consider are the per-verb and client timeouts. Both are calculated by subtracting the
* time already elapsed during the lifetime of the request from some base value.
*
* When deriving verb timeout, two alternative semantics are available. This timeout may represent either:
* * the total time available for a coordinator to process a client request and return its response
* * a time bound for a coordinator to send internode requests and gather responses from replicas
*
* The point from which elapsed time is measured here is configurable to accommodate these two different
* options. For the former, the clock starts when a client request is received and enqueued by the coordinator.
* For the latter, it starts when the request is dequeued by the coordinator and processing is started.
* See {@link #baseTimeNanos()} for details.
*
* The client timeout represents how long the sender of a request is prepared to wait for a response. By
* implication, after this window has passed any further work done on the server side is wasted effort. Ideally,
* the base for this timeout would be set on a per-request basis but as this not currently supported in the
* protocol, it is configured uniformly for all requests. See {@link DatabaseDescriptor#getNativeTransportTimeout}.
* For this calculation, elapsed time is always measured from the point when a request is received and enqueued.
*
* Where verb timeout is based on queue admission, deadline computation is straightforward. The expiration
* deadline is simply the current time plus the smaller of the verb and client timeouts.
*
* However, if verb timeout is based on when the request is dequeued, the implications are more nuanced.
* In this scenario, while there may still be "headroom" available within the verb timeout, using it could
* exceed the client timeout (which is always based on admission time).
*
* For example:
*
* * Client timeout base is 10 (cb), verb timeout base is 5 (vb)
* * Request is enqueued at t1 (e)
* * Request is dequeued at t8 (d)
* * computeDeadline is called at t9 (n)
*
* If verb timeout is based on dequeuing, there would still some time remaining before a verb-based deadline.
* elapsed = (n - d) ; 1
* timeout = (vb - elapsed) ; 4
* deadline = (n + timeout) ; t13
* ostensibly, the coordinator has until t13 to complete processing
*
* But as client timeout is measured from admission time, the request may exceeded the maximum wait period for
* the client sooner.
* elapsed = (n - e) ; 8
* timeout = (cb - elapsed) ; 2
* deadline = (n + timeout) ; t11
* So the coordinator actually only has until t11 to complete processing, beyond then the client will not accept
* any response.
*
* @param verbExpiresAfterNanos the base timeout value for the verb being executed
* @return the point in time after which no further processing should occur
*/
public long computeDeadline(long verbExpiresAfterNanos)
{
long clientDeadline = clientDeadline();
long verbDeadline = baseTimeNanos() + verbExpiresAfterNanos;
// Whichever one is closer
return Math.min(verbDeadline, clientDeadline);
}
public long computeTimeout(long now, long verbExpiresAfterNanos)
{
return computeDeadline(verbExpiresAfterNanos) - now;
}
/**
* No request should survive native request deadline, but in order to err on the side of caution, we have this
* swtich that allows hints to be submitted to mutation stage when cluster is potentially overloaded. Allowing
* hints to be not bound by deadline can exacerbate overload, but since there are also correctness implications,
* this seemed like a reasonable configuration option.
*/
public boolean shouldSendHints()
{
if (!DatabaseDescriptor.getEnforceNativeDeadlineForHints())
return true;
long now = MonotonicClock.Global.preciseTime.now();
long clientDeadline = clientDeadline();
return now < clientDeadline;
}
public long clientDeadline()
{
return enqueuedAtNanos() + DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS);
}
public long timeSpentInQueueNanos()
{
return startedAtNanos - enqueuedAtNanos;
}
}
/**
* It is important to keep this class an instance of {@link DebuggableTask}, either runnable or callable since this
* is the only way we can keep it not wrapped into a callable on SEPExecutor submission path. And we need this
* functionality for tracking time purposes.
*/
public class RequestProcessor implements DebuggableTask.RunnableDebuggableTask
{
private final Channel channel;
private final Message.Request request;
private final FlushItemConverter forFlusher;
private final Overload backpressure;
private volatile long startTimeNanos;
public RequestProcessor(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure)
{
this.channel = channel;
this.request = request;
this.forFlusher = forFlusher;
this.backpressure = backpressure;
}
@Override
public void run()
{
startTimeNanos = MonotonicClock.Global.preciseTime.now();
processRequest(channel, request, forFlusher, backpressure, new RequestTime(request.createdAtNanos, startTimeNanos));
}
@Override
public long creationTimeNanos()
{
return request.createdAtNanos;
}
@Override
public long startTimeNanos()
{
return startTimeNanos;
}
@Override
public String description()
{
return request.toString();
}
@Override
public String toString()
{
return "RequestProcessor{" +
"request=" + request +
", approxStartTimeNanos=" + startTimeNanos +
'}';
}
}
/**
* Checks if the item in the head of the queue has spent more than allowed time in the queue.
*/
@Override
public boolean hasQueueCapacity()
{
double threshold = DatabaseDescriptor.getNativeTransportQueueMaxItemAgeThreshold();
if (threshold <= 0)
return true;
return requestExecutor.oldestTaskQueueTime() < (DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS) * threshold);
}
/**
* Note: this method may be executed on the netty event loop, during initial protocol negotiation; the caller is
* responsible for cleaning up any global or thread-local state. (ex. tracing, client warnings, etc.).
*/
private static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure, RequestTime requestTime)
{
long queueTime = requestTime.timeSpentInQueueNanos();
// If we have already crossed the max timeout for all possible RPCs, we time out the query immediately.
// We do not differentiate between query types here, since if we got into a situation when, say, we have a PREPARE
// query that is stuck behind the EXECUTE query, we would rather time it out and catch up with a backlog, expecting
// that the bursts are going to be short-lived.
ClientMetrics.instance.queueTime(queueTime, TimeUnit.NANOSECONDS);
if (queueTime > DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.NANOSECONDS))
{
ClientMetrics.instance.markTimedOutBeforeProcessing();
return ErrorMessage.fromException(new OverloadedException("Query timed out before it could start"));
}
if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
ClientWarn.instance.captureWarnings();
// even if ClientWarn is disabled, still setup CoordinatorTrackWarnings, as this will populate metrics and
// emit logs on the server; the warnings will just be ignored and not sent to the client
if (request.isTrackable())
CoordinatorWarnings.init();
switch (backpressure)
{
case NONE:
break;
case REQUESTS:
{
String message = String.format("Request breached global limit of %d requests/second and triggered backpressure.",
ClientResourceLimits.getNativeTransportMaxRequestsPerSecond());
NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, TimeUnit.MINUTES, message);
ClientWarn.instance.warn(message);
break;
}
case BYTES_IN_FLIGHT:
{
String message = String.format("Request breached limit(s) on bytes in flight (Endpoint: %d, Global: %d) and triggered backpressure.",
ClientResourceLimits.getEndpointLimit(), ClientResourceLimits.getGlobalLimit());
NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, TimeUnit.MINUTES, message);
ClientWarn.instance.warn(message);
break;
}
case QUEUE_TIME:
{
String message = String.format("Request has spent over %s time of the maximum timeout %dms in the queue",
DatabaseDescriptor.getNativeTransportQueueMaxItemAgeThreshold(),
DatabaseDescriptor.getNativeTransportTimeout(TimeUnit.MILLISECONDS));
NoSpamLogger.log(logger, NoSpamLogger.Level.INFO, 1, TimeUnit.MINUTES, message);
ClientWarn.instance.warn(message);
break;
}
}
QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion());
Message.logger.trace("Received: {}, v={}", request, connection.getVersion());
connection.requests.inc();
Message.Response response = request.execute(qstate, requestTime);
if (request.isTrackable())
CoordinatorWarnings.done();
response.setStreamId(request.getStreamId());
response.setWarnings(ClientWarn.instance.getWarnings());
response.attach(connection);
connection.applyStateTransition(request.type, response.type);
return response;
}
/**
* Note: this method may be executed on the netty event loop.
*/
static Message.Response processRequest(Channel channel, Message.Request request, Overload backpressure, RequestTime requestTime)
{
try
{
return processRequest((ServerConnection) request.connection(), request, backpressure, requestTime);
}
catch (Throwable t)
{
JVMStabilityInspector.inspectThrowable(t);
if (request.isTrackable())
CoordinatorWarnings.done();
Predicate<Throwable> handler = ExceptionHandlers.getUnexpectedExceptionHandler(channel, true);
ErrorMessage error = ErrorMessage.fromException(t, handler);
error.setStreamId(request.getStreamId());
error.setWarnings(ClientWarn.instance.getWarnings());
return error;
}
finally
{
CoordinatorWarnings.reset();
ClientWarn.instance.resetWarnings();
}
}
/**
* Note: this method is not expected to execute on the netty event loop.
*/
void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure, RequestTime requestTime)
{
Message.Response response = processRequest(channel, request, backpressure, requestTime);
FlushItem<?> toFlush = forFlusher.toFlushItem(channel, request, response);
Message.logger.trace("Responding: {}, v={}", response, request.connection().getVersion());
flush(toFlush);
}
private void flush(FlushItem<?> item)
{
EventLoop loop = item.channel.eventLoop();
Flusher flusher = flusherLookup.get(loop);
if (flusher == null)
{
Flusher created = useLegacyFlusher ? Flusher.legacy(loop) : Flusher.immediate(loop);
Flusher alt = flusherLookup.putIfAbsent(loop, flusher = created);
if (alt != null)
flusher = alt;
}
flusher.enqueue(item);
flusher.start();
}
public boolean isDone()
{
return requestExecutor.getPendingTaskCount() == 0 && requestExecutor.getActiveTaskCount() == 0;
}
public static void shutdown()
{
requestExecutor.shutdown();
authExecutor.shutdown();
}
/**
* Dispatcher for EventMessages. In {@link Server.ConnectionTracker#send(Event)}, the strategy
* for delivering events to registered clients is dependent on protocol version and the configuration
* of the pipeline. For v5 and newer connections, the event message is encoded into an Envelope,
* wrapped in a FlushItem and then delivered via the pipeline's flusher, in a similar way to
* a Response returned from {@link #processRequest(Channel, Message.Request, FlushItemConverter, Overload, RequestTime)}.
* It's worth noting that events are not generally fired as a direct response to a client request,
* so this flush item has a null request attribute. The dispatcher itself is created when the
* pipeline is first configured during protocol negotiation and is attached to the channel for
* later retrieval.
*
* Pre-v5 connections simply write the EventMessage directly to the pipeline.
*/
static final AttributeKey<Consumer<EventMessage>> EVENT_DISPATCHER = AttributeKey.valueOf("EVTDISP");
Consumer<EventMessage> eventDispatcher(final Channel channel,
final ProtocolVersion version,
final FrameEncoder.PayloadAllocator allocator)
{
return eventMessage -> flush(new FlushItem.Framed(channel,
eventMessage.encode(version),
null,
allocator,
f -> f.response.release()));
}
}