blob: a4dacebc9b0281152973d55ec9356382518a244e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.cassandra.concurrent.SEPExecutor;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.utils.Throwables;
import org.junit.Assert;
import org.junit.Test;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.MethodDelegation;
import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
import org.apache.cassandra.io.sstable.metadata.MetadataType;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
import org.apache.cassandra.utils.DiagnosticSnapshotService;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class RepairDigestTrackingTest extends TestBaseImpl
{
private static final String TABLE = "tbl";
private static final String KS_TABLE = KEYSPACE + '.' + TABLE;
@SuppressWarnings("Convert2MethodRef")
@Test
public void testInconsistenciesFound() throws Throwable
{
try (Cluster cluster = init(builder().withNodes(2).start()))
{
cluster.get(1).runOnInstance(() -> StorageProxy.instance.enableRepairedDataTrackingForRangeReads());
cluster.schemaChange("CREATE TABLE " + KS_TABLE+ " (k INT, c INT, v INT, PRIMARY KEY (k,c)) with read_repair='NONE'");
for (int i = 0; i < 10; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)",
ConsistencyLevel.ALL,
i, i, i);
}
cluster.forEach(i -> i.flush(KEYSPACE));
for (int i = 10; i < 20; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)",
ConsistencyLevel.ALL,
i, i, i);
}
cluster.forEach(i -> i.flush(KEYSPACE));
cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
// mark everything on node 2 repaired
cluster.get(2).runOnInstance(markAllRepaired());
cluster.get(2).runOnInstance(assertRepaired());
// insert more data on node1 to generate an initial mismatch
cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (?, ?, ?)", 5, 5, 55);
cluster.get(1).runOnInstance(assertNotRepaired());
long ccBefore = getConfirmedInconsistencies(cluster.get(1));
cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL);
long ccAfter = getConfirmedInconsistencies(cluster.get(1));
Assert.assertEquals("confirmed count should differ by 1 after range read", ccBefore + 1, ccAfter);
}
}
@SuppressWarnings("Convert2MethodRef")
@Test
public void testPurgeableTombstonesAreIgnored() throws Throwable
{
try (Cluster cluster = init(builder().withNodes(2).start()))
{
cluster.get(1).runOnInstance(() -> StorageProxy.instance.enableRepairedDataTrackingForRangeReads());
cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, v2 INT, PRIMARY KEY (k,c)) WITH gc_grace_seconds=0");
// on node1 only insert some tombstones, then flush
for (int i = 0; i < 10; i++)
{
cluster.get(1).executeInternal("DELETE v1 FROM " + KS_TABLE + " USING TIMESTAMP 0 WHERE k=? and c=? ", i, i);
}
cluster.get(1).flush(KEYSPACE);
// insert data on both nodes and flush
for (int i = 0; i < 10; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 1",
ConsistencyLevel.ALL,
i, i, i);
}
cluster.forEach(i -> i.flush(KEYSPACE));
// nothing is repaired yet
cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
// mark everything repaired
cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
cluster.forEach(i -> i.runOnInstance(assertRepaired()));
// now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected
for (int i = 0; i < 10; i++)
{
cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v2) VALUES (?, ?, ?) USING TIMESTAMP 2", i, i, i * 2);
}
long ccBefore = getConfirmedInconsistencies(cluster.get(1));
// Unfortunately we need to sleep here to ensure that nowInSec > the local deletion time of the tombstones
TimeUnit.SECONDS.sleep(2);
cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE, ConsistencyLevel.ALL);
long ccAfter = getConfirmedInconsistencies(cluster.get(1));
Assert.assertEquals("No repaired data inconsistencies should be detected", ccBefore, ccAfter);
}
}
@SuppressWarnings("Convert2MethodRef")
@Test
public void testSnapshottingOnInconsistency() throws Throwable
{
try (Cluster cluster = init(Cluster.create(2)))
{
cluster.get(1).runOnInstance(() -> StorageProxy.instance.enableRepairedDataTrackingForPartitionReads());
cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v INT, PRIMARY KEY (k,c))");
for (int i = 0; i < 10; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)",
ConsistencyLevel.ALL, i, i);
}
cluster.forEach(c -> c.flush(KEYSPACE));
for (int i = 10; i < 20; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)",
ConsistencyLevel.ALL, i, i);
}
cluster.forEach(c -> c.flush(KEYSPACE));
cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
// Mark everything repaired on node2
cluster.get(2).runOnInstance(markAllRepaired());
cluster.get(2).runOnInstance(assertRepaired());
// now overwrite on node1 only to generate digest mismatches
cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 55);
cluster.get(1).runOnInstance(assertNotRepaired());
// Execute a partition read and assert inconsistency is detected (as nothing is repaired on node1)
long ccBefore = getConfirmedInconsistencies(cluster.get(1));
cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL);
long ccAfter = getConfirmedInconsistencies(cluster.get(1));
Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 1, ccAfter);
String snapshotName = DiagnosticSnapshotService.getSnapshotName(DiagnosticSnapshotService.REPAIRED_DATA_MISMATCH_SNAPSHOT_PREFIX);
cluster.forEach(i -> i.runOnInstance(assertSnapshotNotPresent(snapshotName)));
// re-introduce a mismatch, enable snapshotting and try again
cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v) VALUES (0, ?, ?)", 5, 555);
cluster.get(1).runOnInstance(() -> StorageProxy.instance.enableSnapshotOnRepairedDataMismatch());
cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0", ConsistencyLevel.ALL);
ccAfter = getConfirmedInconsistencies(cluster.get(1));
Assert.assertEquals("confirmed count should increment by 1 after each partition read", ccBefore + 2, ccAfter);
cluster.forEach(i -> i.runOnInstance(assertSnapshotPresent(snapshotName)));
}
}
@Test
public void testRepairedReadCountNormalizationWithInitialUnderread() throws Throwable
{
// Asserts that the amount of repaired data read for digest generation is consistent
// across replicas where one has to read less repaired data to satisfy the original
// limits of the read request.
try (Cluster cluster = init(Cluster.create(2)))
{
cluster.get(1).runOnInstance(() -> {
StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
});
cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " +
"WITH CLUSTERING ORDER BY (c DESC)");
// insert data on both nodes and flush
for (int i=0; i<20; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0",
ConsistencyLevel.ALL, i, i);
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
ConsistencyLevel.ALL, i, i);
}
cluster.forEach(c -> c.flush(KEYSPACE));
// nothing is repaired yet
cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
// mark everything repaired
cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
cluster.forEach(i -> i.runOnInstance(assertRepaired()));
// Add some unrepaired data to both nodes
for (int i=20; i<30; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
ConsistencyLevel.ALL, i, i);
}
// And some more unrepaired data to node2 only. This causes node2 to read less repaired data than node1
// when satisfying the limits of the read. So node2 needs to overread more repaired data than node1 when
// calculating the repaired data digest.
cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1", 30, 30);
// Verify single partition read
long ccBefore = getConfirmedInconsistencies(cluster.get(1));
assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=1 LIMIT 20", ConsistencyLevel.ALL),
rows(1, 30, 11));
long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1));
// Recreate a mismatch in unrepaired data and verify partition range read
cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?)", 31, 31);
assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 30", ConsistencyLevel.ALL),
rows(1, 31, 2));
long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1));
if (ccAfterPartitionRead != ccAfterRangeRead)
if (ccAfterPartitionRead != ccBefore)
fail("Both range and partition reads reported data inconsistencies but none were expected");
else
fail("Reported inconsistency during range read but none were expected");
else if (ccAfterPartitionRead != ccBefore)
fail("Reported inconsistency during partition read but none were expected");
}
}
@Test
public void testRepairedReadCountNormalizationWithInitialOverread() throws Throwable
{
// Asserts that the amount of repaired data read for digest generation is consistent
// across replicas where one has to read more repaired data to satisfy the original
// limits of the read request.
try (Cluster cluster = init(Cluster.create(2)))
{
cluster.get(1).runOnInstance(() -> {
StorageProxy.instance.enableRepairedDataTrackingForRangeReads();
StorageProxy.instance.enableRepairedDataTrackingForPartitionReads();
});
cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " +
"WITH CLUSTERING ORDER BY (c DESC)");
// insert data on both nodes and flush
for (int i=0; i<10; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0",
ConsistencyLevel.ALL, i, i);
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
ConsistencyLevel.ALL, i, i);
}
cluster.forEach(c -> c.flush(KEYSPACE));
// nothing is repaired yet
cluster.forEach(i -> i.runOnInstance(assertNotRepaired()));
// mark everything repaired
cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
cluster.forEach(i -> i.runOnInstance(assertRepaired()));
// Add some unrepaired data to both nodes
for (int i=10; i<13; i++)
{
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 1",
ConsistencyLevel.ALL, i, i);
cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1",
ConsistencyLevel.ALL, i, i);
}
cluster.forEach(c -> c.flush(KEYSPACE));
// And some row deletions on node2 only which cover data in the repaired set
// This will cause node2 to read more repaired data in satisfying the limit of the read request
// so it should overread less than node1 (in fact, it should not overread at all) in order to
// calculate the repaired data digest.
for (int i=7; i<10; i++)
{
cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 0 AND c = ?", i);
cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 1 AND c = ?", i);
}
// Verify single partition read
long ccBefore = getConfirmedInconsistencies(cluster.get(1));
assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0 LIMIT 5", ConsistencyLevel.ALL),
rows(rows(0, 12, 10), rows(0, 6, 5)));
long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1));
// Verify partition range read
assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 11", ConsistencyLevel.ALL),
rows(rows(1, 12, 10), rows(1, 6, 0), rows(0, 12, 12)));
long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1));
if (ccAfterPartitionRead != ccAfterRangeRead)
if (ccAfterPartitionRead != ccBefore)
fail("Both range and partition reads reported data inconsistencies but none were expected");
else
fail("Reported inconsistency during range read but none were expected");
else if (ccAfterPartitionRead != ccBefore)
fail("Reported inconsistency during partition read but none were expected");
}
}
/**
* In CASSANDRA-16721, we discovered that if responses from remote replicas came back while the local runnable was
* still executing, the fact that {@link ReadCommand} was mutable meant that the trackRepairedStatus flag on the
* command instance could move from false to true in executeLocally(), between setting the
* RepairedDataInfo/gathering the sstables and calling extend(). When this happened, the RDI was still the
* stand-in object NO_OP_REPAIRED_DATA_INFO, which has a null repairedDataCounter, and we hit the NPE.
*
* Similarly, the trackRepairedStatus flag could be set after the point at which the RDI is set on the local
* read, assigned to the repairedDataInfo in {@link ReadCommand}, and improperly shared between initial local read
* and the local read triggered by read repair.
*
* These problems are sidestepped completely by CASSANDRA-16721, as an RDI instance is now created and destroyed
* entirely within the scope of single {@link LocalReadRunnable}, but this test still attempts to validate some
* assumptions about the cleanliness of the logs and the correctness of queries made when initial local reads and
* local reads triggered by read repair (after speculative reads) execute at roughly the same time.
*
* This test depends on whether node1 gets a data or a digest request first, we force it to be a digest request
* in the forTokenReadLiveSorted ByteBuddy rule below.
*/
@Test
public void testLocalDataAndRemoteRequestConcurrency() throws Exception
{
try (Cluster cluster = init(Cluster.build(3)
.withInstanceInitializer(BBHelper::install)
.withConfig(config -> config.set("repaired_data_tracking_for_partition_reads_enabled", true)
.with(GOSSIP)
.with(NETWORK))
.start()))
{
// A speculative read is the reason we have two remote replicas in play that can return results before
// the local replica does.
setupSchema(cluster, "create table " + KS_TABLE + " (id int primary key, t int) WITH speculative_retry = 'ALWAYS'");
cluster.get(1).executeInternal("INSERT INTO " + KS_TABLE + " (id, t) values (0, 0)");
cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (id, t) values (0, 0)");
cluster.get(3).executeInternal("INSERT INTO " + KS_TABLE + " (id, t) values (0, 1)");
cluster.forEach(c -> c.flush(KEYSPACE));
cluster.forEach(i -> i.runOnInstance(markAllRepaired()));
cluster.forEach(i -> i.runOnInstance(assertRepaired()));
long logPositionBeforeQuery = cluster.get(1).logs().mark();
Object[][] rows = cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE id=0", ConsistencyLevel.QUORUM);
assertEquals(1, rows.length);
// Given we didn't write at QUORUM, both 0 and 1 are acceptable values.
assertTrue((int) rows[0][1] == 0 || (int) rows[0][1] == 1);
List<String> result = cluster.get(1).logs().grepForErrors(logPositionBeforeQuery).getResult();
assertEquals(Collections.emptyList(), result);
Assert.assertTrue("Encountered an error", result.isEmpty());
}
}
public static class BBHelper
{
private static final CyclicBarrier barrier = new CyclicBarrier(2);
public static void install(ClassLoader classLoader, Integer num)
{
// Only install on the coordinating node, which is also a replica...
if (num == 1)
{
new ByteBuddy().rebase(SEPExecutor.class)
.method(named("maybeExecuteImmediately"))
.intercept(MethodDelegation.to(BBHelper.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
new ByteBuddy().rebase(SinglePartitionReadCommand.class)
.method(named("executeLocally"))
.intercept(MethodDelegation.to(BBHelper.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
new ByteBuddy().rebase(ReplicaLayout.class)
.method(named("forTokenReadLiveSorted").and(takesArguments(AbstractReplicationStrategy.class, Token.class)))
.intercept(MethodDelegation.to(BBHelper.class))
.make()
.load(classLoader, ClassLoadingStrategy.Default.INJECTION);
}
}
@SuppressWarnings("unused")
public static void maybeExecuteImmediately(Runnable command)
{
// Force local read runnables (from initial read and read-repair) to execute in separate threads.
new Thread(command).start();
}
@SuppressWarnings({ "unused" })
public static UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController,
@SuperCall Callable<UnfilteredPartitionIterator> zuperCall)
{
try
{
if (executionController.metadata().name.equals(TABLE))
{
// Force both the initial local read and the local read triggered by read-repair to proceed at
// roughly the same time.
barrier.await();
}
return zuperCall.call();
}
catch (Exception e)
{
throw Throwables.unchecked(e);
}
}
@SuppressWarnings({ "unused" })
public static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(AbstractReplicationStrategy replicationStrategy, Token token)
{
try
{
EndpointsForToken.Builder builder = EndpointsForToken.builder(token, 3);
builder.add(ReplicaUtils.full(InetAddressAndPort.getByName("127.0.0.3")));
builder.add(ReplicaUtils.full(InetAddressAndPort.getByName("127.0.0.2")));
builder.add(ReplicaUtils.full(InetAddressAndPort.getByName("127.0.0.1")));
return new ReplicaLayout.ForTokenRead(replicationStrategy, builder.build());
}
catch (Exception e)
{
throw Throwables.unchecked(e);
}
}
}
private Object[][] rows(Object[][] head, Object[][]...tail)
{
return Stream.concat(Stream.of(head),
Stream.of(tail).flatMap(Stream::of))
.toArray(Object[][]::new);
}
private Object[][] rows(int partitionKey, int start, int end)
{
if (start == end)
return new Object[][] { new Object[] { partitionKey, start, end } };
IntStream clusterings = start > end
? IntStream.range(end -1, start).map(i -> start - i + end - 1)
: IntStream.range(start, end);
return clusterings.mapToObj(i -> new Object[] {partitionKey, i, i}).toArray(Object[][]::new);
}
private IIsolatedExecutor.SerializableRunnable assertNotRepaired()
{
return () ->
{
try
{
Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
.getColumnFamilyStore(TABLE)
.getLiveSSTables()
.iterator();
while (sstables.hasNext())
{
SSTableReader sstable = sstables.next();
Descriptor descriptor = sstable.descriptor;
Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
.deserialize(descriptor, EnumSet.of(MetadataType.STATS));
StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
Assert.assertEquals("repaired at is set for sstable: " + descriptor,
stats.repairedAt,
ActiveRepairService.UNREPAIRED_SSTABLE);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}
private IIsolatedExecutor.SerializableRunnable markAllRepaired()
{
return () ->
{
try
{
Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
.getColumnFamilyStore(TABLE)
.getLiveSSTables()
.iterator();
while (sstables.hasNext())
{
SSTableReader sstable = sstables.next();
Descriptor descriptor = sstable.descriptor;
descriptor.getMetadataSerializer()
.mutateRepairMetadata(descriptor, System.currentTimeMillis(), null, false);
sstable.reloadSSTableMetadata();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
};
}
private IIsolatedExecutor.SerializableRunnable assertRepaired()
{
return () ->
{
try
{
Iterator<SSTableReader> sstables = Keyspace.open(KEYSPACE)
.getColumnFamilyStore(TABLE)
.getLiveSSTables()
.iterator();
while (sstables.hasNext())
{
SSTableReader sstable = sstables.next();
Descriptor descriptor = sstable.descriptor;
Map<MetadataType, MetadataComponent> metadata = descriptor.getMetadataSerializer()
.deserialize(descriptor, EnumSet.of(MetadataType.STATS));
StatsMetadata stats = (StatsMetadata) metadata.get(MetadataType.STATS);
Assert.assertTrue("repaired at is not set for sstable: " + descriptor, stats.repairedAt > 0);
}
}
catch (IOException e)
{
throw new RuntimeException(e);
}
};
}
@SuppressWarnings("UnstableApiUsage")
private IInvokableInstance.SerializableRunnable assertSnapshotPresent(String snapshotName)
{
return () ->
{
// snapshots are taken asynchronously, this is crude but it gives it a chance to happen
int attempts = 100;
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
while (cfs.getSnapshotDetails().isEmpty())
{
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
if (attempts-- < 0)
throw new AssertionError(String.format("Snapshot %s not found for for %s", snapshotName, KS_TABLE));
}
};
}
private IInvokableInstance.SerializableRunnable assertSnapshotNotPresent(String snapshotName)
{
return () ->
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
Assert.assertFalse(cfs.snapshotExists(snapshotName));
};
}
private long getConfirmedInconsistencies(IInvokableInstance instance)
{
return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
.getColumnFamilyStore(TABLE)
.metric
.confirmedRepairedInconsistencies
.table
.getCount());
}
private void setupSchema(Cluster cluster, String cql)
{
cluster.schemaChange(cql);
// disable auto compaction to prevent nodes from trying to compact
// new sstables with ones we've modified to mark repaired
cluster.forEach(i -> i.runOnInstance(() -> Keyspace.open(KEYSPACE)
.getColumnFamilyStore(TABLE)
.disableAutoCompaction()));
}
}