blob: 29fa784d39963d195fedd028d1fbaf61bf5ca2ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import org.junit.Before;
import org.junit.Test;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ReadResponseTest
{
private final Random random = new Random();
private TableMetadata metadata;
@Before
public void setup()
{
metadata = TableMetadata.builder("ks", "t1")
.addPartitionKeyColumn("p", Int32Type.instance)
.addRegularColumn("v", Int32Type.instance)
.partitioner(Murmur3Partitioner.instance)
.build();
}
@Test
public void fromCommandWithConclusiveRepairedDigest()
{
ByteBuffer digest = digest();
ReadCommand command = command(key(), metadata);
StubRepairedDataInfo rdi = new StubRepairedDataInfo(digest, true);
ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
assertTrue(response.isRepairedDigestConclusive());
assertEquals(digest, response.repairedDataDigest());
verifySerDe(response);
}
@Test
public void fromCommandWithInconclusiveRepairedDigest()
{
ByteBuffer digest = digest();
ReadCommand command = command(key(), metadata);
StubRepairedDataInfo rdi = new StubRepairedDataInfo(digest, false);
ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
assertFalse(response.isRepairedDigestConclusive());
assertEquals(digest, response.repairedDataDigest());
verifySerDe(response);
}
@Test
public void fromCommandWithConclusiveEmptyRepairedDigest()
{
ReadCommand command = command(key(), metadata);
StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
assertTrue(response.isRepairedDigestConclusive());
assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, response.repairedDataDigest());
verifySerDe(response);
}
@Test
public void fromCommandWithInconclusiveEmptyRepairedDigest()
{
ReadCommand command = command(key(), metadata);
StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, false);
ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
assertFalse(response.isRepairedDigestConclusive());
assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, response.repairedDataDigest());
verifySerDe(response);
}
/*
* Digest responses should never include repaired data tracking as we only request
* it in read repair or for range queries
*/
@Test (expected = UnsupportedOperationException.class)
public void digestResponseErrorsIfRepairedDataDigestRequested()
{
ReadCommand command = digestCommand(key(), metadata);
StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
assertTrue(response.isDigestResponse());
assertFalse(response.mayIncludeRepairedDigest());
response.repairedDataDigest();
}
@Test (expected = UnsupportedOperationException.class)
public void digestResponseErrorsIfIsConclusiveRequested()
{
ReadCommand command = digestCommand(key(), metadata);
StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
assertTrue(response.isDigestResponse());
assertFalse(response.mayIncludeRepairedDigest());
response.isRepairedDigestConclusive();
}
@Test (expected = UnsupportedOperationException.class)
public void digestResponseErrorsIfIteratorRequested()
{
ReadCommand command = digestCommand(key(), metadata);
StubRepairedDataInfo rdi = new StubRepairedDataInfo(ByteBufferUtil.EMPTY_BYTE_BUFFER, true);
ReadResponse response = command.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi);
assertTrue(response.isDigestResponse());
assertFalse(response.mayIncludeRepairedDigest());
response.makeIterator(command);
}
@Test
public void makeDigestDoesntConsiderRepairedDataInfo()
{
// It shouldn't be possible to get false positive DigestMismatchExceptions based
// on differing repaired data tracking info because it isn't requested on initial
// requests, only following a digest mismatch. Having a test doesn't hurt though
int key = key();
ByteBuffer digest1 = digest();
ReadCommand command1 = command(key, metadata);
StubRepairedDataInfo rdi1 = new StubRepairedDataInfo(digest1, true);
ReadResponse response1 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi1);
ByteBuffer digest2 = digest();
ReadCommand command2 = command(key, metadata);
StubRepairedDataInfo rdi2 = new StubRepairedDataInfo(digest2, false);
ReadResponse response2 = command1.createResponse(EmptyIterators.unfilteredPartition(metadata), rdi2);
assertEquals(response1.digest(command1), response2.digest(command2));
}
private void verifySerDe(ReadResponse response) {
// check that roundtripping through ReadResponse.serializer behaves as expected.
// ReadResponses from pre-4.0 nodes will never contain repaired data digest
// or pending session info, but we run all messages through both pre/post 4.0
// serde to check that the defaults are correctly applied
roundTripSerialization(response, MessagingService.current_version);
roundTripSerialization(response, MessagingService.VERSION_30);
}
private void roundTripSerialization(ReadResponse response, int version)
{
try
{
DataOutputBuffer out = new DataOutputBuffer();
ReadResponse.serializer.serialize(response, out, version);
DataInputBuffer in = new DataInputBuffer(out.buffer(), false);
ReadResponse deser = ReadResponse.serializer.deserialize(in, version);
if (version < MessagingService.VERSION_40)
{
assertFalse(deser.mayIncludeRepairedDigest());
// even though that means they should never be used, verify that the default values are present
assertEquals(ByteBufferUtil.EMPTY_BYTE_BUFFER, deser.repairedDataDigest());
assertTrue(deser.isRepairedDigestConclusive());
}
else
{
assertTrue(deser.mayIncludeRepairedDigest());
assertEquals(response.repairedDataDigest(), deser.repairedDataDigest());
assertEquals(response.isRepairedDigestConclusive(), deser.isRepairedDigestConclusive());
}
}
catch (IOException e)
{
fail("Caught unexpected IOException during SerDe: " + e.getMessage());
}
}
private int key()
{
return random.nextInt();
}
private ByteBuffer digest()
{
byte[] bytes = new byte[4];
random.nextBytes(bytes);
return ByteBuffer.wrap(bytes);
}
private ReadCommand digestCommand(int key, TableMetadata metadata)
{
return new StubReadCommand(key, metadata, true);
}
private ReadCommand command(int key, TableMetadata metadata)
{
return new StubReadCommand(key, metadata, false);
}
private static class StubRepairedDataInfo extends RepairedDataInfo
{
private final ByteBuffer repairedDigest;
private final boolean conclusive;
public StubRepairedDataInfo(ByteBuffer repairedDigest, boolean conclusive)
{
super(null);
this.repairedDigest = repairedDigest;
this.conclusive = conclusive;
}
@Override
public ByteBuffer getDigest()
{
return repairedDigest;
}
@Override
public boolean isConclusive()
{
return conclusive;
}
}
private static class StubReadCommand extends SinglePartitionReadCommand
{
StubReadCommand(int key, TableMetadata metadata, boolean isDigest)
{
super(isDigest,
0,
false,
metadata,
FBUtilities.nowInSeconds(),
ColumnFilter.all(metadata),
RowFilter.NONE,
DataLimits.NONE,
metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)),
null,
null);
}
@Override
public boolean selectsFullPartition()
{
return true;
}
public UnfilteredPartitionIterator executeLocally(ReadExecutionController controller)
{
return EmptyIterators.unfilteredPartition(this.metadata());
}
}
}