blob: 5c976b97c6eacd47798b2c3e76f371f3492eb2cf [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.cassandra.distributed.test;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.metrics.CassandraMetricsRegistry;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.transport.Envelope;
import org.apache.cassandra.transport.Message;
import org.apache.cassandra.transport.ProtocolVersion;
import org.apache.cassandra.transport.SimpleClient;
import org.apache.cassandra.transport.messages.OptionsMessage;
import org.assertj.core.api.Assertions;
* If a client sends a message that can not be parsed by the server then we need to detect this and update metrics
* for monitoring.
* <p>
* An issue was found between 2.1 to 3.0 upgrades with regards to paging serialization. Since
* this is a serialization issue we hit similar paths by sending bad bytes to the server, so can simulate the mixed-mode
* paging issue without needing to send proper messages.
public class UnableToParseClientMessageTest extends TestBaseImpl
/** Used by {@link #badBody()} */
private static boolean BAD_BODY_SEEN_LOGS = false;
private static Cluster CLUSTER;
public ProtocolVersion version;
@Parameterized.Parameters(name = "version={0}")
public static Iterable<ProtocolVersion> params()
return ProtocolVersion.SUPPORTED;
public static void setup()
public static void setupCluster() throws IOException
CLUSTER = init( -> c.with(Feature.values())).start());
public static void teardownCluster()
if (CLUSTER != null)
public void badHeader() throws IOException
byte expectedVersion = (byte) (80 + version.asInt());
String expectedError = "Invalid or unsupported protocol version (" + expectedVersion + ")";
CustomHeaderMessage msg = new CustomHeaderMessage(new byte[]{ expectedVersion, 1, 2, 3, 4, 5, 6, 7, 8, 9 });
test(expectedError, msg, ignore -> true);
public void badBody() throws IOException
String expectedError = "Not enough bytes to read an UTF8 serialized string preceded by its 4 bytes length";
CustomBodyMessage msg = new CustomBodyMessage(Message.Type.QUERY, Unpooled.wrappedBuffer("this is not correct format".getBytes(StandardCharsets.UTF_8)));
test(expectedError, msg, results -> {
if (BAD_BODY_SEEN_LOGS && results.isEmpty())
// ignore as there are no logs but they have been seen before
return false;
if (!results.isEmpty())
return true;
private void test(String expectedError, Message.Request request, Predicate<List<String>> shouldCheckLogs) throws IOException
// write gibberish to the native protocol
IInvokableInstance node = CLUSTER.get(1);
// maintance note: this error isn't required to be consistent cross release, so if this changes its ok to update the test to reflect the new exception.
long currentCount = getProtocolExceptionCount(node);
long logStart = node.logs().mark();
try (SimpleClient client = SimpleClient.builder("", 9042).protocolVersion(version).useBeta().build())
client.connect(false, true);
// this should return a failed response
// in pre-v5 the connection isn't closed, so use `false` to avoid waiting
Message.Response response = client.execute(request, false);
Assert.assertEquals(Message.Type.ERROR, response.type);
Assert.assertTrue(response.toString(), response.toString().contains(expectedError));
node.runOnInstance(() -> {
// channelRead throws then channelInactive throws after trying to read remaining bytes
// using spinAssertEquals as the metric is updated AFTER replying back to the client
// so there is a race where we check the metric before it gets updated
Util.spinAssertEquals(currentCount + 1L,
() -> CassandraMetricsRegistry.Metrics.getMeters()
Assert.assertEquals(0, CassandraMetricsRegistry.Metrics.getMeters()
// the logs are noSpamLogger, so each iteration may not produce a new log; only valid if present and not seen before
List<String> results = node.logs().grep(logStart, "Protocol exception with client networking").getResult();
if (shouldCheckLogs.test(results))
results.forEach(s -> Assertions.assertThat(s).contains(expectedError));
private static long getProtocolExceptionCount(IInvokableInstance node)
return node.callOnInstance(() -> CassandraMetricsRegistry.Metrics.getMeters()
public static class CustomHeaderMessage extends OptionsMessage
private final ByteBuf headerEncoded;
public CustomHeaderMessage(byte[] headerEncoded)
public CustomHeaderMessage(ByteBuf headerEncoded)
this.headerEncoded = Objects.requireNonNull(headerEncoded);
public Envelope encode(ProtocolVersion version)
Envelope base = super.encode(version);
return new CustomHeaderEnvelope(base.header, base.body, headerEncoded);
private static class CustomHeaderEnvelope extends Envelope
private final ByteBuf headerEncoded;
public CustomHeaderEnvelope(Header header, ByteBuf body, ByteBuf headerEncoded)
super(header, body);
this.headerEncoded = Objects.requireNonNull(headerEncoded);
// for V4 and below
public ByteBuf encodeHeader()
return headerEncoded;
// for V5 and above
public void encodeHeaderInto(ByteBuffer buf)
private static class CustomBodyMessage extends Message.Request
private final ByteBuf body;
protected CustomBodyMessage(Type type, ByteBuf body)
this.body = Objects.requireNonNull(body);
public Envelope encode(ProtocolVersion version)
Codec<?> originalCodec = type.codec;
setCodec(type, new Codec<Message>()
public Message decode(ByteBuf body, ProtocolVersion version)
return originalCodec.decode(body, version);
public void encode(Message message, ByteBuf dest, ProtocolVersion version)
public int encodedSize(Message message, ProtocolVersion version)
return body.readableBytes();
return super.encode(version);
setCodec(type, originalCodec);
protected Response execute(QueryState queryState, long queryStartNanoTime, boolean traceRequest)
throw new AssertionError("execute not supported");
private static void setCodec(Message.Type type, Message.Codec<?> codec)
catch (NoSuchFieldException | IllegalAccessException e)
throw new AssertionError(e);