| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.cassandra.transport; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.concurrent.TimeUnit; |
| |
| import com.google.common.primitives.Ints; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| import io.netty.channel.Channel; |
| import org.apache.cassandra.config.DatabaseDescriptor; |
| import org.apache.cassandra.exceptions.OverloadedException; |
| import org.apache.cassandra.metrics.ClientMetrics; |
| import org.apache.cassandra.metrics.ClientMessageSizeMetrics; |
| import org.apache.cassandra.net.AbstractMessageHandler; |
| import org.apache.cassandra.net.FrameDecoder; |
| import org.apache.cassandra.net.FrameDecoder.IntactFrame; |
| import org.apache.cassandra.net.FrameEncoder; |
| import org.apache.cassandra.net.ResourceLimits; |
| import org.apache.cassandra.net.ResourceLimits.Limit; |
| import org.apache.cassandra.net.ShareableBytes; |
| import org.apache.cassandra.transport.Flusher.FlushItem.Framed; |
| import org.apache.cassandra.transport.messages.ErrorMessage; |
| import org.apache.cassandra.utils.JVMStabilityInspector; |
| import org.apache.cassandra.utils.NoSpamLogger; |
| |
| import static org.apache.cassandra.utils.MonotonicClock.approxTime; |
| |
| /** |
| * Implementation of {@link AbstractMessageHandler} for processing CQL messages which comprise a {@link Message} wrapped |
| * in an {@link Envelope}. This class is parameterized by a {@link Message} subtype, expected to be either |
| * {@link Message.Request} or {@link Message.Response}. Most commonly, an instance for handling {@link Message.Request} |
| * is created for each inbound CQL client connection. |
| * |
| * # Small vs large messages |
| * Small messages are deserialized in place, and then handed off to a consumer for processing. |
| * Large messages accumulate frames until all bytes for the envelope are received, then concatenate and deserialize the |
| * frames on the event loop thread and pass them on to the same consumer. |
| * |
| * # Flow control (backpressure) |
| * The size of an incoming message is explicit in the {@link Envelope.Header}. |
| * |
| * By default, every connection has 1MiB of exlusive permits available before needing to access the per-endpoint |
| * and global reserves. By default, those reserves are sized proportionally to the heap - 2.5% of heap per-endpoint |
| * and a 10% for the global reserve. |
| * |
| * Permits are held while CQL messages are processed and released after the response has been encoded into the |
| * buffers of the response frame. |
| * |
| * A connection level option (THROW_ON_OVERLOAD) allows clients to choose the backpressure strategy when a connection |
| * has exceeded the maximum number of allowed permits. The choices are to either pause reads from the incoming socket |
| * and allow TCP backpressure to do the work, or to throw an explict exception and rely on the client to back off. |
| */ |
| public class CQLMessageHandler<M extends Message> extends AbstractMessageHandler |
| { |
| private static final Logger logger = LoggerFactory.getLogger(CQLMessageHandler.class); |
| private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 1L, TimeUnit.SECONDS); |
| |
| public static final int LARGE_MESSAGE_THRESHOLD = FrameEncoder.Payload.MAX_SIZE - 1; |
| |
| private final Envelope.Decoder envelopeDecoder; |
| private final Message.Decoder<M> messageDecoder; |
| private final FrameEncoder.PayloadAllocator payloadAllocator; |
| private final MessageConsumer<M> dispatcher; |
| private final ErrorHandler errorHandler; |
| private final boolean throwOnOverload; |
| private final ProtocolVersion version; |
| |
| long channelPayloadBytesInFlight; |
| private int consecutiveMessageErrors = 0; |
| |
| interface MessageConsumer<M extends Message> |
| { |
| void accept(Channel channel, M message, Dispatcher.FlushItemConverter toFlushItem); |
| } |
| |
| interface ErrorHandler |
| { |
| void accept(Throwable error); |
| } |
| |
| CQLMessageHandler(Channel channel, |
| ProtocolVersion version, |
| FrameDecoder decoder, |
| Envelope.Decoder envelopeDecoder, |
| Message.Decoder<M> messageDecoder, |
| MessageConsumer<M> dispatcher, |
| FrameEncoder.PayloadAllocator payloadAllocator, |
| int queueCapacity, |
| ClientResourceLimits.ResourceProvider resources, |
| OnHandlerClosed onClosed, |
| ErrorHandler errorHandler, |
| boolean throwOnOverload) |
| { |
| super(decoder, |
| channel, |
| LARGE_MESSAGE_THRESHOLD, |
| queueCapacity, |
| resources.endpointLimit(), |
| resources.globalLimit(), |
| resources.endpointWaitQueue(), |
| resources.globalWaitQueue(), |
| onClosed); |
| this.envelopeDecoder = envelopeDecoder; |
| this.messageDecoder = messageDecoder; |
| this.payloadAllocator = payloadAllocator; |
| this.dispatcher = dispatcher; |
| this.errorHandler = errorHandler; |
| this.throwOnOverload = throwOnOverload; |
| this.version = version; |
| } |
| |
| @Override |
| public boolean process(FrameDecoder.Frame frame) throws IOException |
| { |
| // new frame, clean slate for processing errors |
| consecutiveMessageErrors = 0; |
| return super.process(frame); |
| } |
| |
| protected boolean processOneContainedMessage(ShareableBytes bytes, Limit endpointReserve, Limit globalReserve) |
| { |
| ByteBuffer buf = bytes.get(); |
| Envelope.Decoder.HeaderExtractionResult extracted = envelopeDecoder.extractHeader(buf); |
| if (!extracted.isSuccess()) |
| return handleProtocolException(extracted.error(), buf, extracted.streamId(), extracted.bodyLength()); |
| |
| Envelope.Header header = extracted.header(); |
| if (header.version != version) |
| { |
| ProtocolException error = new ProtocolException(String.format("Invalid message version. Got %s but previous" + |
| "messages on this connection had version %s", |
| header.version, version)); |
| return handleProtocolException(error, buf, header.streamId, header.bodySizeInBytes); |
| } |
| |
| // max CQL message size defaults to 256mb, so should be safe to downcast |
| int messageSize = Ints.checkedCast(header.bodySizeInBytes); |
| if (throwOnOverload) |
| { |
| if (!acquireCapacity(header, endpointReserve, globalReserve)) |
| { |
| // discard the request and throw an exception |
| ClientMetrics.instance.markRequestDiscarded(); |
| logger.trace("Discarded request of size: {}. InflightChannelRequestPayload: {}, " + |
| "InflightEndpointRequestPayload: {}, InflightOverallRequestPayload: {}, Header: {}", |
| messageSize, |
| channelPayloadBytesInFlight, |
| endpointReserve.using(), |
| globalReserve.using(), |
| header); |
| |
| handleError(new OverloadedException("Server is in overloaded state. " + |
| "Cannot accept more requests at this point"), header); |
| |
| // Don't stop processing incoming messages, rely on the client to apply |
| // backpressure when it receives OverloadedException |
| // but discard this message as we're responding with the overloaded error |
| incrementReceivedMessageMetrics(messageSize); |
| buf.position(buf.position() + Envelope.Header.LENGTH + messageSize); |
| return true; |
| } |
| } |
| else if (!acquireCapacityAndQueueOnFailure(header, endpointReserve, globalReserve)) |
| { |
| // set backpressure on the channel, queuing the request until we have capacity |
| ClientMetrics.instance.pauseConnection(); |
| return false; |
| } |
| |
| channelPayloadBytesInFlight += messageSize; |
| incrementReceivedMessageMetrics(messageSize); |
| return processRequest(composeRequest(header, bytes)); |
| } |
| |
| private boolean handleProtocolException(ProtocolException exception, |
| ByteBuffer buf, |
| int streamId, |
| long expectedMessageLength) |
| { |
| // hard fail if either : |
| // * the expectedMessageLength is < 0 as we're unable to skip the remainder |
| // of the Envelope and attempt to read the next one |
| // * we hit a run of errors in the same frame. Some errors are recoverable |
| // as they have no effect on subsequent Envelopes, in which case we attempt |
| // to continue processing. If we start seeing consecutive errors we assume |
| // that this is not the case and that the entire remaining frame is garbage. |
| // It's possible here that we fail hard when we could potentially not do |
| // (e.g. every Envelope has an invalid opcode, but is otherwise semantically |
| // intact), but this is a trade off. |
| if (expectedMessageLength < 0 || ++consecutiveMessageErrors > DatabaseDescriptor.getConsecutiveMessageErrorsThreshold()) |
| { |
| // transform the exception to a fatal one so the exception handler closes the channel |
| if (!exception.isFatal()) |
| exception = ProtocolException.toFatalException(exception); |
| handleError(exception, streamId); |
| return false; |
| } |
| else |
| { |
| // exception should not be a fatal error or the exception handler will close the channel |
| handleError(exception, streamId); |
| // skip body |
| buf.position(Math.min(buf.limit(), buf.position() + Envelope.Header.LENGTH + Ints.checkedCast(expectedMessageLength))); |
| // continue processing frame |
| return true; |
| } |
| } |
| |
| private void incrementReceivedMessageMetrics(int messageSize) |
| { |
| receivedCount++; |
| receivedBytes += messageSize + Envelope.Header.LENGTH; |
| ClientMessageSizeMetrics.bytesReceived.inc(messageSize + Envelope.Header.LENGTH); |
| ClientMessageSizeMetrics.bytesReceivedPerRequest.update(messageSize + Envelope.Header.LENGTH); |
| } |
| |
| private Envelope composeRequest(Envelope.Header header, ShareableBytes bytes) |
| { |
| // extract body |
| ByteBuffer buf = bytes.get(); |
| int idx = buf.position() + Envelope.Header.LENGTH; |
| final int end = idx + Ints.checkedCast(header.bodySizeInBytes); |
| ByteBuf body = Unpooled.wrappedBuffer(buf.slice()); |
| body.readerIndex(Envelope.Header.LENGTH); |
| body.retain(); |
| buf.position(end); |
| return new Envelope(header, body); |
| } |
| |
| protected boolean processRequest(Envelope request) |
| { |
| M message = null; |
| try |
| { |
| message = messageDecoder.decode(channel, request); |
| dispatcher.accept(channel, message, this::toFlushItem); |
| // sucessfully delivered a CQL message to the execution |
| // stage, so reset the counter of consecutive errors |
| consecutiveMessageErrors = 0; |
| return true; |
| } |
| catch (Exception e) |
| { |
| if (message != null) |
| request.release(); |
| |
| boolean continueProcessing = true; |
| |
| // Indicate that an error was encountered. Initially, we can continue to |
| // process the current frame, but if we keep catching errors, we assume that |
| // the whole frame payload is no good, stop processing and close the connection. |
| if(++consecutiveMessageErrors > DatabaseDescriptor.getConsecutiveMessageErrorsThreshold()) |
| { |
| if (!(e instanceof ProtocolException)) |
| { |
| logger.debug("Error decoding CQL message", e); |
| e = new ProtocolException("Error encountered decoding CQL message: " + e.getMessage()); |
| } |
| e = ProtocolException.toFatalException((ProtocolException) e); |
| continueProcessing = false; |
| } |
| handleErrorAndRelease(e, request.header); |
| return continueProcessing; |
| } |
| } |
| |
| /** |
| * For "expected" errors this ensures we pass a WrappedException, |
| * which contains a streamId, to the error handler. This makes |
| * sure that whereever possible, the streamId is propagated back |
| * to the client. |
| * This also releases the capacity acquired for processing as |
| * indicated by supplied header. |
| * @param t |
| * @param header |
| */ |
| private void handleErrorAndRelease(Throwable t, Envelope.Header header) |
| { |
| release(header); |
| handleError(t, header); |
| } |
| |
| /** |
| * For "expected" errors this ensures we pass a WrappedException, |
| * which contains a streamId, to the error handler. This makes |
| * sure that whereever possible, the streamId is propagated back |
| * to the client. |
| * This variant doesn't call release as it is intended for use |
| * when an error occurs without any capacity being acquired. |
| * Typically, this would be the result of an acquisition failure |
| * if the THROW_ON_OVERLOAD option has been specified by the client. |
| * @param t |
| * @param header |
| */ |
| private void handleError(Throwable t, Envelope.Header header) |
| { |
| handleError(t, header.streamId); |
| } |
| |
| /** |
| * For "expected" errors this ensures we pass a WrappedException, |
| * which contains a streamId, to the error handler. This makes |
| * sure that whereever possible, the streamId is propagated back |
| * to the client. |
| * This variant doesn't call release as it is intended for use |
| * when an error occurs without any capacity being acquired. |
| * Typically, this would be the result of an acquisition failure |
| * if the THROW_ON_OVERLOAD option has been specified by the client. |
| * @param t |
| * @param streamId |
| */ |
| private void handleError(Throwable t, int streamId) |
| { |
| errorHandler.accept(ErrorMessage.wrap(t, streamId)); |
| } |
| |
| /** |
| * For use in the case where the error can't be mapped to a specific stream id, |
| * such as a corrupted frame, or when extracting a CQL message from the frame's |
| * payload fails. This does not attempt to release any resources, as these errors |
| * should only occur before any capacity acquisition is attempted (e.g. on receipt |
| * of a corrupt frame, or failure to extract a CQL message from the envelope). |
| * |
| * @param t |
| */ |
| private void handleError(Throwable t) |
| { |
| errorHandler.accept(t); |
| } |
| |
| // Acts as a Dispatcher.FlushItemConverter |
| private Framed toFlushItem(Channel channel, Message.Request request, Message.Response response) |
| { |
| // Returns a FlushItem.Framed instance which wraps a Consumer<FlushItem> that performs |
| // the work of returning the capacity allocated for processing the request. |
| // The Dispatcher will call this to obtain the FlushItem to enqueue with its Flusher once |
| // a dispatched request has been processed. |
| |
| Envelope responseFrame = response.encode(request.getSource().header.version); |
| int responseSize = envelopeSize(responseFrame.header); |
| ClientMessageSizeMetrics.bytesSent.inc(responseSize); |
| ClientMessageSizeMetrics.bytesSentPerResponse.update(responseSize); |
| |
| return new Framed(channel, |
| responseFrame, |
| request.getSource(), |
| payloadAllocator, |
| this::release); |
| } |
| |
| private void release(Flusher.FlushItem<Envelope> flushItem) |
| { |
| release(flushItem.request.header); |
| flushItem.request.release(); |
| flushItem.response.release(); |
| } |
| |
| private void release(Envelope.Header header) |
| { |
| releaseCapacity(Ints.checkedCast(header.bodySizeInBytes)); |
| channelPayloadBytesInFlight -= header.bodySizeInBytes; |
| } |
| |
| /* |
| * Handling of multi-frame large messages |
| */ |
| protected boolean processFirstFrameOfLargeMessage(IntactFrame frame, Limit endpointReserve, Limit globalReserve) throws IOException |
| { |
| ShareableBytes bytes = frame.contents; |
| ByteBuffer buf = bytes.get(); |
| try |
| { |
| Envelope.Decoder.HeaderExtractionResult extracted = envelopeDecoder.extractHeader(buf); |
| if (!extracted.isSuccess()) |
| { |
| // Hard fail on any decoding error as we can't trust the subsequent frames of |
| // the large message |
| handleError(ProtocolException.toFatalException(extracted.error())); |
| return false; |
| } |
| |
| Envelope.Header header = extracted.header(); |
| // max CQL message size defaults to 256mb, so should be safe to downcast |
| int messageSize = Ints.checkedCast(header.bodySizeInBytes); |
| receivedBytes += buf.remaining(); |
| |
| LargeMessage largeMessage = new LargeMessage(header); |
| if (!acquireCapacity(header, endpointReserve, globalReserve)) |
| { |
| // In the case of large messages, never stop processing incoming frames |
| // as this will halt the client meaning no further frames will be sent, |
| // leading to starvation. |
| // If the throwOnOverload option is set, don't process the message once |
| // read, return an error response to notify the client that resource |
| // limits have been exceeded. If the option isn't set, the only thing we |
| // can do is to consume the subsequent frames and process the message. |
| // Large and small messages are never interleaved for a single client, so |
| // we know that this client will finish sending the large message before |
| // anything else. Other clients sending small messages concurrently will |
| // be backpressured by the global resource limits. The server is still |
| // vulnerable to overload by multiple clients sending large messages |
| // concurrently. |
| if (throwOnOverload) |
| { |
| // discard the request and throw an exception |
| ClientMetrics.instance.markRequestDiscarded(); |
| logger.trace("Discarded request of size: {}. InflightChannelRequestPayload: {}, " + |
| "InflightEndpointRequestPayload: {}, InflightOverallRequestPayload: {}, Header: {}", |
| messageSize, |
| channelPayloadBytesInFlight, |
| endpointReserve.using(), |
| globalReserve.using(), |
| header); |
| |
| // mark as overloaded so that discard the message |
| // after consuming any subsequent frames |
| largeMessage.markOverloaded(); |
| } |
| } |
| this.largeMessage = largeMessage; |
| largeMessage.supply(frame); |
| return true; |
| } |
| catch (Exception e) |
| { |
| throw new IOException("Error decoding CQL Message", e); |
| } |
| } |
| |
| protected String id() |
| { |
| return channel.id().asShortText(); |
| } |
| |
| @SuppressWarnings("BooleanMethodIsAlwaysInverted") |
| private boolean acquireCapacityAndQueueOnFailure(Envelope.Header header, Limit endpointReserve, Limit globalReserve) |
| { |
| int bytesRequired = Ints.checkedCast(header.bodySizeInBytes); |
| long currentTimeNanos = approxTime.now(); |
| return acquireCapacity(endpointReserve, globalReserve, bytesRequired, currentTimeNanos, Long.MAX_VALUE); |
| } |
| |
| @SuppressWarnings("BooleanMethodIsAlwaysInverted") |
| private boolean acquireCapacity(Envelope.Header header, Limit endpointReserve, Limit globalReserve) |
| { |
| int bytesRequired = Ints.checkedCast(header.bodySizeInBytes); |
| return acquireCapacity(endpointReserve, globalReserve, bytesRequired) == ResourceLimits.Outcome.SUCCESS; |
| } |
| |
| /* |
| * Although it would be possible to recover when certain types of corrupt frame are encountered, |
| * this could cause problems for clients as the payload may contain CQL messages from multiple |
| * streams. Simply dropping the corrupt frame or returning an error response would not give the |
| * client enough information to map back to inflight requests, leading to timeouts. |
| * Instead, we need to fail fast, possibly dropping the connection whenever a corrupt frame is |
| * encountered. Consequently, we terminate the connection (via a ProtocolException) whenever a |
| * corrupt frame is encountered, regardless of its type. |
| */ |
| protected void processCorruptFrame(FrameDecoder.CorruptFrame frame) |
| { |
| corruptFramesUnrecovered++; |
| String error = String.format("%s invalid, unrecoverable CRC mismatch detected in frame %s. Read %d, Computed %d", |
| id(), frame.isRecoverable() ? "body" : "header", frame.readCRC, frame.computedCRC); |
| |
| noSpamLogger.error(error); |
| |
| // If this is part of a multi-frame message, process it before passing control to the error handler. |
| // This is so we can take care of any housekeeping associated with large messages. |
| if (!frame.isSelfContained) |
| { |
| if (null == largeMessage) // first frame of a large message |
| receivedBytes += frame.frameSize; |
| else // subsequent frame of a large message |
| processSubsequentFrameOfLargeMessage(frame); |
| } |
| |
| handleError(ProtocolException.toFatalException(new ProtocolException(error))); |
| } |
| |
| protected void fatalExceptionCaught(Throwable cause) |
| { |
| decoder.discard(); |
| logger.warn("Unrecoverable exception caught in CQL message processing pipeline, closing the connection", cause); |
| channel.close(); |
| } |
| |
| static int envelopeSize(Envelope.Header header) |
| { |
| return Envelope.Header.LENGTH + Ints.checkedCast(header.bodySizeInBytes); |
| } |
| |
| private class LargeMessage extends AbstractMessageHandler.LargeMessage<Envelope.Header> |
| { |
| private static final long EXPIRES_AT = Long.MAX_VALUE; |
| |
| private boolean overloaded = false; |
| |
| private LargeMessage(Envelope.Header header) |
| { |
| super(envelopeSize(header), header, EXPIRES_AT, false); |
| } |
| |
| private Envelope assembleFrame() |
| { |
| ByteBuf body = Unpooled.wrappedBuffer(buffers.stream() |
| .map(ShareableBytes::get) |
| .toArray(ByteBuffer[]::new)); |
| |
| body.readerIndex(Envelope.Header.LENGTH); |
| body.retain(); |
| return new Envelope(header, body); |
| } |
| |
| /** |
| * Used to indicate that a message should be dropped and not processed. |
| * We do this on receipt of the first frame of a large message if sufficient capacity |
| * cannot be acquired to process it and throwOnOverload is set for the connection. |
| * In this case, the client has elected to shed load rather than apply backpressure |
| * so we must ensure that subsequent frames are consumed from the channel. At that |
| * point an error response is returned to the client, rather than processing the message. |
| */ |
| private void markOverloaded() |
| { |
| overloaded = true; |
| } |
| |
| protected void onComplete() |
| { |
| if (overloaded) |
| handleErrorAndRelease(new OverloadedException("Server is in overloaded state. " + |
| "Cannot accept more requests at this point"), header); |
| else if (!isCorrupt) |
| processRequest(assembleFrame()); |
| |
| } |
| |
| protected void abort() |
| { |
| if (!isCorrupt) |
| releaseBuffersAndCapacity(); // release resources if in normal state when abort() is invoked |
| } |
| } |
| } |