Filter remote DC replicas out when constructing the initial replica plan for the local read repair

patch by Runtian Liu; reviewed by Blake Eggleston and Stefan Miklosovic for CASSANDRA-19120
diff --git a/CHANGES.txt b/CHANGES.txt
index 86b6c18..e4b8692 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.13
+ * Filter remote DC replicas out when constructing the initial replica plan for the local read repair (CASSANDRA-19120)
  * Remove redundant code in StorageProxy#sendToHintedReplicas (CASSANDRA-19412)
  * Remove bashisms for mx4j tool in cassandra-env.sh (CASSANDRA-19416)
  * Add new concurrent_merkle_tree_requests config property to prevent OOM during multi-range and/or multi-table repairs (CASSANDRA-19336)
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index 67b89e5..7e7a6a1 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -19,7 +19,6 @@
 package org.apache.cassandra.locator;
 
 import com.carrotsearch.hppc.ObjectIntHashMap;
-import com.carrotsearch.hppc.cursors.ObjectIntCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ArrayListMultimap;
@@ -439,6 +438,7 @@
     {
         return new Selector()
         {
+
             @Override
             public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
             E select(ConsistencyLevel consistencyLevel, L liveAndDown, L live)
@@ -455,7 +455,8 @@
                     int add = consistencyLevel.blockForWrite(liveAndDown.replicationStrategy(), liveAndDown.pending()) - contacts.size();
                     if (add > 0)
                     {
-                        for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
+                        E all = consistencyLevel.isDatacenterLocal() ? live.all().filter(InOurDcTester.replicas()) : live.all();
+                        for (Replica replica : filter(all, r -> !contacts.contains(r)))
                         {
                             contacts.add(replica);
                             if (--add == 0)
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index edcf14d..742d961 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -23,28 +23,26 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Predicate;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.AbstractFuture;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.InOurDcTester;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.Replicas;
-import org.apache.cassandra.locator.InOurDcTester;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.Message;
@@ -53,6 +51,7 @@
 import org.apache.cassandra.tracing.Tracing;
 
 import static org.apache.cassandra.net.Verb.*;
+import static com.google.common.collect.Iterables.all;
 
 public class BlockingPartitionRepair
         extends AbstractFuture<Object> implements RequestCallback<Object>
@@ -61,31 +60,30 @@
     private final ReplicaPlan.ForTokenWrite writePlan;
     private final Map<Replica, Mutation> pendingRepairs;
     private final CountDownLatch latch;
-    private final Predicate<InetAddressAndPort> shouldBlockOn;
 
     private volatile long mutationsSentTime;
 
     public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
     {
-        this(key, repairs, writePlan,
-             writePlan.consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue());
-    }
-    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan, Predicate<InetAddressAndPort> shouldBlockOn)
-    {
         this.key = key;
         this.pendingRepairs = new ConcurrentHashMap<>(repairs);
         this.writePlan = writePlan;
-        this.shouldBlockOn = shouldBlockOn;
+
+        // make sure all the read repair targets are contact of the repair write plan
+        Preconditions.checkState(all(repairs.keySet(), (r) -> writePlan.contacts().contains(r)),
+                                 "All repair targets should be part of contacts of read repair write plan.");
 
         int blockFor = writePlan.blockFor();
         // here we remove empty repair mutations from the block for total, since
         // we're not sending them mutations
         for (Replica participant : writePlan.contacts())
         {
-            // remote dcs can sometimes get involved in dc-local reads. We want to repair
-            // them if they do, but they shouldn't interfere with blocking the client read.
-            if (!repairs.containsKey(participant) && shouldBlockOn.test(participant.endpoint()))
+            if (!repairs.containsKey(participant))
                 blockFor--;
+
+            // make sure for local consistency, all contacts are local replicas
+            Preconditions.checkState(!writePlan.consistencyLevel().isDatacenterLocal() || InOurDcTester.replicas().test(participant),
+                                     "Local consistency blocking read repair is trying to contact remote DC node: " + participant.endpoint());
         }
 
         // there are some cases where logically identical data can return different digests
@@ -110,11 +108,8 @@
     @VisibleForTesting
     void ack(InetAddressAndPort from)
     {
-        if (shouldBlockOn.test(from))
-        {
-            pendingRepairs.remove(writePlan.lookup(from));
-            latch.countDown();
-        }
+        pendingRepairs.remove(writePlan.lookup(from));
+        latch.countDown();
     }
 
     @Override
@@ -160,9 +155,6 @@
             // use a separate verb here to avoid writing hints on timeouts
             sendRR(Message.out(READ_REPAIR_REQ, mutation), destination.endpoint());
             ColumnFamilyStore.metricsFor(tableId).readRepairRequests.mark();
-
-            if (!shouldBlockOn.test(destination.endpoint()))
-                pendingRepairs.remove(destination);
             ReadRepairDiagnostics.sendInitialRepair(this, destination.endpoint(), mutation);
         }
     }
@@ -205,7 +197,7 @@
         if (awaitRepairsUntil(timeout + timeoutUnit.convert(mutationsSentTime, TimeUnit.NANOSECONDS), timeoutUnit))
             return;
 
-        EndpointsForToken newCandidates = writePlan.liveUncontacted();
+        EndpointsForToken newCandidates = writePlan.consistencyLevel().isDatacenterLocal() ? writePlan.liveUncontacted().filter(InOurDcTester.replicas()) : writePlan.liveUncontacted();
         if (newCandidates.isEmpty())
             return;
 
diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
index 7247704..9992bd5 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -21,7 +21,6 @@
 import java.util.List;
 
 import org.apache.cassandra.db.Columns;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.RegularAndStaticColumns;
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index d36808f..83865b1 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -33,6 +33,7 @@
 import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.AbstractNetworkTopologySnitch;
 import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
@@ -90,13 +91,20 @@
     static InetAddressAndPort target1;
     static InetAddressAndPort target2;
     static InetAddressAndPort target3;
+    static InetAddressAndPort remote1;
+    static InetAddressAndPort remote2;
+    static InetAddressAndPort remote3;
     static List<InetAddressAndPort> targets;
+    static List<InetAddressAndPort> remotes;
 
     static Replica replica1;
     static Replica replica2;
     static Replica replica3;
     static EndpointsForRange replicas;
-    static ReplicaPlan.ForRead<?> replicaPlan;
+    static Replica remoteReplica1;
+    static Replica remoteReplica2;
+    static Replica remoteReplica3;
+    static EndpointsForRange remoteReplicas;
 
     static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime());
     static DecoratedKey key;
@@ -214,6 +222,66 @@
     static void configureClass(ReadRepairStrategy repairStrategy) throws Throwable
     {
         SchemaLoader.loadSchema();
+
+        DatabaseDescriptor.setEndpointSnitch(new AbstractNetworkTopologySnitch()
+        {
+            public String getRack(InetAddressAndPort endpoint)
+            {
+                return "rack1";
+            }
+
+            public String getDatacenter(InetAddressAndPort endpoint)
+            {
+                byte[] address = endpoint.addressBytes;
+                if (address[1] == 2) {
+                    return "datacenter2";
+                }
+                return "datacenter1";
+            }
+        });
+
+        target1 = InetAddressAndPort.getByName("127.1.0.255");
+        target2 = InetAddressAndPort.getByName("127.1.0.254");
+        target3 = InetAddressAndPort.getByName("127.1.0.253");
+
+        remote1 = InetAddressAndPort.getByName("127.2.0.255");
+        remote2 = InetAddressAndPort.getByName("127.2.0.254");
+        remote3 = InetAddressAndPort.getByName("127.2.0.253");
+
+        targets = ImmutableList.of(target1, target2, target3);
+        remotes = ImmutableList.of(remote1, remote2, remote3);
+
+        replica1 = fullReplica(target1, FULL_RANGE);
+        replica2 = fullReplica(target2, FULL_RANGE);
+        replica3 = fullReplica(target3, FULL_RANGE);
+        replicas = EndpointsForRange.of(replica1, replica2, replica3);
+
+        remoteReplica1 = fullReplica(remote1, FULL_RANGE);
+        remoteReplica2 = fullReplica(remote2, FULL_RANGE);
+        remoteReplica3 = fullReplica(remote3, FULL_RANGE);
+        remoteReplicas = EndpointsForRange.of(remoteReplica1, remoteReplica2, remoteReplica3);
+
+        StorageService.instance.getTokenMetadata().clearUnsafe();
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 0 })), replica1.endpoint());
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 1 })), replica2.endpoint());
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 2 })), replica3.endpoint());
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 3 })), remoteReplica1.endpoint());
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 4 })), remoteReplica2.endpoint());
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 5 })), remoteReplica3.endpoint());
+
+        for (Replica replica : replicas)
+        {
+            UUID hostId = UUID.randomUUID();
+            Gossiper.instance.initializeNodeUnsafe(replica.endpoint(), hostId, 1);
+            StorageService.instance.getTokenMetadata().updateHostId(hostId, replica.endpoint());
+        }
+        for (Replica replica : remoteReplicas)
+        {
+            UUID hostId = UUID.randomUUID();
+            Gossiper.instance.initializeNodeUnsafe(replica.endpoint(), hostId, 1);
+            StorageService.instance.getTokenMetadata().updateHostId(hostId, replica.endpoint());
+        }
+
         String ksName = "ks";
 
         String ddl = String.format("CREATE TABLE tbl (k int primary key, v text) WITH read_repair='%s'",
@@ -221,8 +289,8 @@
 
         cfm = CreateTableStatement.parse(ddl, ksName).build();
         assert cfm.params.readRepair == repairStrategy;
-        KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, KeyspaceParams.simple(3), Tables.of(cfm));
-        MigrationManager.announceNewKeyspace(ksm, false);
+        KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, KeyspaceParams.nts("datacenter1", 3, "datacenter2", 3), Tables.of(cfm));
+        MigrationManager.announceNewKeyspace(ksm, true);
 
         ks = Keyspace.open(ksName);
         cfs = ks.getColumnFamilyStore("tbl");
@@ -230,27 +298,6 @@
         cfs.sampleReadLatencyNanos = 0;
         cfs.additionalWriteLatencyNanos = 0;
 
-        target1 = InetAddressAndPort.getByName("127.0.0.255");
-        target2 = InetAddressAndPort.getByName("127.0.0.254");
-        target3 = InetAddressAndPort.getByName("127.0.0.253");
-
-        targets = ImmutableList.of(target1, target2, target3);
-
-        replica1 = fullReplica(target1, FULL_RANGE);
-        replica2 = fullReplica(target2, FULL_RANGE);
-        replica3 = fullReplica(target3, FULL_RANGE);
-        replicas = EndpointsForRange.of(replica1, replica2, replica3);
-
-        replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas);
-
-        StorageService.instance.getTokenMetadata().clearUnsafe();
-        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 0 })), replica1.endpoint());
-        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 1 })), replica2.endpoint());
-        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 2 })), replica3.endpoint());
-        Gossiper.instance.initializeNodeUnsafe(replica1.endpoint(), UUID.randomUUID(), 1);
-        Gossiper.instance.initializeNodeUnsafe(replica2.endpoint(), UUID.randomUUID(), 1);
-        Gossiper.instance.initializeNodeUnsafe(replica3.endpoint(), UUID.randomUUID(), 1);
-
         // default test values
         key  = dk(5);
         cell1 = cell("v", "val1", now);
@@ -292,12 +339,23 @@
         return repairPlan(replicaPlan(liveAndDown, targets), liveAndDown);
     }
 
+    static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan, EndpointsForRange liveAndDown, EndpointsForToken pending)
+    {
+        Token token = readPlan.range().left.getToken();
+        return ReplicaPlans.forWrite(readPlan.keyspace(),
+                                     readPlan.consistencyLevel(),
+                                     liveAndDown.forToken(token),
+                                     pending,
+                                     replica -> true,
+                                     ReplicaPlans.writeReadRepair(readPlan));
+    }
+
     static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan, EndpointsForRange liveAndDown)
     {
         Token token = readPlan.range().left.getToken();
         EndpointsForToken pending = EndpointsForToken.empty(token);
         return ReplicaPlans.forWrite(readPlan.keyspace(),
-                                     ConsistencyLevel.TWO,
+                                     readPlan.consistencyLevel(),
                                      liveAndDown.forToken(token),
                                      pending,
                                      replica -> true,
@@ -305,7 +363,7 @@
     }
     static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, EndpointsForRange targets)
     {
-        return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets);
+        return replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, replicas, targets);
     }
     static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
     {
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index 43a1275..53a7219 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -39,7 +39,6 @@
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaPlan;
-import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.reads.ReadCallback;
 
@@ -50,7 +49,7 @@
     {
         public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
         {
-            super(Util.dk("not a real usable value"), repairs, writePlan, e -> targets.contains(e));
+            super(Util.dk("not a real usable value"), repairs, writePlan);
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -121,10 +120,10 @@
     {
         AbstractReplicationStrategy rs = ks.getReplicationStrategy();
         Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM, rs));
-        Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM, rs));
-        Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, rs));
-        Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, rs));
-        Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, rs));
+        Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.LOCAL_QUORUM, rs));
+        Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.LOCAL_QUORUM, rs));
+        Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.LOCAL_QUORUM, rs));
+        Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.LOCAL_QUORUM, rs));
     }
 
 
@@ -253,36 +252,18 @@
     }
 
     /**
-     * For dc local consistency levels, noop mutations and responses from remote dcs should not affect effective blockFor
+     * For dc local consistency levels, we will run into assertion error because no remote DC replicas should be contacted
      */
-    @Test
-    public void remoteDCTest() throws Exception
+    @Test(expected = IllegalStateException.class)
+    public void remoteDCSpeculativeRetryTest() throws Exception
     {
         Map<Replica, Mutation> repairs = new HashMap<>();
         repairs.put(replica1, mutation(cell1));
+        repairs.put(remoteReplica1, mutation(cell1));
 
-        Replica remote1 = ReplicaUtils.full(InetAddressAndPort.getByName("10.0.0.1"));
-        Replica remote2 = ReplicaUtils.full(InetAddressAndPort.getByName("10.0.0.2"));
-        repairs.put(remote1, mutation(cell1));
-
-        EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remote1, remote2);
+        EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remoteReplica1, remoteReplica2);
         ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants));
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, writePlan);
-        handler.sendInitialRepairs();
-        Assert.assertEquals(2, handler.mutationsSent.size());
-        Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint()));
-        Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint()));
-
-        Assert.assertEquals(1, handler.waitingOn());
-        Assert.assertFalse(getCurrentRepairStatus(handler));
-
-        handler.ack(remote1.endpoint());
-        Assert.assertEquals(1, handler.waitingOn());
-        Assert.assertFalse(getCurrentRepairStatus(handler));
-
-        handler.ack(replica1.endpoint());
-        Assert.assertEquals(0, handler.waitingOn());
-        Assert.assertTrue(getCurrentRepairStatus(handler));
+        createRepairHandler(repairs, writePlan);
     }
 
     private boolean getCurrentRepairStatus(BlockingPartitionRepair handler)
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index ae83efb..66d05a2 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -178,7 +178,7 @@
 
         DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
         {
-            super(key, repairs, writePlan, isLocal());
+            super(key, repairs, writePlan);
             DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent);
         }
 
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
index 81ab07e..721b6f5 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
@@ -22,7 +22,6 @@
 
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.ReadCallback;
 
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index 5ea790b..4dfe2bc 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -32,7 +32,6 @@
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.service.reads.ReadCallback;
 
 public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index dad9aa4..d931672 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -75,7 +75,7 @@
     {
         public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
         {
-            super(Util.dk("not a valid key"), repairs, writePlan, e -> targets.endpoints().contains(e));
+            super(Util.dk("not a valid key"), repairs, writePlan);
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -313,10 +313,10 @@
     }
 
     /**
-     * For dc local consistency levels, noop mutations and responses from remote dcs should not affect effective blockFor
+     * For dc local consistency levels, if repair map has remote DC node, we will get assertion failure
      */
-    @Test
-    public void remoteDCTest() throws Exception
+    @Test(expected = IllegalStateException.class)
+    public void remoteDCNodeInvolveInLocalConsistencyTest() throws Exception
     {
         Map<Replica, Mutation> repairs = new HashMap<>();
         repairs.put(target1, mutation(cell1));
@@ -328,22 +328,7 @@
         EndpointsForRange participants = EndpointsForRange.of(target1, target2, remote1, remote2);
         EndpointsForRange targets = EndpointsForRange.of(target1, target2);
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, participants, targets);
-        handler.sendInitialRepairs();
-        Assert.assertEquals(2, handler.mutationsSent.size());
-        Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint()));
-        Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint()));
-
-        Assert.assertEquals(1, handler.waitingOn());
-        Assert.assertFalse(getCurrentRepairStatus(handler));
-
-        handler.ack(remote1.endpoint());
-        Assert.assertEquals(1, handler.waitingOn());
-        Assert.assertFalse(getCurrentRepairStatus(handler));
-
-        handler.ack(target1.endpoint());
-        Assert.assertEquals(0, handler.waitingOn());
-        Assert.assertTrue(getCurrentRepairStatus(handler));
+        createRepairHandler(repairs, participants, targets);
     }
 
     private boolean getCurrentRepairStatus(BlockingPartitionRepair handler)
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
index 84276d5..5d7996d 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -23,7 +23,6 @@
 import java.util.Map;
 import java.util.function.Consumer;
 
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
@@ -35,7 +34,6 @@
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.DigestResolver;