| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.transport; |
| |
| import java.util.List; |
| |
| import com.google.common.base.Predicate; |
| |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.transport.ClientResourceLimits.Overload; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelConfig; |
| import io.netty.channel.ChannelFuture; |
| import io.netty.channel.ChannelFutureListener; |
| import io.netty.channel.ChannelHandler; |
| import io.netty.channel.ChannelHandlerContext; |
| import io.netty.channel.ChannelInboundHandlerAdapter; |
| import io.netty.channel.SimpleChannelInboundHandler; |
| import io.netty.handler.codec.MessageToMessageDecoder; |
| import io.netty.handler.codec.MessageToMessageEncoder; |
| import org.apache.cassandra.exceptions.OverloadedException; |
| import org.apache.cassandra.metrics.ClientMetrics; |
| import org.apache.cassandra.net.ResourceLimits; |
| import org.apache.cassandra.transport.messages.ErrorMessage; |
| import org.apache.cassandra.utils.JVMStabilityInspector; |
| |
| import static org.apache.cassandra.transport.CQLMessageHandler.RATE_LIMITER_DELAY_UNIT; |
| import static org.apache.cassandra.transport.ClientResourceLimits.GLOBAL_REQUEST_LIMITER; |
| |
| public class PreV5Handlers |
| { |
| /** |
| * Wraps an {@link org.apache.cassandra.transport.Dispatcher} so that it can be used as an |
| * channel inbound handler in pre-V5 pipelines. |
| */ |
| public static class LegacyDispatchHandler extends SimpleChannelInboundHandler<Message.Request> |
| { |
| private static final Logger logger = LoggerFactory.getLogger(LegacyDispatchHandler.class); |
| |
| private final Dispatcher dispatcher; |
| private final ClientResourceLimits.Allocator endpointPayloadTracker; |
| |
| /** |
| * Current count of *request* bytes that are live on the channel. |
| * <p> |
| * Note: should only be accessed while on the netty event loop. |
| */ |
| private long channelPayloadBytesInFlight; |
| |
| /** The cause of the current connection pause, or {@link Overload#NONE} if it is unpaused. */ |
| private Overload backpressure = Overload.NONE; |
| |
| LegacyDispatchHandler(Dispatcher dispatcher, ClientResourceLimits.Allocator endpointPayloadTracker) |
| { |
| this.dispatcher = dispatcher; |
| this.endpointPayloadTracker = endpointPayloadTracker; |
| } |
| |
| protected void channelRead0(ChannelHandlerContext ctx, Message.Request request) |
| { |
| // The only reason we won't process this message is if checkLimits() throws an OverloadedException. |
| // (i.e. Even if backpressure is applied, the current request is allowed to finish.) |
| checkLimits(ctx, request); |
| dispatcher.dispatch(ctx.channel(), request, this::toFlushItem, backpressure); |
| } |
| |
| // Acts as a Dispatcher.FlushItemConverter |
| private Flusher.FlushItem.Unframed toFlushItem(Channel channel, Message.Request request, Message.Response response) |
| { |
| return new Flusher.FlushItem.Unframed(channel, response, request.getSource(), this::releaseItem); |
| } |
| |
| private void releaseItem(Flusher.FlushItem<Message.Response> item) |
| { |
| // Note: in contrast to the equivalent for V5 protocol, CQLMessageHandler::release(FlushItem item), |
| // this does not release the FlushItem's Message.Response. In V4, the buffers for the response's body |
| // and serialised header are emitted directly down the Netty pipeline from Envelope.Encoder, so |
| // releasing them is handled by the pipeline itself. |
| long itemSize = item.request.header.bodySizeInBytes; |
| item.request.release(); |
| |
| // since the request has been processed, decrement inflight payload at channel, endpoint and global levels |
| channelPayloadBytesInFlight -= itemSize; |
| boolean globalInFlightBytesBelowLimit = endpointPayloadTracker.release(itemSize) == ResourceLimits.Outcome.BELOW_LIMIT; |
| |
| // Now check to see if we need to reenable the channel's autoRead. |
| // |
| // If the current payload bytes in flight is zero, we must reenable autoread as |
| // 1) we allow no other thread/channel to do it, and |
| // 2) there are no other events following this one (becuase we're at zero bytes in flight), |
| // so no successive to trigger the other clause in this if-block. |
| // |
| // The only exception to this is if the global request rate limit has been breached, which means |
| // we'll have to wait until a scheduled wakeup task unpauses the connection. |
| // |
| // Note: This path is only relevant when part of a pre-V5 pipeline, as only in this case is |
| // paused ever set to true. In pipelines configured for V5 or later, backpressure and control |
| // over the inbound pipeline's autoread status are handled by the FrameDecoder/FrameProcessor. |
| ChannelConfig config = item.channel.config(); |
| |
| if (backpressure == Overload.BYTES_IN_FLIGHT && (channelPayloadBytesInFlight == 0 || globalInFlightBytesBelowLimit)) |
| { |
| unpauseConnection(config); |
| } |
| } |
| |
| /** |
| * Checks limits on bytes in flight and the request rate limiter (if enabled) to determine whether to drop a |
| * request or trigger backpressure and pause the connection. |
| * <p> |
| * The check for inflight payload to potentially discard the request should have been ideally in one of the |
| * first handlers in the pipeline (Envelope.Decoder::decode()). However, in case of any exception thrown between |
| * that handler (where inflight payload is incremented) and this handler (Dispatcher::channelRead0) (where |
| * inflight payload in decremented), inflight payload becomes erroneous. ExceptionHandler is not sufficient for |
| * this purpose since it does not have the message envelope associated with the exception. |
| * <p> |
| * If the connection is configured to throw {@link OverloadedException}, requests that breach the rate limit are |
| * not counted against that limit. |
| * <p> |
| * Note: this method should execute on the netty event loop. |
| * |
| * @throws ErrorMessage.WrappedException with an {@link OverloadedException} if overload occurs and the |
| * connection is configured to throw on overload |
| */ |
| private void checkLimits(ChannelHandlerContext ctx, Message.Request request) |
| { |
| long requestSize = request.getSource().header.bodySizeInBytes; |
| |
| if (request.connection.isThrowOnOverload()) |
| { |
| if (endpointPayloadTracker.tryAllocate(requestSize) != ResourceLimits.Outcome.SUCCESS) |
| { |
| discardAndThrow(request, requestSize, Overload.BYTES_IN_FLIGHT); |
| } |
| |
| if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled() && !GLOBAL_REQUEST_LIMITER.tryReserve()) |
| { |
| // We've already allocated against the payload tracker here, so release those resources. |
| endpointPayloadTracker.release(requestSize); |
| discardAndThrow(request, requestSize, Overload.REQUESTS); |
| } |
| } |
| else |
| { |
| // Any request that gets here will be processed, so increment the channel bytes in flight. |
| channelPayloadBytesInFlight += requestSize; |
| |
| // Check for overloaded state by trying to allocate the message size from inflight payload trackers |
| if (endpointPayloadTracker.tryAllocate(requestSize) != ResourceLimits.Outcome.SUCCESS) |
| { |
| endpointPayloadTracker.allocate(requestSize); |
| pauseConnection(ctx); |
| backpressure = Overload.BYTES_IN_FLIGHT; |
| } |
| |
| if (DatabaseDescriptor.getNativeTransportRateLimitingEnabled()) |
| { |
| // Reserve a permit even if we've already triggered backpressure on bytes in flight. |
| long delay = GLOBAL_REQUEST_LIMITER.reserveAndGetDelay(RATE_LIMITER_DELAY_UNIT); |
| |
| // If we've already triggered backpressure on bytes in flight, no further action is necessary. |
| if (backpressure == Overload.NONE && delay > 0) |
| { |
| pauseConnection(ctx); |
| |
| // A permit isn't immediately available, so schedule an unpause for when it is. |
| ctx.channel().eventLoop().schedule(() -> unpauseConnection(ctx.channel().config()), delay, RATE_LIMITER_DELAY_UNIT); |
| backpressure = Overload.REQUESTS; |
| } |
| } |
| } |
| } |
| |
| private void pauseConnection(ChannelHandlerContext ctx) |
| { |
| if (ctx.channel().config().isAutoRead()) |
| { |
| ctx.channel().config().setAutoRead(false); |
| ClientMetrics.instance.pauseConnection(); |
| } |
| } |
| |
| private void unpauseConnection(ChannelConfig config) |
| { |
| backpressure = Overload.NONE; |
| |
| if (!config.isAutoRead()) |
| { |
| ClientMetrics.instance.unpauseConnection(); |
| config.setAutoRead(true); |
| } |
| } |
| |
| private void discardAndThrow(Message.Request request, long requestSize, Overload overload) |
| { |
| ClientMetrics.instance.markRequestDiscarded(); |
| |
| logger.trace("Discarded request of size {} with {} bytes in flight on channel. {} " + |
| "Global rate limiter: {} Request: {}", |
| requestSize, channelPayloadBytesInFlight, endpointPayloadTracker, |
| GLOBAL_REQUEST_LIMITER, request); |
| |
| OverloadedException exception = overload == Overload.REQUESTS |
| ? new OverloadedException(String.format("Request breached global limit of %d requests/second. Server is " + |
| "currently in an overloaded state and cannot accept more requests.", |
| GLOBAL_REQUEST_LIMITER.getRate())) |
| : new OverloadedException(String.format("Request breached limit on bytes in flight. (%s)) " + |
| "Server is currently in an overloaded state and cannot accept more requests.", |
| |
| endpointPayloadTracker)); |
| |
| throw ErrorMessage.wrap(exception, request.getSource().header.streamId); |
| } |
| |
| @Override |
| public void channelInactive(ChannelHandlerContext ctx) |
| { |
| endpointPayloadTracker.release(); |
| if (!ctx.channel().config().isAutoRead()) |
| { |
| ClientMetrics.instance.unpauseConnection(); |
| } |
| ctx.fireChannelInactive(); |
| } |
| } |
| |
| /** |
| * Simple adaptor to allow {@link org.apache.cassandra.transport.Message.Decoder#decodeMessage(Channel, Envelope)} |
| * to be used as a handler in pre-V5 pipelines |
| */ |
| @ChannelHandler.Sharable |
| public static class ProtocolDecoder extends MessageToMessageDecoder<Envelope> |
| { |
| public static final ProtocolDecoder instance = new ProtocolDecoder(); |
| private ProtocolDecoder(){} |
| |
| public void decode(ChannelHandlerContext ctx, Envelope source, List<Object> results) |
| { |
| try |
| { |
| ProtocolVersion version = getConnectionVersion(ctx); |
| if (source.header.version != version) |
| { |
| throw new ProtocolException( |
| String.format("Invalid message version. Got %s but previous " + |
| "messages on this connection had version %s", |
| source.header.version, version)); |
| } |
| results.add(Message.Decoder.decodeMessage(ctx.channel(), source)); |
| } |
| catch (Throwable ex) |
| { |
| source.release(); |
| // Remember the streamId |
| throw ErrorMessage.wrap(ex, source.header.streamId); |
| } |
| } |
| } |
| |
| /** |
| * Simple adaptor to plug CQL message encoding into pre-V5 pipelines |
| */ |
| @ChannelHandler.Sharable |
| public static class ProtocolEncoder extends MessageToMessageEncoder<Message> |
| { |
| public static final ProtocolEncoder instance = new ProtocolEncoder(); |
| private ProtocolEncoder(){} |
| public void encode(ChannelHandlerContext ctx, Message source, List<Object> results) |
| { |
| ProtocolVersion version = getConnectionVersion(ctx); |
| results.add(source.encode(version)); |
| } |
| } |
| |
| /** |
| * Pre-V5 exception handler which closes the connection if an {@link org.apache.cassandra.transport.ProtocolException} |
| * is thrown |
| */ |
| @ChannelHandler.Sharable |
| public static final class ExceptionHandler extends ChannelInboundHandlerAdapter |
| { |
| private static final Logger logger = LoggerFactory.getLogger(ExceptionHandler.class); |
| |
| public static final ExceptionHandler instance = new ExceptionHandler(); |
| private ExceptionHandler(){} |
| |
| @Override |
| public void exceptionCaught(final ChannelHandlerContext ctx, Throwable cause) |
| { |
| // Provide error message to client in case channel is still open |
| if (ctx.channel().isOpen()) |
| { |
| Predicate<Throwable> handler = ExceptionHandlers.getUnexpectedExceptionHandler(ctx.channel(), false); |
| ErrorMessage errorMessage = ErrorMessage.fromException(cause, handler); |
| ChannelFuture future = ctx.writeAndFlush(errorMessage.encode(getConnectionVersion(ctx))); |
| // On protocol exception, close the channel as soon as the message have been sent. |
| // Most cases of PE are wrapped so the type check below is expected to fail more often than not. |
| // At this moment Fatal exceptions are not thrown in v4, but just as a precaustion we check for them here |
| if (isFatal(cause)) |
| future.addListener((ChannelFutureListener) f -> ctx.close()); |
| } |
| |
| if (DatabaseDescriptor.getClientErrorReportingExclusions().contains(ctx.channel().remoteAddress())) |
| { |
| // Sometimes it is desirable to ignore exceptions from specific IPs; such as when security scans are |
| // running. To avoid polluting logs and metrics, metrics are not updated when the IP is in the exclude |
| // list. |
| logger.debug("Excluding client exception for {}; address contained in client_error_reporting_exclusions", ctx.channel().remoteAddress(), cause); |
| return; |
| } |
| ExceptionHandlers.logClientNetworkingExceptions(cause); |
| JVMStabilityInspector.inspectThrowable(cause); |
| } |
| |
| private static boolean isFatal(Throwable cause) |
| { |
| return cause instanceof ProtocolException; // this matches previous versions which didn't annotate exceptions as fatal or not |
| } |
| } |
| |
| private static ProtocolVersion getConnectionVersion(ChannelHandlerContext ctx) |
| { |
| Connection connection = ctx.channel().attr(Connection.attributeKey).get(); |
| // The only case the connection can be null is when we send the initial STARTUP message |
| return connection == null ? ProtocolVersion.CURRENT : connection.getVersion(); |
| } |
| |
| } |