blob: 7feefa301256413ae3d5af7abaf4160da74e90ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import com.google.common.util.concurrent.Uninterruptibles;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.impl.InstanceKiller;
import org.apache.cassandra.io.sstable.CorruptSSTableException;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.ForwardingSSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
import org.apache.cassandra.io.util.ChannelProxy;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.repair.messages.RepairOption;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
import org.apache.cassandra.service.StorageService;
@RunWith(Parameterized.class)
public class FailingRepairTest extends TestBaseImpl implements Serializable
{
private static ICluster<IInvokableInstance> CLUSTER;
private final Verb messageType;
private final RepairParallelism parallelism;
private final boolean withTracing;
private final SerializableRunnable setup;
public FailingRepairTest(Verb messageType, RepairParallelism parallelism, boolean withTracing, SerializableRunnable setup)
{
this.messageType = messageType;
this.parallelism = parallelism;
this.withTracing = withTracing;
this.setup = setup;
}
@Parameters(name = "{0}/{1}/{2}")
public static Collection<Object[]> messages()
{
List<Object[]> tests = new ArrayList<>();
for (RepairParallelism parallelism : RepairParallelism.values())
{
for (Boolean withTracing : Arrays.asList(Boolean.TRUE, Boolean.FALSE))
{
tests.add(new Object[]{ Verb.VALIDATION_REQ, parallelism, withTracing, failingReaders(Verb.VALIDATION_REQ, parallelism, withTracing) });
}
}
return tests;
}
private static SerializableRunnable failingReaders(Verb type, RepairParallelism parallelism, boolean withTracing)
{
return () -> {
String cfName = getCfName(type, parallelism, withTracing);
ColumnFamilyStore cf = Keyspace.open(KEYSPACE).getColumnFamilyStore(cfName);
cf.forceBlockingFlush();
Set<SSTableReader> remove = cf.getLiveSSTables();
Set<SSTableReader> replace = new HashSet<>();
if (type == Verb.VALIDATION_REQ)
{
for (SSTableReader r : remove)
replace.add(new FailingSSTableReader(r));
}
else
{
throw new UnsupportedOperationException("verb: " + type);
}
cf.getTracker().removeUnsafe(remove);
cf.addSSTables(replace);
};
}
private static String getCfName(Verb type, RepairParallelism parallelism, boolean withTracing)
{
return type.name().toLowerCase() + "_" + parallelism.name().toLowerCase() + "_" + withTracing;
}
@BeforeClass
public static void setupCluster() throws IOException
{
// streaming requires networking ATM
// streaming also requires gossip or isn't setup properly
CLUSTER = init(Cluster.build()
.withNodes(2)
.withConfig(c -> c.with(Feature.NETWORK)
.with(Feature.GOSSIP)
.set("disk_failure_policy", "die"))
.start());
CLUSTER.setUncaughtExceptionsFilter((throwable) -> {
if (throwable.getClass().toString().contains("InstanceShutdown") || // can't check instanceof as it is thrown by a different classloader
throwable.getMessage() != null && throwable.getMessage().contains("Parent repair session with id"))
return true;
return false;
});
}
@AfterClass
public static void teardownCluster() throws Exception
{
if (CLUSTER != null)
CLUSTER.close();
}
@Before
public void cleanupState()
{
for (int i = 1; i <= CLUSTER.size(); i++)
CLUSTER.get(i).runOnInstance(InstanceKiller::clear);
}
@Test(timeout = 10 * 60 * 1000)
public void testFailingMessage() throws IOException
{
final int replica = 1;
final int coordinator = 2;
String tableName = getCfName(messageType, parallelism, withTracing);
String fqtn = KEYSPACE + "." + tableName;
CLUSTER.schemaChange("CREATE TABLE " + fqtn + " (k INT, PRIMARY KEY (k))");
// create data which will NOT conflict
int lhsOffset = 10;
int rhsOffset = 20;
int limit = rhsOffset + (rhsOffset - lhsOffset);
// setup data which is consistent on both sides
for (int i = 0; i < lhsOffset; i++)
CLUSTER.coordinator(replica)
.execute("INSERT INTO " + fqtn + " (k) VALUES (?)", ConsistencyLevel.ALL, i);
// create data on LHS which does NOT exist in RHS
for (int i = lhsOffset; i < rhsOffset; i++)
CLUSTER.get(replica).executeInternal("INSERT INTO " + fqtn + " (k) VALUES (?)", i);
// create data on RHS which does NOT exist in LHS
for (int i = rhsOffset; i < limit; i++)
CLUSTER.get(coordinator).executeInternal("INSERT INTO " + fqtn + " (k) VALUES (?)", i);
// at this point, the two nodes should be out of sync, so confirm missing data
// node 1
Object[][] node1Records = toRows(IntStream.range(0, rhsOffset));
Object[][] node1Actuals = toNaturalOrder(CLUSTER.get(replica).executeInternal("SELECT k FROM " + fqtn));
Assert.assertArrayEquals(node1Records, node1Actuals);
// node 2
Object[][] node2Records = toRows(IntStream.concat(IntStream.range(0, lhsOffset), IntStream.range(rhsOffset, limit)));
Object[][] node2Actuals = toNaturalOrder(CLUSTER.get(coordinator).executeInternal("SELECT k FROM " + fqtn));
Assert.assertArrayEquals(node2Records, node2Actuals);
// Inject the failure
CLUSTER.get(replica).runOnInstance(() -> setup.run());
// run a repair which is expected to fail
List<String> repairStatus = CLUSTER.get(coordinator).callOnInstance(() -> {
// need all ranges on the host
String ranges = StorageService.instance.getLocalAndPendingRanges(KEYSPACE).stream()
.map(r -> r.left + ":" + r.right)
.collect(Collectors.joining(","));
Map<String, String> args = new HashMap<String, String>()
{{
put(RepairOption.PARALLELISM_KEY, parallelism.getName());
put(RepairOption.PRIMARY_RANGE_KEY, "false");
put(RepairOption.INCREMENTAL_KEY, "false");
put(RepairOption.TRACE_KEY, Boolean.toString(withTracing));
put(RepairOption.PULL_REPAIR_KEY, "false");
put(RepairOption.FORCE_REPAIR_KEY, "false");
put(RepairOption.RANGES_KEY, ranges);
put(RepairOption.COLUMNFAMILIES_KEY, tableName);
}};
int cmd = StorageService.instance.repairAsync(KEYSPACE, args);
Assert.assertFalse("repair return status was 0, expected non-zero return status, 0 indicates repair not submitted", cmd == 0);
List<String> status;
do
{
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
status = StorageService.instance.getParentRepairStatus(cmd);
} while (status == null || status.get(0).equals(ParentRepairStatus.IN_PROGRESS.name()));
return status;
});
Assert.assertEquals(repairStatus.toString(), ParentRepairStatus.FAILED, ParentRepairStatus.valueOf(repairStatus.get(0)));
// its possible that the coordinator gets the message that the replica failed before the replica completes
// shutting down; this then means that isKilled could be updated after the fact
IInvokableInstance replicaInstance = CLUSTER.get(replica);
while (replicaInstance.killAttempts() <= 0)
Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
Assert.assertEquals("replica should be killed", 1, replicaInstance.killAttempts());
Assert.assertEquals("coordinator should not be killed", 0, CLUSTER.get(coordinator).killAttempts());
}
private static Object[][] toNaturalOrder(Object[][] actuals)
{
// data is returned in token order, so rather than try to be fancy and order expected in token order
// convert it to natural
int[] values = new int[actuals.length];
for (int i = 0; i < values.length; i++)
values[i] = (Integer) actuals[i][0];
Arrays.sort(values);
return toRows(IntStream.of(values));
}
private static Object[][] toRows(IntStream values)
{
return values
.mapToObj(v -> new Object[]{ v })
.toArray(Object[][]::new);
}
private static final class FailingSSTableReader extends ForwardingSSTableReader
{
private FailingSSTableReader(SSTableReader delegate)
{
super(delegate);
}
public ISSTableScanner getScanner()
{
return new FailingISSTableScanner();
}
public ISSTableScanner getScanner(Collection<Range<Token>> ranges)
{
return new FailingISSTableScanner();
}
public ISSTableScanner getScanner(Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
{
return new FailingISSTableScanner();
}
public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, SSTableReadsListener listener)
{
return new FailingISSTableScanner();
}
public ChannelProxy getDataChannel()
{
throw new RuntimeException();
}
public String toString()
{
return "FailingSSTableReader[" + super.toString() + "]";
}
}
private static final class FailingISSTableScanner implements ISSTableScanner
{
public long getLengthInBytes()
{
return 0;
}
public long getCompressedLengthInBytes()
{
return 0;
}
public long getCurrentPosition()
{
return 0;
}
public long getBytesScanned()
{
return 0;
}
public Set<SSTableReader> getBackingSSTables()
{
return Collections.emptySet();
}
public TableMetadata metadata()
{
return null;
}
public void close()
{
}
public boolean hasNext()
{
throw new CorruptSSTableException(new IOException("Test commands it"), "mahahahaha!");
}
public UnfilteredRowIterator next()
{
throw new CorruptSSTableException(new IOException("Test commands it"), "mahahahaha!");
}
}
}