blob: 849bc652e167c049c97b8f923f1219708aa37a33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICoordinator;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.IMessageFilters;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.impl.Instance;
import org.apache.cassandra.distributed.shared.RepairResult;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.messages.FinalizePropose;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.SimpleCondition;
import org.apache.cassandra.utils.progress.ProgressEventType;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class PreviewRepairTest extends TestBaseImpl
{
@BeforeClass
public static void setup()
{
DatabaseDescriptor.daemonInitialization();
}
/**
* makes sure that the repaired sstables are not matching on the two
* nodes by disabling autocompaction on node2 and then running an
* incremental repair
*/
@Test
public void testWithMismatchingPending() throws Throwable
{
try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
{
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
insert(cluster.coordinator(1), 0, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
cluster.get(1).callOnInstance(repair(options(false, false)));
insert(cluster.coordinator(1), 100, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
// make sure that all sstables have moved to repaired by triggering a compaction
// also disables autocompaction on the nodes
cluster.forEach((node) -> node.runOnInstance(() -> {
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
cfs.disableAutoCompaction();
}));
long[] marks = logMark(cluster);
cluster.get(1).callOnInstance(repair(options(false, false)));
// now re-enable autocompaction on node1, this moves the sstables for the new repair to repaired
cluster.get(1).runOnInstance(() -> {
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
cfs.enableAutoCompaction();
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
});
waitLogsRepairFullyFinished(cluster, marks);
RepairResult rs = cluster.get(1).callOnInstance(repair(options(true, false)));
assertTrue(rs.success); // preview repair should succeed
assertFalse(rs.wasInconsistent); // and we should see no mismatches
}
}
public static void waitLogsRepairFullyFinished(Cluster cluster, long[] marks) throws TimeoutException
{
for (int i = 0; i < cluster.size(); i++)
cluster.get(i + 1).logs().watchFor(marks[i], "Finalized local repair session");
}
public static long[] logMark(Cluster cluster)
{
long [] marks = new long[cluster.size()];
for (int i = 0; i < cluster.size(); i++)
{
marks[i] = cluster.get(i + 1).logs().mark();
}
return marks;
}
/**
* another case where the repaired datasets could mismatch is if an incremental repair finishes just as the preview
* repair is starting up.
*
* This tests this case:
* 1. we start a preview repair
* 2. pause the validation requests from node1 -> node2
* 3. node1 starts its validation
* 4. run an incremental repair which completes fine
* 5. node2 resumes its validation
*
* Now we will include sstables from the second incremental repair on node2 but not on node1
* This should fail since we fail any preview repair which is ongoing when an incremental repair finishes (step 4 above)
*/
@Test
public void testFinishingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
{
ExecutorService es = Executors.newSingleThreadExecutor();
try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
{
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
insert(cluster.coordinator(1), 0, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
cluster.get(1).callOnInstance(repair(options(false, false)));
insert(cluster.coordinator(1), 100, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
SimpleCondition previewRepairStarted = new SimpleCondition();
SimpleCondition continuePreviewRepair = new SimpleCondition();
DelayFirstRepairTypeMessageFilter filter = DelayFirstRepairTypeMessageFilter.validationRequest(previewRepairStarted, continuePreviewRepair);
// this pauses the validation request sent from node1 to node2 until we have run a full inc repair below
cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
Future<RepairResult> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, false))));
previewRepairStarted.await();
// this needs to finish before the preview repair is unpaused on node2
cluster.get(1).callOnInstance(repair(options(false, false)));
continuePreviewRepair.signalAll();
RepairResult rs = rsFuture.get();
assertFalse(rs.success); // preview repair should have failed
assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
}
finally
{
es.shutdown();
}
}
/**
* Tests that a IR is running, but not completed before validation compaction starts
*/
@Test
public void testConcurrentIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
{
try (Cluster cluster = init(Cluster.build(2).withConfig(config ->
config.with(GOSSIP)
.with(NETWORK)).start()))
{
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
insert(cluster.coordinator(1), 0, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
cluster.get(1).callOnInstance(repair(options(false, false)));
insert(cluster.coordinator(1), 100, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
SimpleCondition previewRepairStarted = new SimpleCondition();
SimpleCondition continuePreviewRepair = new SimpleCondition();
// this pauses the validation request sent from node1 to node2 until the inc repair below has run
cluster.filters()
.outbound()
.verbs(Verb.VALIDATION_REQ.id)
.from(1).to(2)
.messagesMatching(DelayFirstRepairTypeMessageFilter.validationRequest(previewRepairStarted, continuePreviewRepair))
.drop();
SimpleCondition irRepairStarted = new SimpleCondition();
SimpleCondition continueIrRepair = new SimpleCondition();
// this blocks the IR from committing, so we can reenable the preview
cluster.filters()
.outbound()
.verbs(Verb.FINALIZE_PROPOSE_MSG.id)
.from(1).to(2)
.messagesMatching(DelayFirstRepairTypeMessageFilter.finalizePropose(irRepairStarted, continueIrRepair))
.drop();
Future<RepairResult> previewResult = cluster.get(1).asyncCallsOnInstance(repair(options(true, false))).call();
previewRepairStarted.await();
// trigger IR and wait till its ready to commit
Future<RepairResult> irResult = cluster.get(1).asyncCallsOnInstance(repair(options(false, false))).call();
irRepairStarted.await();
// unblock preview repair and wait for it to complete
continuePreviewRepair.signalAll();
RepairResult rs = previewResult.get();
assertFalse(rs.success); // preview repair should have failed
assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
continueIrRepair.signalAll();
RepairResult ir = irResult.get();
assertTrue(ir.success);
assertFalse(ir.wasInconsistent); // not preview, so we don't care about preview notification
}
}
/**
* Same as testFinishingIncRepairDuringPreview but the previewed range does not intersect the incremental repair
* so both preview and incremental repair should finish fine (without any mismatches)
*/
@Test
public void testFinishingNonIntersectingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
{
ExecutorService es = Executors.newSingleThreadExecutor();
try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
{
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
insert(cluster.coordinator(1), 0, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
assertTrue(cluster.get(1).callOnInstance(repair(options(false, false))).success);
insert(cluster.coordinator(1), 100, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
// pause preview repair validation messages on node2 until node1 has finished
SimpleCondition previewRepairStarted = new SimpleCondition();
SimpleCondition continuePreviewRepair = new SimpleCondition();
DelayFirstRepairTypeMessageFilter filter = DelayFirstRepairTypeMessageFilter.validationRequest(previewRepairStarted, continuePreviewRepair);
cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
// get local ranges to repair two separate ranges:
List<String> localRanges = cluster.get(1).callOnInstance(() -> {
List<String> res = new ArrayList<>();
for (Range<Token> r : StorageService.instance.getLocalReplicas(KEYSPACE).ranges())
res.add(r.left.getTokenValue()+ ":"+ r.right.getTokenValue());
return res;
});
assertEquals(2, localRanges.size());
Future<RepairResult> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, false, localRanges.get(0)))));
previewRepairStarted.await(); // wait for node1 to start validation compaction
// this needs to finish before the preview repair is unpaused on node2
assertTrue(cluster.get(1).callOnInstance(repair(options(false, false, localRanges.get(1)))).success);
continuePreviewRepair.signalAll();
RepairResult rs = repairStatusFuture.get();
assertTrue(rs.success); // repair should succeed
assertFalse(rs.wasInconsistent); // and no mismatches
}
finally
{
es.shutdown();
}
}
/**
* Makes sure we can start a non-intersecting preview repair while there are other pending sstables on disk
*/
@Test
public void testStartNonIntersectingPreviewRepair() throws IOException, InterruptedException, ExecutionException
{
ExecutorService es = Executors.newSingleThreadExecutor();
try(Cluster cluster = init(Cluster.build(2).withConfig(config ->
config.with(GOSSIP)
.with(NETWORK))
.start()))
{
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
insert(cluster.coordinator(1), 0, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
cluster.get(1).nodetoolResult("repair", KEYSPACE, "tbl").asserts().success();
insert(cluster.coordinator(1), 100, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
// pause inc repair validation messages on node2 until node1 has finished
SimpleCondition incRepairStarted = new SimpleCondition();
SimpleCondition continueIncRepair = new SimpleCondition();
DelayFirstRepairTypeMessageFilter filter = DelayFirstRepairTypeMessageFilter.validationRequest(incRepairStarted, continueIncRepair);
cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
// get local ranges to repair two separate ranges:
List<String> localRanges = cluster.get(1).callOnInstance(() -> {
List<String> res = new ArrayList<>();
for (Range<Token> r : StorageService.instance.getLocalReplicas(KEYSPACE).ranges())
res.add(r.left.getTokenValue()+ ":"+ r.right.getTokenValue());
return res;
});
assertEquals(2, localRanges.size());
String [] previewedRange = localRanges.get(0).split(":");
String [] repairedRange = localRanges.get(1).split(":");
Future<NodeToolResult> repairStatusFuture = es.submit(() -> cluster.get(1).nodetoolResult("repair", "-st", repairedRange[0], "-et", repairedRange[1], KEYSPACE, "tbl"));
incRepairStarted.await(); // wait for node1 to start validation compaction
// now we have pending sstables in range "repairedRange", make sure we can preview "previewedRange"
cluster.get(1).nodetoolResult("repair", "-vd", "-st", previewedRange[0], "-et", previewedRange[1], KEYSPACE, "tbl")
.asserts()
.success()
.notificationContains("Repaired data is in sync");
continueIncRepair.signalAll();
repairStatusFuture.get().asserts().success();
}
finally
{
es.shutdown();
}
}
@Test
public void snapshotTest() throws IOException, InterruptedException
{
try(Cluster cluster = init(Cluster.build(3).withConfig(config ->
config.set("snapshot_on_repaired_data_mismatch", true)
.with(GOSSIP)
.with(NETWORK))
.start()))
{
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
cluster.schemaChange("create table " + KEYSPACE + ".tbl2 (id int primary key, t int)");
// populate 2 tables
insert(cluster.coordinator(1), 0, 100, "tbl");
insert(cluster.coordinator(1), 0, 100, "tbl2");
cluster.forEach((n) -> n.flush(KEYSPACE));
// make sure everything is marked repaired
cluster.get(1).callOnInstance(repair(options(false, false)));
waitMarkedRepaired(cluster);
// make node2 mismatch
unmarkRepaired(cluster.get(2), "tbl");
verifySnapshots(cluster, "tbl", true);
verifySnapshots(cluster, "tbl2", true);
AtomicInteger snapshotMessageCounter = new AtomicInteger();
cluster.filters().verbs(Verb.SNAPSHOT_REQ.id).messagesMatching((from, to, message) -> {
snapshotMessageCounter.incrementAndGet();
return false;
}).drop();
cluster.get(1).callOnInstance(repair(options(true, true)));
verifySnapshots(cluster, "tbl", false);
// tbl2 should not have a mismatch, so the snapshots should be empty here
verifySnapshots(cluster, "tbl2", true);
assertEquals(3, snapshotMessageCounter.get());
// and make sure that we don't try to snapshot again
snapshotMessageCounter.set(0);
cluster.get(3).callOnInstance(repair(options(true, true)));
assertEquals(0, snapshotMessageCounter.get());
}
}
private void waitMarkedRepaired(Cluster cluster)
{
cluster.forEach(node -> node.runOnInstance(() -> {
for (String table : Arrays.asList("tbl", "tbl2"))
{
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
while (true)
{
if (cfs.getLiveSSTables().stream().allMatch(SSTableReader::isRepaired))
return;
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
}
}));
}
private void unmarkRepaired(IInvokableInstance instance, String table)
{
instance.runOnInstance(() -> {
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
try
{
cfs.getCompactionStrategyManager().mutateRepaired(cfs.getLiveSSTables(), ActiveRepairService.UNREPAIRED_SSTABLE, null, false);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
});
}
private void verifySnapshots(Cluster cluster, String table, boolean shouldBeEmpty)
{
cluster.forEach(node -> node.runOnInstance(() -> {
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(table);
if(shouldBeEmpty)
{
assertTrue(cfs.getSnapshotDetails().isEmpty());
}
else
{
while (cfs.getSnapshotDetails().isEmpty())
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
}
}));
}
static abstract class DelayFirstRepairMessageFilter implements IMessageFilters.Matcher
{
private final SimpleCondition pause;
private final SimpleCondition resume;
private final AtomicBoolean waitForRepair = new AtomicBoolean(true);
protected DelayFirstRepairMessageFilter(SimpleCondition pause, SimpleCondition resume)
{
this.pause = pause;
this.resume = resume;
}
protected abstract boolean matchesMessage(RepairMessage message);
public final boolean matches(int from, int to, IMessage message)
{
try
{
Message<?> msg = Instance.deserializeMessage(message);
RepairMessage repairMessage = (RepairMessage) msg.payload;
// only the first message should be delayed:
if (matchesMessage(repairMessage) && waitForRepair.compareAndSet(true, false))
{
pause.signalAll();
resume.await();
}
}
catch (Exception e)
{
throw new RuntimeException(e);
}
return false; // don't drop the message
}
}
static class DelayFirstRepairTypeMessageFilter extends DelayFirstRepairMessageFilter
{
private final Class<? extends RepairMessage> type;
public DelayFirstRepairTypeMessageFilter(SimpleCondition pause, SimpleCondition resume, Class<? extends RepairMessage> type)
{
super(pause, resume);
this.type = type;
}
public static DelayFirstRepairTypeMessageFilter validationRequest(SimpleCondition pause, SimpleCondition resume)
{
return new DelayFirstRepairTypeMessageFilter(pause, resume, ValidationRequest.class);
}
public static DelayFirstRepairTypeMessageFilter finalizePropose(SimpleCondition pause, SimpleCondition resume)
{
return new DelayFirstRepairTypeMessageFilter(pause, resume, FinalizePropose.class);
}
protected boolean matchesMessage(RepairMessage repairMessage)
{
return repairMessage.getClass() == type;
}
}
static void insert(ICoordinator coordinator, int start, int count)
{
insert(coordinator, start, count, "tbl");
}
static void insert(ICoordinator coordinator, int start, int count, String table)
{
for (int i = start; i < start + count; i++)
coordinator.execute("insert into " + KEYSPACE + "." + table + " (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i);
}
/**
* returns a pair with [repair success, was inconsistent]
*/
private static IIsolatedExecutor.SerializableCallable<RepairResult> repair(Map<String, String> options)
{
return () -> {
SimpleCondition await = new SimpleCondition();
AtomicBoolean success = new AtomicBoolean(true);
AtomicBoolean wasInconsistent = new AtomicBoolean(false);
StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> {
if (event.getType() == ProgressEventType.ERROR)
{
success.set(false);
await.signalAll();
}
else if (event.getType() == ProgressEventType.NOTIFICATION && event.getMessage().contains("Repaired data is inconsistent"))
{
wasInconsistent.set(true);
}
else if (event.getType() == ProgressEventType.COMPLETE)
await.signalAll();
}));
try
{
await.await(1, TimeUnit.MINUTES);
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
return new RepairResult(success.get(), wasInconsistent.get());
};
}
private static Map<String, String> options(boolean preview, boolean full)
{
Map<String, String> config = new HashMap<>();
config.put(RepairOption.INCREMENTAL_KEY, "true");
config.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.toString());
if (preview)
config.put(RepairOption.PREVIEW, PreviewKind.REPAIRED.toString());
if (full)
config.put(RepairOption.INCREMENTAL_KEY, "false");
return config;
}
private static Map<String, String> options(boolean preview, boolean full, String range)
{
Map<String, String> options = options(preview, full);
options.put(RepairOption.RANGES_KEY, range);
return options;
}
}