CASSANDRA-19842: Consistency level check incorrectly passes when majority of the replica set is unavailable for write (#75)

Patch by Yifan Cai; Reviewed by Doug Rohrer, Francisco Guerrero for CASSANDRA-19842
diff --git a/CHANGES.txt b/CHANGES.txt
index cec6e0a..2922b0a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 1.0.0
+ * Fix consistency level check for write (CASSANDRA-19842)
  * Fix NPE when writing UDT values (CASSANDRA-19836)
  * Add job_timeout_seconds writer option (CASSANDRA-19827)
  * Prevent double closing sstable writer (CASSANDRA-19821)
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
index 13c465a..3820efd 100644
--- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
@@ -27,5 +27,27 @@
 
     String datacenter();
 
+    /**
+     * IP address string of a Cassandra instance.
+     * Mainly used in blocked instance list to identify instances.
+     * Prefer to use {@link #ipAddressWithPort} as instance identifier,
+     * unless knowing the compared is IP address without port for sure.
+     */
     String ipAddress();
+
+    /**
+     * Equivalent to EndpointWithPort in Cassandra
+     * @return ip address with port string in the format, "ip:port"
+     */
+    String ipAddressWithPort();
+
+    /**
+     * @return state of the node
+     */
+    NodeState nodeState();
+
+    /**
+     * @return status of the node
+     */
+    NodeStatus nodeStatus();
 }
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeState.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeState.java
new file mode 100644
index 0000000..93f148f
--- /dev/null
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common.model;
+
+public enum NodeState
+{
+    NORMAL(false),
+    JOINING(true),
+    LEAVING(true),
+    MOVING(true),
+    REPLACING(true); // state added in sidecar
+
+    public final boolean isPending;
+
+    NodeState(boolean isPending)
+    {
+        this.isPending = isPending;
+    }
+
+    public static NodeState fromNameIgnoreCase(String name)
+    {
+        String uppercase = name.toUpperCase();
+        return valueOf(uppercase);
+    }
+}
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeStatus.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeStatus.java
new file mode 100644
index 0000000..8b64cdd
--- /dev/null
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeStatus.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common.model;
+
+public enum NodeStatus
+{
+    UP,
+    DOWN,
+    UNKNOWN;
+
+    public static NodeStatus fromNameIgnoreCase(String name)
+    {
+        String uppercase = name.toUpperCase();
+        try
+        {
+            return valueOf(uppercase);
+        }
+        catch (Exception ex)
+        {
+            // default to UNKNOWN
+            return UNKNOWN;
+        }
+    }
+}
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/AnalyticsException.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/AnalyticsException.java
new file mode 100644
index 0000000..ab92fb2
--- /dev/null
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/AnalyticsException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.exception;
+
+/**
+ * Cassandra Analytics exceptions base class for exception handling
+ * Note that it is a RuntimeException (unchecked). It gives call-sites the flexibility of handling the exceptions they cared about without forcing the
+ * catch blocks (and making the code overly verbose). In most cases, the call-sites simply converts a checked exception into unchecked and rethrow.
+ */
+public abstract class AnalyticsException extends RuntimeException
+{
+    private static final long serialVersionUID = 3980444570316598756L;
+
+    public AnalyticsException(String message)
+    {
+        super(message);
+    }
+
+    public AnalyticsException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+
+    public AnalyticsException(Throwable cause)
+    {
+        super(cause);
+    }
+
+    protected AnalyticsException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace)
+    {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/ConsistencyNotSatisfiedException.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/ConsistencyNotSatisfiedException.java
new file mode 100644
index 0000000..d82a988
--- /dev/null
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/ConsistencyNotSatisfiedException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.exception;
+
+/**
+ * Consistency cannot be satisfied by the bulk operations, i.e. read or write
+ */
+public class ConsistencyNotSatisfiedException extends AnalyticsException
+{
+    private static final long serialVersionUID = 992947698403422384L;
+
+    public ConsistencyNotSatisfiedException(String message)
+    {
+        super(message);
+    }
+
+    public ConsistencyNotSatisfiedException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+}
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/SidecarApiCallException.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/SidecarApiCallException.java
new file mode 100644
index 0000000..d59c138
--- /dev/null
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/SidecarApiCallException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.exception;
+
+/**
+ * Exception due to Cassandra Sidecar Api call failure
+ */
+public class SidecarApiCallException extends AnalyticsException
+{
+    public SidecarApiCallException(String message, Throwable cause)
+    {
+        super(message, cause);
+    }
+}
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 41b2315..1ad92a6 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -164,7 +164,13 @@
         this.skipExtendedVerify = MapUtils.getBoolean(options, WriterOptions.SKIP_EXTENDED_VERIFY.name(), true,
                                                       "skip extended verification of SSTables by Cassandra");
         this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM"));
-        this.localDC = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null);
+        String dc = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null);
+        if (!consistencyLevel.isLocal() && dc != null)
+        {
+            LOGGER.warn("localDc is present for non-local consistency level {} specified in writer options. Correcting localDc to null", consistencyLevel);
+            dc = null;
+        }
+        this.localDC = dc;
         this.numberSplits = MapUtils.getInt(options, WriterOptions.NUMBER_SPLITS.name(), DEFAULT_NUM_SPLITS, "number of splits");
         this.sstableDataSizeInMiB = resolveSSTableDataSizeInMiB(options);
         this.commitBatchSize = MapUtils.getInt(options, WriterOptions.COMMIT_BATCH_SIZE.name(), DEFAULT_COMMIT_BATCH_SIZE, "commit batch size");
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java
index 42413d0..9946271 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java
@@ -20,18 +20,17 @@
 package org.apache.cassandra.spark.bulkwriter;
 
 import java.math.BigInteger;
-import java.util.AbstractMap;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.Multimap;
 import com.google.common.collect.Range;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.exception.ConsistencyNotSatisfiedException;
 
 public class BulkWriteValidator
 {
@@ -57,8 +56,8 @@
                                         String phase,
                                         JobInfo job)
     {
-        Collection<AbstractMap.SimpleEntry<Range<BigInteger>, Multimap<RingInstance, String>>> failedRanges =
-        failureHandler.getFailedEntries(tokenRangeMapping, job.getConsistencyLevel(), job.getLocalDC());
+        List<ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange> failedRanges =
+        failureHandler.getFailedRanges(tokenRangeMapping, job.getConsistencyLevel(), job.getLocalDC());
 
         if (failedRanges.isEmpty())
         {
@@ -66,18 +65,11 @@
         }
         else
         {
-            String message = String.format("Failed to load %s ranges with %s for job %s in phase %s.",
+            String message = String.format("Failed to write %s ranges with %s for job %s in phase %s.",
                                            failedRanges.size(), job.getConsistencyLevel(), job.getId(), phase);
             logger.error(message);
-            failedRanges.forEach(failedRange ->
-                                 failedRange.getValue()
-                                            .keySet()
-                                            .forEach(instance ->
-                                                     logger.error("Failed {} for {} on {}",
-                                                                  phase,
-                                                                  failedRange.getKey(),
-                                                                  instance.toString())));
-            throw new RuntimeException(message);
+            logFailedRanges(logger, phase, failedRanges);
+            throw new ConsistencyNotSatisfiedException(message);
         }
     }
 
@@ -117,6 +109,21 @@
         });
     }
 
+    private static void logFailedRanges(Logger logger, String phase,
+                                        List<ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange> failedRanges)
+    {
+        for (ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange failedRange : failedRanges)
+        {
+            failedRange.failuresPerInstance.forEachInstance((instance, errors) -> {
+                logger.error("Failed in phase {} for {} on {}. Failure: {}",
+                             phase,
+                             failedRange.range,
+                             instance.toString(),
+                             errors);
+            });
+        }
+    }
+
     public void updateFailureHandler(Range<BigInteger> failedRange, RingInstance instance, String reason)
     {
         failureHandler.addFailure(failedRange, instance, reason);
@@ -124,8 +131,16 @@
 
     public void validateClOrFail(TokenRangeMapping<RingInstance> tokenRangeMapping)
     {
-        // Updates failures by looking up instance metadata
-        updateInstanceAvailability();
+        validateClOrFail(tokenRangeMapping, true);
+    }
+
+    public void validateClOrFail(TokenRangeMapping<RingInstance> tokenRangeMapping, boolean refreshInstanceAvailability)
+    {
+        if (refreshInstanceAvailability)
+        {
+            // Updates failures by looking up instance metadata
+            updateInstanceAvailability();
+        }
         // Fails if the failures violate consistency requirements
         validateClOrFail(tokenRangeMapping, failureHandler, LOGGER, phase, job);
     }
@@ -147,12 +162,9 @@
                                                     + "Please rerun import once topology changes are complete.",
                                                     instance.nodeName(), cluster.getInstanceState(instance));
                 throw new RuntimeException(errorMessage);
-            // Check for blocked instances and ranges for the purpose of logging only.
-            // We check for blocked instances while validating consistency level requirements
+            // Both 'blocked' and 'down' instances are considered as failure
             case UNAVAILABLE_BLOCKED:
             case UNAVAILABLE_DOWN:
-                boolean shouldAddFailure = availability == InstanceAvailability.UNAVAILABLE_DOWN;
-
                 Collection<Range<BigInteger>> unavailableRanges = cluster.getTokenRangeMapping(true)
                                                                          .getTokenRanges()
                                                                          .get(instance);
@@ -160,10 +172,7 @@
                     String nodeDisplayName = instance.nodeName();
                     String message = String.format("%s %s", nodeDisplayName, availability.getMessage());
                     LOGGER.warn("{} failed in phase {} on {} because {}", failedRange, phase, nodeDisplayName, message);
-                    if (shouldAddFailure)
-                    {
-                        failureHandler.addFailure(failedRange, instance, message);
-                    }
+                    failureHandler.addFailure(failedRange, instance, message);
                 });
                 break;
 
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index c208ed4..13c616e 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -34,6 +34,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -60,8 +62,10 @@
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.common.client.InstanceState;
 import org.apache.cassandra.spark.common.client.InstanceStatus;
+import org.apache.cassandra.spark.common.model.NodeState;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.exception.SidecarApiCallException;
 import org.apache.cassandra.spark.utils.CqlUtils;
 import org.apache.cassandra.spark.utils.FutureUtils;
 import org.jetbrains.annotations.NotNull;
@@ -227,10 +231,18 @@
         return schemaResponse.schema();
     }
 
-    private TokenRangeReplicasResponse getTokenRangesAndReplicaSets() throws ExecutionException, InterruptedException
+    private TokenRangeReplicasResponse getTokenRangesAndReplicaSets()
     {
         CassandraContext context = getCassandraContext();
-        return context.getSidecarClient().tokenRangeReplicas(new ArrayList<>(context.getCluster()), conf.keyspace).get();
+        try
+        {
+            return context.getSidecarClient().tokenRangeReplicas(new ArrayList<>(context.getCluster()), conf.keyspace).get();
+        }
+        catch (ExecutionException | InterruptedException exception)
+        {
+            LOGGER.error("Failed to get token ranges for keyspace {}", conf.keyspace, exception);
+            throw new SidecarApiCallException("Failed to get token ranges for keyspace" + conf.keyspace, exception);
+        }
     }
 
     private Set<String> readReplicasFromTokenRangeResponse(TokenRangeReplicasResponse response)
@@ -381,67 +393,66 @@
 
     private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
     {
+        return getTokenRangeReplicas(this::getTokenRangesAndReplicaSets,
+                                     this::getPartitioner,
+                                     this::getReplicationFactor,
+                                     this::instanceIsBlocked);
+    }
+
+    @VisibleForTesting
+    static TokenRangeMapping<RingInstance> getTokenRangeReplicas(Supplier<TokenRangeReplicasResponse> topologySupplier,
+                                                                 Supplier<Partitioner> partitionerSupplier,
+                                                                 Supplier<ReplicationFactor> replicationFactorSupplier,
+                                                                 Predicate<? super RingInstance> blockedInstancePredicate)
+    {
+        long start = System.nanoTime();
+        TokenRangeReplicasResponse response = topologySupplier.get();
+        long elapsedTimeNanos = System.nanoTime() - start;
+        Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(),
+                                                                                                   response.replicaMetadata());
+        LOGGER.info("Retrieved token ranges for {} instances from write replica set in {} milliseconds",
+                    tokenRangesByInstance.size(),
+                    TimeUnit.NANOSECONDS.toMillis(elapsedTimeNanos));
+
+        Set<RingInstance> instances = response.replicaMetadata()
+                                              .values()
+                                              .stream()
+                                              .map(RingInstance::new)
+                                              .collect(Collectors.toSet());
+
+        Set<RingInstance> replacementInstances = instances.stream()
+                                                          .filter(i -> i.nodeState() == NodeState.REPLACING)
+                                                          .collect(Collectors.toSet());
+
+        Set<RingInstance> blockedInstances = instances.stream()
+                                                      .filter(blockedInstancePredicate)
+                                                      .collect(Collectors.toSet());
+
+        Set<String> blockedIps = blockedInstances.stream().map(i -> i.ringInstance().address())
+                                                 .collect(Collectors.toSet());
+
+        // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC
         Map<String, Set<String>> writeReplicasByDC;
-        Map<String, Set<String>> pendingReplicasByDC;
-        Map<String, ReplicaMetadata> replicaMetadata;
-        Set<RingInstance> blockedInstances;
-        Set<RingInstance> replacementInstances;
-        Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
-        try
+        writeReplicasByDC = response.writeReplicas()
+                                    .stream()
+                                    .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream())
+                                    .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+                                                              (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps)));
+
+        Map<String, Set<String>> pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC);
+
+        if (LOGGER.isDebugEnabled())
         {
-            long start = System.nanoTime();
-            TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets();
-            long elapsedTimeNanos = System.nanoTime() - start;
-            replicaMetadata = response.replicaMetadata();
-
-            tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
-            LOGGER.info("Retrieved token ranges for {} instances from write replica set in {} nanoseconds",
-                        tokenRangesByInstance.size(),
-                        elapsedTimeNanos);
-
-            replacementInstances = response.replicaMetadata()
-                                           .values()
-                                           .stream()
-                                           .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.name()))
-                                           .map(RingInstance::new)
-                                           .collect(Collectors.toSet());
-
-            blockedInstances = response.replicaMetadata()
-                                       .values()
-                                       .stream()
-                                       .map(RingInstance::new)
-                                       .filter(this::instanceIsBlocked)
-                                       .collect(Collectors.toSet());
-
-            Set<String> blockedIps = blockedInstances.stream().map(i -> i.ringInstance().address())
-                                                     .collect(Collectors.toSet());
-
-            // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC
-            writeReplicasByDC = response.writeReplicas()
-                                        .stream()
-                                        .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream())
-                                        .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
-                                                                  (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps)));
-
-            pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC);
-
-            if (LOGGER.isDebugEnabled())
-            {
-                LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}",
-                             writeReplicasByDC.keySet(),
-                             writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
-                             pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
-            }
-        }
-        catch (ExecutionException | InterruptedException exception)
-        {
-            LOGGER.error("Failed to get token ranges, ", exception);
-            throw new RuntimeException(exception);
+            LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}",
+                         writeReplicasByDC.keySet(),
+                         writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+                         pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
         }
 
+        Map<String, ReplicaMetadata> replicaMetadata = response.replicaMetadata();
         // Include availability info so CL checks can use it to exclude replacement hosts
-        return new TokenRangeMapping<>(getPartitioner(),
-                                       getReplicationFactor(),
+        return new TokenRangeMapping<>(partitionerSupplier.get(),
+                                       replicationFactorSupplier.get(),
                                        writeReplicasByDC,
                                        pendingReplicasByDC,
                                        tokenRangesByInstance,
@@ -450,7 +461,7 @@
                                        replacementInstances);
     }
 
-    private Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs)
+    private static Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs)
     {
         Set<String> merged = new HashSet<>();
         // Removes blocked instances. If this is included, remove blockedInstances from CL checks
@@ -460,7 +471,7 @@
         return merged;
     }
 
-    private Map<String, Set<String>> getPendingReplicas(TokenRangeReplicasResponse response, Map<String, Set<String>> writeReplicasByDC)
+    private static Map<String, Set<String>> getPendingReplicas(TokenRangeReplicasResponse response, Map<String, Set<String>> writeReplicasByDC)
     {
 
         Set<String> pendingReplicas = response.replicaMetadata()
@@ -478,7 +489,7 @@
         // Filter writeReplica entries and the value replicaSet to only include those with pending replicas
         return writeReplicasByDC.entrySet()
                                 .stream()
-                                .filter(e -> e.getValue().stream()
+                                .filter(e -> e.getValue().stream() // todo: transformToHostWithoutPort is called twice for entries
                                               .anyMatch(v -> pendingReplicas.contains(transformToHostWithoutPort(v))))
                                 .collect(Collectors.toMap(Map.Entry::getKey,
                                                           e -> e.getValue().stream()
@@ -486,13 +497,13 @@
                                                                 .collect(Collectors.toSet())));
     }
 
-    private String transformToHostWithoutPort(String v)
+    private static String transformToHostWithoutPort(String v)
     {
         return v.contains(":") ? v.split(":")[0] : v;
     }
 
-    private Multimap<RingInstance, Range<BigInteger>> getTokenRangesByInstance(List<ReplicaInfo> writeReplicas,
-                                                                               Map<String, ReplicaMetadata> replicaMetadata)
+    private static Multimap<RingInstance, Range<BigInteger>> getTokenRangesByInstance(List<ReplicaInfo> writeReplicas,
+                                                                                      Map<String, ReplicaMetadata> replicaMetadata)
     {
         Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap = ArrayListMultimap.create();
         for (ReplicaInfo rInfo : writeReplicas)
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java
index d1f458a..228ed91 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java
@@ -357,7 +357,7 @@
         // it either passes or throw if consistency level cannot be satisfied
         try
         {
-            writeValidator.validateClOrFail(cassandraTopologyMonitor.initialTopology());
+            writeValidator.validateClOrFail(cassandraTopologyMonitor.initialTopology(), false);
         }
         catch (RuntimeException rte)
         {
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index 60628bd..a454827 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -25,6 +25,7 @@
 import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
 import org.apache.cassandra.spark.data.QualifiedTableName;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 public interface JobInfo extends Serializable
 {
@@ -32,6 +33,7 @@
     // Job Information API - should this really just move back to Config? Here to try to reduce the violations of the Law of Demeter more than anything else
     ConsistencyLevel getConsistencyLevel();
 
+    @Nullable
     String getLocalDC();
 
     /**
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 3169cff..d0111ef 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -337,9 +337,10 @@
         if (haveMappingsChanged)
         {
             Set<Range<BigInteger>> rangeDelta = symmetricDifference(startMapping.keySet(), endMapping.keySet());
-            Set<String> instanceDelta = symmetricDifference(initialInstances, endInstances).stream()
-                                                                                           .map(RingInstance::ipAddress)
-                                                                                           .collect(Collectors.toSet());
+            Set<String> instanceDelta = symmetricDifference(initialInstances, endInstances)
+                                        .stream()
+                                        .map(RingInstance::ipAddressWithPort)
+                                        .collect(Collectors.toSet());
             String message = String.format("[%s] Token range mappings have changed since the task started " +
                                            "with non-overlapping instances: %s and ranges: %s",
                                            partitionId,
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
index 7efe001..ae04447 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
@@ -25,9 +25,11 @@
 import java.io.Serializable;
 import java.util.Objects;
 
-import o.a.c.sidecar.client.shaded.common.response.data.RingEntry;
 import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
+import o.a.c.sidecar.client.shaded.common.response.data.RingEntry;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.common.model.NodeState;
+import org.apache.cassandra.spark.common.model.NodeStatus;
 import org.jetbrains.annotations.Nullable;
 
 public class RingInstance implements CassandraInstance, Serializable
@@ -77,9 +79,29 @@
         return ringEntry.address();
     }
 
+    @Override
+    public String ipAddressWithPort()
+    {
+        return ringEntry.address() + ':' + ringEntry.port();
+    }
+
+    @Override
+    public NodeState nodeState()
+    {
+        return NodeState.fromNameIgnoreCase(ringEntry.state());
+    }
+
+    @Override
+    public NodeStatus nodeStatus()
+    {
+        return NodeStatus.fromNameIgnoreCase(ringEntry.status());
+    }
+
     /**
      * Custom equality that compares the token, fully qualified domain name, the port, and the datacenter
      *
+     * Note that node state and status are not part of the calculation.
+     *
      * @param other the other instance
      * @return true if both instances are equal, false otherwise
      */
@@ -101,6 +123,8 @@
     /**
      * Custom hashCode that compares the token, fully qualified domain name, the port, and the datacenter
      *
+     * Note that node state and status are not part of the calculation.
+     *
      * @return The hashcode of this instance based on the important fields
      */
     @Override
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/CreatedRestoreSlice.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/CreatedRestoreSlice.java
index e1da5c3..168e044 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/CreatedRestoreSlice.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/CreatedRestoreSlice.java
@@ -20,6 +20,7 @@
 package org.apache.cassandra.spark.bulkwriter.blobupload;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -106,7 +107,7 @@
         }
 
         if (!succeededInstances().isEmpty()
-            && consistencyLevel.canBeSatisfied(succeededInstances(), replicationFactor, localDC))
+            && consistencyLevel.canBeSatisfied(succeededInstances(), Collections.emptyList(), replicationFactor, localDC))
         {
             isSatisfied = true;
             return ConsistencyLevelCheckResult.SATISFIED;
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java
index dd48bfe..3372dd9 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java
@@ -21,7 +21,6 @@
 
 import java.util.Collection;
 import java.util.Objects;
-import java.util.Set;
 
 import com.google.common.base.Preconditions;
 
@@ -38,15 +37,28 @@
     boolean isLocal();
 
     /**
-     * Check consistency level with the collection of the succeeded instances
+     * Check write consistency can be satisfied with the collection of the succeeded instances
+     * <p>
+     * When pendingReplicas is non-empty, the minimum number of success is increased by the size of pendingReplicas,
+     * keeping the same semantics defined in Cassandra.
+     * See <a href="https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/ConsistencyLevel.java#L172">blockForWrite</a>
+     * <p>
+     * For example, say RF == 3, and there is 2 pending replicas.
+     * <ul>
+     *     <li>QUORUM write consistency requires at least 4 replicas to succeed, i.e. quorum(3) + 2, tolerating 1 failure</li>
+     *     <li>ONE write consistency requires at least 3 replicas to succeed, i.e. 1 + 2, tolerating 2 failure</li>
+     *     <li>TWO write consistency requires at least 4 replicas to succeed, i.e. 2 + 2, tolerating 1 failure</li>
+     * </ul>
      *
      * @param succeededInstances the succeeded instances in the replica set
+     * @param pendingInstances the pending instances, i.e. JOINING, LEAVING, MOVING
      * @param replicationFactor replication factor to check with
      * @param localDC the local data center name if required for the check
      * @return true means the consistency level is _definitively_ satisfied.
      *         Meanwhile, returning false means no conclusion can be drawn
      */
     boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+                           Collection<? extends CassandraInstance> pendingInstances,
                            ReplicationFactor replicationFactor,
                            String localDC);
 
@@ -56,35 +68,6 @@
                                     cl.name() + " only make sense for NetworkTopologyStrategy keyspaces");
     }
 
-    /**
-     * Checks if the consistency guarantees are maintained, given the failed, blocked and replacing instances, consistency-level and the replication-factor.
-     * <pre>
-     * - QUORUM based consistency levels check for quorum using the write-replica-set (instead of RF) as they include healthy and pending nodes.
-     *   This is done to ensure that writes go to a quorum of healthy nodes while accounting for potential failure in pending nodes becoming healthy.
-     * - ONE and TWO consistency guarantees are maintained by ensuring that the failures leave us with at-least the corresponding healthy
-     *   (and non-pending) nodes.
-     *
-     *   For both the above cases, blocked instances are also considered as failures while performing consistency checks.
-     *   Write replicas are adjusted to exclude replacement nodes for consistency checks, if we have replacement nodes that are not among the failed instances.
-     *   This is to ensure that we are writing to sufficient non-replacement nodes as replacements can potentially fail leaving us with fewer nodes.
-     * </pre>
-     *
-     * @param writeReplicas        the set of replicas for write operations
-     * @param pendingReplicas      the set of replicas pending status
-     * @param replacementInstances the set of instances that are replacing the other instances
-     * @param blockedInstances     the set of instances that have been blocked for the bulk operation
-     * @param failedInstanceIps    the set of instances where there were failures
-     * @param localDC              the local datacenter used for consistency level, or {@code null} if not provided
-     * @return {@code true} if the consistency has been met, {@code false} otherwise
-     */
-    boolean checkConsistency(Set<String> writeReplicas,
-                             Set<String> pendingReplicas,
-                             Set<String> replacementInstances,
-                             Set<String> blockedInstances,
-                             Set<String> failedInstanceIps,
-                             String localDC); // todo: simplify the parameter list. not all are required in impl
-
-    // Check if successful writes forms quorum of non-replacing nodes - N/A as quorum is if there are no failures/blocked
     enum CL implements ConsistencyLevel
     {
         ALL
@@ -96,26 +79,13 @@
             }
 
             @Override
-            public boolean checkConsistency(Set<String> writeReplicas,
-                                            Set<String> pendingReplicas,
-                                            Set<String> replacementInstances,
-                                            Set<String> blockedInstances,
-                                            Set<String> failedInstanceIps,
-                                            String localDC)
-            {
-                int failedExcludingReplacements = failedInstanceIps.size() - replacementInstances.size();
-                return failedExcludingReplacements <= 0 && blockedInstances.isEmpty();
-            }
-
-            @Override
             public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+                                          Collection<? extends CassandraInstance> pendingInstances,
                                           ReplicationFactor replicationFactor,
                                           String localDC)
             {
-                int rf = replicationFactor.getTotalReplicationFactor();
-                // The effective RF during expansion could be larger than the defined RF
-                // The check for CL satisfaction should consider the scenario and use >=
-                return succeededInstances.size() >= rf;
+                int blockedFor = replicationFactor.getTotalReplicationFactor() + pendingInstances.size();
+                return succeededInstances.size() >= blockedFor;
             }
         },
 
@@ -128,31 +98,20 @@
             }
 
             @Override
-            public boolean checkConsistency(Set<String> writeReplicas,
-                                            Set<String> pendingReplicas,
-                                            Set<String> replacementInstances,
-                                            Set<String> blockedInstances,
-                                            Set<String> failedInstanceIps,
-                                            String localDC)
-            {
-                return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - (writeReplicas.size() / 2 + 1));
-            }
-
-            @Override
             public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+                                          Collection<? extends CassandraInstance> pendingInstances,
                                           ReplicationFactor replicationFactor,
-                                          String localDC)
+                                          String localDC) // localDc is ignored for EACH_QUORUM
             {
                 ensureNetworkTopologyStrategy(replicationFactor, EACH_QUORUM);
-                Objects.requireNonNull(localDC, "localDC cannot be null");
 
                 for (String datacenter : replicationFactor.getOptions().keySet())
                 {
                     int rf = replicationFactor.getOptions().get(datacenter);
-                    int majority = rf / 2 + 1;
-                    if (succeededInstances.stream()
-                                          .filter(instance -> instance.datacenter().equalsIgnoreCase(datacenter))
-                                          .count() < majority)
+                    int pendingCountOfDc = countInDc(pendingInstances, datacenter);
+                    int succeededCountOfDc = countInDc(succeededInstances, datacenter);
+                    int blockedFor = quorum(rf) + pendingCountOfDc;
+                    if (succeededCountOfDc < blockedFor)
                     {
                         return false;
                     }
@@ -169,23 +128,14 @@
             }
 
             @Override
-            public boolean checkConsistency(Set<String> writeReplicas,
-                                            Set<String> pendingReplicas,
-                                            Set<String> replacementInstances,
-                                            Set<String> blockedInstances,
-                                            Set<String> failedInstanceIps,
-                                            String localDC)
-            {
-                return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - (writeReplicas.size() / 2 + 1));
-            }
-
-            @Override
             public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+                                          Collection<? extends CassandraInstance> pendingInstances,
                                           ReplicationFactor replicationFactor,
                                           String localDC)
             {
                 int rf = replicationFactor.getTotalReplicationFactor();
-                return succeededInstances.size() > rf / 2;
+                int blockedFor = quorum(rf) + pendingInstances.size();
+                return succeededInstances.size() >= blockedFor;
             }
         },
         LOCAL_QUORUM
@@ -197,18 +147,8 @@
             }
 
             @Override
-            public boolean checkConsistency(Set<String> writeReplicas,
-                                            Set<String> pendingReplicas,
-                                            Set<String> replacementInstances,
-                                            Set<String> blockedInstances,
-                                            Set<String> failedInstanceIps,
-                                            String localDC)
-            {
-                return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - (writeReplicas.size() / 2 + 1));
-            }
-
-            @Override
             public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+                                          Collection<? extends CassandraInstance> pendingInstances,
                                           ReplicationFactor replicationFactor,
                                           String localDC)
             {
@@ -216,9 +156,10 @@
                 Objects.requireNonNull(localDC, "localDC cannot be null");
 
                 int rf = replicationFactor.getOptions().get(localDC);
-                return succeededInstances.stream()
-                                         .filter(instance -> instance.datacenter().equalsIgnoreCase(localDC))
-                                         .count() > rf / 2;
+                int pendingCountInDc = countInDc(pendingInstances, localDC);
+                int succeededCountInDc = countInDc(succeededInstances, localDC);
+                int blockFor = quorum(rf) + pendingCountInDc;
+                return succeededCountInDc >= blockFor;
             }
         },
         ONE
@@ -230,23 +171,13 @@
             }
 
             @Override
-            public boolean checkConsistency(Set<String> writeReplicas,
-                                            Set<String> pendingReplicas,
-                                            Set<String> replacementInstances,
-                                            Set<String> blockedInstances,
-                                            Set<String> failedInstanceIps,
-                                            String localDC)
-            {
-                return (failedInstanceIps.size() + blockedInstances.size())
-                       <= (writeReplicas.size() - pendingReplicas.size() - 1);
-            }
-
-            @Override
             public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+                                          Collection<? extends CassandraInstance> pendingInstances,
                                           ReplicationFactor replicationFactor,
                                           String localDC)
             {
-                return !succeededInstances.isEmpty();
+                int blockFor = 1 + pendingInstances.size();
+                return succeededInstances.size() >= blockFor;
             }
         },
         TWO
@@ -258,23 +189,13 @@
             }
 
             @Override
-            public boolean checkConsistency(Set<String> writeReplicas,
-                                            Set<String> pendingReplicas,
-                                            Set<String> replacementInstances,
-                                            Set<String> blockedInstances,
-                                            Set<String> failedInstanceIps,
-                                            String localDC)
-            {
-                return (failedInstanceIps.size() + blockedInstances.size())
-                       <= (writeReplicas.size() - pendingReplicas.size() - 2);
-            }
-
-            @Override
             public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+                                          Collection<? extends CassandraInstance> pendingInstances,
                                           ReplicationFactor replicationFactor,
                                           String localDC)
             {
-                return succeededInstances.size() >= 2;
+                int blockFor = 2 + pendingInstances.size();
+                return succeededInstances.size() >= blockFor;
             }
         },
         LOCAL_ONE
@@ -286,26 +207,29 @@
             }
 
             @Override
-            public boolean checkConsistency(Set<String> writeReplicas,
-                                            Set<String> pendingReplicas,
-                                            Set<String> replacementInstances,
-                                            Set<String> blockedInstances,
-                                            Set<String> failedInstanceIps,
-                                            String localDC)
-            {
-                return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - pendingReplicas.size() - 1);
-            }
-
-            @Override
             public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+                                          Collection<? extends CassandraInstance> pendingInstances,
                                           ReplicationFactor replicationFactor,
                                           String localDC)
             {
                 ensureNetworkTopologyStrategy(replicationFactor, LOCAL_ONE);
                 Objects.requireNonNull(localDC, "localDC cannot be null");
 
-                return succeededInstances.stream().anyMatch(instance -> instance.datacenter().equalsIgnoreCase(localDC));
+                int pendingCountInDc = countInDc(pendingInstances, localDC);
+                int succeededCountInDc = countInDc(succeededInstances, localDC);
+                int blockFor = pendingCountInDc + 1;
+                return succeededCountInDc >= blockFor;
             }
         };
+
+        private static int quorum(int replicaSetSize)
+        {
+            return replicaSetSize / 2 + 1;
+        }
+
+        private static int countInDc(Collection<? extends CassandraInstance> instances, String localDC)
+        {
+            return (int) instances.stream().filter(i -> i.datacenter().equalsIgnoreCase(localDC)).count();
+        }
     }
 }
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java
index 06382f8..fe2c36b 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java
@@ -20,13 +20,12 @@
 package org.apache.cassandra.spark.bulkwriter.token;
 
 import java.math.BigInteger;
-import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
@@ -37,16 +36,72 @@
 import com.google.common.collect.TreeRangeMap;
 
 import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.common.model.NodeStatus;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.Nullable;
 
 public class ReplicaAwareFailureHandler<Instance extends CassandraInstance>
 {
-    private final RangeMap<BigInteger, Multimap<Instance, String>> failedRangesMap = TreeRangeMap.create();
+    public class FailuresPerInstance
+    {
+        private final Multimap<Instance, String> errorMessagesPerInstance;
+
+        public FailuresPerInstance()
+        {
+            this.errorMessagesPerInstance = ArrayListMultimap.create();
+        }
+
+        public FailuresPerInstance(Multimap<Instance, String> errorMessagesPerInstance)
+        {
+            this.errorMessagesPerInstance = ArrayListMultimap.create(errorMessagesPerInstance);
+        }
+
+        public FailuresPerInstance copy()
+        {
+            return new FailuresPerInstance(this.errorMessagesPerInstance);
+        }
+
+        public Set<Instance> instances()
+        {
+            return errorMessagesPerInstance.keySet();
+        }
+
+        public void addErrorForInstance(Instance instance, String errorMessage)
+        {
+            errorMessagesPerInstance.put(instance, errorMessage);
+        }
+
+        public boolean hasError(Instance instance)
+        {
+            return errorMessagesPerInstance.containsKey(instance)
+                   && !errorMessagesPerInstance.get(instance).isEmpty();
+        }
+
+        public void forEachInstance(BiConsumer<Instance, Collection<String>> instanceErrorsConsumer)
+        {
+            errorMessagesPerInstance.asMap().forEach(instanceErrorsConsumer);
+        }
+    }
+
+    public class ConsistencyFailurePerRange
+    {
+        public final Range<BigInteger> range;
+        public final FailuresPerInstance failuresPerInstance;
+
+        public ConsistencyFailurePerRange(Range<BigInteger> range, FailuresPerInstance failuresPerInstance)
+        {
+            this.range = range;
+            this.failuresPerInstance = failuresPerInstance;
+        }
+    }
+
+    // failures captures per each range; note that failures do not necessarily fail a range, as long as consistency level is considered
+    private final RangeMap<BigInteger, FailuresPerInstance> rangeFailuresMap = TreeRangeMap.create();
 
     public ReplicaAwareFailureHandler(Partitioner partitioner)
     {
-        failedRangesMap.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), ArrayListMultimap.create());
+        rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), new FailuresPerInstance());
     }
 
     /**
@@ -66,25 +121,25 @@
      */
     public synchronized void addFailure(Range<BigInteger> tokenRange, Instance casInstance, String errMessage)
     {
-        RangeMap<BigInteger, Multimap<Instance, String>> overlappingFailures = failedRangesMap.subRangeMap(tokenRange);
-        RangeMap<BigInteger, Multimap<Instance, String>> mappingsToAdd = TreeRangeMap.create();
+        RangeMap<BigInteger, FailuresPerInstance> overlappingFailures = rangeFailuresMap.subRangeMap(tokenRange);
+        RangeMap<BigInteger, FailuresPerInstance> mappingsToAdd = TreeRangeMap.create();
 
-        for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> entry : overlappingFailures.asMapOfRanges().entrySet())
+        for (Map.Entry<Range<BigInteger>, FailuresPerInstance> entry : overlappingFailures.asMapOfRanges().entrySet())
         {
-            Multimap<Instance, String> newErrorMap = ArrayListMultimap.create(entry.getValue());
-            newErrorMap.put(casInstance, errMessage);
+            FailuresPerInstance newErrorMap = entry.getValue().copy();
+            newErrorMap.addErrorForInstance(casInstance, errMessage);
             mappingsToAdd.put(entry.getKey(), newErrorMap);
         }
-        failedRangesMap.putAll(mappingsToAdd);
+        rangeFailuresMap.putAll(mappingsToAdd);
     }
 
     public Set<Instance> getFailedInstances()
     {
-        return failedRangesMap.asMapOfRanges().values()
-                              .stream()
-                              .map(Multimap::keySet)
-                              .flatMap(Collection::stream)
-                              .collect(Collectors.toSet());
+        return rangeFailuresMap.asMapOfRanges().values()
+                               .stream()
+                               .map(FailuresPerInstance::instances)
+                               .flatMap(Collection::stream)
+                               .collect(Collectors.toSet());
     }
 
     /**
@@ -94,119 +149,75 @@
      * @param tokenRangeMapping the mapping of token ranges to a Cassandra instance
      * @param cl                the desired consistency level
      * @param localDC           the local datacenter
-     * @return list of failed entries for token ranges that break consistency. This should ideally be empty for a
+     * @return list of failed token ranges that break consistency. This should ideally be empty for a
      * successful operation.
      */
-    public synchronized Collection<AbstractMap.SimpleEntry<Range<BigInteger>, Multimap<Instance, String>>>
-    getFailedEntries(TokenRangeMapping<? extends CassandraInstance> tokenRangeMapping,
-                     ConsistencyLevel cl,
-                     String localDC)
+    public synchronized List<ConsistencyFailurePerRange>
+    getFailedRanges(TokenRangeMapping<Instance> tokenRangeMapping,
+                    ConsistencyLevel cl,
+                    @Nullable String localDC)
     {
+        Preconditions.checkArgument((cl.isLocal() && localDC != null) || (!cl.isLocal() && localDC == null),
+                                    "Not a valid pair of consistency level configuration. " +
+                                    "Consistency level: " + cl + " localDc: " + localDC);
+        List<ConsistencyFailurePerRange> failedRanges = new ArrayList<>();
 
-        List<AbstractMap.SimpleEntry<Range<BigInteger>, Multimap<Instance, String>>> failedEntries =
-        new ArrayList<>();
-
-        for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> failedRangeEntry : failedRangesMap.asMapOfRanges()
-                                                                                                        .entrySet())
+        for (Map.Entry<Range<BigInteger>, FailuresPerInstance> failedRangeEntry : rangeFailuresMap.asMapOfRanges()
+                                                                                                  .entrySet())
         {
-            Multimap<Instance, String> errorMap = failedRangeEntry.getValue();
-            Collection<Instance> failedInstances = errorMap.keySet()
-                                                           .stream()
-                                                           .filter(inst ->
-                                                                   !errorMap.get(inst).isEmpty())
-                                                           .collect(Collectors.toList());
+            Range<BigInteger> range = failedRangeEntry.getKey();
+            FailuresPerInstance errorMap = failedRangeEntry.getValue();
+            Set<Instance> failedReplicas = errorMap.instances()
+                                                   .stream()
+                                                   .filter(errorMap::hasError)
+                                                   .collect(Collectors.toSet());
 
-
-            if (!validateConsistency(tokenRangeMapping, failedInstances, cl, localDC))
+            // no failures found for the range; skip consistency check on this one and move on
+            if (failedReplicas.isEmpty())
             {
-                failedEntries.add(new AbstractMap.SimpleEntry<>(failedRangeEntry.getKey(),
-                                                                failedRangeEntry.getValue()));
+                continue;
             }
+
+            tokenRangeMapping.getWriteReplicasOfRange(range, localDC)
+                             .forEach((subrange, liveAndDown) -> {
+                                 if (!checkSubrange(cl, localDC, tokenRangeMapping.replicationFactor(), liveAndDown, failedReplicas))
+                                 {
+                                     failedRanges.add(new ConsistencyFailurePerRange(subrange, errorMap));
+                                 }
+                             });
         }
 
-        return failedEntries;
+        return failedRanges;
     }
 
-    private boolean validateConsistency(TokenRangeMapping<? extends CassandraInstance> tokenRangeMapping,
-                                        Collection<Instance> failedInstances,
-                                        ConsistencyLevel cl,
-                                        String localDC)
+    /**
+     * Check whether a CL can be satisfied for each sub-range.
+     * @return true if consistency is satisfied; false otherwise.
+     */
+    private boolean checkSubrange(ConsistencyLevel cl,
+                                  @Nullable String localDC,
+                                  ReplicationFactor replicationFactor,
+                                  Set<Instance> liveAndDown,
+                                  Set<Instance> failedReplicas)
     {
-        boolean isConsistencyLevelMet = true;
+        Set<Instance> liveReplicas = liveAndDown.stream()
+                                                .filter(instance -> instance.nodeStatus() == NodeStatus.UP)
+                                                .collect(Collectors.toSet());
+        Set<Instance> pendingReplicas = liveAndDown.stream()
+                                                   .filter(instance -> instance.nodeState().isPending)
+                                                   .collect(Collectors.toSet());
+        // success is assumed if not failed
+        Set<Instance> succeededReplicas = liveReplicas.stream()
+                                                      .filter(instance -> !failedReplicas.contains(instance))
+                                                      .collect(Collectors.toSet());
 
-        Set<String> failedInstanceIPs = failedInstances.stream()
-                                                       .map(CassandraInstance::ipAddress)
-                                                       .collect(Collectors.toSet());
-        Set<String> datacenters = Collections.emptySet();
-        ReplicationFactor replicationFactor = tokenRangeMapping.replicationFactor();
-        if (cl == ConsistencyLevel.CL.EACH_QUORUM)
-        {
-            datacenters = replicationFactor.getOptions().keySet();
-            Preconditions.checkArgument(replicationFactor.getReplicationStrategy() == ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
-                                        "%s requires %s replication strategy", cl, ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy);
-        }
-
-        if (cl.isLocal())
-        {
-            datacenters = Collections.singleton(localDC);
-            Preconditions.checkArgument(replicationFactor.getReplicationStrategy() == ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
-                                        "%s requires %s replication strategy", cl, ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy);
-        }
-
-        if (!datacenters.isEmpty())
-        {
-            for (String dc : datacenters)
-            {
-                Set<String> failedIpsPerDC = failedInstances.stream()
-                                                            .filter(inst -> inst.datacenter().matches(dc))
-                                                            .map(CassandraInstance::ipAddress)
-                                                            .collect(Collectors.toSet());
-
-                Set<String> dcWriteReplicas = maybeUpdateWriteReplicasForReplacements(tokenRangeMapping.getWriteReplicas(dc),
-                                                                                      tokenRangeMapping.getReplacementInstances(dc),
-                                                                                      failedIpsPerDC);
-
-                isConsistencyLevelMet = isConsistencyLevelMet &&
-                                        cl.checkConsistency(dcWriteReplicas,
-                                                            tokenRangeMapping.getPendingReplicas(dc),
-                                                            tokenRangeMapping.getReplacementInstances(dc),
-                                                            tokenRangeMapping.getBlockedInstances(dc),
-                                                            failedIpsPerDC,
-                                                            localDC);
-            }
-        }
-        else
-        {
-            Set<String> replacementInstances = tokenRangeMapping.getReplacementInstances();
-            Set<String> dcWriteReplicas = maybeUpdateWriteReplicasForReplacements(tokenRangeMapping.getWriteReplicas(),
-                                                                                  replacementInstances,
-                                                                                  failedInstanceIPs);
-
-            isConsistencyLevelMet = cl.checkConsistency(dcWriteReplicas,
-                                                        tokenRangeMapping.getPendingReplicas(),
-                                                        replacementInstances,
-                                                        tokenRangeMapping.getBlockedInstances(),
-                                                        failedInstanceIPs,
-                                                        localDC);
-        }
-        return isConsistencyLevelMet;
+        return cl.canBeSatisfied(succeededReplicas, pendingReplicas, replicationFactor, localDC);
     }
 
-    public boolean hasFailed(TokenRangeMapping<? extends CassandraInstance> tokenRange,
+    public boolean hasFailed(TokenRangeMapping<Instance> tokenRange,
                              ConsistencyLevel cl,
                              String localDC)
     {
-        return !getFailedEntries(tokenRange, cl, localDC).isEmpty();
-    }
-
-    private static Set<String> maybeUpdateWriteReplicasForReplacements(Set<String> writeReplicas, Set<String> replacingInstances, Set<String> failedInstances)
-    {
-        // Exclude replacement nodes from write-replicas if replacements are NOT among failed instances
-        if (!replacingInstances.isEmpty() && Collections.disjoint(failedInstances, replacingInstances))
-        {
-
-            return writeReplicas.stream().filter(r -> !replacingInstances.contains(r)).collect(Collectors.toSet());
-        }
-        return writeReplicas;
+        return !getFailedRanges(tokenRange, cl, localDC).isEmpty();
     }
 }
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java
index 6e5ecac..b247916 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java
@@ -27,6 +27,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import com.google.common.base.Preconditions;
@@ -40,7 +41,9 @@
 import org.apache.cassandra.spark.common.model.CassandraInstance;
 import org.apache.cassandra.spark.data.ReplicationFactor;
 import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.Nullable;
 
+// TODO: refactor to improve the return types of methods to use `Instance` instead of String and cleanup
 public class TokenRangeMapping<Instance extends CassandraInstance> implements Serializable
 {
     private static final long serialVersionUID = -7284933683815811160L;
@@ -137,6 +140,31 @@
         return new HashSet<>(pendingReplicasByDC.get(datacenter));
     }
 
+    /**
+     * Get the write replicas of sub-ranges that overlap with the input range.
+     *
+     * @param range range to check. The range can potentially overlap with multiple ranges.
+     *              For example, a down node adds one failure of a token range that covers multiple primary token ranges that replicate to it.
+     * @param localDc local DC name to filter out non-local-DC instances. The parameter is optional. When not present, i.e. null, no filtering is applied
+     * @return the write replicas of sub-ranges
+     */
+    public Map<Range<BigInteger>, Set<Instance>> getWriteReplicasOfRange(Range<BigInteger> range, @Nullable String localDc)
+    {
+        Map<Range<BigInteger>, List<Instance>> subRangeReplicas = replicasByTokenRange.subRangeMap(range).asMapOfRanges();
+        Function<List<Instance>, Set<Instance>> inDcInstances = instances -> {
+            if (localDc != null)
+            {
+                return instances.stream()
+                                .filter(instance -> instance.datacenter().equalsIgnoreCase(localDc))
+                                .collect(Collectors.toSet());
+            }
+            return new HashSet<>(instances);
+        };
+        return subRangeReplicas.entrySet()
+                               .stream()
+                               .collect(Collectors.toMap(Map.Entry::getKey, entry -> inDcInstances.apply(entry.getValue())));
+    }
+
     public Set<String> getWriteReplicas()
     {
         return (writeReplicasByDC == null || writeReplicasByDC.isEmpty())
@@ -168,7 +196,7 @@
     public Set<String> getReplacementInstances()
     {
         return replacementInstances.stream()
-                                   .map(RingInstance::ipAddress)
+                                   .map(RingInstance::ipAddressWithPort)
                                    .collect(Collectors.toSet());
     }
 
@@ -176,7 +204,7 @@
     {
         return replacementInstances.stream()
                                    .filter(r -> r.datacenter().equalsIgnoreCase(datacenter))
-                                   .map(RingInstance::ipAddress)
+                                   .map(RingInstance::ipAddressWithPort)
                                    .collect(Collectors.toSet());
     }
 
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
new file mode 100644
index 0000000..8f9aef9
--- /dev/null
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaInfo;
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
+import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.data.QualifiedTableName;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.exception.ConsistencyNotSatisfiedException;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class BulkWriteValidatorTest
+{
+    @Test
+    void testConsistencyCheckFailureWhenBlockedInstancesFailQuorum()
+    {
+        BulkWriterContext mockWriterContext = mock(BulkWriterContext.class);
+        ClusterInfo mockClusterInfo = mock(ClusterInfo.class);
+        when(mockWriterContext.cluster()).thenReturn(mockClusterInfo);
+
+        CassandraContext mockCassandraContext = mock(CassandraContext.class);
+        when(mockClusterInfo.getCassandraContext()).thenReturn(mockCassandraContext);
+        Map<String, String> replicationOptions = new HashMap<>();
+        replicationOptions.put("class", "SimpleStrategy");
+        replicationOptions.put("replication_factor", "3");
+        TokenRangeMapping<RingInstance> topology = CassandraClusterInfo.getTokenRangeReplicas(
+        () -> mockSimpleTokenRangeReplicasResponse(10, 3),
+        () -> Partitioner.Murmur3Partitioner,
+        () -> new ReplicationFactor(replicationOptions),
+        ringInstance -> {
+            int nodeId = Integer.parseInt(ringInstance.ipAddress().replace("localhost", ""));
+            return nodeId <= 2; // block nodes 0, 1, 2
+        });
+        when(mockClusterInfo.getTokenRangeMapping(anyBoolean())).thenReturn(topology);
+        Map<RingInstance, InstanceAvailability> instanceAvailabilityMap = new HashMap<>(10);
+        Set<String> blocked = topology.getBlockedInstances();
+        for (RingInstance instance : topology.getTokenRanges().keySet())
+        {
+            String ip = instance.ringInstance().address();
+            instanceAvailabilityMap.put(instance, blocked.contains(ip) ? InstanceAvailability.UNAVAILABLE_BLOCKED : InstanceAvailability.AVAILABLE);
+        }
+        when(mockClusterInfo.getInstanceAvailability()).thenReturn(instanceAvailabilityMap);
+
+        JobInfo mockJobInfo = mock(JobInfo.class);
+        UUID jobId = UUID.randomUUID();
+        when(mockJobInfo.getId()).thenReturn(jobId.toString());
+        when(mockJobInfo.getRestoreJobId()).thenReturn(jobId);
+        when(mockJobInfo.qualifiedTableName()).thenReturn(new QualifiedTableName("testkeyspace", "testtable"));
+        when(mockJobInfo.getConsistencyLevel()).thenReturn(ConsistencyLevel.CL.QUORUM);
+        when(mockJobInfo.effectiveSidecarPort()).thenReturn(9043);
+        when(mockJobInfo.jobKeepAliveMinutes()).thenReturn(-1);
+        when(mockWriterContext.job()).thenReturn(mockJobInfo);
+
+        BulkWriteValidator writerValidator = new BulkWriteValidator(mockWriterContext, new ReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner));
+        assertThatThrownBy(() -> writerValidator.validateClOrFail(topology))
+        .isExactlyInstanceOf(ConsistencyNotSatisfiedException.class)
+        .hasMessageContaining("Failed to write");
+    }
+
+    private TokenRangeReplicasResponse mockSimpleTokenRangeReplicasResponse(int instancesCount, int replicationFactor)
+    {
+        long startToken = 0;
+        long rangeLength = 100;
+        List<ReplicaInfo> replicaInfoList = new ArrayList<>(instancesCount);
+        Map<String, ReplicaMetadata> replicaMetadata = new HashMap<>(instancesCount);
+        for (int i = 0; i < instancesCount; i++)
+        {
+            long endToken = startToken + rangeLength;
+            List<String> replicas = new ArrayList<>(replicationFactor);
+            for (int r = 0; r < replicationFactor; r++)
+            {
+                replicas.add("localhost" + (i + r) % instancesCount);
+            }
+            Map<String, List<String>> replicasPerDc = new HashMap<>();
+            replicasPerDc.put("ignored", replicas);
+            ReplicaInfo ri = new ReplicaInfo(String.valueOf(startToken), String.valueOf(endToken), replicasPerDc);
+            replicaInfoList.add(ri);
+            String address = "localhost" + i;
+            ReplicaMetadata rm = new ReplicaMetadata("NORMAL", "UP", address, address, 9042, "ignored");
+            replicaMetadata.put(address, rm);
+            startToken = endToken;
+        }
+        return new TokenRangeReplicasResponse(replicaInfoList, replicaInfoList, replicaMetadata);
+    }
+}
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java
index 966de28..ae7a839 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java
@@ -62,7 +62,7 @@
 
 public class DirectStreamSessionTest
 {
-    public static final String LOAD_RANGE_ERROR_PREFIX = "Failed to load 1 ranges with LOCAL_QUORUM";
+    public static final String LOAD_RANGE_ERROR_PREFIX = "Failed to write 1 ranges with LOCAL_QUORUM";
     private static final Map<String, Object> COLUMN_BOUND_VALUES = ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2);
     @TempDir
     private Path folder;
@@ -88,7 +88,7 @@
         tableWriter = new MockTableWriter(folder);
         transportContext = (TransportContext.DirectDataBulkWriterContext) writerContext.transportContext();
         executor = new MockScheduledExecutorService();
-        expectedInstances = Lists.newArrayList("DC1-i1", "DC1-i2", "DC1-i3");
+        expectedInstances = Lists.newArrayList("DC1-i2", "DC1-i3", "DC1-i4");
     }
 
     @Test
@@ -246,7 +246,7 @@
         ss.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
         ExecutionException exception = assertThrows(ExecutionException.class,
                                                     () -> ss.finalizeStreamAsync().get());
-        assertEquals("Failed to load 1 ranges with LOCAL_QUORUM for job " + writerContext.job().getId()
+        assertEquals("Failed to write 1 ranges with LOCAL_QUORUM for job " + writerContext.job().getId()
                      + " in phase UploadAndCommit.", exception.getCause().getMessage());
         executor.assertFuturesCalled();
         assertThat(writerContext.getUploads().values().stream().mapToInt(Collection::size).sum(), equalTo(RF * FILES_PER_SSTABLE));
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
index fa886a4..f84280f 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
@@ -188,7 +188,7 @@
                                        .waitForCompletion();
         });
         assertNotNull(exception.getMessage());
-        assertTrue(exception.getMessage().contains("Failed to load"));
+        assertTrue(exception.getMessage().contains("Failed to write"), "Actual error message: " + exception.getMessage());
         assertTrue(exception.getMessage().contains(errorMessage));
         assertNotNull(exception.getCause());
         validateAllSlicesWereCalledAtMostOnce(resultList);
@@ -407,8 +407,8 @@
             int noProgressPerReplicaSet = noProgressInstanceCount;
             // create one distinct slice per instance
             CreateSliceRequestPayload mockCreateSliceRequestPayload = mock(CreateSliceRequestPayload.class);
-            when(mockCreateSliceRequestPayload.startToken()).thenReturn(BigInteger.valueOf(100 * i));
-            when(mockCreateSliceRequestPayload.endToken()).thenReturn(BigInteger.valueOf(100 * (1 + i)));
+            when(mockCreateSliceRequestPayload.startToken()).thenReturn(BigInteger.valueOf(100 * (i - 1)));
+            when(mockCreateSliceRequestPayload.endToken()).thenReturn(BigInteger.valueOf(100 * i));
             when(mockCreateSliceRequestPayload.sliceId()).thenReturn(UUID.randomUUID().toString());
             when(mockCreateSliceRequestPayload.key()).thenReturn("key_for_instance_" + i); // to be captured by extension mock
             when(mockCreateSliceRequestPayload.bucket()).thenReturn("bucket"); // to be captured by extension mock
@@ -495,8 +495,8 @@
         return new RingInstance(new RingEntry.Builder()
                                 .datacenter("DC1")
                                 .address("127.0.0." + instanceInRing)
-                                .token(String.valueOf(i * 100))
-                                .fqdn("instance-" + instanceInRing)
+                                .token(String.valueOf(i * 100_000))
+                                .fqdn("DC1-i" + instanceInRing)
                                 .build());
     }
 }
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index 23813ab..1a86680 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -231,7 +231,11 @@
     @Override
     public String getLocalDC()
     {
-        return "DC1";
+        if (getConsistencyLevel().isLocal())
+        {
+            return "DC1";
+        }
+        return null;
     }
 
     @Override
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
index bb2c4f9..455dc40 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
@@ -149,7 +149,6 @@
     @Test
     void testWriteWithBlockedInstances()
     {
-
         String blockedInstanceIp = "127.0.0.2";
         TokenRangeMapping<RingInstance> testMapping =
         TokenRangeMappingUtils.buildTokenRangeMappingWithBlockedInstance(100000,
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
index f08a2a5..f48ffbf 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
@@ -108,7 +108,7 @@
                 .multiply(BigInteger.valueOf(index))).subtract(BigInteger.valueOf(2).pow(63));
     }
 
-    private static ReplicaAwareFailureHandler<CassandraInstance> ntsStrategyHandler(Partitioner partitioner)
+    private static ReplicaAwareFailureHandler<RingInstance> ntsStrategyHandler(Partitioner partitioner)
     {
         return new ReplicaAwareFailureHandler<>(partitioner);
     }
@@ -211,15 +211,16 @@
         Partitioner partitioner = Partitioner.Murmur3Partitioner;
         BigInteger[] tokens = getTokens(partitioner, 5);
         List<RingInstance> instances = getInstances(tokens, DATACENTER_1);
-        CassandraInstance instance1 = instances.get(0);
-        CassandraInstance instance2 = instance(tokens[0], instance1.nodeName(), instance1.datacenter(), "?");
-        ReplicaAwareFailureHandler<CassandraInstance> replicationFactor3 = ntsStrategyHandler(partitioner);
+        RingInstance instance1 = instances.get(0);
+        RingInstance instance2 = instance(tokens[0], instance1.nodeName(), instance1.datacenter(), "?");
+        ReplicaAwareFailureHandler<RingInstance> replicationFactor3 = ntsStrategyHandler(partitioner);
         ReplicationFactor repFactor = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
                                                             ntsOptions(new String[]{DATACENTER_1 }, new int[]{3 }));
-        Map<String, Set<String>> writeReplicas = instances.stream()
-                                                          .collect(Collectors.groupingBy(CassandraInstance::datacenter,
-                                                                                         Collectors.mapping(CassandraInstance::nodeName,
-                                                                                                            Collectors.toSet())));
+        Map<String, Set<String>> writeReplicas =
+        instances.stream().collect(Collectors.groupingBy(CassandraInstance::datacenter,
+                                                         // writeReplicas are ultimately created from StorageService#getRangeToEndpointMap in Cassandra
+                                                         // The returned values are ip addresses.
+                                                         Collectors.mapping(CassandraInstance::ipAddressWithPort, Collectors.toSet())));
         Multimap<RingInstance, Range<BigInteger>> tokenRanges = TokenRangeMappingUtils.setupTokenRangeMap(partitioner, repFactor, instances);
         TokenRangeMapping<RingInstance> tokenRange = new TokenRangeMapping<>(partitioner,
                                                                              repFactor,
@@ -236,7 +237,7 @@
         replicationFactor3.addFailure(Range.openClosed(tokens[0].add(BigInteger.ONE),
                                                        tokens[0].add(BigInteger.valueOf(2L))), instance2, "Failure 2");
 
-        replicationFactor3.getFailedEntries(tokenRange, ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1);
+        replicationFactor3.getFailedRanges(tokenRange, ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1);
         assertFalse(replicationFactor3.hasFailed(tokenRange, ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1));
     }
 }
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
index 2e2265d..8be92d2 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
@@ -45,6 +45,7 @@
 import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
 import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
 import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.exception.ConsistencyNotSatisfiedException;
 import org.apache.cassandra.spark.utils.DigestAlgorithm;
 import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
 
@@ -52,6 +53,7 @@
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class StreamSessionConsistencyTest
@@ -59,7 +61,8 @@
     private static final int NUMBER_DCS = 2;
     private static final int FILES_PER_SSTABLE = 8;
     private static final int REPLICATION_FACTOR = 3;
-    private static final List<String> EXPECTED_INSTANCES = ImmutableList.of("DC1-i1", "DC1-i2", "DC1-i3", "DC2-i1", "DC2-i2", "DC2-i3");
+    // range [101, 199] is replicated to those instances. i1 has token 0/1, which are before the range
+    private static final List<String> EXPECTED_INSTANCES = ImmutableList.of("DC1-i2", "DC1-i3", "DC1-i4", "DC2-i2", "DC2-i3", "DC2-i4");
     private static final Range<BigInteger> RANGE = Range.range(BigInteger.valueOf(101L), BoundType.CLOSED, BigInteger.valueOf(199L), BoundType.CLOSED);
     private static final TokenRangeMapping<RingInstance> TOKEN_RANGE_MAPPING =
     TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3, "DC2", 3), 6);
@@ -82,7 +85,7 @@
         return clsToFailures.stream().map(List::toArray).collect(Collectors.toList());
     }
 
-    private void setup(ConsistencyLevel.CL consistencyLevel, List<Integer> failuresPerDc)
+    private void setup(ConsistencyLevel.CL consistencyLevel)
     {
         digestAlgorithm = new XXHash32DigestAlgorithm();
         tableWriter = new MockTableWriter(folder);
@@ -96,20 +99,21 @@
                                                        List<Integer> failuresPerDc)
     throws IOException, ExecutionException, InterruptedException
     {
-        setup(consistencyLevel, failuresPerDc);
+        setup(consistencyLevel);
         AtomicInteger dc1Failures = new AtomicInteger(failuresPerDc.get(0));
         AtomicInteger dc2Failures = new AtomicInteger(failuresPerDc.get(1));
         ImmutableMap<String, AtomicInteger> dcFailures = ImmutableMap.of("DC1", dc1Failures, "DC2", dc2Failures);
         boolean shouldFail = calculateFailure(consistencyLevel, dc1Failures.get(), dc2Failures.get());
-        // Return successful result for 1st result, failed for the rest
         writerContext.setCommitResultSupplier((uuids, dc) -> {
             if (dcFailures.get(dc).getAndDecrement() > 0)
             {
-                return new DirectDataTransferApi.RemoteCommitResult(false, null, uuids, "");
+                // failure
+                return new DirectDataTransferApi.RemoteCommitResult(false, uuids, null, "");
             }
             else
             {
-                return new DirectDataTransferApi.RemoteCommitResult(true, uuids, null, "");
+                // success
+                return new DirectDataTransferApi.RemoteCommitResult(true, null, null, "");
             }
         });
         StreamSession<?> streamSession = createStreamSession(NonValidatingTestSortedSSTableWriter::new);
@@ -118,7 +122,7 @@
         if (shouldFail)
         {
             ExecutionException exception = assertThrows(ExecutionException.class, fut::get);
-            assertEquals("Failed to load 1 ranges with " + consistencyLevel
+            assertEquals("Failed to write 1 ranges with " + consistencyLevel
                          + " for job " + writerContext.job().getId()
                          + " in phase UploadAndCommit.", exception.getCause().getMessage());
         }
@@ -143,7 +147,7 @@
                                                        List<Integer> failuresPerDc)
     throws IOException, ExecutionException, InterruptedException
     {
-        setup(consistencyLevel, failuresPerDc);
+        setup(consistencyLevel);
         AtomicInteger dc1Failures = new AtomicInteger(failuresPerDc.get(0));
         AtomicInteger dc2Failures = new AtomicInteger(failuresPerDc.get(1));
         int numFailures = dc1Failures.get() + dc2Failures.get();
@@ -156,7 +160,8 @@
         if (shouldFail)
         {
             ExecutionException exception = assertThrows(ExecutionException.class, fut::get);
-            assertEquals("Failed to load 1 ranges with " + consistencyLevel
+            assertInstanceOf(ConsistencyNotSatisfiedException.class, exception.getCause());
+            assertEquals("Failed to write 1 ranges with " + consistencyLevel
                          + " for job " + writerContext.job().getId()
                          + " in phase UploadAndCommit.", exception.getCause().getMessage());
         }
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
index bab7db5..117e78c 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
@@ -65,9 +65,11 @@
                                                 .get();
         ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
         Map<String, Set<String>> writeReplicas =
-        instances.stream().collect(Collectors.groupingBy(RingInstance::datacenter,
-                                                         Collectors.mapping(RingInstance::nodeName,
-                                                                            Collectors.toSet())));
+        instances.stream()
+                 .collect(Collectors.groupingBy(RingInstance::datacenter,
+                                                // writeReplicas are ultimately created from StorageService#getRangeToEndpointMap in Cassandra
+                                                // The returned values are ip addresses with port.
+                                                Collectors.mapping(RingInstance::ipAddressWithPort, Collectors.toSet())));
         writeReplicas.replaceAll((key, value) -> {
             value.removeIf(e -> value.size() > 3);
             return value;
@@ -77,7 +79,7 @@
                                                          .map(i -> new ReplicaMetadata(i.ringInstance().state(),
                                                                                        i.ringInstance().status(),
                                                                                        i.nodeName(),
-                                                                                       i.ipAddress(),
+                                                                                       i.ipAddressWithPort(),
                                                                                        7012,
                                                                                        i.datacenter()))
                                                          .collect(Collectors.toList());
@@ -118,18 +120,16 @@
         ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
         Map<String, Set<String>> writeReplicas =
         instances.stream().collect(Collectors.groupingBy(RingInstance::datacenter,
-                                                         Collectors.mapping(RingInstance::nodeName,
+                                                         // writeReplicas are ultimately created from StorageService#getRangeToEndpointMap in Cassandra
+                                                         // The returned values are ip addresses with port.
+                                                         Collectors.mapping(RingInstance::ipAddressWithPort,
                                                                             Collectors.toSet())));
-        writeReplicas.replaceAll((key, value) -> {
-            value.removeIf(e -> value.size() > 3);
-            return value;
-        });
 
         List<ReplicaMetadata> replicaMetadata = instances.stream()
                                                          .map(i -> new ReplicaMetadata(i.ringInstance().state(),
                                                                                        i.ringInstance().status(),
                                                                                        i.nodeName(),
-                                                                                       i.ipAddress(),
+                                                                                       i.ipAddressWithPort(),
                                                                                        7012,
                                                                                        i.datacenter()))
                                                          .collect(Collectors.toList());
@@ -177,18 +177,16 @@
         ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
         Map<String, Set<String>> writeReplicas =
         instances.stream().collect(Collectors.groupingBy(RingInstance::datacenter,
-                                                         Collectors.mapping(RingInstance::nodeName,
+                                                         // writeReplicas are ultimately created from StorageService#getRangeToEndpointMap in Cassandra
+                                                         // The returned values are ip addresses with port.
+                                                         Collectors.mapping(RingInstance::ipAddressWithPort,
                                                                             Collectors.toSet())));
-        writeReplicas.replaceAll((key, value) -> {
-            value.removeIf(e -> value.size() > 3);
-            return value;
-        });
 
         List<ReplicaMetadata> replicaMetadata = instances.stream()
                                                          .map(i -> new ReplicaMetadata(i.ringInstance().state(),
                                                                                        i.ringInstance().status(),
                                                                                        i.nodeName(),
-                                                                                       i.ipAddress(),
+                                                                                       i.ipAddressWithPort(),
                                                                                        7012,
                                                                                        i.datacenter()))
                                                          .collect(Collectors.toList());
@@ -261,13 +259,14 @@
             String datacenter = rfForDC.getKey();
             for (int i = 0; i < instancesPerDc; i++)
             {
+                int instanceId = i + 1;
                 RingEntry.Builder ringEntry = new RingEntry.Builder()
-                                              .address("127.0." + dcOffset + "." + i)
+                                              .address("127.0." + dcOffset + "." + instanceId)
                                               .datacenter(datacenter)
                                               .load("0")
                                               // Single DC tokens will be in multiples of 100000
                                               .token(Integer.toString(initialToken + dcOffset + 100_000 * i))
-                                              .fqdn(datacenter + "-i" + i)
+                                              .fqdn(datacenter + "-i" + instanceId)
                                               .rack("Rack")
                                               .hostId("")
                                               .status("UP")
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/BulkWriterConsistencyLevelTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/BulkWriterConsistencyLevelTest.java
index 858934a..e02a3d3 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/BulkWriterConsistencyLevelTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/BulkWriterConsistencyLevelTest.java
@@ -21,10 +21,9 @@
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import com.google.common.collect.ImmutableMap;
 import org.junit.jupiter.api.BeforeAll;
@@ -42,27 +41,51 @@
 {
     private static final ReplicationFactor replicationFactor = new ReplicationFactor(ImmutableMap.of(
     "class", "NetworkTopologyStrategy",
-    "dc1", "3"));
+    "dc1", "3",
+    "dc2", "3"));
 
     private static List<CassandraInstance> succeededNone = Collections.emptyList();
     private static List<CassandraInstance> succeededOne;
+    private static List<CassandraInstance> succeededOnePerDc;
     private static List<CassandraInstance> succeededTwo;
+    private static List<CassandraInstance> succeededTwoPerDc;
     private static List<CassandraInstance> succeededThree;
+    private static List<CassandraInstance> succeededThreePerDc;
+    private static List<CassandraInstance> succeededQuorum; // across DC
+    private static List<CassandraInstance> succeededHalf; // across DC; half < quorum
+    private static List<CassandraInstance> succeededAll; // all the nodes
 
-    private static Set<String> zero = Collections.emptySet();
-    private static Set<String> one = intToSet(1);
-    private static Set<String> two = intToSet(2);
-    private static Set<String> three = intToSet(3);
+    private static Set<CassandraInstance> zero = Collections.emptySet();
+    private static Set<CassandraInstance> onePerDc = intToSet(1);
+    private static Set<CassandraInstance> one = intToSet(1, true);
+    private static Set<CassandraInstance> twoPerDc = intToSet(2);
+    private static Set<CassandraInstance> two = intToSet(2, true);
+    private static Set<CassandraInstance> threePerDc = intToSet(3);
+    private static Set<CassandraInstance> three = intToSet(3, true);
 
     @BeforeAll
     static void setup()
     {
-        CassandraInstance i1 = mockInstance("dc1");
-        CassandraInstance i2 = mockInstance("dc1");
-        CassandraInstance i3 = mockInstance("dc1");
-        succeededOne = Arrays.asList(i1);
-        succeededTwo = Arrays.asList(i1, i2);
-        succeededThree = Arrays.asList(i1, i2, i3);
+        CassandraInstance dc1i1 = mockInstance("dc1");
+        CassandraInstance dc1i2 = mockInstance("dc1");
+        CassandraInstance dc1i3 = mockInstance("dc1");
+        CassandraInstance dc2i1 = mockInstance("dc2");
+        CassandraInstance dc2i2 = mockInstance("dc2");
+        CassandraInstance dc2i3 = mockInstance("dc2");
+        succeededOne = Arrays.asList(dc1i1);
+        succeededTwo = Arrays.asList(dc1i1, dc1i2);
+        succeededThree = Arrays.asList(dc1i1, dc1i2, dc1i3);
+        succeededOnePerDc = Arrays.asList(dc1i1,
+                                          dc2i1);
+        succeededTwoPerDc = Arrays.asList(dc1i1, dc1i2,
+                                          dc2i1, dc2i2);
+        succeededThreePerDc = Arrays.asList(dc1i1, dc1i2, dc1i3,
+                                            dc2i1, dc2i2, dc2i3);
+        succeededQuorum = Arrays.asList(dc1i1, dc1i2, dc1i3,
+                                        dc2i1);
+        succeededHalf = Arrays.asList(dc1i1, dc1i2,
+                                      dc2i1);
+        succeededAll = succeededThreePerDc;
     }
 
     @Test
@@ -82,13 +105,14 @@
         testCanBeSatisfied(CL.LOCAL_QUORUM, succeededTwo, true);
         testCanBeSatisfied(CL.LOCAL_QUORUM, succeededThree, true);
 
-        testCanBeSatisfied(CL.EACH_QUORUM, succeededTwo, true);
-        testCanBeSatisfied(CL.EACH_QUORUM, succeededThree, true);
+        testCanBeSatisfied(CL.EACH_QUORUM, succeededTwoPerDc, true);
+        testCanBeSatisfied(CL.EACH_QUORUM, succeededThreePerDc, true);
 
-        testCanBeSatisfied(CL.QUORUM, succeededTwo, true);
-        testCanBeSatisfied(CL.QUORUM, succeededThree, true);
+        testCanBeSatisfied(CL.QUORUM, succeededTwoPerDc, true);
+        testCanBeSatisfied(CL.QUORUM, succeededThreePerDc, true);
+        testCanBeSatisfied(CL.QUORUM, succeededQuorum, true);
 
-        testCanBeSatisfied(CL.ALL, succeededThree, true);
+        testCanBeSatisfied(CL.ALL, succeededAll, true);
     }
 
     @Test
@@ -106,79 +130,107 @@
 
         testCanBeSatisfied(CL.EACH_QUORUM, succeededNone, false);
         testCanBeSatisfied(CL.EACH_QUORUM, succeededOne, false);
+        testCanBeSatisfied(CL.EACH_QUORUM, succeededOnePerDc, false);
 
         testCanBeSatisfied(CL.QUORUM, succeededNone, false);
         testCanBeSatisfied(CL.QUORUM, succeededOne, false);
+        testCanBeSatisfied(CL.QUORUM, succeededOnePerDc, false);
+        testCanBeSatisfied(CL.QUORUM, succeededTwo, false);
+        testCanBeSatisfied(CL.QUORUM, succeededHalf, false);
 
         testCanBeSatisfied(CL.ALL, succeededNone, false);
         testCanBeSatisfied(CL.ALL, succeededOne, false);
+        testCanBeSatisfied(CL.ALL, succeededOnePerDc, false);
         testCanBeSatisfied(CL.ALL, succeededTwo, false);
+        testCanBeSatisfied(CL.ALL, succeededTwoPerDc, false);
+        testCanBeSatisfied(CL.ALL, succeededQuorum, false);
+        testCanBeSatisfied(CL.ALL, succeededHalf, false);
     }
 
     @Test
-    void testCheckConsistencyReturnsTrue()
+    void testCanBeSatisfiedWithPendingReturnsTrue()
     {
-        testCheckConsistency(CL.ONE, /* total */ three, /* failed */ zero, zero, true);
-        testCheckConsistency(CL.ONE, /* total */ three, /* failed */ one, zero, true);
-        testCheckConsistency(CL.ONE, /* total */ three, /* failed */ two, zero, true);
+        testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ three, /* pending */ zero, true);
+        testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ three, /* pending */ two, true);
+        testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ three, /* pending */ one, true);
+        testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ two, /* pending */ zero, true);
+        testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ two, /* pending */ one, true);
+        testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ one, /* pending */ zero, true);
 
-        testCheckConsistency(CL.TWO, /* total */ three, /* failed */ zero, zero, true);
-        testCheckConsistency(CL.TWO, /* total */ three, /* failed */ one, zero, true);
+        testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ three, /* pending */ zero, true);
+        testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ three, /* pending */ one, true);
+        testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ two, /* pending */ zero, true);
 
-        testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ zero, /* pending */ zero, true);
-        testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ zero, /* pending */ one, true);
-        testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ zero, /* pending */ two, true);
-        testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ one, /* pending */ one, true);
-        testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ two, /* pending */ zero, true);
+        testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ three, /* pending */ zero, true);
+        testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ three, /* pending */ two, true);
+        testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ three, /* pending */ one, true);
+        testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ two, /* pending */ zero, true);
+        testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ two, /* pending */ one, true);
+        testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ one, /* pending */ zero, true);
 
-        testCheckConsistency(CL.LOCAL_QUORUM, /* total */ three, /* failed */ zero, zero, true);
-        testCheckConsistency(CL.LOCAL_QUORUM, /* total */ three, /* failed */ one, zero, true);
+        testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ three, /* pending */ zero, true);
+        testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ three, /* pending */ one, true);
+        testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ three, /* pending */ onePerDc, true);
+        testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ two, /* pending */ zero, true);
 
-        testCheckConsistency(CL.EACH_QUORUM, /* total */ three, /* failed */ zero, zero, true);
-        testCheckConsistency(CL.EACH_QUORUM, /* total */ three, /* failed */ one, zero, true);
+        testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ threePerDc, /* pending */ zero, true);
+        testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ threePerDc, /* pending */ one, true);
+        testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ threePerDc, /* pending */ onePerDc, true);
+        testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ twoPerDc, /* pending */ zero, true);
 
-        testCheckConsistency(CL.QUORUM, /* total */ three, /* failed */ zero, zero, true);
-        testCheckConsistency(CL.QUORUM, /* total */ three, /* failed */ one, zero, true);
+        testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ threePerDc, /* pending */ zero, true);
+        testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ threePerDc, /* pending */ onePerDc, true);
+        testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ twoPerDc, /* pending */ zero, true);
 
-        testCheckConsistency(CL.ALL, /* total */ three, /* failed */ zero, zero, true);
+        testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ threePerDc, /* pending */ zero, true);
     }
 
     @Test
-    void testCheckConsistencyReturnsFalse()
+    void testCanBeSatisfiedWithPendingReturnsFalse()
     {
-        testCheckConsistency(CL.ONE, /* total */ three, /* failed */ three, zero, false);
+        testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ zero, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ onePerDc, /* pending */ onePerDc, false);
 
-        testCheckConsistency(CL.TWO, /* total */ three, /* failed */ three, zero, false);
-        testCheckConsistency(CL.TWO, /* total */ three, /* failed */ two, zero, false);
+        testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ zero, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ one, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ two, /* pending */ one, false);
 
-        testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ three, /* pending */ zero, false);
-        testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ two, /* pending */ one, false);
-        testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ one, /* pending */ two, false);
+        testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ zero, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ one, /* pending */ one, false);
+        testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ two, /* pending */ two, false);
 
-        testCheckConsistency(CL.LOCAL_QUORUM, /* total */ three, /* failed */ three, zero, false);
-        testCheckConsistency(CL.LOCAL_QUORUM, /* total */ three, /* failed */ two, zero, false);
+        testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ zero, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ one, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ two, /* pending */ one, false);
 
-        testCheckConsistency(CL.EACH_QUORUM, /* total */ three, /* failed */ three, zero, false);
-        testCheckConsistency(CL.EACH_QUORUM, /* total */ three, /* failed */ two, zero, false);
+        testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ zero, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ onePerDc, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ twoPerDc, /* pending */ onePerDc, false);
 
-        testCheckConsistency(CL.QUORUM, /* total */ three, /* failed */ three, zero, false);
-        testCheckConsistency(CL.QUORUM, /* total */ three, /* failed */ two, zero, false);
+        testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ zero, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ onePerDc, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ twoPerDc, /* pending */ onePerDc, false);
 
-        testCheckConsistency(CL.ALL, /* total */ three, /* failed */ one, zero, false);
-        testCheckConsistency(CL.ALL, /* total */ three, /* failed */ two, zero, false);
-        testCheckConsistency(CL.ALL, /* total */ three, /* failed */ three, zero, false);
+        testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ two, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ twoPerDc, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ one, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ onePerDc, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ zero, /* pending */ zero, false);
+        testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ threePerDc, /* pending */ one, false);
+        testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ threePerDc, /* pending */ onePerDc, false);
     }
 
     private void testCanBeSatisfied(ConsistencyLevel cl, List<CassandraInstance> succeeded, boolean expectedResult)
     {
-        assertThat(cl.canBeSatisfied(succeeded, replicationFactor, "dc1")).isEqualTo(expectedResult);
+        assertThat(cl.canBeSatisfied(succeeded, zero, replicationFactor, "dc1")).isEqualTo(expectedResult);
     }
 
-    private void testCheckConsistency(ConsistencyLevel cl, Set<String> total, Set<String> failed, Set<String> pending, boolean expectedResult)
+    private void testCanBeSatisfiedWithPending(ConsistencyLevel cl,
+                                               Set<CassandraInstance> succeeded,
+                                               Set<CassandraInstance> pending,
+                                               boolean expectedResult)
     {
-        assertThat(cl.checkConsistency(total, pending, zero, // replacement is not used
-                                       zero, // include blocking instance set in failed set
-                                       failed, "dc1")).isEqualTo(expectedResult);
+        assertThat(cl.canBeSatisfied(succeeded, pending, replicationFactor, "dc1")).isEqualTo(expectedResult);
     }
 
     private static CassandraInstance mockInstance(String dc)
@@ -188,8 +240,27 @@
         return i;
     }
 
-    private static Set<String> intToSet(int i)
+    private static Set<CassandraInstance> intToSet(int i)
     {
-        return IntStream.range(0, i).mapToObj(String::valueOf).collect(Collectors.toSet());
+        return intToSet(i, false);
+    }
+
+    private static Set<CassandraInstance> intToSet(int i, boolean singleDc)
+    {
+        Set<CassandraInstance> res = new HashSet<>();
+        for (int j = 0; j < i; j++)
+        {
+            CassandraInstance dc1Instance = mock(CassandraInstance.class);
+            when(dc1Instance.datacenter()).thenReturn("dc1");
+            res.add(dc1Instance);
+            if (singleDc)
+            {
+                continue;
+            }
+            CassandraInstance dc2Instance = mock(CassandraInstance.class);
+            when(dc2Instance.datacenter()).thenReturn("dc2");
+            res.add(dc2Instance);
+        }
+        return res;
     }
 }
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java
index fbd5479..969c29b 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java
@@ -114,13 +114,13 @@
                           .hasMessageContaining("java.lang.RuntimeException: Bulk Write to Cassandra has failed");
 
         Throwable cause = thrown;
-        while (cause != null && !StringUtils.contains(cause.getMessage(), "Failed to load"))
+        while (cause != null && !StringUtils.contains(cause.getMessage(), "Failed to write"))
         {
             cause = cause.getCause();
         }
 
         assertThat(cause).isNotNull()
-                         .hasMessageFindingMatch(String.format("Failed to load (\\d+) ranges with %s for " +
+                         .hasMessageFindingMatch(String.format("Failed to write (\\d+) ranges with %s for " +
                                                                "job ([a-zA-Z0-9-]+) in phase Environment Validation.", cl.writeCL));
 
         expectedInstanceData.entrySet()
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index 9628bed..5fc3425 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -216,13 +216,13 @@
         Throwable cause = thrown;
 
         // Find the cause
-        while (cause != null && !StringUtils.contains(cause.getMessage(), "Failed to load"))
+        while (cause != null && !StringUtils.contains(cause.getMessage(), "Failed to write"))
         {
             cause = cause.getCause();
         }
 
         assertThat(cause).isNotNull()
-                         .hasMessageFindingMatch("Failed to load (\\d+) ranges with " + writeCL +
+                         .hasMessageFindingMatch("Failed to write (\\d+) ranges with " + writeCL +
                                                  " for job ([a-zA-Z0-9-]+) in phase .*");
     }
 
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java
index a69f6c8..19c902d 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java
@@ -43,13 +43,11 @@
 import org.apache.cassandra.spark.bulkwriter.WriterOptions;
 import org.apache.cassandra.testing.ClusterBuilderConfiguration;
 
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static net.bytebuddy.matcher.ElementMatchers.named;
 import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
 import static org.apache.cassandra.testing.TestUtils.DC1_RF3_DC2_RF3;
 import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
@@ -107,8 +105,7 @@
         return Stream.of(
         Arguments.of(TestConsistencyLevel.of(LOCAL_QUORUM, LOCAL_QUORUM)),
         Arguments.of(TestConsistencyLevel.of(LOCAL_QUORUM, EACH_QUORUM)),
-        Arguments.of(TestConsistencyLevel.of(QUORUM, QUORUM)),
-        Arguments.of(TestConsistencyLevel.of(ONE, ALL))
+        Arguments.of(TestConsistencyLevel.of(QUORUM, QUORUM))
         );
     }
 
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
index b11a816..9cbbeba 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
@@ -49,7 +49,6 @@
 import org.apache.spark.sql.SparkSession;
 
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
 import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
 import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
@@ -191,7 +190,6 @@
     static Stream<Arguments> singleDCTestInputs()
     {
         return Stream.of(
-        Arguments.of(TestConsistencyLevel.of(ONE, ALL)),
         Arguments.of(TestConsistencyLevel.of(QUORUM, QUORUM))
         );
     }