Filter remote DC replicas out when constructing the initial replica plan for the local read repair
patch by Runtian Liu; reviewed by Blake Eggleston and Stefan Miklosovic for CASSANDRA-19120
diff --git a/CHANGES.txt b/CHANGES.txt
index 86b6c18..e4b8692 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.13
+ * Filter remote DC replicas out when constructing the initial replica plan for the local read repair (CASSANDRA-19120)
* Remove redundant code in StorageProxy#sendToHintedReplicas (CASSANDRA-19412)
* Remove bashisms for mx4j tool in cassandra-env.sh (CASSANDRA-19416)
* Add new concurrent_merkle_tree_requests config property to prevent OOM during multi-range and/or multi-table repairs (CASSANDRA-19336)
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index 67b89e5..7e7a6a1 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -19,7 +19,6 @@
package org.apache.cassandra.locator;
import com.carrotsearch.hppc.ObjectIntHashMap;
-import com.carrotsearch.hppc.cursors.ObjectIntCursor;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
@@ -439,6 +438,7 @@
{
return new Selector()
{
+
@Override
public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live)
@@ -455,7 +455,8 @@
int add = consistencyLevel.blockForWrite(liveAndDown.replicationStrategy(), liveAndDown.pending()) - contacts.size();
if (add > 0)
{
- for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
+ E all = consistencyLevel.isDatacenterLocal() ? live.all().filter(InOurDcTester.replicas()) : live.all();
+ for (Replica replica : filter(all, r -> !contacts.contains(r)))
{
contacts.add(replica);
if (--add == 0)
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index edcf14d..742d961 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -23,28 +23,26 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractFuture;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.InOurDcTester;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.Replicas;
-import org.apache.cassandra.locator.InOurDcTester;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.RequestCallback;
import org.apache.cassandra.net.Message;
@@ -53,6 +51,7 @@
import org.apache.cassandra.tracing.Tracing;
import static org.apache.cassandra.net.Verb.*;
+import static com.google.common.collect.Iterables.all;
public class BlockingPartitionRepair
extends AbstractFuture<Object> implements RequestCallback<Object>
@@ -61,31 +60,30 @@
private final ReplicaPlan.ForTokenWrite writePlan;
private final Map<Replica, Mutation> pendingRepairs;
private final CountDownLatch latch;
- private final Predicate<InetAddressAndPort> shouldBlockOn;
private volatile long mutationsSentTime;
public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
{
- this(key, repairs, writePlan,
- writePlan.consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue());
- }
- public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan, Predicate<InetAddressAndPort> shouldBlockOn)
- {
this.key = key;
this.pendingRepairs = new ConcurrentHashMap<>(repairs);
this.writePlan = writePlan;
- this.shouldBlockOn = shouldBlockOn;
+
+ // make sure all the read repair targets are contact of the repair write plan
+ Preconditions.checkState(all(repairs.keySet(), (r) -> writePlan.contacts().contains(r)),
+ "All repair targets should be part of contacts of read repair write plan.");
int blockFor = writePlan.blockFor();
// here we remove empty repair mutations from the block for total, since
// we're not sending them mutations
for (Replica participant : writePlan.contacts())
{
- // remote dcs can sometimes get involved in dc-local reads. We want to repair
- // them if they do, but they shouldn't interfere with blocking the client read.
- if (!repairs.containsKey(participant) && shouldBlockOn.test(participant.endpoint()))
+ if (!repairs.containsKey(participant))
blockFor--;
+
+ // make sure for local consistency, all contacts are local replicas
+ Preconditions.checkState(!writePlan.consistencyLevel().isDatacenterLocal() || InOurDcTester.replicas().test(participant),
+ "Local consistency blocking read repair is trying to contact remote DC node: " + participant.endpoint());
}
// there are some cases where logically identical data can return different digests
@@ -110,11 +108,8 @@
@VisibleForTesting
void ack(InetAddressAndPort from)
{
- if (shouldBlockOn.test(from))
- {
- pendingRepairs.remove(writePlan.lookup(from));
- latch.countDown();
- }
+ pendingRepairs.remove(writePlan.lookup(from));
+ latch.countDown();
}
@Override
@@ -160,9 +155,6 @@
// use a separate verb here to avoid writing hints on timeouts
sendRR(Message.out(READ_REPAIR_REQ, mutation), destination.endpoint());
ColumnFamilyStore.metricsFor(tableId).readRepairRequests.mark();
-
- if (!shouldBlockOn.test(destination.endpoint()))
- pendingRepairs.remove(destination);
ReadRepairDiagnostics.sendInitialRepair(this, destination.endpoint(), mutation);
}
}
@@ -205,7 +197,7 @@
if (awaitRepairsUntil(timeout + timeoutUnit.convert(mutationsSentTime, TimeUnit.NANOSECONDS), timeoutUnit))
return;
- EndpointsForToken newCandidates = writePlan.liveUncontacted();
+ EndpointsForToken newCandidates = writePlan.consistencyLevel().isDatacenterLocal() ? writePlan.liveUncontacted().filter(InOurDcTester.replicas()) : writePlan.liveUncontacted();
if (newCandidates.isEmpty())
return;
diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
index 7247704..9992bd5 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -21,7 +21,6 @@
import java.util.List;
import org.apache.cassandra.db.Columns;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.RegularAndStaticColumns;
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index d36808f..83865b1 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -33,6 +33,7 @@
import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
import org.apache.cassandra.locator.EndpointsForToken;
import org.apache.cassandra.locator.ReplicaPlan;
import org.junit.Assert;
@@ -90,13 +91,20 @@
static InetAddressAndPort target1;
static InetAddressAndPort target2;
static InetAddressAndPort target3;
+ static InetAddressAndPort remote1;
+ static InetAddressAndPort remote2;
+ static InetAddressAndPort remote3;
static List<InetAddressAndPort> targets;
+ static List<InetAddressAndPort> remotes;
static Replica replica1;
static Replica replica2;
static Replica replica3;
static EndpointsForRange replicas;
- static ReplicaPlan.ForRead<?> replicaPlan;
+ static Replica remoteReplica1;
+ static Replica remoteReplica2;
+ static Replica remoteReplica3;
+ static EndpointsForRange remoteReplicas;
static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime());
static DecoratedKey key;
@@ -214,6 +222,66 @@
static void configureClass(ReadRepairStrategy repairStrategy) throws Throwable
{
SchemaLoader.loadSchema();
+
+ DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch()
+ {
+ public String getRack(InetAddressAndPort endpoint)
+ {
+ return "rack1";
+ }
+
+ public String getDatacenter(InetAddressAndPort endpoint)
+ {
+ byte[] address = endpoint.addressBytes;
+ if (address[1] == 2) {
+ return "datacenter2";
+ }
+ return "datacenter1";
+ }
+ });
+
+ target1 = InetAddressAndPort.getByName("127.1.0.255");
+ target2 = InetAddressAndPort.getByName("127.1.0.254");
+ target3 = InetAddressAndPort.getByName("127.1.0.253");
+
+ remote1 = InetAddressAndPort.getByName("127.2.0.255");
+ remote2 = InetAddressAndPort.getByName("127.2.0.254");
+ remote3 = InetAddressAndPort.getByName("127.2.0.253");
+
+ targets = ImmutableList.of(target1, target2, target3);
+ remotes = ImmutableList.of(remote1, remote2, remote3);
+
+ replica1 = fullReplica(target1, FULL_RANGE);
+ replica2 = fullReplica(target2, FULL_RANGE);
+ replica3 = fullReplica(target3, FULL_RANGE);
+ replicas = EndpointsForRange.of(replica1, replica2, replica3);
+
+ remoteReplica1 = fullReplica(remote1, FULL_RANGE);
+ remoteReplica2 = fullReplica(remote2, FULL_RANGE);
+ remoteReplica3 = fullReplica(remote3, FULL_RANGE);
+ remoteReplicas = EndpointsForRange.of(remoteReplica1, remoteReplica2, remoteReplica3);
+
+ StorageService.instance.getTokenMetadata().clearUnsafe();
+ StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 0 })), replica1.endpoint());
+ StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 1 })), replica2.endpoint());
+ StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 2 })), replica3.endpoint());
+ StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 3 })), remoteReplica1.endpoint());
+ StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 4 })), remoteReplica2.endpoint());
+ StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 5 })), remoteReplica3.endpoint());
+
+ for (Replica replica : replicas)
+ {
+ UUID hostId = UUID.randomUUID();
+ Gossiper.instance.initializeNodeUnsafe(replica.endpoint(), hostId, 1);
+ StorageService.instance.getTokenMetadata().updateHostId(hostId, replica.endpoint());
+ }
+ for (Replica replica : remoteReplicas)
+ {
+ UUID hostId = UUID.randomUUID();
+ Gossiper.instance.initializeNodeUnsafe(replica.endpoint(), hostId, 1);
+ StorageService.instance.getTokenMetadata().updateHostId(hostId, replica.endpoint());
+ }
+
String ksName = "ks";
String ddl = String.format("CREATE TABLE tbl (k int primary key, v text) WITH read_repair='%s'",
@@ -221,8 +289,8 @@
cfm = CreateTableStatement.parse(ddl, ksName).build();
assert cfm.params.readRepair == repairStrategy;
- KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, KeyspaceParams.simple(3), Tables.of(cfm));
- MigrationManager.announceNewKeyspace(ksm, false);
+ KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, KeyspaceParams.nts("datacenter1", 3, "datacenter2", 3), Tables.of(cfm));
+ MigrationManager.announceNewKeyspace(ksm, true);
ks = Keyspace.open(ksName);
cfs = ks.getColumnFamilyStore("tbl");
@@ -230,27 +298,6 @@
cfs.sampleReadLatencyNanos = 0;
cfs.additionalWriteLatencyNanos = 0;
- target1 = InetAddressAndPort.getByName("127.0.0.255");
- target2 = InetAddressAndPort.getByName("127.0.0.254");
- target3 = InetAddressAndPort.getByName("127.0.0.253");
-
- targets = ImmutableList.of(target1, target2, target3);
-
- replica1 = fullReplica(target1, FULL_RANGE);
- replica2 = fullReplica(target2, FULL_RANGE);
- replica3 = fullReplica(target3, FULL_RANGE);
- replicas = EndpointsForRange.of(replica1, replica2, replica3);
-
- replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas);
-
- StorageService.instance.getTokenMetadata().clearUnsafe();
- StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 0 })), replica1.endpoint());
- StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 1 })), replica2.endpoint());
- StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 2 })), replica3.endpoint());
- Gossiper.instance.initializeNodeUnsafe(replica1.endpoint(), UUID.randomUUID(), 1);
- Gossiper.instance.initializeNodeUnsafe(replica2.endpoint(), UUID.randomUUID(), 1);
- Gossiper.instance.initializeNodeUnsafe(replica3.endpoint(), UUID.randomUUID(), 1);
-
// default test values
key = dk(5);
cell1 = cell("v", "val1", now);
@@ -292,12 +339,23 @@
return repairPlan(replicaPlan(liveAndDown, targets), liveAndDown);
}
+ static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan, EndpointsForRange liveAndDown, EndpointsForToken pending)
+ {
+ Token token = readPlan.range().left.getToken();
+ return ReplicaPlans.forWrite(readPlan.keyspace(),
+ readPlan.consistencyLevel(),
+ liveAndDown.forToken(token),
+ pending,
+ replica -> true,
+ ReplicaPlans.writeReadRepair(readPlan));
+ }
+
static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan, EndpointsForRange liveAndDown)
{
Token token = readPlan.range().left.getToken();
EndpointsForToken pending = EndpointsForToken.empty(token);
return ReplicaPlans.forWrite(readPlan.keyspace(),
- ConsistencyLevel.TWO,
+ readPlan.consistencyLevel(),
liveAndDown.forToken(token),
pending,
replica -> true,
@@ -305,7 +363,7 @@
}
static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, EndpointsForRange targets)
{
- return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets);
+ return replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, replicas, targets);
}
static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
{
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index 43a1275..53a7219 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -39,7 +39,6 @@
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaPlan;
-import org.apache.cassandra.locator.ReplicaUtils;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.reads.ReadCallback;
@@ -50,7 +49,7 @@
{
public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
{
- super(Util.dk("not a real usable value"), repairs, writePlan, e -> targets.contains(e));
+ super(Util.dk("not a real usable value"), repairs, writePlan);
}
Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -121,10 +120,10 @@
{
AbstractReplicationStrategy rs = ks.getReplicationStrategy();
Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM, rs));
- Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM, rs));
- Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, rs));
- Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, rs));
- Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, rs));
+ Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.LOCAL_QUORUM, rs));
+ Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.LOCAL_QUORUM, rs));
+ Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.LOCAL_QUORUM, rs));
+ Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.LOCAL_QUORUM, rs));
}
@@ -253,36 +252,18 @@
}
/**
- * For dc local consistency levels, noop mutations and responses from remote dcs should not affect effective blockFor
+ * For dc local consistency levels, we will run into assertion error because no remote DC replicas should be contacted
*/
- @Test
- public void remoteDCTest() throws Exception
+ @Test(expected = IllegalStateException.class)
+ public void remoteDCSpeculativeRetryTest() throws Exception
{
Map<Replica, Mutation> repairs = new HashMap<>();
repairs.put(replica1, mutation(cell1));
+ repairs.put(remoteReplica1, mutation(cell1));
- Replica remote1 = ReplicaUtils.full(InetAddressAndPort.getByName("10.0.0.1"));
- Replica remote2 = ReplicaUtils.full(InetAddressAndPort.getByName("10.0.0.2"));
- repairs.put(remote1, mutation(cell1));
-
- EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remote1, remote2);
+ EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remoteReplica1, remoteReplica2);
ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants));
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, writePlan);
- handler.sendInitialRepairs();
- Assert.assertEquals(2, handler.mutationsSent.size());
- Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint()));
- Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint()));
-
- Assert.assertEquals(1, handler.waitingOn());
- Assert.assertFalse(getCurrentRepairStatus(handler));
-
- handler.ack(remote1.endpoint());
- Assert.assertEquals(1, handler.waitingOn());
- Assert.assertFalse(getCurrentRepairStatus(handler));
-
- handler.ack(replica1.endpoint());
- Assert.assertEquals(0, handler.waitingOn());
- Assert.assertTrue(getCurrentRepairStatus(handler));
+ createRepairHandler(repairs, writePlan);
}
private boolean getCurrentRepairStatus(BlockingPartitionRepair handler)
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index ae83efb..66d05a2 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -178,7 +178,7 @@
DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
{
- super(key, repairs, writePlan, isLocal());
+ super(key, repairs, writePlan);
DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent);
}
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
index 81ab07e..721b6f5 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
@@ -22,7 +22,6 @@
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.service.reads.ReadCallback;
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index 5ea790b..4dfe2bc 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -32,7 +32,6 @@
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.service.reads.ReadCallback;
public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index dad9aa4..d931672 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -75,7 +75,7 @@
{
public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
{
- super(Util.dk("not a valid key"), repairs, writePlan, e -> targets.endpoints().contains(e));
+ super(Util.dk("not a valid key"), repairs, writePlan);
}
Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -313,10 +313,10 @@
}
/**
- * For dc local consistency levels, noop mutations and responses from remote dcs should not affect effective blockFor
+ * For dc local consistency levels, if repair map has remote DC node, we will get assertion failure
*/
- @Test
- public void remoteDCTest() throws Exception
+ @Test(expected = IllegalStateException.class)
+ public void remoteDCNodeInvolveInLocalConsistencyTest() throws Exception
{
Map<Replica, Mutation> repairs = new HashMap<>();
repairs.put(target1, mutation(cell1));
@@ -328,22 +328,7 @@
EndpointsForRange participants = EndpointsForRange.of(target1, target2, remote1, remote2);
EndpointsForRange targets = EndpointsForRange.of(target1, target2);
- InstrumentedReadRepairHandler handler = createRepairHandler(repairs, participants, targets);
- handler.sendInitialRepairs();
- Assert.assertEquals(2, handler.mutationsSent.size());
- Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint()));
- Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint()));
-
- Assert.assertEquals(1, handler.waitingOn());
- Assert.assertFalse(getCurrentRepairStatus(handler));
-
- handler.ack(remote1.endpoint());
- Assert.assertEquals(1, handler.waitingOn());
- Assert.assertFalse(getCurrentRepairStatus(handler));
-
- handler.ack(target1.endpoint());
- Assert.assertEquals(0, handler.waitingOn());
- Assert.assertTrue(getCurrentRepairStatus(handler));
+ createRepairHandler(repairs, participants, targets);
}
private boolean getCurrentRepairStatus(BlockingPartitionRepair handler)
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
index 84276d5..5d7996d 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -23,7 +23,6 @@
import java.util.Map;
import java.util.function.Consumer;
-import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.ReadCommand;
@@ -35,7 +34,6 @@
import org.apache.cassandra.locator.Endpoints;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.service.reads.DigestResolver;