blob: c3687f1927f29c1a408b886eb71cd13f1af09097 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.db;
import java.io.IOException;
import java.io.OutputStream;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.IntStream;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.Util;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.CounterColumnType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.ReversedType;
import org.apache.cassandra.db.marshal.SetType;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.rows.SerializationHelper;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.DeserializationHelper;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.WrappedDataOutputStreamPlus;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.metrics.ClearableHistogram;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
import org.apache.cassandra.schema.CachingParams;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.schema.KeyspaceParams;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableParams;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class ReadCommandTest
{
private static final String KEYSPACE = "ReadCommandTest";
private static final String CF1 = "Standard1";
private static final String CF2 = "Standard2";
private static final String CF3 = "Standard3";
private static final String CF4 = "Standard4";
private static final String CF5 = "Standard5";
private static final String CF6 = "Standard6";
private static final String CF7 = "Counter7";
private static final String CF8 = "Standard8";
private static final String CF9 = "Standard9";
private static final InetAddressAndPort REPAIR_COORDINATOR;
static {
try
{
REPAIR_COORDINATOR = InetAddressAndPort.getByName("10.0.0.1");
}
catch (UnknownHostException e)
{
throw new AssertionError(e);
}
}
@BeforeClass
public static void defineSchema() throws ConfigurationException
{
DatabaseDescriptor.daemonInitialization();
TableMetadata.Builder metadata1 = SchemaLoader.standardCFMD(KEYSPACE, CF1);
TableMetadata.Builder metadata2 =
TableMetadata.builder(KEYSPACE, CF2)
.addPartitionKeyColumn("key", BytesType.instance)
.addClusteringColumn("col", AsciiType.instance)
.addRegularColumn("a", AsciiType.instance)
.addRegularColumn("b", AsciiType.instance);
TableMetadata.Builder metadata3 =
TableMetadata.builder(KEYSPACE, CF3)
.addPartitionKeyColumn("key", BytesType.instance)
.addClusteringColumn("col", AsciiType.instance)
.addRegularColumn("a", AsciiType.instance)
.addRegularColumn("b", AsciiType.instance)
.addRegularColumn("c", AsciiType.instance)
.addRegularColumn("d", AsciiType.instance)
.addRegularColumn("e", AsciiType.instance)
.addRegularColumn("f", AsciiType.instance);
TableMetadata.Builder metadata4 =
TableMetadata.builder(KEYSPACE, CF4)
.addPartitionKeyColumn("key", BytesType.instance)
.addClusteringColumn("col", AsciiType.instance)
.addRegularColumn("a", AsciiType.instance)
.addRegularColumn("b", AsciiType.instance)
.addRegularColumn("c", AsciiType.instance)
.addRegularColumn("d", AsciiType.instance)
.addRegularColumn("e", AsciiType.instance)
.addRegularColumn("f", AsciiType.instance);
TableMetadata.Builder metadata5 =
TableMetadata.builder(KEYSPACE, CF5)
.addPartitionKeyColumn("key", BytesType.instance)
.addClusteringColumn("col", AsciiType.instance)
.addRegularColumn("a", AsciiType.instance)
.addRegularColumn("b", AsciiType.instance)
.addRegularColumn("c", AsciiType.instance)
.addRegularColumn("d", AsciiType.instance)
.addRegularColumn("e", AsciiType.instance)
.addRegularColumn("f", AsciiType.instance);
TableMetadata.Builder metadata6 =
TableMetadata.builder(KEYSPACE, CF6)
.addPartitionKeyColumn("key", BytesType.instance)
.addStaticColumn("s", AsciiType.instance)
.addClusteringColumn("col", AsciiType.instance)
.addRegularColumn("a", AsciiType.instance)
.addRegularColumn("b", AsciiType.instance)
.caching(CachingParams.CACHE_EVERYTHING);
TableMetadata.Builder metadata7 =
TableMetadata.builder(KEYSPACE, CF7)
.flags(EnumSet.of(TableMetadata.Flag.COUNTER))
.addPartitionKeyColumn("key", BytesType.instance)
.addClusteringColumn("col", AsciiType.instance)
.addRegularColumn("c", CounterColumnType.instance);
TableMetadata.Builder metadata8 =
TableMetadata.builder(KEYSPACE, CF8)
.addPartitionKeyColumn("key", BytesType.instance)
.addClusteringColumn("col", AsciiType.instance)
.addRegularColumn("a", AsciiType.instance)
.addRegularColumn("b", AsciiType.instance)
.addRegularColumn("c", SetType.getInstance(AsciiType.instance, true));
TableMetadata.Builder metadata9 =
TableMetadata.builder(KEYSPACE, CF9)
.addPartitionKeyColumn("key", Int32Type.instance)
.addClusteringColumn("col", ReversedType.getInstance(Int32Type.instance))
.addRegularColumn("a", AsciiType.instance);
SchemaLoader.prepareServer();
SchemaLoader.createKeyspace(KEYSPACE,
KeyspaceParams.simple(1),
metadata1,
metadata2,
metadata3,
metadata4,
metadata5,
metadata6,
metadata7,
metadata8,
metadata9);
LocalSessionAccessor.startup();
}
@Test
public void testPartitionRangeAbort() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF1);
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key1"))
.clustering("Column1")
.add("val", ByteBufferUtil.bytes("abcd"))
.build()
.apply();
cfs.forceBlockingFlush();
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key2"))
.clustering("Column1")
.add("val", ByteBufferUtil.bytes("abcd"))
.build()
.apply();
ReadCommand readCommand = Util.cmd(cfs).build();
assertEquals(2, Util.getAll(readCommand).size());
readCommand.abort();
assertEquals(0, Util.getAll(readCommand).size());
}
@Test
public void testSinglePartitionSliceAbort() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
cfs.truncateBlocking();
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
.clustering("cc")
.add("a", ByteBufferUtil.bytes("abcd"))
.build()
.apply();
cfs.forceBlockingFlush();
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
.clustering("dd")
.add("a", ByteBufferUtil.bytes("abcd"))
.build()
.apply();
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
List<FilteredPartition> partitions = Util.getAll(readCommand);
assertEquals(1, partitions.size());
assertEquals(2, partitions.get(0).rowCount());
readCommand.abort();
assertEquals(0, Util.getAll(readCommand).size());
}
@Test
public void testSinglePartitionNamesAbort() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
cfs.truncateBlocking();
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
.clustering("cc")
.add("a", ByteBufferUtil.bytes("abcd"))
.build()
.apply();
cfs.forceBlockingFlush();
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
.clustering("dd")
.add("a", ByteBufferUtil.bytes("abcd"))
.build()
.apply();
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("cc").includeRow("dd").build();
List<FilteredPartition> partitions = Util.getAll(readCommand);
assertEquals(1, partitions.size());
assertEquals(2, partitions.get(0).rowCount());
readCommand.abort();
assertEquals(0, Util.getAll(readCommand).size());
}
@Test
public void testSinglePartitionGroupMerge() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF3);
String[][][] groups = new String[][][] {
new String[][] {
new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the row
new String[] { "1", "key2", "bb", "b" },
new String[] { "1", "key3", "cc", "c" }
},
new String[][] {
new String[] { "1", "key3", "dd", "d" },
new String[] { "1", "key2", "ee", "e" },
new String[] { "1", "key1", "ff", "f" }
},
new String[][] {
new String[] { "1", "key6", "aa", "a" },
new String[] { "1", "key5", "bb", "b" },
new String[] { "1", "key4", "cc", "c" }
},
new String[][] {
new String[] { "-1", "key6", "aa", "a" },
new String[] { "-1", "key2", "bb", "b" }
}
};
// Given the data above, when the keys are sorted and the deletions removed, we should
// get these clustering rows in this order
String[] expectedRows = new String[] { "aa", "ff", "ee", "cc", "dd", "cc", "bb"};
List<ByteBuffer> buffers = new ArrayList<>(groups.length);
int nowInSeconds = FBUtilities.nowInSeconds();
ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata()).build();
RowFilter rowFilter = RowFilter.create();
Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.TOP);
ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(Slices.with(cfs.metadata().comparator, slice), false);
for (String[][] group : groups)
{
cfs.truncateBlocking();
List<SinglePartitionReadCommand> commands = new ArrayList<>(group.length);
for (String[] data : group)
{
if (data[0].equals("1"))
{
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1]))
.clustering(data[2])
.add(data[3], ByteBufferUtil.bytes("blah"))
.build()
.apply();
}
else
{
RowUpdateBuilder.deleteRow(cfs.metadata(), FBUtilities.timestampMicros(), ByteBufferUtil.bytes(data[1]), data[2]).apply();
}
commands.add(SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, columnFilter, rowFilter, DataLimits.NONE, Util.dk(data[1]), sliceFilter));
}
cfs.forceBlockingFlush();
ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
try (ReadExecutionController executionController = query.executionController();
UnfilteredPartitionIterator iter = query.executeLocally(executionController);
DataOutputBuffer buffer = new DataOutputBuffer())
{
UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter,
columnFilter,
buffer,
MessagingService.current_version);
buffers.add(buffer.buffer());
}
}
// deserialize, merge and check the results are all there
List<UnfilteredPartitionIterator> iterators = new ArrayList<>();
for (ByteBuffer buffer : buffers)
{
try (DataInputBuffer in = new DataInputBuffer(buffer, true))
{
iterators.add(UnfilteredPartitionIterators.serializerForIntraNode().deserialize(in,
MessagingService.current_version,
cfs.metadata(),
columnFilter,
DeserializationHelper.Flag.LOCAL));
}
}
UnfilteredPartitionIterators.MergeListener listener =
new UnfilteredPartitionIterators.MergeListener()
{
public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
{
return null;
}
public void close()
{
}
};
try (PartitionIterator partitionIterator = UnfilteredPartitionIterators.filter(UnfilteredPartitionIterators.merge(iterators, listener), nowInSeconds))
{
int i = 0;
int numPartitions = 0;
while (partitionIterator.hasNext())
{
numPartitions++;
try(RowIterator rowIterator = partitionIterator.next())
{
while (rowIterator.hasNext())
{
Row row = rowIterator.next();
assertEquals("col=" + expectedRows[i++], row.clustering().toString(cfs.metadata()));
//System.out.print(row.toString(cfs.metadata, true));
}
}
}
assertEquals(5, numPartitions);
assertEquals(expectedRows.length, i);
}
}
@Test
public void testSerializer() throws IOException
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
new RowUpdateBuilder(cfs.metadata.get(), 0, ByteBufferUtil.bytes("key"))
.clustering("dd")
.add("a", ByteBufferUtil.bytes("abcd"))
.build()
.apply();
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("dd").build();
int messagingVersion = MessagingService.current_version;
FakeOutputStream out = new FakeOutputStream();
Tracing.instance.newSession(Tracing.TraceType.QUERY);
Message<ReadCommand> messageOut = Message.out(Verb.READ_REQ, readCommand);
long size = messageOut.serializedSize(messagingVersion);
Message.serializer.serialize(messageOut, new WrappedDataOutputStreamPlus(out), messagingVersion);
Assert.assertEquals(size, out.count);
}
static class FakeOutputStream extends OutputStream
{
long count;
public void write(int b) throws IOException
{
count++;
}
}
@Test
public void testCountDeletedRows() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF4);
String[][][] groups = new String[][][] {
new String[][] {
new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the
// row
new String[] { "1", "key2", "bb", "b" },
new String[] { "1", "key3", "cc", "c" }
},
new String[][] {
new String[] { "1", "key3", "dd", "d" },
new String[] { "1", "key2", "ee", "e" },
new String[] { "1", "key1", "ff", "f" }
},
new String[][] {
new String[] { "1", "key6", "aa", "a" },
new String[] { "1", "key5", "bb", "b" },
new String[] { "1", "key4", "cc", "c" }
},
new String[][] {
new String[] { "1", "key2", "aa", "a" },
new String[] { "1", "key2", "cc", "c" },
new String[] { "1", "key2", "dd", "d" }
},
new String[][] {
new String[] { "-1", "key6", "aa", "a" },
new String[] { "-1", "key2", "bb", "b" },
new String[] { "-1", "key2", "ee", "e" },
new String[] { "-1", "key2", "aa", "a" },
new String[] { "-1", "key2", "cc", "c" },
new String[] { "-1", "key2", "dd", "d" }
}
};
List<ByteBuffer> buffers = new ArrayList<>(groups.length);
int nowInSeconds = FBUtilities.nowInSeconds();
ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata()).build();
RowFilter rowFilter = RowFilter.create();
Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.TOP);
ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(
Slices.with(cfs.metadata().comparator, slice), false);
for (String[][] group : groups)
{
cfs.truncateBlocking();
List<SinglePartitionReadCommand> commands = new ArrayList<>(group.length);
for (String[] data : group)
{
if (data[0].equals("1"))
{
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1]))
.clustering(data[2])
.add(data[3], ByteBufferUtil.bytes("blah"))
.build()
.apply();
}
else
{
RowUpdateBuilder.deleteRow(cfs.metadata(), FBUtilities.timestampMicros(),
ByteBufferUtil.bytes(data[1]), data[2]).apply();
}
commands.add(SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, columnFilter, rowFilter,
DataLimits.NONE, Util.dk(data[1]), sliceFilter));
}
cfs.forceBlockingFlush();
ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
try (ReadExecutionController executionController = query.executionController();
UnfilteredPartitionIterator iter = query.executeLocally(executionController);
DataOutputBuffer buffer = new DataOutputBuffer())
{
UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter,
columnFilter,
buffer,
MessagingService.current_version);
buffers.add(buffer.buffer());
}
}
assertEquals(5, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax());
}
@Test
public void testCountWithNoDeletedRow() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF5);
String[][][] groups = new String[][][] {
new String[][] {
new String[] { "1", "key1", "aa", "a" }, // "1" indicates to create the data, "-1" to delete the
// row
new String[] { "1", "key2", "bb", "b" },
new String[] { "1", "key3", "cc", "c" }
},
new String[][] {
new String[] { "1", "key3", "dd", "d" },
new String[] { "1", "key2", "ee", "e" },
new String[] { "1", "key1", "ff", "f" }
},
new String[][] {
new String[] { "1", "key6", "aa", "a" },
new String[] { "1", "key5", "bb", "b" },
new String[] { "1", "key4", "cc", "c" }
}
};
List<ByteBuffer> buffers = new ArrayList<>(groups.length);
int nowInSeconds = FBUtilities.nowInSeconds();
ColumnFilter columnFilter = ColumnFilter.allRegularColumnsBuilder(cfs.metadata()).build();
RowFilter rowFilter = RowFilter.create();
Slice slice = Slice.make(ClusteringBound.BOTTOM, ClusteringBound.TOP);
ClusteringIndexSliceFilter sliceFilter = new ClusteringIndexSliceFilter(
Slices.with(cfs.metadata().comparator, slice), false);
for (String[][] group : groups)
{
cfs.truncateBlocking();
List<SinglePartitionReadCommand> commands = new ArrayList<>(group.length);
for (String[] data : group)
{
if (data[0].equals("1"))
{
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes(data[1]))
.clustering(data[2])
.add(data[3], ByteBufferUtil.bytes("blah"))
.build()
.apply();
}
else
{
RowUpdateBuilder.deleteRow(cfs.metadata(), FBUtilities.timestampMicros(),
ByteBufferUtil.bytes(data[1]), data[2]).apply();
}
commands.add(SinglePartitionReadCommand.create(cfs.metadata(), nowInSeconds, columnFilter, rowFilter,
DataLimits.NONE, Util.dk(data[1]), sliceFilter));
}
cfs.forceBlockingFlush();
ReadQuery query = new SinglePartitionReadCommand.Group(commands, DataLimits.NONE);
try (ReadExecutionController executionController = query.executionController();
UnfilteredPartitionIterator iter = query.executeLocally(executionController);
DataOutputBuffer buffer = new DataOutputBuffer())
{
UnfilteredPartitionIterators.serializerForIntraNode().serialize(iter,
columnFilter,
buffer,
MessagingService.current_version);
buffers.add(buffer.buffer());
}
}
assertEquals(1, cfs.metric.tombstoneScannedHistogram.cf.getSnapshot().getMax());
}
@Test
public void testSinglePartitionSliceRepairedDataTracking() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
testRepairedDataTracking(cfs, readCommand);
}
@Test
public void testPartitionRangeRepairedDataTracking() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
ReadCommand readCommand = Util.cmd(cfs).build();
testRepairedDataTracking(cfs, readCommand);
}
@Test
public void testSinglePartitionNamesRepairedDataTracking() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("cc").includeRow("dd").build();
testRepairedDataTracking(cfs, readCommand);
}
@Test
public void testSinglePartitionNamesSkipsOptimisationsIfTrackingRepairedData()
{
// when tracking, the optimizations of querying sstables in timestamp order and
// returning once all requested columns are not available so just assert that
// all sstables are read when performing such queries
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF2);
cfs.truncateBlocking();
cfs.disableAutoCompaction();
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
.clustering("dd")
.add("a", ByteBufferUtil.bytes("abcd"))
.build()
.apply();
cfs.forceBlockingFlush();
new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key"))
.clustering("dd")
.add("a", ByteBufferUtil.bytes("wxyz"))
.build()
.apply();
cfs.forceBlockingFlush();
List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
assertEquals(2, sstables.size());
Collections.sort(sstables, SSTableReader.maxTimestampDescending);
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).includeRow("dd").columns("a").build();
assertEquals(0, readCount(sstables.get(0)));
assertEquals(0, readCount(sstables.get(1)));
ReadCommand withTracking = readCommand.copy();
withTracking.trackRepairedStatus();
Util.getAll(withTracking);
assertEquals(1, readCount(sstables.get(0)));
assertEquals(1, readCount(sstables.get(1)));
// same command without tracking touches only the table with the higher timestamp
Util.getAll(readCommand.copy());
assertEquals(2, readCount(sstables.get(0)));
assertEquals(1, readCount(sstables.get(1)));
}
@Test
public void dontIncludeLegacyCounterContextInDigest() throws IOException
{
// Serializations of a CounterContext containing legacy (pre-2.1) shards
// can legitimately differ across replicas. For this reason, the context
// bytes are omitted from the repaired digest if they contain legacy shards.
// This clearly has a tradeoff with the efficacy of the digest, without doing
// so false positive digest mismatches will be reported for scenarios where
// there is nothing that can be done to "fix" the replicas
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF7);
cfs.truncateBlocking();
cfs.disableAutoCompaction();
// insert a row with the counter column having value 0, in a legacy shard.
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
.clustering("aa")
.addLegacyCounterCell("c", 0L)
.build()
.apply();
cfs.forceBlockingFlush();
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
// execute a read and capture the digest
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
ByteBuffer digestWithLegacyCounter0 = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
assertFalse(EMPTY_BYTE_BUFFER.equals(digestWithLegacyCounter0));
// truncate, then re-insert the same partition, but this time with a legacy
// shard having the value 1. The repaired digest should match the previous, as
// the values (context) are not included, only the cell metadata (ttl, timestamp, etc)
cfs.truncateBlocking();
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
.clustering("aa")
.addLegacyCounterCell("c", 1L)
.build()
.apply();
cfs.forceBlockingFlush();
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
ByteBuffer digestWithLegacyCounter1 = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
assertEquals(digestWithLegacyCounter0, digestWithLegacyCounter1);
// truncate, then re-insert the same partition, but this time with a non-legacy
// counter cell present. The repaired digest should not match the previous ones
// as this time the value (context) is included.
cfs.truncateBlocking();
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
.clustering("aa")
.add("c", 1L)
.build()
.apply();
cfs.forceBlockingFlush();
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
ByteBuffer digestWithCounterCell = performReadAndVerifyRepairedInfo(readCommand, 1, 1, true);
assertFalse(EMPTY_BYTE_BUFFER.equals(digestWithCounterCell));
assertFalse(digestWithLegacyCounter0.equals(digestWithCounterCell));
assertFalse(digestWithLegacyCounter1.equals(digestWithCounterCell));
}
/**
* Writes a single partition containing a single row and reads using a partition read. The single
* row includes 1 live simple column, 1 simple tombstone and 1 complex column with a complex
* deletion and a live cell. The repaired data digests generated by executing the same query
* before and after the tombstones become eligible for purging should not match each other.
* Also, neither digest should be empty as the partition is not made empty by the purging.
*/
@Test
public void purgeGCableTombstonesBeforeCalculatingDigest() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF8);
cfs.truncateBlocking();
cfs.disableAutoCompaction();
setGCGrace(cfs, 600);
DecoratedKey[] keys = new DecoratedKey[] { Util.dk("key0"), Util.dk("key1"), Util.dk("key2"), Util.dk("key3") };
int nowInSec = FBUtilities.nowInSeconds();
TableMetadata cfm = cfs.metadata();
// A simple tombstone
new RowUpdateBuilder(cfs.metadata(), 0, keys[0]).clustering("cc").delete("a").build().apply();
// Collection with an associated complex deletion
PartitionUpdate.SimpleBuilder builder = PartitionUpdate.simpleBuilder(cfs.metadata(), keys[1]).timestamp(0);
builder.row("cc").add("c", ImmutableSet.of("element1", "element2"));
builder.buildAsMutation().apply();
// RangeTombstone and a row (not covered by the RT). The row contains a regular tombstone which will not be
// purged. This is to prevent the partition from being fully purged and removed from the final results
new RowUpdateBuilder(cfs.metadata(), nowInSec, 0L, keys[2]).addRangeTombstone("aa", "bb").build().apply();
new RowUpdateBuilder(cfs.metadata(), nowInSec+ 1000, 1000L, keys[2]).clustering("cc").delete("a").build().apply();
// Partition with 2 rows, one fully deleted
new RowUpdateBuilder(cfs.metadata.get(), 0, keys[3]).clustering("bb").add("a", ByteBufferUtil.bytes("a")).delete("b").build().apply();
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, keys[3], "cc").apply();
cfs.forceBlockingFlush();
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
Map<DecoratedKey, ByteBuffer> digestsWithTombstones = new HashMap<>();
//Tombstones are not yet purgable
for (DecoratedKey key : keys)
{
ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build();
cmd.trackRepairedStatus();
Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
assertFalse(partition.isEmpty());
partition.unfilteredIterator().forEachRemaining(u -> {
// must be either a RT, or a row containing some kind of deletion
assertTrue(u.isRangeTombstoneMarker() || ((Row)u).hasDeletion(cmd.nowInSec()));
});
ByteBuffer digestWithTombstones = cmd.getRepairedDataDigest();
// None should generate an empty digest
assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithTombstones);
digestsWithTombstones.put(key, digestWithTombstones);
}
// Make tombstones eligible for purging and re-run cmd with an incremented nowInSec
setGCGrace(cfs, 0);
//Tombstones are now purgable, so won't be in the read results and produce different digests
for (DecoratedKey key : keys)
{
ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec + 60).build();
cmd.trackRepairedStatus();
Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
assertFalse(partition.isEmpty());
partition.unfilteredIterator().forEachRemaining(u -> {
// After purging, only rows without any deletions should remain.
// The one exception is "key2:cc" which has a regular column tombstone which is not
// eligible for purging. This is to prevent the partition from being fully purged
// when its RT is removed.
assertTrue(u.isRow());
Row r = (Row)u;
assertTrue(!r.hasDeletion(cmd.nowInSec())
|| (key.equals(keys[2]) && r.clustering()
.get(0)
.equals(AsciiType.instance.fromString("cc"))));
});
ByteBuffer digestWithoutTombstones = cmd.getRepairedDataDigest();
// not an empty digest
assertDigestsDiffer(EMPTY_BYTE_BUFFER, digestWithoutTombstones);
// should not match the pre-purge digest
assertDigestsDiffer(digestsWithTombstones.get(key), digestWithoutTombstones);
}
}
@Test
public void testRepairedDataOverreadMetrics()
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF9);
cfs.truncateBlocking();
cfs.disableAutoCompaction();
cfs.metadata().withSwapped(cfs.metadata().params.unbuild()
.caching(CachingParams.CACHE_NOTHING)
.build());
// Insert and repair
insert(cfs, IntStream.range(0, 10), () -> IntStream.range(0, 10));
cfs.forceBlockingFlush();
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
// Insert and leave unrepaired
insert(cfs, IntStream.range(0, 10), () -> IntStream.range(10, 20));
// Single partition reads
int limit = 5;
ReadCommand cmd = Util.cmd(cfs, ByteBufferUtil.bytes(0)).withLimit(limit).build();
assertEquals(0, getAndResetOverreadCount(cfs));
// No overreads if not tracking
readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
assertEquals(0, getAndResetOverreadCount(cfs));
// Overread up to (limit - 1) if tracking is enabled
cmd = cmd.copy();
cmd.trackRepairedStatus();
readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
// overread count is always < limit as the first read is counted during merging (and so is expected)
assertEquals(limit - 1, getAndResetOverreadCount(cfs));
// if limit already requires reading all repaired data, no overreads should be recorded
limit = 20;
cmd = Util.cmd(cfs, ByteBufferUtil.bytes(0)).withLimit(limit).build();
readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit);
assertEquals(0, getAndResetOverreadCount(cfs));
// Range reads
limit = 5;
cmd = Util.cmd(cfs).withLimit(limit).build();
assertEquals(0, getAndResetOverreadCount(cfs));
// No overreads if not tracking
readAndCheckRowCount(Util.getAll(cmd), limit);
assertEquals(0, getAndResetOverreadCount(cfs));
// Overread up to (limit - 1) if tracking is enabled
cmd = cmd.copy();
cmd.trackRepairedStatus();
readAndCheckRowCount(Util.getAll(cmd), limit);
assertEquals(limit - 1, getAndResetOverreadCount(cfs));
// if limit already requires reading all repaired data, no overreads should be recorded
limit = 100;
cmd = Util.cmd(cfs).withLimit(limit).build();
readAndCheckRowCount(Util.getAll(cmd), limit);
assertEquals(0, getAndResetOverreadCount(cfs));
}
private void setGCGrace(ColumnFamilyStore cfs, int gcGrace)
{
TableParams newParams = cfs.metadata().params.unbuild().gcGraceSeconds(gcGrace).build();
KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(cfs.metadata().keyspace);
Schema.instance.load(
keyspaceMetadata.withSwapped(
keyspaceMetadata.tables.withSwapped(
cfs.metadata().withSwapped(newParams))));
}
private long getAndResetOverreadCount(ColumnFamilyStore cfs)
{
// always clear the histogram after reading to make comparisons & asserts easier
long rows = cfs.metric.repairedDataTrackingOverreadRows.cf.getSnapshot().getMax();
((ClearableHistogram)cfs.metric.repairedDataTrackingOverreadRows.cf).clear();
return rows;
}
private void readAndCheckRowCount(Iterable<FilteredPartition> partitions, int expected)
{
int count = 0;
for (Partition partition : partitions)
{
assertFalse(partition.isEmpty());
try (UnfilteredRowIterator iter = partition.unfilteredIterator())
{
while (iter.hasNext())
{
iter.next();
count++;
}
}
}
assertEquals(expected, count);
}
private void insert(ColumnFamilyStore cfs, IntStream partitionIds, Supplier<IntStream> rowIds)
{
partitionIds.mapToObj(ByteBufferUtil::bytes)
.forEach( pk ->
rowIds.get().forEach( c ->
new RowUpdateBuilder(cfs.metadata(), 0, pk)
.clustering(c)
.add("a", ByteBufferUtil.bytes("abcd"))
.build()
.apply()
));
}
private void assertDigestsDiffer(ByteBuffer b0, ByteBuffer b1)
{
assertTrue(ByteBufferUtil.compareUnsigned(b0, b1) != 0);
}
@Test
public void partitionReadFullyPurged() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
ReadCommand partitionRead = Util.cmd(cfs, Util.dk("key")).build();
fullyPurgedPartitionCreatesEmptyDigest(cfs, partitionRead);
}
@Test
public void rangeReadFullyPurged() throws Exception
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
ReadCommand rangeRead = Util.cmd(cfs).build();
fullyPurgedPartitionCreatesEmptyDigest(cfs, rangeRead);
}
/**
* Writes a single partition containing only a single row deletion and reads with either a range or
* partition query. Before the row deletion is eligible for purging, it should appear in the query
* results and cause a non-empty repaired data digest to be generated. Repeating the query after
* the row deletion is eligible for purging, both the result set and the repaired data digest should
* be empty.
*/
private void fullyPurgedPartitionCreatesEmptyDigest(ColumnFamilyStore cfs, ReadCommand command) throws Exception
{
cfs.truncateBlocking();
cfs.disableAutoCompaction();
setGCGrace(cfs, 600);
// Partition with a fully deleted static row and a single, fully deleted regular row
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key")).apply();
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key"), "cc").apply();
cfs.forceBlockingFlush();
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
command.trackRepairedStatus();
List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command);
assertEquals(1, partitions.size());
ByteBuffer digestWithTombstones = command.getRepairedDataDigest();
assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithTombstones) != 0);
// Make tombstones eligible for purging and re-run cmd with an incremented nowInSec
setGCGrace(cfs, 0);
AbstractReadCommandBuilder builder = command instanceof PartitionRangeReadCommand
? Util.cmd(cfs)
: Util.cmd(cfs, Util.dk("key"));
builder.withNowInSeconds(command.nowInSec() + 60);
command = builder.build();
command.trackRepairedStatus();
partitions = Util.getAllUnfiltered(command);
assertTrue(partitions.isEmpty());
ByteBuffer digestWithoutTombstones = command.getRepairedDataDigest();
assertEquals(0, ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutTombstones));
}
/**
* Verifies that during range reads which include multiple partitions, fully purged partitions
* have no material effect on the calculated digest. This test writes two sstables, each containing
* a single partition; the first is live and the second fully deleted and eligible for purging.
* Initially, only the sstable containing the live partition is marked repaired, while a range read
* which covers both partitions is performed to generate a digest. Then the sstable containing the
* purged partition is also marked repaired and the query reexecuted. The digests produced by both
* queries should match as the digest calculation should exclude the fully purged partition.
*/
@Test
public void mixedPurgedAndNonPurgedPartitions()
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
cfs.truncateBlocking();
cfs.disableAutoCompaction();
setGCGrace(cfs, 0);
ReadCommand command = Util.cmd(cfs).withNowInSeconds(FBUtilities.nowInSeconds() + 60).build();
// Live partition in a repaired sstable, so included in the digest calculation
new RowUpdateBuilder(cfs.metadata.get(), 0, ByteBufferUtil.bytes("key-0")).clustering("cc").add("a", ByteBufferUtil.bytes("a")).build().apply();
cfs.forceBlockingFlush();
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
// Fully deleted partition (static and regular rows) in an unrepaired sstable, so not included in the intial digest
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key-1")).apply();
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, ByteBufferUtil.bytes("key-1"), "cc").apply();
cfs.forceBlockingFlush();
command.trackRepairedStatus();
List<ImmutableBTreePartition> partitions = Util.getAllUnfiltered(command);
assertEquals(1, partitions.size());
ByteBuffer digestWithoutPurgedPartition = command.getRepairedDataDigest();
assertTrue(ByteBufferUtil.compareUnsigned(EMPTY_BYTE_BUFFER, digestWithoutPurgedPartition) != 0);
// mark the sstable containing the purged partition as repaired, so both partitions are now
// read during in the digest calculation. Because the purged partition is entirely
// discarded, the resultant digest should match the earlier one.
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
command = Util.cmd(cfs).withNowInSeconds(command.nowInSec()).build();
command.trackRepairedStatus();
partitions = Util.getAllUnfiltered(command);
assertEquals(1, partitions.size());
ByteBuffer digestWithPurgedPartition = command.getRepairedDataDigest();
assertEquals(0, ByteBufferUtil.compareUnsigned(digestWithPurgedPartition, digestWithoutPurgedPartition));
}
@Test
public void purgingConsidersRepairedDataOnly() throws Exception
{
// 2 sstables, first is repaired and contains data that is all purgeable
// the second is unrepaired and contains non-purgable data. Even though
// the partition itself is not fully purged, the repaired data digest
// should be empty as there was no non-purgeable, repaired data read.
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
cfs.truncateBlocking();
cfs.disableAutoCompaction();
setGCGrace(cfs, 0);
// Partition with a fully deleted static row and a single, fully deleted row which will be fully purged
DecoratedKey key = Util.dk("key");
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, key).apply();
RowUpdateBuilder.deleteRow(cfs.metadata(), 0, key, "cc").apply();
cfs.forceBlockingFlush();
cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null));
new RowUpdateBuilder(cfs.metadata(), 1, key).clustering("cc").add("a", ByteBufferUtil.bytes("a")).build().apply();
cfs.forceBlockingFlush();
int nowInSec = FBUtilities.nowInSeconds() + 10;
ReadCommand cmd = Util.cmd(cfs, key).withNowInSeconds(nowInSec).build();
cmd.trackRepairedStatus();
Partition partition = Util.getOnlyPartitionUnfiltered(cmd);
assertFalse(partition.isEmpty());
// check that
try (UnfilteredRowIterator rows = partition.unfilteredIterator())
{
assertFalse(rows.isEmpty());
Unfiltered unfiltered = rows.next();
assertFalse(rows.hasNext());
assertTrue(unfiltered.isRow());
assertFalse(((Row) unfiltered).hasDeletion(nowInSec));
}
assertEquals(EMPTY_BYTE_BUFFER, cmd.getRepairedDataDigest());
}
private long readCount(SSTableReader sstable)
{
return sstable.getReadMeter().count();
}
@Test
public void skipRowCacheIfTrackingRepairedData()
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
cfs.truncateBlocking();
cfs.disableAutoCompaction();
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
.clustering("cc")
.add("a", ByteBufferUtil.bytes("abcd"))
.build()
.apply();
cfs.forceBlockingFlush();
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
assertTrue(cfs.isRowCacheEnabled());
// warm the cache
assertFalse(Util.getAll(readCommand).isEmpty());
long cacheHits = cfs.metric.rowCacheHit.getCount();
Util.getAll(readCommand);
assertTrue(cfs.metric.rowCacheHit.getCount() > cacheHits);
cacheHits = cfs.metric.rowCacheHit.getCount();
ReadCommand withRepairedInfo = readCommand.copy();
withRepairedInfo.trackRepairedStatus();
Util.getAll(withRepairedInfo);
assertEquals(cacheHits, cfs.metric.rowCacheHit.getCount());
}
@Test (expected = IllegalArgumentException.class)
public void copyFullAsTransientTest()
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
readCommand.copyAsTransientQuery(ReplicaUtils.full(FBUtilities.getBroadcastAddressAndPort()));
}
@Test (expected = IllegalArgumentException.class)
public void copyTransientAsDigestQuery()
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
ReadCommand readCommand = Util.cmd(cfs, Util.dk("key")).build();
readCommand.copyAsDigestQuery(ReplicaUtils.trans(FBUtilities.getBroadcastAddressAndPort()));
}
@Test (expected = IllegalArgumentException.class)
public void copyMultipleFullAsTransientTest()
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
DecoratedKey key = Util.dk("key");
Token token = key.getToken();
// Address is unimportant for this test
InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
ReadCommand readCommand = Util.cmd(cfs, key).build();
readCommand.copyAsTransientQuery(EndpointsForToken.of(token,
ReplicaUtils.trans(addr, token),
ReplicaUtils.full(addr, token)));
}
@Test (expected = IllegalArgumentException.class)
public void copyMultipleTransientAsDigestQuery()
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF6);
DecoratedKey key = Util.dk("key");
Token token = key.getToken();
// Address is unimportant for this test
InetAddressAndPort addr = FBUtilities.getBroadcastAddressAndPort();
ReadCommand readCommand = Util.cmd(cfs, key).build();
readCommand.copyAsDigestQuery(EndpointsForToken.of(token,
ReplicaUtils.trans(addr, token),
ReplicaUtils.full(addr, token)));
}
private void testRepairedDataTracking(ColumnFamilyStore cfs, ReadCommand readCommand) throws IOException
{
cfs.truncateBlocking();
cfs.disableAutoCompaction();
new RowUpdateBuilder(cfs.metadata(), 0, ByteBufferUtil.bytes("key"))
.clustering("cc")
.add("a", ByteBufferUtil.bytes("abcd"))
.build()
.apply();
cfs.forceBlockingFlush();
new RowUpdateBuilder(cfs.metadata(), 1, ByteBufferUtil.bytes("key"))
.clustering("dd")
.add("a", ByteBufferUtil.bytes("abcd"))
.build()
.apply();
cfs.forceBlockingFlush();
List<SSTableReader> sstables = new ArrayList<>(cfs.getLiveSSTables());
assertEquals(2, sstables.size());
sstables.forEach(sstable -> assertFalse(sstable.isRepaired() || sstable.isPendingRepair()));
SSTableReader sstable1 = sstables.get(0);
SSTableReader sstable2 = sstables.get(1);
int numPartitions = 1;
int rowsPerPartition = 2;
// Capture all the digest versions as we mutate the table's repaired status. Each time
// we make a change, we expect a different digest.
Set<ByteBuffer> digests = new HashSet<>();
// first time round, nothing has been marked repaired so we expect digest to be an empty buffer and to be marked conclusive
ByteBuffer digest = performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true);
assertEquals(EMPTY_BYTE_BUFFER, digest);
digests.add(digest);
// add a pending repair session to table1, digest should remain the same but now we expect it to be marked inconclusive
UUID session1 = UUIDGen.getTimeUUID();
mutateRepaired(cfs, sstable1, ActiveRepairService.UNREPAIRED_SSTABLE, session1);
digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
assertEquals(1, digests.size());
// add a different pending session to table2, digest should remain the same and still consider it inconclusive
UUID session2 = UUIDGen.getTimeUUID();
mutateRepaired(cfs, sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, session2);
digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
assertEquals(1, digests.size());
// mark one table repaired
mutateRepaired(cfs, sstable1, 111, null);
// this time, digest should not be empty, session2 still means that the result is inconclusive
digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, false));
assertEquals(2, digests.size());
// mark the second table repaired
mutateRepaired(cfs, sstable2, 222, null);
// digest should be updated again and as there are no longer any pending sessions, it should be considered conclusive
digests.add(performReadAndVerifyRepairedInfo(readCommand, numPartitions, rowsPerPartition, true));
assertEquals(3, digests.size());
// insert a partition tombstone into the memtable, then re-check the repaired info.
// This is to ensure that when the optimisations which skip reading from sstables
// when a newer partition tombstone has already been cause the digest to be marked
// as inconclusive.
// the exception to this case is for partition range reads, where we always read
// and generate digests for all sstables, so we only test this path for single partition reads
if (readCommand.isLimitedToOnePartition())
{
new Mutation(PartitionUpdate.simpleBuilder(cfs.metadata(), ByteBufferUtil.bytes("key"))
.delete()
.build()).apply();
digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
assertEquals(EMPTY_BYTE_BUFFER, digest);
// now flush so we have an unrepaired table with the deletion and repeat the check
cfs.forceBlockingFlush();
digest = performReadAndVerifyRepairedInfo(readCommand, 0, rowsPerPartition, false);
assertEquals(EMPTY_BYTE_BUFFER, digest);
}
}
private void mutateRepaired(ColumnFamilyStore cfs, SSTableReader sstable, long repairedAt, UUID pendingSession)
{
try
{
sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingSession, false);
sstable.reloadSSTableMetadata();
}
catch (IOException e)
{
e.printStackTrace();
fail("Caught IOException when mutating sstable metadata");
}
if (pendingSession != null)
{
// setup a minimal repair session. This is necessary because we
// check for sessions which have exceeded timeout and been purged
Range<Token> range = new Range<>(cfs.metadata().partitioner.getMinimumToken(),
cfs.metadata().partitioner.getRandomToken());
ActiveRepairService.instance.registerParentRepairSession(pendingSession,
REPAIR_COORDINATOR,
Lists.newArrayList(cfs),
Sets.newHashSet(range),
true,
repairedAt,
true,
PreviewKind.NONE);
LocalSessionAccessor.prepareUnsafe(pendingSession, null, Sets.newHashSet(REPAIR_COORDINATOR));
}
}
private ByteBuffer performReadAndVerifyRepairedInfo(ReadCommand command,
int expectedPartitions,
int expectedRowsPerPartition,
boolean expectConclusive)
{
// perform equivalent read command multiple times and assert that
// the repaired data info is always consistent. Return the digest
// so we can verify that it changes when the repaired status of
// the queried tables does.
Set<ByteBuffer> digests = new HashSet<>();
for (int i = 0; i < 10; i++)
{
ReadCommand withRepairedInfo = command.copy();
withRepairedInfo.trackRepairedStatus();
List<FilteredPartition> partitions = Util.getAll(withRepairedInfo);
assertEquals(expectedPartitions, partitions.size());
partitions.forEach(p -> assertEquals(expectedRowsPerPartition, p.rowCount()));
ByteBuffer digest = withRepairedInfo.getRepairedDataDigest();
digests.add(digest);
assertEquals(1, digests.size());
assertEquals(expectConclusive, withRepairedInfo.isRepairedDataDigestConclusive());
}
return digests.iterator().next();
}
}