blob: cec8edd148e3890c955d7ac9ea31ba3014fc8f8d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.transport;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.ResourceLimits;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.utils.JVMStabilityInspector;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.Throwables;
import static org.apache.cassandra.transport.Message.logger;
public class PreV5Handlers
{
/**
* Wraps an {@link org.apache.cassandra.transport.Dispatcher} so that it can be used as an
* channel inbound handler in pre-V5 pipelines.
*/
public static class LegacyDispatchHandler extends SimpleChannelInboundHandler<Message.Request>
{
private static final Logger logger = LoggerFactory.getLogger(LegacyDispatchHandler.class);
private final Dispatcher dispatcher;
private final ClientResourceLimits.Allocator endpointPayloadTracker;
/**
* Current count of *request* bytes that are live on the channel.
* <p>
* Note: should only be accessed while on the netty event loop.
*/
private long channelPayloadBytesInFlight;
private boolean paused;
LegacyDispatchHandler(Dispatcher dispatcher, ClientResourceLimits.Allocator endpointPayloadTracker)
{
this.dispatcher = dispatcher;
this.endpointPayloadTracker = endpointPayloadTracker;
}
protected void channelRead0(ChannelHandlerContext ctx, Message.Request request) throws Exception
{
// if we decide to handle this message, process it outside of the netty event loop
if (shouldHandleRequest(ctx, request))
dispatcher.dispatch(ctx.channel(), request, this::toFlushItem);
}
// Acts as a Dispatcher.FlushItemConverter
private Flusher.FlushItem.Unframed toFlushItem(Channel channel, Message.Request request, Message.Response response)
{
return new Flusher.FlushItem.Unframed(channel, response, request.getSource(), this::releaseItem);
}
private void releaseItem(Flusher.FlushItem<Message.Response> item)
{
// Note: in contrast to the equivalent for V5 protocol, CQLMessageHandler::release(FlushItem item),
// this does not release the FlushItem's Message.Response. In V4, the buffers for the response's body
// and serialised header are emitted directly down the Netty pipeline from Envelope.Encoder, so
// releasing them is handled by the pipeline itself.
long itemSize = item.request.header.bodySizeInBytes;
item.request.release();
// since the request has been processed, decrement inflight payload at channel, endpoint and global levels
channelPayloadBytesInFlight -= itemSize;
ResourceLimits.Outcome endpointGlobalReleaseOutcome = endpointPayloadTracker.release(itemSize);
// now check to see if we need to reenable the channel's autoRead.
// If the current payload side is zero, we must reenable autoread as
// 1) we allow no other thread/channel to do it, and
// 2) there's no other events following this one (becuase we're at zero bytes in flight),
// so no successive to trigger the other clause in this if-block
//
// note: this path is only relevant when part of a pre-V5 pipeline, as only in this case is
// paused ever set to true. In pipelines configured for V5 or later, backpressure and control
// over the inbound pipeline's autoread status are handled by the FrameDecoder/FrameProcessor.
ChannelConfig config = item.channel.config();
if (paused && (channelPayloadBytesInFlight == 0 || endpointGlobalReleaseOutcome == ResourceLimits.Outcome.BELOW_LIMIT))
{
paused = false;
ClientMetrics.instance.unpauseConnection();
config.setAutoRead(true);
}
}
/**
* This check for inflight payload to potentially discard the request should have been ideally in one of the
* first handlers in the pipeline (Envelope.Decoder::decode()). However, incase of any exception thrown between that
* handler (where inflight payload is incremented) and this handler (Dispatcher::channelRead0) (where inflight
* payload in decremented), inflight payload becomes erroneous. ExceptionHandler is not sufficient for this
* purpose since it does not have the message envelope associated with the exception.
* <p>
* Note: this method should execute on the netty event loop.
*/
private boolean shouldHandleRequest(ChannelHandlerContext ctx, Message.Request request)
{
long requestSize = request.getSource().header.bodySizeInBytes;
// check for overloaded state by trying to allocate the message size from inflight payload trackers
if (endpointPayloadTracker.tryAllocate(requestSize) != ResourceLimits.Outcome.SUCCESS)
{
if (request.connection.isThrowOnOverload())
{
// discard the request and throw an exception
ClientMetrics.instance.markRequestDiscarded();
logger.trace("Discarded request of size: {}. InflightChannelRequestPayload: {}, {}, Request: {}",
requestSize,
channelPayloadBytesInFlight,
endpointPayloadTracker.toString(),
request);
throw ErrorMessage.wrap(new OverloadedException("Server is in overloaded state. Cannot accept more requests at this point"),
request.getSource().header.streamId);
}
else
{
// set backpressure on the channel, and handle the request
endpointPayloadTracker.allocate(requestSize);
ctx.channel().config().setAutoRead(false);
ClientMetrics.instance.pauseConnection();
paused = true;
}
}
channelPayloadBytesInFlight += requestSize;
return true;
}
@Override
public void channelInactive(ChannelHandlerContext ctx)
{
endpointPayloadTracker.release();
if (paused)
{
paused = false;
ClientMetrics.instance.unpauseConnection();
}
ctx.fireChannelInactive();
}
}
/**
* Simple adaptor to allow {@link org.apache.cassandra.transport.Message.Decoder#decodeMessage(Channel, Envelope)}
* to be used as a handler in pre-V5 pipelines
*/
@ChannelHandler.Sharable
public static class ProtocolDecoder extends MessageToMessageDecoder<Envelope>
{
public static final ProtocolDecoder instance = new ProtocolDecoder();
private ProtocolDecoder(){}
public void decode(ChannelHandlerContext ctx, Envelope source, List results)
{
try
{
ProtocolVersion version = getConnectionVersion(ctx);
if (source.header.version != version)
{
throw new ProtocolException(
String.format("Invalid message version. Got %s but previous " +
"messages on this connection had version %s",
source.header.version, version));
}
results.add(Message.Decoder.decodeMessage(ctx.channel(), source));
}
catch (Throwable ex)
{
source.release();
// Remember the streamId
throw ErrorMessage.wrap(ex, source.header.streamId);
}
}
}
/**
* Simple adaptor to plug CQL message encoding into pre-V5 pipelines
*/
@ChannelHandler.Sharable
public static class ProtocolEncoder extends MessageToMessageEncoder<Message>
{
public static final ProtocolEncoder instance = new ProtocolEncoder();
private ProtocolEncoder(){}
public void encode(ChannelHandlerContext ctx, Message source, List results)
{
ProtocolVersion version = getConnectionVersion(ctx);
results.add(source.encode(version));
}
}
/**
* Pre-V5 exception handler which closes the connection if an {@link org.apache.cassandra.transport.ProtocolException}
* is thrown
*/
@ChannelHandler.Sharable
public static final class ExceptionHandler extends ChannelInboundHandlerAdapter
{
public static final ExceptionHandler instance = new ExceptionHandler();
private ExceptionHandler(){}
@Override
public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause)
{
// Provide error message to client in case channel is still open
if (ctx.channel().isOpen())
{
ExceptionHandlers.UnexpectedChannelExceptionHandler handler = new ExceptionHandlers.UnexpectedChannelExceptionHandler(ctx.channel(), false);
ErrorMessage errorMessage = ErrorMessage.fromException(cause, handler);
ChannelFuture future = ctx.writeAndFlush(errorMessage.encode(getConnectionVersion(ctx)));
// On protocol exception, close the channel as soon as the message have been sent.
// Most cases of PE are wrapped so the type check below is expected to fail more often than not.
// At this moment Fatal exceptions are not thrown in v4, but just as a precaustion we check for them here
if (isFatal(cause))
future.addListener((ChannelFutureListener) f -> ctx.close());
}
if (Throwables.anyCauseMatches(cause, t -> t instanceof ProtocolException))
{
// if any ProtocolExceptions is not silent, then handle
if (Throwables.anyCauseMatches(cause, t -> t instanceof ProtocolException && !((ProtocolException) t).isSilent()))
{
ClientMetrics.instance.markProtocolException();
// since protocol exceptions are expected to be client issues, not logging stack trace
// to avoid spamming the logs once a bad client shows up
NoSpamLogger.log(logger, NoSpamLogger.Level.WARN, 1, TimeUnit.MINUTES, "Protocol exception with client networking: " + cause.getMessage());
}
}
else
{
ClientMetrics.instance.markUnknownException();
logger.warn("Unknown exception in client networking", cause);
}
JVMStabilityInspector.inspectThrowable(cause);
}
private static boolean isFatal(Throwable cause)
{
return cause instanceof ProtocolException; // this matches previous versions which didn't annotate exceptions as fatal or not
}
}
private static ProtocolVersion getConnectionVersion(ChannelHandlerContext ctx)
{
Connection connection = ctx.channel().attr(Connection.attributeKey).get();
// The only case the connection can be null is when we send the initial STARTUP message
return connection == null ? ProtocolVersion.CURRENT : connection.getVersion();
}
}