CASSANDRA-19842: Consistency level check incorrectly passes when majority of the replica set is unavailable for write (#75)
Patch by Yifan Cai; Reviewed by Doug Rohrer, Francisco Guerrero for CASSANDRA-19842
diff --git a/CHANGES.txt b/CHANGES.txt
index cec6e0a..2922b0a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.0.0
+ * Fix consistency level check for write (CASSANDRA-19842)
* Fix NPE when writing UDT values (CASSANDRA-19836)
* Add job_timeout_seconds writer option (CASSANDRA-19827)
* Prevent double closing sstable writer (CASSANDRA-19821)
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
index 13c465a..3820efd 100644
--- a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/CassandraInstance.java
@@ -27,5 +27,27 @@
String datacenter();
+ /**
+ * IP address string of a Cassandra instance.
+ * Mainly used in blocked instance list to identify instances.
+ * Prefer to use {@link #ipAddressWithPort} as instance identifier,
+ * unless knowing the compared is IP address without port for sure.
+ */
String ipAddress();
+
+ /**
+ * Equivalent to EndpointWithPort in Cassandra
+ * @return ip address with port string in the format, "ip:port"
+ */
+ String ipAddressWithPort();
+
+ /**
+ * @return state of the node
+ */
+ NodeState nodeState();
+
+ /**
+ * @return status of the node
+ */
+ NodeStatus nodeStatus();
}
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeState.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeState.java
new file mode 100644
index 0000000..93f148f
--- /dev/null
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common.model;
+
+public enum NodeState
+{
+ NORMAL(false),
+ JOINING(true),
+ LEAVING(true),
+ MOVING(true),
+ REPLACING(true); // state added in sidecar
+
+ public final boolean isPending;
+
+ NodeState(boolean isPending)
+ {
+ this.isPending = isPending;
+ }
+
+ public static NodeState fromNameIgnoreCase(String name)
+ {
+ String uppercase = name.toUpperCase();
+ return valueOf(uppercase);
+ }
+}
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeStatus.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeStatus.java
new file mode 100644
index 0000000..8b64cdd
--- /dev/null
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/common/model/NodeStatus.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.common.model;
+
+public enum NodeStatus
+{
+ UP,
+ DOWN,
+ UNKNOWN;
+
+ public static NodeStatus fromNameIgnoreCase(String name)
+ {
+ String uppercase = name.toUpperCase();
+ try
+ {
+ return valueOf(uppercase);
+ }
+ catch (Exception ex)
+ {
+ // default to UNKNOWN
+ return UNKNOWN;
+ }
+ }
+}
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/AnalyticsException.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/AnalyticsException.java
new file mode 100644
index 0000000..ab92fb2
--- /dev/null
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/AnalyticsException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.exception;
+
+/**
+ * Cassandra Analytics exceptions base class for exception handling
+ * Note that it is a RuntimeException (unchecked). It gives call-sites the flexibility of handling the exceptions they cared about without forcing the
+ * catch blocks (and making the code overly verbose). In most cases, the call-sites simply converts a checked exception into unchecked and rethrow.
+ */
+public abstract class AnalyticsException extends RuntimeException
+{
+ private static final long serialVersionUID = 3980444570316598756L;
+
+ public AnalyticsException(String message)
+ {
+ super(message);
+ }
+
+ public AnalyticsException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public AnalyticsException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ protected AnalyticsException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace)
+ {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/ConsistencyNotSatisfiedException.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/ConsistencyNotSatisfiedException.java
new file mode 100644
index 0000000..d82a988
--- /dev/null
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/ConsistencyNotSatisfiedException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.exception;
+
+/**
+ * Consistency cannot be satisfied by the bulk operations, i.e. read or write
+ */
+public class ConsistencyNotSatisfiedException extends AnalyticsException
+{
+ private static final long serialVersionUID = 992947698403422384L;
+
+ public ConsistencyNotSatisfiedException(String message)
+ {
+ super(message);
+ }
+
+ public ConsistencyNotSatisfiedException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
diff --git a/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/SidecarApiCallException.java b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/SidecarApiCallException.java
new file mode 100644
index 0000000..d59c138
--- /dev/null
+++ b/cassandra-analytics-common/src/main/java/org/apache/cassandra/spark/exception/SidecarApiCallException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.exception;
+
+/**
+ * Exception due to Cassandra Sidecar Api call failure
+ */
+public class SidecarApiCallException extends AnalyticsException
+{
+ public SidecarApiCallException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
index 41b2315..1ad92a6 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkSparkConf.java
@@ -164,7 +164,13 @@
this.skipExtendedVerify = MapUtils.getBoolean(options, WriterOptions.SKIP_EXTENDED_VERIFY.name(), true,
"skip extended verification of SSTables by Cassandra");
this.consistencyLevel = ConsistencyLevel.CL.valueOf(MapUtils.getOrDefault(options, WriterOptions.BULK_WRITER_CL.name(), "EACH_QUORUM"));
- this.localDC = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null);
+ String dc = MapUtils.getOrDefault(options, WriterOptions.LOCAL_DC.name(), null);
+ if (!consistencyLevel.isLocal() && dc != null)
+ {
+ LOGGER.warn("localDc is present for non-local consistency level {} specified in writer options. Correcting localDc to null", consistencyLevel);
+ dc = null;
+ }
+ this.localDC = dc;
this.numberSplits = MapUtils.getInt(options, WriterOptions.NUMBER_SPLITS.name(), DEFAULT_NUM_SPLITS, "number of splits");
this.sstableDataSizeInMiB = resolveSSTableDataSizeInMiB(options);
this.commitBatchSize = MapUtils.getInt(options, WriterOptions.COMMIT_BATCH_SIZE.name(), DEFAULT_COMMIT_BATCH_SIZE, "commit batch size");
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java
index 42413d0..9946271 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidator.java
@@ -20,18 +20,17 @@
package org.apache.cassandra.spark.bulkwriter;
import java.math.BigInteger;
-import java.util.AbstractMap;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import com.google.common.collect.Multimap;
import com.google.common.collect.Range;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.exception.ConsistencyNotSatisfiedException;
public class BulkWriteValidator
{
@@ -57,8 +56,8 @@
String phase,
JobInfo job)
{
- Collection<AbstractMap.SimpleEntry<Range<BigInteger>, Multimap<RingInstance, String>>> failedRanges =
- failureHandler.getFailedEntries(tokenRangeMapping, job.getConsistencyLevel(), job.getLocalDC());
+ List<ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange> failedRanges =
+ failureHandler.getFailedRanges(tokenRangeMapping, job.getConsistencyLevel(), job.getLocalDC());
if (failedRanges.isEmpty())
{
@@ -66,18 +65,11 @@
}
else
{
- String message = String.format("Failed to load %s ranges with %s for job %s in phase %s.",
+ String message = String.format("Failed to write %s ranges with %s for job %s in phase %s.",
failedRanges.size(), job.getConsistencyLevel(), job.getId(), phase);
logger.error(message);
- failedRanges.forEach(failedRange ->
- failedRange.getValue()
- .keySet()
- .forEach(instance ->
- logger.error("Failed {} for {} on {}",
- phase,
- failedRange.getKey(),
- instance.toString())));
- throw new RuntimeException(message);
+ logFailedRanges(logger, phase, failedRanges);
+ throw new ConsistencyNotSatisfiedException(message);
}
}
@@ -117,6 +109,21 @@
});
}
+ private static void logFailedRanges(Logger logger, String phase,
+ List<ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange> failedRanges)
+ {
+ for (ReplicaAwareFailureHandler<RingInstance>.ConsistencyFailurePerRange failedRange : failedRanges)
+ {
+ failedRange.failuresPerInstance.forEachInstance((instance, errors) -> {
+ logger.error("Failed in phase {} for {} on {}. Failure: {}",
+ phase,
+ failedRange.range,
+ instance.toString(),
+ errors);
+ });
+ }
+ }
+
public void updateFailureHandler(Range<BigInteger> failedRange, RingInstance instance, String reason)
{
failureHandler.addFailure(failedRange, instance, reason);
@@ -124,8 +131,16 @@
public void validateClOrFail(TokenRangeMapping<RingInstance> tokenRangeMapping)
{
- // Updates failures by looking up instance metadata
- updateInstanceAvailability();
+ validateClOrFail(tokenRangeMapping, true);
+ }
+
+ public void validateClOrFail(TokenRangeMapping<RingInstance> tokenRangeMapping, boolean refreshInstanceAvailability)
+ {
+ if (refreshInstanceAvailability)
+ {
+ // Updates failures by looking up instance metadata
+ updateInstanceAvailability();
+ }
// Fails if the failures violate consistency requirements
validateClOrFail(tokenRangeMapping, failureHandler, LOGGER, phase, job);
}
@@ -147,12 +162,9 @@
+ "Please rerun import once topology changes are complete.",
instance.nodeName(), cluster.getInstanceState(instance));
throw new RuntimeException(errorMessage);
- // Check for blocked instances and ranges for the purpose of logging only.
- // We check for blocked instances while validating consistency level requirements
+ // Both 'blocked' and 'down' instances are considered as failure
case UNAVAILABLE_BLOCKED:
case UNAVAILABLE_DOWN:
- boolean shouldAddFailure = availability == InstanceAvailability.UNAVAILABLE_DOWN;
-
Collection<Range<BigInteger>> unavailableRanges = cluster.getTokenRangeMapping(true)
.getTokenRanges()
.get(instance);
@@ -160,10 +172,7 @@
String nodeDisplayName = instance.nodeName();
String message = String.format("%s %s", nodeDisplayName, availability.getMessage());
LOGGER.warn("{} failed in phase {} on {} because {}", failedRange, phase, nodeDisplayName, message);
- if (shouldAddFailure)
- {
- failureHandler.addFailure(failedRange, instance, message);
- }
+ failureHandler.addFailure(failedRange, instance, message);
});
break;
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
index c208ed4..13c616e 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/CassandraClusterInfo.java
@@ -34,6 +34,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
@@ -60,8 +62,10 @@
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
import org.apache.cassandra.spark.common.client.InstanceState;
import org.apache.cassandra.spark.common.client.InstanceStatus;
+import org.apache.cassandra.spark.common.model.NodeState;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.exception.SidecarApiCallException;
import org.apache.cassandra.spark.utils.CqlUtils;
import org.apache.cassandra.spark.utils.FutureUtils;
import org.jetbrains.annotations.NotNull;
@@ -227,10 +231,18 @@
return schemaResponse.schema();
}
- private TokenRangeReplicasResponse getTokenRangesAndReplicaSets() throws ExecutionException, InterruptedException
+ private TokenRangeReplicasResponse getTokenRangesAndReplicaSets()
{
CassandraContext context = getCassandraContext();
- return context.getSidecarClient().tokenRangeReplicas(new ArrayList<>(context.getCluster()), conf.keyspace).get();
+ try
+ {
+ return context.getSidecarClient().tokenRangeReplicas(new ArrayList<>(context.getCluster()), conf.keyspace).get();
+ }
+ catch (ExecutionException | InterruptedException exception)
+ {
+ LOGGER.error("Failed to get token ranges for keyspace {}", conf.keyspace, exception);
+ throw new SidecarApiCallException("Failed to get token ranges for keyspace" + conf.keyspace, exception);
+ }
}
private Set<String> readReplicasFromTokenRangeResponse(TokenRangeReplicasResponse response)
@@ -381,67 +393,66 @@
private TokenRangeMapping<RingInstance> getTokenRangeReplicas()
{
+ return getTokenRangeReplicas(this::getTokenRangesAndReplicaSets,
+ this::getPartitioner,
+ this::getReplicationFactor,
+ this::instanceIsBlocked);
+ }
+
+ @VisibleForTesting
+ static TokenRangeMapping<RingInstance> getTokenRangeReplicas(Supplier<TokenRangeReplicasResponse> topologySupplier,
+ Supplier<Partitioner> partitionerSupplier,
+ Supplier<ReplicationFactor> replicationFactorSupplier,
+ Predicate<? super RingInstance> blockedInstancePredicate)
+ {
+ long start = System.nanoTime();
+ TokenRangeReplicasResponse response = topologySupplier.get();
+ long elapsedTimeNanos = System.nanoTime() - start;
+ Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(),
+ response.replicaMetadata());
+ LOGGER.info("Retrieved token ranges for {} instances from write replica set in {} milliseconds",
+ tokenRangesByInstance.size(),
+ TimeUnit.NANOSECONDS.toMillis(elapsedTimeNanos));
+
+ Set<RingInstance> instances = response.replicaMetadata()
+ .values()
+ .stream()
+ .map(RingInstance::new)
+ .collect(Collectors.toSet());
+
+ Set<RingInstance> replacementInstances = instances.stream()
+ .filter(i -> i.nodeState() == NodeState.REPLACING)
+ .collect(Collectors.toSet());
+
+ Set<RingInstance> blockedInstances = instances.stream()
+ .filter(blockedInstancePredicate)
+ .collect(Collectors.toSet());
+
+ Set<String> blockedIps = blockedInstances.stream().map(i -> i.ringInstance().address())
+ .collect(Collectors.toSet());
+
+ // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC
Map<String, Set<String>> writeReplicasByDC;
- Map<String, Set<String>> pendingReplicasByDC;
- Map<String, ReplicaMetadata> replicaMetadata;
- Set<RingInstance> blockedInstances;
- Set<RingInstance> replacementInstances;
- Multimap<RingInstance, Range<BigInteger>> tokenRangesByInstance;
- try
+ writeReplicasByDC = response.writeReplicas()
+ .stream()
+ .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream())
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
+ (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps)));
+
+ Map<String, Set<String>> pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC);
+
+ if (LOGGER.isDebugEnabled())
{
- long start = System.nanoTime();
- TokenRangeReplicasResponse response = getTokenRangesAndReplicaSets();
- long elapsedTimeNanos = System.nanoTime() - start;
- replicaMetadata = response.replicaMetadata();
-
- tokenRangesByInstance = getTokenRangesByInstance(response.writeReplicas(), response.replicaMetadata());
- LOGGER.info("Retrieved token ranges for {} instances from write replica set in {} nanoseconds",
- tokenRangesByInstance.size(),
- elapsedTimeNanos);
-
- replacementInstances = response.replicaMetadata()
- .values()
- .stream()
- .filter(m -> m.state().equalsIgnoreCase(InstanceState.REPLACING.name()))
- .map(RingInstance::new)
- .collect(Collectors.toSet());
-
- blockedInstances = response.replicaMetadata()
- .values()
- .stream()
- .map(RingInstance::new)
- .filter(this::instanceIsBlocked)
- .collect(Collectors.toSet());
-
- Set<String> blockedIps = blockedInstances.stream().map(i -> i.ringInstance().address())
- .collect(Collectors.toSet());
-
- // Each token range has hosts by DC. We collate them across all ranges into all hosts by DC
- writeReplicasByDC = response.writeReplicas()
- .stream()
- .flatMap(wr -> wr.replicasByDatacenter().entrySet().stream())
- .collect(Collectors.toMap(Map.Entry::getKey, e -> new HashSet<>(e.getValue()),
- (l1, l2) -> filterAndMergeInstances(l1, l2, blockedIps)));
-
- pendingReplicasByDC = getPendingReplicas(response, writeReplicasByDC);
-
- if (LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}",
- writeReplicasByDC.keySet(),
- writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
- pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
- }
- }
- catch (ExecutionException | InterruptedException exception)
- {
- LOGGER.error("Failed to get token ranges, ", exception);
- throw new RuntimeException(exception);
+ LOGGER.debug("Fetched token-ranges with dcs={}, write_replica_count={}, pending_replica_count={}",
+ writeReplicasByDC.keySet(),
+ writeReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size(),
+ pendingReplicasByDC.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()).size());
}
+ Map<String, ReplicaMetadata> replicaMetadata = response.replicaMetadata();
// Include availability info so CL checks can use it to exclude replacement hosts
- return new TokenRangeMapping<>(getPartitioner(),
- getReplicationFactor(),
+ return new TokenRangeMapping<>(partitionerSupplier.get(),
+ replicationFactorSupplier.get(),
writeReplicasByDC,
pendingReplicasByDC,
tokenRangesByInstance,
@@ -450,7 +461,7 @@
replacementInstances);
}
- private Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs)
+ private static Set<String> filterAndMergeInstances(Set<String> instancesList1, Set<String> instancesList2, Set<String> blockedIPs)
{
Set<String> merged = new HashSet<>();
// Removes blocked instances. If this is included, remove blockedInstances from CL checks
@@ -460,7 +471,7 @@
return merged;
}
- private Map<String, Set<String>> getPendingReplicas(TokenRangeReplicasResponse response, Map<String, Set<String>> writeReplicasByDC)
+ private static Map<String, Set<String>> getPendingReplicas(TokenRangeReplicasResponse response, Map<String, Set<String>> writeReplicasByDC)
{
Set<String> pendingReplicas = response.replicaMetadata()
@@ -478,7 +489,7 @@
// Filter writeReplica entries and the value replicaSet to only include those with pending replicas
return writeReplicasByDC.entrySet()
.stream()
- .filter(e -> e.getValue().stream()
+ .filter(e -> e.getValue().stream() // todo: transformToHostWithoutPort is called twice for entries
.anyMatch(v -> pendingReplicas.contains(transformToHostWithoutPort(v))))
.collect(Collectors.toMap(Map.Entry::getKey,
e -> e.getValue().stream()
@@ -486,13 +497,13 @@
.collect(Collectors.toSet())));
}
- private String transformToHostWithoutPort(String v)
+ private static String transformToHostWithoutPort(String v)
{
return v.contains(":") ? v.split(":")[0] : v;
}
- private Multimap<RingInstance, Range<BigInteger>> getTokenRangesByInstance(List<ReplicaInfo> writeReplicas,
- Map<String, ReplicaMetadata> replicaMetadata)
+ private static Multimap<RingInstance, Range<BigInteger>> getTokenRangesByInstance(List<ReplicaInfo> writeReplicas,
+ Map<String, ReplicaMetadata> replicaMetadata)
{
Multimap<RingInstance, Range<BigInteger>> instanceToRangeMap = ArrayListMultimap.create();
for (ReplicaInfo rInfo : writeReplicas)
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java
index d1f458a..228ed91 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinator.java
@@ -357,7 +357,7 @@
// it either passes or throw if consistency level cannot be satisfied
try
{
- writeValidator.validateClOrFail(cassandraTopologyMonitor.initialTopology());
+ writeValidator.validateClOrFail(cassandraTopologyMonitor.initialTopology(), false);
}
catch (RuntimeException rte)
{
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
index 60628bd..a454827 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/JobInfo.java
@@ -25,6 +25,7 @@
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.data.QualifiedTableName;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
public interface JobInfo extends Serializable
{
@@ -32,6 +33,7 @@
// Job Information API - should this really just move back to Config? Here to try to reduce the violations of the Law of Demeter more than anything else
ConsistencyLevel getConsistencyLevel();
+ @Nullable
String getLocalDC();
/**
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
index 3169cff..d0111ef 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RecordWriter.java
@@ -337,9 +337,10 @@
if (haveMappingsChanged)
{
Set<Range<BigInteger>> rangeDelta = symmetricDifference(startMapping.keySet(), endMapping.keySet());
- Set<String> instanceDelta = symmetricDifference(initialInstances, endInstances).stream()
- .map(RingInstance::ipAddress)
- .collect(Collectors.toSet());
+ Set<String> instanceDelta = symmetricDifference(initialInstances, endInstances)
+ .stream()
+ .map(RingInstance::ipAddressWithPort)
+ .collect(Collectors.toSet());
String message = String.format("[%s] Token range mappings have changed since the task started " +
"with non-overlapping instances: %s and ranges: %s",
partitionId,
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
index 7efe001..ae04447 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/RingInstance.java
@@ -25,9 +25,11 @@
import java.io.Serializable;
import java.util.Objects;
-import o.a.c.sidecar.client.shaded.common.response.data.RingEntry;
import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
+import o.a.c.sidecar.client.shaded.common.response.data.RingEntry;
import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.common.model.NodeState;
+import org.apache.cassandra.spark.common.model.NodeStatus;
import org.jetbrains.annotations.Nullable;
public class RingInstance implements CassandraInstance, Serializable
@@ -77,9 +79,29 @@
return ringEntry.address();
}
+ @Override
+ public String ipAddressWithPort()
+ {
+ return ringEntry.address() + ':' + ringEntry.port();
+ }
+
+ @Override
+ public NodeState nodeState()
+ {
+ return NodeState.fromNameIgnoreCase(ringEntry.state());
+ }
+
+ @Override
+ public NodeStatus nodeStatus()
+ {
+ return NodeStatus.fromNameIgnoreCase(ringEntry.status());
+ }
+
/**
* Custom equality that compares the token, fully qualified domain name, the port, and the datacenter
*
+ * Note that node state and status are not part of the calculation.
+ *
* @param other the other instance
* @return true if both instances are equal, false otherwise
*/
@@ -101,6 +123,8 @@
/**
* Custom hashCode that compares the token, fully qualified domain name, the port, and the datacenter
*
+ * Note that node state and status are not part of the calculation.
+ *
* @return The hashcode of this instance based on the important fields
*/
@Override
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/CreatedRestoreSlice.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/CreatedRestoreSlice.java
index e1da5c3..168e044 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/CreatedRestoreSlice.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/blobupload/CreatedRestoreSlice.java
@@ -20,6 +20,7 @@
package org.apache.cassandra.spark.bulkwriter.blobupload;
import java.io.Serializable;
+import java.util.Collections;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -106,7 +107,7 @@
}
if (!succeededInstances().isEmpty()
- && consistencyLevel.canBeSatisfied(succeededInstances(), replicationFactor, localDC))
+ && consistencyLevel.canBeSatisfied(succeededInstances(), Collections.emptyList(), replicationFactor, localDC))
{
isSatisfied = true;
return ConsistencyLevelCheckResult.SATISFIED;
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java
index dd48bfe..3372dd9 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ConsistencyLevel.java
@@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.Objects;
-import java.util.Set;
import com.google.common.base.Preconditions;
@@ -38,15 +37,28 @@
boolean isLocal();
/**
- * Check consistency level with the collection of the succeeded instances
+ * Check write consistency can be satisfied with the collection of the succeeded instances
+ * <p>
+ * When pendingReplicas is non-empty, the minimum number of success is increased by the size of pendingReplicas,
+ * keeping the same semantics defined in Cassandra.
+ * See <a href="https://github.com/apache/cassandra/blob/trunk/src/java/org/apache/cassandra/db/ConsistencyLevel.java#L172">blockForWrite</a>
+ * <p>
+ * For example, say RF == 3, and there is 2 pending replicas.
+ * <ul>
+ * <li>QUORUM write consistency requires at least 4 replicas to succeed, i.e. quorum(3) + 2, tolerating 1 failure</li>
+ * <li>ONE write consistency requires at least 3 replicas to succeed, i.e. 1 + 2, tolerating 2 failure</li>
+ * <li>TWO write consistency requires at least 4 replicas to succeed, i.e. 2 + 2, tolerating 1 failure</li>
+ * </ul>
*
* @param succeededInstances the succeeded instances in the replica set
+ * @param pendingInstances the pending instances, i.e. JOINING, LEAVING, MOVING
* @param replicationFactor replication factor to check with
* @param localDC the local data center name if required for the check
* @return true means the consistency level is _definitively_ satisfied.
* Meanwhile, returning false means no conclusion can be drawn
*/
boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+ Collection<? extends CassandraInstance> pendingInstances,
ReplicationFactor replicationFactor,
String localDC);
@@ -56,35 +68,6 @@
cl.name() + " only make sense for NetworkTopologyStrategy keyspaces");
}
- /**
- * Checks if the consistency guarantees are maintained, given the failed, blocked and replacing instances, consistency-level and the replication-factor.
- * <pre>
- * - QUORUM based consistency levels check for quorum using the write-replica-set (instead of RF) as they include healthy and pending nodes.
- * This is done to ensure that writes go to a quorum of healthy nodes while accounting for potential failure in pending nodes becoming healthy.
- * - ONE and TWO consistency guarantees are maintained by ensuring that the failures leave us with at-least the corresponding healthy
- * (and non-pending) nodes.
- *
- * For both the above cases, blocked instances are also considered as failures while performing consistency checks.
- * Write replicas are adjusted to exclude replacement nodes for consistency checks, if we have replacement nodes that are not among the failed instances.
- * This is to ensure that we are writing to sufficient non-replacement nodes as replacements can potentially fail leaving us with fewer nodes.
- * </pre>
- *
- * @param writeReplicas the set of replicas for write operations
- * @param pendingReplicas the set of replicas pending status
- * @param replacementInstances the set of instances that are replacing the other instances
- * @param blockedInstances the set of instances that have been blocked for the bulk operation
- * @param failedInstanceIps the set of instances where there were failures
- * @param localDC the local datacenter used for consistency level, or {@code null} if not provided
- * @return {@code true} if the consistency has been met, {@code false} otherwise
- */
- boolean checkConsistency(Set<String> writeReplicas,
- Set<String> pendingReplicas,
- Set<String> replacementInstances,
- Set<String> blockedInstances,
- Set<String> failedInstanceIps,
- String localDC); // todo: simplify the parameter list. not all are required in impl
-
- // Check if successful writes forms quorum of non-replacing nodes - N/A as quorum is if there are no failures/blocked
enum CL implements ConsistencyLevel
{
ALL
@@ -96,26 +79,13 @@
}
@Override
- public boolean checkConsistency(Set<String> writeReplicas,
- Set<String> pendingReplicas,
- Set<String> replacementInstances,
- Set<String> blockedInstances,
- Set<String> failedInstanceIps,
- String localDC)
- {
- int failedExcludingReplacements = failedInstanceIps.size() - replacementInstances.size();
- return failedExcludingReplacements <= 0 && blockedInstances.isEmpty();
- }
-
- @Override
public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+ Collection<? extends CassandraInstance> pendingInstances,
ReplicationFactor replicationFactor,
String localDC)
{
- int rf = replicationFactor.getTotalReplicationFactor();
- // The effective RF during expansion could be larger than the defined RF
- // The check for CL satisfaction should consider the scenario and use >=
- return succeededInstances.size() >= rf;
+ int blockedFor = replicationFactor.getTotalReplicationFactor() + pendingInstances.size();
+ return succeededInstances.size() >= blockedFor;
}
},
@@ -128,31 +98,20 @@
}
@Override
- public boolean checkConsistency(Set<String> writeReplicas,
- Set<String> pendingReplicas,
- Set<String> replacementInstances,
- Set<String> blockedInstances,
- Set<String> failedInstanceIps,
- String localDC)
- {
- return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - (writeReplicas.size() / 2 + 1));
- }
-
- @Override
public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+ Collection<? extends CassandraInstance> pendingInstances,
ReplicationFactor replicationFactor,
- String localDC)
+ String localDC) // localDc is ignored for EACH_QUORUM
{
ensureNetworkTopologyStrategy(replicationFactor, EACH_QUORUM);
- Objects.requireNonNull(localDC, "localDC cannot be null");
for (String datacenter : replicationFactor.getOptions().keySet())
{
int rf = replicationFactor.getOptions().get(datacenter);
- int majority = rf / 2 + 1;
- if (succeededInstances.stream()
- .filter(instance -> instance.datacenter().equalsIgnoreCase(datacenter))
- .count() < majority)
+ int pendingCountOfDc = countInDc(pendingInstances, datacenter);
+ int succeededCountOfDc = countInDc(succeededInstances, datacenter);
+ int blockedFor = quorum(rf) + pendingCountOfDc;
+ if (succeededCountOfDc < blockedFor)
{
return false;
}
@@ -169,23 +128,14 @@
}
@Override
- public boolean checkConsistency(Set<String> writeReplicas,
- Set<String> pendingReplicas,
- Set<String> replacementInstances,
- Set<String> blockedInstances,
- Set<String> failedInstanceIps,
- String localDC)
- {
- return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - (writeReplicas.size() / 2 + 1));
- }
-
- @Override
public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+ Collection<? extends CassandraInstance> pendingInstances,
ReplicationFactor replicationFactor,
String localDC)
{
int rf = replicationFactor.getTotalReplicationFactor();
- return succeededInstances.size() > rf / 2;
+ int blockedFor = quorum(rf) + pendingInstances.size();
+ return succeededInstances.size() >= blockedFor;
}
},
LOCAL_QUORUM
@@ -197,18 +147,8 @@
}
@Override
- public boolean checkConsistency(Set<String> writeReplicas,
- Set<String> pendingReplicas,
- Set<String> replacementInstances,
- Set<String> blockedInstances,
- Set<String> failedInstanceIps,
- String localDC)
- {
- return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - (writeReplicas.size() / 2 + 1));
- }
-
- @Override
public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+ Collection<? extends CassandraInstance> pendingInstances,
ReplicationFactor replicationFactor,
String localDC)
{
@@ -216,9 +156,10 @@
Objects.requireNonNull(localDC, "localDC cannot be null");
int rf = replicationFactor.getOptions().get(localDC);
- return succeededInstances.stream()
- .filter(instance -> instance.datacenter().equalsIgnoreCase(localDC))
- .count() > rf / 2;
+ int pendingCountInDc = countInDc(pendingInstances, localDC);
+ int succeededCountInDc = countInDc(succeededInstances, localDC);
+ int blockFor = quorum(rf) + pendingCountInDc;
+ return succeededCountInDc >= blockFor;
}
},
ONE
@@ -230,23 +171,13 @@
}
@Override
- public boolean checkConsistency(Set<String> writeReplicas,
- Set<String> pendingReplicas,
- Set<String> replacementInstances,
- Set<String> blockedInstances,
- Set<String> failedInstanceIps,
- String localDC)
- {
- return (failedInstanceIps.size() + blockedInstances.size())
- <= (writeReplicas.size() - pendingReplicas.size() - 1);
- }
-
- @Override
public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+ Collection<? extends CassandraInstance> pendingInstances,
ReplicationFactor replicationFactor,
String localDC)
{
- return !succeededInstances.isEmpty();
+ int blockFor = 1 + pendingInstances.size();
+ return succeededInstances.size() >= blockFor;
}
},
TWO
@@ -258,23 +189,13 @@
}
@Override
- public boolean checkConsistency(Set<String> writeReplicas,
- Set<String> pendingReplicas,
- Set<String> replacementInstances,
- Set<String> blockedInstances,
- Set<String> failedInstanceIps,
- String localDC)
- {
- return (failedInstanceIps.size() + blockedInstances.size())
- <= (writeReplicas.size() - pendingReplicas.size() - 2);
- }
-
- @Override
public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+ Collection<? extends CassandraInstance> pendingInstances,
ReplicationFactor replicationFactor,
String localDC)
{
- return succeededInstances.size() >= 2;
+ int blockFor = 2 + pendingInstances.size();
+ return succeededInstances.size() >= blockFor;
}
},
LOCAL_ONE
@@ -286,26 +207,29 @@
}
@Override
- public boolean checkConsistency(Set<String> writeReplicas,
- Set<String> pendingReplicas,
- Set<String> replacementInstances,
- Set<String> blockedInstances,
- Set<String> failedInstanceIps,
- String localDC)
- {
- return (failedInstanceIps.size() + blockedInstances.size()) <= (writeReplicas.size() - pendingReplicas.size() - 1);
- }
-
- @Override
public boolean canBeSatisfied(Collection<? extends CassandraInstance> succeededInstances,
+ Collection<? extends CassandraInstance> pendingInstances,
ReplicationFactor replicationFactor,
String localDC)
{
ensureNetworkTopologyStrategy(replicationFactor, LOCAL_ONE);
Objects.requireNonNull(localDC, "localDC cannot be null");
- return succeededInstances.stream().anyMatch(instance -> instance.datacenter().equalsIgnoreCase(localDC));
+ int pendingCountInDc = countInDc(pendingInstances, localDC);
+ int succeededCountInDc = countInDc(succeededInstances, localDC);
+ int blockFor = pendingCountInDc + 1;
+ return succeededCountInDc >= blockFor;
}
};
+
+ private static int quorum(int replicaSetSize)
+ {
+ return replicaSetSize / 2 + 1;
+ }
+
+ private static int countInDc(Collection<? extends CassandraInstance> instances, String localDC)
+ {
+ return (int) instances.stream().filter(i -> i.datacenter().equalsIgnoreCase(localDC)).count();
+ }
}
}
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java
index 06382f8..fe2c36b 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/ReplicaAwareFailureHandler.java
@@ -20,13 +20,12 @@
package org.apache.cassandra.spark.bulkwriter.token;
import java.math.BigInteger;
-import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
@@ -37,16 +36,72 @@
import com.google.common.collect.TreeRangeMap;
import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.common.model.NodeStatus;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.Nullable;
public class ReplicaAwareFailureHandler<Instance extends CassandraInstance>
{
- private final RangeMap<BigInteger, Multimap<Instance, String>> failedRangesMap = TreeRangeMap.create();
+ public class FailuresPerInstance
+ {
+ private final Multimap<Instance, String> errorMessagesPerInstance;
+
+ public FailuresPerInstance()
+ {
+ this.errorMessagesPerInstance = ArrayListMultimap.create();
+ }
+
+ public FailuresPerInstance(Multimap<Instance, String> errorMessagesPerInstance)
+ {
+ this.errorMessagesPerInstance = ArrayListMultimap.create(errorMessagesPerInstance);
+ }
+
+ public FailuresPerInstance copy()
+ {
+ return new FailuresPerInstance(this.errorMessagesPerInstance);
+ }
+
+ public Set<Instance> instances()
+ {
+ return errorMessagesPerInstance.keySet();
+ }
+
+ public void addErrorForInstance(Instance instance, String errorMessage)
+ {
+ errorMessagesPerInstance.put(instance, errorMessage);
+ }
+
+ public boolean hasError(Instance instance)
+ {
+ return errorMessagesPerInstance.containsKey(instance)
+ && !errorMessagesPerInstance.get(instance).isEmpty();
+ }
+
+ public void forEachInstance(BiConsumer<Instance, Collection<String>> instanceErrorsConsumer)
+ {
+ errorMessagesPerInstance.asMap().forEach(instanceErrorsConsumer);
+ }
+ }
+
+ public class ConsistencyFailurePerRange
+ {
+ public final Range<BigInteger> range;
+ public final FailuresPerInstance failuresPerInstance;
+
+ public ConsistencyFailurePerRange(Range<BigInteger> range, FailuresPerInstance failuresPerInstance)
+ {
+ this.range = range;
+ this.failuresPerInstance = failuresPerInstance;
+ }
+ }
+
+ // failures captures per each range; note that failures do not necessarily fail a range, as long as consistency level is considered
+ private final RangeMap<BigInteger, FailuresPerInstance> rangeFailuresMap = TreeRangeMap.create();
public ReplicaAwareFailureHandler(Partitioner partitioner)
{
- failedRangesMap.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), ArrayListMultimap.create());
+ rangeFailuresMap.put(Range.openClosed(partitioner.minToken(), partitioner.maxToken()), new FailuresPerInstance());
}
/**
@@ -66,25 +121,25 @@
*/
public synchronized void addFailure(Range<BigInteger> tokenRange, Instance casInstance, String errMessage)
{
- RangeMap<BigInteger, Multimap<Instance, String>> overlappingFailures = failedRangesMap.subRangeMap(tokenRange);
- RangeMap<BigInteger, Multimap<Instance, String>> mappingsToAdd = TreeRangeMap.create();
+ RangeMap<BigInteger, FailuresPerInstance> overlappingFailures = rangeFailuresMap.subRangeMap(tokenRange);
+ RangeMap<BigInteger, FailuresPerInstance> mappingsToAdd = TreeRangeMap.create();
- for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> entry : overlappingFailures.asMapOfRanges().entrySet())
+ for (Map.Entry<Range<BigInteger>, FailuresPerInstance> entry : overlappingFailures.asMapOfRanges().entrySet())
{
- Multimap<Instance, String> newErrorMap = ArrayListMultimap.create(entry.getValue());
- newErrorMap.put(casInstance, errMessage);
+ FailuresPerInstance newErrorMap = entry.getValue().copy();
+ newErrorMap.addErrorForInstance(casInstance, errMessage);
mappingsToAdd.put(entry.getKey(), newErrorMap);
}
- failedRangesMap.putAll(mappingsToAdd);
+ rangeFailuresMap.putAll(mappingsToAdd);
}
public Set<Instance> getFailedInstances()
{
- return failedRangesMap.asMapOfRanges().values()
- .stream()
- .map(Multimap::keySet)
- .flatMap(Collection::stream)
- .collect(Collectors.toSet());
+ return rangeFailuresMap.asMapOfRanges().values()
+ .stream()
+ .map(FailuresPerInstance::instances)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toSet());
}
/**
@@ -94,119 +149,75 @@
* @param tokenRangeMapping the mapping of token ranges to a Cassandra instance
* @param cl the desired consistency level
* @param localDC the local datacenter
- * @return list of failed entries for token ranges that break consistency. This should ideally be empty for a
+ * @return list of failed token ranges that break consistency. This should ideally be empty for a
* successful operation.
*/
- public synchronized Collection<AbstractMap.SimpleEntry<Range<BigInteger>, Multimap<Instance, String>>>
- getFailedEntries(TokenRangeMapping<? extends CassandraInstance> tokenRangeMapping,
- ConsistencyLevel cl,
- String localDC)
+ public synchronized List<ConsistencyFailurePerRange>
+ getFailedRanges(TokenRangeMapping<Instance> tokenRangeMapping,
+ ConsistencyLevel cl,
+ @Nullable String localDC)
{
+ Preconditions.checkArgument((cl.isLocal() && localDC != null) || (!cl.isLocal() && localDC == null),
+ "Not a valid pair of consistency level configuration. " +
+ "Consistency level: " + cl + " localDc: " + localDC);
+ List<ConsistencyFailurePerRange> failedRanges = new ArrayList<>();
- List<AbstractMap.SimpleEntry<Range<BigInteger>, Multimap<Instance, String>>> failedEntries =
- new ArrayList<>();
-
- for (Map.Entry<Range<BigInteger>, Multimap<Instance, String>> failedRangeEntry : failedRangesMap.asMapOfRanges()
- .entrySet())
+ for (Map.Entry<Range<BigInteger>, FailuresPerInstance> failedRangeEntry : rangeFailuresMap.asMapOfRanges()
+ .entrySet())
{
- Multimap<Instance, String> errorMap = failedRangeEntry.getValue();
- Collection<Instance> failedInstances = errorMap.keySet()
- .stream()
- .filter(inst ->
- !errorMap.get(inst).isEmpty())
- .collect(Collectors.toList());
+ Range<BigInteger> range = failedRangeEntry.getKey();
+ FailuresPerInstance errorMap = failedRangeEntry.getValue();
+ Set<Instance> failedReplicas = errorMap.instances()
+ .stream()
+ .filter(errorMap::hasError)
+ .collect(Collectors.toSet());
-
- if (!validateConsistency(tokenRangeMapping, failedInstances, cl, localDC))
+ // no failures found for the range; skip consistency check on this one and move on
+ if (failedReplicas.isEmpty())
{
- failedEntries.add(new AbstractMap.SimpleEntry<>(failedRangeEntry.getKey(),
- failedRangeEntry.getValue()));
+ continue;
}
+
+ tokenRangeMapping.getWriteReplicasOfRange(range, localDC)
+ .forEach((subrange, liveAndDown) -> {
+ if (!checkSubrange(cl, localDC, tokenRangeMapping.replicationFactor(), liveAndDown, failedReplicas))
+ {
+ failedRanges.add(new ConsistencyFailurePerRange(subrange, errorMap));
+ }
+ });
}
- return failedEntries;
+ return failedRanges;
}
- private boolean validateConsistency(TokenRangeMapping<? extends CassandraInstance> tokenRangeMapping,
- Collection<Instance> failedInstances,
- ConsistencyLevel cl,
- String localDC)
+ /**
+ * Check whether a CL can be satisfied for each sub-range.
+ * @return true if consistency is satisfied; false otherwise.
+ */
+ private boolean checkSubrange(ConsistencyLevel cl,
+ @Nullable String localDC,
+ ReplicationFactor replicationFactor,
+ Set<Instance> liveAndDown,
+ Set<Instance> failedReplicas)
{
- boolean isConsistencyLevelMet = true;
+ Set<Instance> liveReplicas = liveAndDown.stream()
+ .filter(instance -> instance.nodeStatus() == NodeStatus.UP)
+ .collect(Collectors.toSet());
+ Set<Instance> pendingReplicas = liveAndDown.stream()
+ .filter(instance -> instance.nodeState().isPending)
+ .collect(Collectors.toSet());
+ // success is assumed if not failed
+ Set<Instance> succeededReplicas = liveReplicas.stream()
+ .filter(instance -> !failedReplicas.contains(instance))
+ .collect(Collectors.toSet());
- Set<String> failedInstanceIPs = failedInstances.stream()
- .map(CassandraInstance::ipAddress)
- .collect(Collectors.toSet());
- Set<String> datacenters = Collections.emptySet();
- ReplicationFactor replicationFactor = tokenRangeMapping.replicationFactor();
- if (cl == ConsistencyLevel.CL.EACH_QUORUM)
- {
- datacenters = replicationFactor.getOptions().keySet();
- Preconditions.checkArgument(replicationFactor.getReplicationStrategy() == ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
- "%s requires %s replication strategy", cl, ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy);
- }
-
- if (cl.isLocal())
- {
- datacenters = Collections.singleton(localDC);
- Preconditions.checkArgument(replicationFactor.getReplicationStrategy() == ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
- "%s requires %s replication strategy", cl, ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy);
- }
-
- if (!datacenters.isEmpty())
- {
- for (String dc : datacenters)
- {
- Set<String> failedIpsPerDC = failedInstances.stream()
- .filter(inst -> inst.datacenter().matches(dc))
- .map(CassandraInstance::ipAddress)
- .collect(Collectors.toSet());
-
- Set<String> dcWriteReplicas = maybeUpdateWriteReplicasForReplacements(tokenRangeMapping.getWriteReplicas(dc),
- tokenRangeMapping.getReplacementInstances(dc),
- failedIpsPerDC);
-
- isConsistencyLevelMet = isConsistencyLevelMet &&
- cl.checkConsistency(dcWriteReplicas,
- tokenRangeMapping.getPendingReplicas(dc),
- tokenRangeMapping.getReplacementInstances(dc),
- tokenRangeMapping.getBlockedInstances(dc),
- failedIpsPerDC,
- localDC);
- }
- }
- else
- {
- Set<String> replacementInstances = tokenRangeMapping.getReplacementInstances();
- Set<String> dcWriteReplicas = maybeUpdateWriteReplicasForReplacements(tokenRangeMapping.getWriteReplicas(),
- replacementInstances,
- failedInstanceIPs);
-
- isConsistencyLevelMet = cl.checkConsistency(dcWriteReplicas,
- tokenRangeMapping.getPendingReplicas(),
- replacementInstances,
- tokenRangeMapping.getBlockedInstances(),
- failedInstanceIPs,
- localDC);
- }
- return isConsistencyLevelMet;
+ return cl.canBeSatisfied(succeededReplicas, pendingReplicas, replicationFactor, localDC);
}
- public boolean hasFailed(TokenRangeMapping<? extends CassandraInstance> tokenRange,
+ public boolean hasFailed(TokenRangeMapping<Instance> tokenRange,
ConsistencyLevel cl,
String localDC)
{
- return !getFailedEntries(tokenRange, cl, localDC).isEmpty();
- }
-
- private static Set<String> maybeUpdateWriteReplicasForReplacements(Set<String> writeReplicas, Set<String> replacingInstances, Set<String> failedInstances)
- {
- // Exclude replacement nodes from write-replicas if replacements are NOT among failed instances
- if (!replacingInstances.isEmpty() && Collections.disjoint(failedInstances, replacingInstances))
- {
-
- return writeReplicas.stream().filter(r -> !replacingInstances.contains(r)).collect(Collectors.toSet());
- }
- return writeReplicas;
+ return !getFailedRanges(tokenRange, cl, localDC).isEmpty();
}
}
diff --git a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java
index 6e5ecac..b247916 100644
--- a/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java
+++ b/cassandra-analytics-core/src/main/java/org/apache/cassandra/spark/bulkwriter/token/TokenRangeMapping.java
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.base.Preconditions;
@@ -40,7 +41,9 @@
import org.apache.cassandra.spark.common.model.CassandraInstance;
import org.apache.cassandra.spark.data.ReplicationFactor;
import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.jetbrains.annotations.Nullable;
+// TODO: refactor to improve the return types of methods to use `Instance` instead of String and cleanup
public class TokenRangeMapping<Instance extends CassandraInstance> implements Serializable
{
private static final long serialVersionUID = -7284933683815811160L;
@@ -137,6 +140,31 @@
return new HashSet<>(pendingReplicasByDC.get(datacenter));
}
+ /**
+ * Get the write replicas of sub-ranges that overlap with the input range.
+ *
+ * @param range range to check. The range can potentially overlap with multiple ranges.
+ * For example, a down node adds one failure of a token range that covers multiple primary token ranges that replicate to it.
+ * @param localDc local DC name to filter out non-local-DC instances. The parameter is optional. When not present, i.e. null, no filtering is applied
+ * @return the write replicas of sub-ranges
+ */
+ public Map<Range<BigInteger>, Set<Instance>> getWriteReplicasOfRange(Range<BigInteger> range, @Nullable String localDc)
+ {
+ Map<Range<BigInteger>, List<Instance>> subRangeReplicas = replicasByTokenRange.subRangeMap(range).asMapOfRanges();
+ Function<List<Instance>, Set<Instance>> inDcInstances = instances -> {
+ if (localDc != null)
+ {
+ return instances.stream()
+ .filter(instance -> instance.datacenter().equalsIgnoreCase(localDc))
+ .collect(Collectors.toSet());
+ }
+ return new HashSet<>(instances);
+ };
+ return subRangeReplicas.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, entry -> inDcInstances.apply(entry.getValue())));
+ }
+
public Set<String> getWriteReplicas()
{
return (writeReplicasByDC == null || writeReplicasByDC.isEmpty())
@@ -168,7 +196,7 @@
public Set<String> getReplacementInstances()
{
return replacementInstances.stream()
- .map(RingInstance::ipAddress)
+ .map(RingInstance::ipAddressWithPort)
.collect(Collectors.toSet());
}
@@ -176,7 +204,7 @@
{
return replacementInstances.stream()
.filter(r -> r.datacenter().equalsIgnoreCase(datacenter))
- .map(RingInstance::ipAddress)
+ .map(RingInstance::ipAddressWithPort)
.collect(Collectors.toSet());
}
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
new file mode 100644
index 0000000..8f9aef9
--- /dev/null
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/BulkWriteValidatorTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cassandra.spark.bulkwriter;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.junit.jupiter.api.Test;
+
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse;
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaInfo;
+import o.a.c.sidecar.client.shaded.common.response.TokenRangeReplicasResponse.ReplicaMetadata;
+import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
+import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
+import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
+import org.apache.cassandra.spark.data.QualifiedTableName;
+import org.apache.cassandra.spark.data.ReplicationFactor;
+import org.apache.cassandra.spark.data.partitioner.Partitioner;
+import org.apache.cassandra.spark.exception.ConsistencyNotSatisfiedException;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class BulkWriteValidatorTest
+{
+ @Test
+ void testConsistencyCheckFailureWhenBlockedInstancesFailQuorum()
+ {
+ BulkWriterContext mockWriterContext = mock(BulkWriterContext.class);
+ ClusterInfo mockClusterInfo = mock(ClusterInfo.class);
+ when(mockWriterContext.cluster()).thenReturn(mockClusterInfo);
+
+ CassandraContext mockCassandraContext = mock(CassandraContext.class);
+ when(mockClusterInfo.getCassandraContext()).thenReturn(mockCassandraContext);
+ Map<String, String> replicationOptions = new HashMap<>();
+ replicationOptions.put("class", "SimpleStrategy");
+ replicationOptions.put("replication_factor", "3");
+ TokenRangeMapping<RingInstance> topology = CassandraClusterInfo.getTokenRangeReplicas(
+ () -> mockSimpleTokenRangeReplicasResponse(10, 3),
+ () -> Partitioner.Murmur3Partitioner,
+ () -> new ReplicationFactor(replicationOptions),
+ ringInstance -> {
+ int nodeId = Integer.parseInt(ringInstance.ipAddress().replace("localhost", ""));
+ return nodeId <= 2; // block nodes 0, 1, 2
+ });
+ when(mockClusterInfo.getTokenRangeMapping(anyBoolean())).thenReturn(topology);
+ Map<RingInstance, InstanceAvailability> instanceAvailabilityMap = new HashMap<>(10);
+ Set<String> blocked = topology.getBlockedInstances();
+ for (RingInstance instance : topology.getTokenRanges().keySet())
+ {
+ String ip = instance.ringInstance().address();
+ instanceAvailabilityMap.put(instance, blocked.contains(ip) ? InstanceAvailability.UNAVAILABLE_BLOCKED : InstanceAvailability.AVAILABLE);
+ }
+ when(mockClusterInfo.getInstanceAvailability()).thenReturn(instanceAvailabilityMap);
+
+ JobInfo mockJobInfo = mock(JobInfo.class);
+ UUID jobId = UUID.randomUUID();
+ when(mockJobInfo.getId()).thenReturn(jobId.toString());
+ when(mockJobInfo.getRestoreJobId()).thenReturn(jobId);
+ when(mockJobInfo.qualifiedTableName()).thenReturn(new QualifiedTableName("testkeyspace", "testtable"));
+ when(mockJobInfo.getConsistencyLevel()).thenReturn(ConsistencyLevel.CL.QUORUM);
+ when(mockJobInfo.effectiveSidecarPort()).thenReturn(9043);
+ when(mockJobInfo.jobKeepAliveMinutes()).thenReturn(-1);
+ when(mockWriterContext.job()).thenReturn(mockJobInfo);
+
+ BulkWriteValidator writerValidator = new BulkWriteValidator(mockWriterContext, new ReplicaAwareFailureHandler<>(Partitioner.Murmur3Partitioner));
+ assertThatThrownBy(() -> writerValidator.validateClOrFail(topology))
+ .isExactlyInstanceOf(ConsistencyNotSatisfiedException.class)
+ .hasMessageContaining("Failed to write");
+ }
+
+ private TokenRangeReplicasResponse mockSimpleTokenRangeReplicasResponse(int instancesCount, int replicationFactor)
+ {
+ long startToken = 0;
+ long rangeLength = 100;
+ List<ReplicaInfo> replicaInfoList = new ArrayList<>(instancesCount);
+ Map<String, ReplicaMetadata> replicaMetadata = new HashMap<>(instancesCount);
+ for (int i = 0; i < instancesCount; i++)
+ {
+ long endToken = startToken + rangeLength;
+ List<String> replicas = new ArrayList<>(replicationFactor);
+ for (int r = 0; r < replicationFactor; r++)
+ {
+ replicas.add("localhost" + (i + r) % instancesCount);
+ }
+ Map<String, List<String>> replicasPerDc = new HashMap<>();
+ replicasPerDc.put("ignored", replicas);
+ ReplicaInfo ri = new ReplicaInfo(String.valueOf(startToken), String.valueOf(endToken), replicasPerDc);
+ replicaInfoList.add(ri);
+ String address = "localhost" + i;
+ ReplicaMetadata rm = new ReplicaMetadata("NORMAL", "UP", address, address, 9042, "ignored");
+ replicaMetadata.put(address, rm);
+ startToken = endToken;
+ }
+ return new TokenRangeReplicasResponse(replicaInfoList, replicaInfoList, replicaMetadata);
+ }
+}
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java
index 966de28..ae7a839 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/DirectStreamSessionTest.java
@@ -62,7 +62,7 @@
public class DirectStreamSessionTest
{
- public static final String LOAD_RANGE_ERROR_PREFIX = "Failed to load 1 ranges with LOCAL_QUORUM";
+ public static final String LOAD_RANGE_ERROR_PREFIX = "Failed to write 1 ranges with LOCAL_QUORUM";
private static final Map<String, Object> COLUMN_BOUND_VALUES = ImmutableMap.of("id", 0, "date", 1, "course", "course", "marks", 2);
@TempDir
private Path folder;
@@ -88,7 +88,7 @@
tableWriter = new MockTableWriter(folder);
transportContext = (TransportContext.DirectDataBulkWriterContext) writerContext.transportContext();
executor = new MockScheduledExecutorService();
- expectedInstances = Lists.newArrayList("DC1-i1", "DC1-i2", "DC1-i3");
+ expectedInstances = Lists.newArrayList("DC1-i2", "DC1-i3", "DC1-i4");
}
@Test
@@ -246,7 +246,7 @@
ss.addRow(BigInteger.valueOf(102L), COLUMN_BOUND_VALUES);
ExecutionException exception = assertThrows(ExecutionException.class,
() -> ss.finalizeStreamAsync().get());
- assertEquals("Failed to load 1 ranges with LOCAL_QUORUM for job " + writerContext.job().getId()
+ assertEquals("Failed to write 1 ranges with LOCAL_QUORUM for job " + writerContext.job().getId()
+ " in phase UploadAndCommit.", exception.getCause().getMessage());
executor.assertFuturesCalled();
assertThat(writerContext.getUploads().values().stream().mapToInt(Collection::size).sum(), equalTo(RF * FILES_PER_SSTABLE));
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
index fa886a4..f84280f 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/ImportCompletionCoordinatorTest.java
@@ -188,7 +188,7 @@
.waitForCompletion();
});
assertNotNull(exception.getMessage());
- assertTrue(exception.getMessage().contains("Failed to load"));
+ assertTrue(exception.getMessage().contains("Failed to write"), "Actual error message: " + exception.getMessage());
assertTrue(exception.getMessage().contains(errorMessage));
assertNotNull(exception.getCause());
validateAllSlicesWereCalledAtMostOnce(resultList);
@@ -407,8 +407,8 @@
int noProgressPerReplicaSet = noProgressInstanceCount;
// create one distinct slice per instance
CreateSliceRequestPayload mockCreateSliceRequestPayload = mock(CreateSliceRequestPayload.class);
- when(mockCreateSliceRequestPayload.startToken()).thenReturn(BigInteger.valueOf(100 * i));
- when(mockCreateSliceRequestPayload.endToken()).thenReturn(BigInteger.valueOf(100 * (1 + i)));
+ when(mockCreateSliceRequestPayload.startToken()).thenReturn(BigInteger.valueOf(100 * (i - 1)));
+ when(mockCreateSliceRequestPayload.endToken()).thenReturn(BigInteger.valueOf(100 * i));
when(mockCreateSliceRequestPayload.sliceId()).thenReturn(UUID.randomUUID().toString());
when(mockCreateSliceRequestPayload.key()).thenReturn("key_for_instance_" + i); // to be captured by extension mock
when(mockCreateSliceRequestPayload.bucket()).thenReturn("bucket"); // to be captured by extension mock
@@ -495,8 +495,8 @@
return new RingInstance(new RingEntry.Builder()
.datacenter("DC1")
.address("127.0.0." + instanceInRing)
- .token(String.valueOf(i * 100))
- .fqdn("instance-" + instanceInRing)
+ .token(String.valueOf(i * 100_000))
+ .fqdn("DC1-i" + instanceInRing)
.build());
}
}
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
index 23813ab..1a86680 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/MockBulkWriterContext.java
@@ -231,7 +231,11 @@
@Override
public String getLocalDC()
{
- return "DC1";
+ if (getConsistencyLevel().isLocal())
+ {
+ return "DC1";
+ }
+ return null;
}
@Override
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
index bb2c4f9..455dc40 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RecordWriterTest.java
@@ -149,7 +149,6 @@
@Test
void testWriteWithBlockedInstances()
{
-
String blockedInstanceIp = "127.0.0.2";
TokenRangeMapping<RingInstance> testMapping =
TokenRangeMappingUtils.buildTokenRangeMappingWithBlockedInstance(100000,
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
index f08a2a5..f48ffbf 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/RingInstanceTest.java
@@ -108,7 +108,7 @@
.multiply(BigInteger.valueOf(index))).subtract(BigInteger.valueOf(2).pow(63));
}
- private static ReplicaAwareFailureHandler<CassandraInstance> ntsStrategyHandler(Partitioner partitioner)
+ private static ReplicaAwareFailureHandler<RingInstance> ntsStrategyHandler(Partitioner partitioner)
{
return new ReplicaAwareFailureHandler<>(partitioner);
}
@@ -211,15 +211,16 @@
Partitioner partitioner = Partitioner.Murmur3Partitioner;
BigInteger[] tokens = getTokens(partitioner, 5);
List<RingInstance> instances = getInstances(tokens, DATACENTER_1);
- CassandraInstance instance1 = instances.get(0);
- CassandraInstance instance2 = instance(tokens[0], instance1.nodeName(), instance1.datacenter(), "?");
- ReplicaAwareFailureHandler<CassandraInstance> replicationFactor3 = ntsStrategyHandler(partitioner);
+ RingInstance instance1 = instances.get(0);
+ RingInstance instance2 = instance(tokens[0], instance1.nodeName(), instance1.datacenter(), "?");
+ ReplicaAwareFailureHandler<RingInstance> replicationFactor3 = ntsStrategyHandler(partitioner);
ReplicationFactor repFactor = new ReplicationFactor(ReplicationFactor.ReplicationStrategy.NetworkTopologyStrategy,
ntsOptions(new String[]{DATACENTER_1 }, new int[]{3 }));
- Map<String, Set<String>> writeReplicas = instances.stream()
- .collect(Collectors.groupingBy(CassandraInstance::datacenter,
- Collectors.mapping(CassandraInstance::nodeName,
- Collectors.toSet())));
+ Map<String, Set<String>> writeReplicas =
+ instances.stream().collect(Collectors.groupingBy(CassandraInstance::datacenter,
+ // writeReplicas are ultimately created from StorageService#getRangeToEndpointMap in Cassandra
+ // The returned values are ip addresses.
+ Collectors.mapping(CassandraInstance::ipAddressWithPort, Collectors.toSet())));
Multimap<RingInstance, Range<BigInteger>> tokenRanges = TokenRangeMappingUtils.setupTokenRangeMap(partitioner, repFactor, instances);
TokenRangeMapping<RingInstance> tokenRange = new TokenRangeMapping<>(partitioner,
repFactor,
@@ -236,7 +237,7 @@
replicationFactor3.addFailure(Range.openClosed(tokens[0].add(BigInteger.ONE),
tokens[0].add(BigInteger.valueOf(2L))), instance2, "Failure 2");
- replicationFactor3.getFailedEntries(tokenRange, ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1);
+ replicationFactor3.getFailedRanges(tokenRange, ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1);
assertFalse(replicationFactor3.hasFailed(tokenRange, ConsistencyLevel.CL.LOCAL_QUORUM, DATACENTER_1));
}
}
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
index 2e2265d..8be92d2 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/StreamSessionConsistencyTest.java
@@ -45,6 +45,7 @@
import org.apache.cassandra.spark.bulkwriter.token.ReplicaAwareFailureHandler;
import org.apache.cassandra.spark.bulkwriter.token.TokenRangeMapping;
import org.apache.cassandra.spark.common.model.CassandraInstance;
+import org.apache.cassandra.spark.exception.ConsistencyNotSatisfiedException;
import org.apache.cassandra.spark.utils.DigestAlgorithm;
import org.apache.cassandra.spark.utils.XXHash32DigestAlgorithm;
@@ -52,6 +53,7 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class StreamSessionConsistencyTest
@@ -59,7 +61,8 @@
private static final int NUMBER_DCS = 2;
private static final int FILES_PER_SSTABLE = 8;
private static final int REPLICATION_FACTOR = 3;
- private static final List<String> EXPECTED_INSTANCES = ImmutableList.of("DC1-i1", "DC1-i2", "DC1-i3", "DC2-i1", "DC2-i2", "DC2-i3");
+ // range [101, 199] is replicated to those instances. i1 has token 0/1, which are before the range
+ private static final List<String> EXPECTED_INSTANCES = ImmutableList.of("DC1-i2", "DC1-i3", "DC1-i4", "DC2-i2", "DC2-i3", "DC2-i4");
private static final Range<BigInteger> RANGE = Range.range(BigInteger.valueOf(101L), BoundType.CLOSED, BigInteger.valueOf(199L), BoundType.CLOSED);
private static final TokenRangeMapping<RingInstance> TOKEN_RANGE_MAPPING =
TokenRangeMappingUtils.buildTokenRangeMapping(0, ImmutableMap.of("DC1", 3, "DC2", 3), 6);
@@ -82,7 +85,7 @@
return clsToFailures.stream().map(List::toArray).collect(Collectors.toList());
}
- private void setup(ConsistencyLevel.CL consistencyLevel, List<Integer> failuresPerDc)
+ private void setup(ConsistencyLevel.CL consistencyLevel)
{
digestAlgorithm = new XXHash32DigestAlgorithm();
tableWriter = new MockTableWriter(folder);
@@ -96,20 +99,21 @@
List<Integer> failuresPerDc)
throws IOException, ExecutionException, InterruptedException
{
- setup(consistencyLevel, failuresPerDc);
+ setup(consistencyLevel);
AtomicInteger dc1Failures = new AtomicInteger(failuresPerDc.get(0));
AtomicInteger dc2Failures = new AtomicInteger(failuresPerDc.get(1));
ImmutableMap<String, AtomicInteger> dcFailures = ImmutableMap.of("DC1", dc1Failures, "DC2", dc2Failures);
boolean shouldFail = calculateFailure(consistencyLevel, dc1Failures.get(), dc2Failures.get());
- // Return successful result for 1st result, failed for the rest
writerContext.setCommitResultSupplier((uuids, dc) -> {
if (dcFailures.get(dc).getAndDecrement() > 0)
{
- return new DirectDataTransferApi.RemoteCommitResult(false, null, uuids, "");
+ // failure
+ return new DirectDataTransferApi.RemoteCommitResult(false, uuids, null, "");
}
else
{
- return new DirectDataTransferApi.RemoteCommitResult(true, uuids, null, "");
+ // success
+ return new DirectDataTransferApi.RemoteCommitResult(true, null, null, "");
}
});
StreamSession<?> streamSession = createStreamSession(NonValidatingTestSortedSSTableWriter::new);
@@ -118,7 +122,7 @@
if (shouldFail)
{
ExecutionException exception = assertThrows(ExecutionException.class, fut::get);
- assertEquals("Failed to load 1 ranges with " + consistencyLevel
+ assertEquals("Failed to write 1 ranges with " + consistencyLevel
+ " for job " + writerContext.job().getId()
+ " in phase UploadAndCommit.", exception.getCause().getMessage());
}
@@ -143,7 +147,7 @@
List<Integer> failuresPerDc)
throws IOException, ExecutionException, InterruptedException
{
- setup(consistencyLevel, failuresPerDc);
+ setup(consistencyLevel);
AtomicInteger dc1Failures = new AtomicInteger(failuresPerDc.get(0));
AtomicInteger dc2Failures = new AtomicInteger(failuresPerDc.get(1));
int numFailures = dc1Failures.get() + dc2Failures.get();
@@ -156,7 +160,8 @@
if (shouldFail)
{
ExecutionException exception = assertThrows(ExecutionException.class, fut::get);
- assertEquals("Failed to load 1 ranges with " + consistencyLevel
+ assertInstanceOf(ConsistencyNotSatisfiedException.class, exception.getCause());
+ assertEquals("Failed to write 1 ranges with " + consistencyLevel
+ " for job " + writerContext.job().getId()
+ " in phase UploadAndCommit.", exception.getCause().getMessage());
}
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
index bab7db5..117e78c 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/TokenRangeMappingUtils.java
@@ -65,9 +65,11 @@
.get();
ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
Map<String, Set<String>> writeReplicas =
- instances.stream().collect(Collectors.groupingBy(RingInstance::datacenter,
- Collectors.mapping(RingInstance::nodeName,
- Collectors.toSet())));
+ instances.stream()
+ .collect(Collectors.groupingBy(RingInstance::datacenter,
+ // writeReplicas are ultimately created from StorageService#getRangeToEndpointMap in Cassandra
+ // The returned values are ip addresses with port.
+ Collectors.mapping(RingInstance::ipAddressWithPort, Collectors.toSet())));
writeReplicas.replaceAll((key, value) -> {
value.removeIf(e -> value.size() > 3);
return value;
@@ -77,7 +79,7 @@
.map(i -> new ReplicaMetadata(i.ringInstance().state(),
i.ringInstance().status(),
i.nodeName(),
- i.ipAddress(),
+ i.ipAddressWithPort(),
7012,
i.datacenter()))
.collect(Collectors.toList());
@@ -118,18 +120,16 @@
ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
Map<String, Set<String>> writeReplicas =
instances.stream().collect(Collectors.groupingBy(RingInstance::datacenter,
- Collectors.mapping(RingInstance::nodeName,
+ // writeReplicas are ultimately created from StorageService#getRangeToEndpointMap in Cassandra
+ // The returned values are ip addresses with port.
+ Collectors.mapping(RingInstance::ipAddressWithPort,
Collectors.toSet())));
- writeReplicas.replaceAll((key, value) -> {
- value.removeIf(e -> value.size() > 3);
- return value;
- });
List<ReplicaMetadata> replicaMetadata = instances.stream()
.map(i -> new ReplicaMetadata(i.ringInstance().state(),
i.ringInstance().status(),
i.nodeName(),
- i.ipAddress(),
+ i.ipAddressWithPort(),
7012,
i.datacenter()))
.collect(Collectors.toList());
@@ -177,18 +177,16 @@
ReplicationFactor replicationFactor = getReplicationFactor(rfByDC);
Map<String, Set<String>> writeReplicas =
instances.stream().collect(Collectors.groupingBy(RingInstance::datacenter,
- Collectors.mapping(RingInstance::nodeName,
+ // writeReplicas are ultimately created from StorageService#getRangeToEndpointMap in Cassandra
+ // The returned values are ip addresses with port.
+ Collectors.mapping(RingInstance::ipAddressWithPort,
Collectors.toSet())));
- writeReplicas.replaceAll((key, value) -> {
- value.removeIf(e -> value.size() > 3);
- return value;
- });
List<ReplicaMetadata> replicaMetadata = instances.stream()
.map(i -> new ReplicaMetadata(i.ringInstance().state(),
i.ringInstance().status(),
i.nodeName(),
- i.ipAddress(),
+ i.ipAddressWithPort(),
7012,
i.datacenter()))
.collect(Collectors.toList());
@@ -261,13 +259,14 @@
String datacenter = rfForDC.getKey();
for (int i = 0; i < instancesPerDc; i++)
{
+ int instanceId = i + 1;
RingEntry.Builder ringEntry = new RingEntry.Builder()
- .address("127.0." + dcOffset + "." + i)
+ .address("127.0." + dcOffset + "." + instanceId)
.datacenter(datacenter)
.load("0")
// Single DC tokens will be in multiples of 100000
.token(Integer.toString(initialToken + dcOffset + 100_000 * i))
- .fqdn(datacenter + "-i" + i)
+ .fqdn(datacenter + "-i" + instanceId)
.rack("Rack")
.hostId("")
.status("UP")
diff --git a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/BulkWriterConsistencyLevelTest.java b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/BulkWriterConsistencyLevelTest.java
index 858934a..e02a3d3 100644
--- a/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/BulkWriterConsistencyLevelTest.java
+++ b/cassandra-analytics-core/src/test/java/org/apache/cassandra/spark/bulkwriter/token/BulkWriterConsistencyLevelTest.java
@@ -21,10 +21,9 @@
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.BeforeAll;
@@ -42,27 +41,51 @@
{
private static final ReplicationFactor replicationFactor = new ReplicationFactor(ImmutableMap.of(
"class", "NetworkTopologyStrategy",
- "dc1", "3"));
+ "dc1", "3",
+ "dc2", "3"));
private static List<CassandraInstance> succeededNone = Collections.emptyList();
private static List<CassandraInstance> succeededOne;
+ private static List<CassandraInstance> succeededOnePerDc;
private static List<CassandraInstance> succeededTwo;
+ private static List<CassandraInstance> succeededTwoPerDc;
private static List<CassandraInstance> succeededThree;
+ private static List<CassandraInstance> succeededThreePerDc;
+ private static List<CassandraInstance> succeededQuorum; // across DC
+ private static List<CassandraInstance> succeededHalf; // across DC; half < quorum
+ private static List<CassandraInstance> succeededAll; // all the nodes
- private static Set<String> zero = Collections.emptySet();
- private static Set<String> one = intToSet(1);
- private static Set<String> two = intToSet(2);
- private static Set<String> three = intToSet(3);
+ private static Set<CassandraInstance> zero = Collections.emptySet();
+ private static Set<CassandraInstance> onePerDc = intToSet(1);
+ private static Set<CassandraInstance> one = intToSet(1, true);
+ private static Set<CassandraInstance> twoPerDc = intToSet(2);
+ private static Set<CassandraInstance> two = intToSet(2, true);
+ private static Set<CassandraInstance> threePerDc = intToSet(3);
+ private static Set<CassandraInstance> three = intToSet(3, true);
@BeforeAll
static void setup()
{
- CassandraInstance i1 = mockInstance("dc1");
- CassandraInstance i2 = mockInstance("dc1");
- CassandraInstance i3 = mockInstance("dc1");
- succeededOne = Arrays.asList(i1);
- succeededTwo = Arrays.asList(i1, i2);
- succeededThree = Arrays.asList(i1, i2, i3);
+ CassandraInstance dc1i1 = mockInstance("dc1");
+ CassandraInstance dc1i2 = mockInstance("dc1");
+ CassandraInstance dc1i3 = mockInstance("dc1");
+ CassandraInstance dc2i1 = mockInstance("dc2");
+ CassandraInstance dc2i2 = mockInstance("dc2");
+ CassandraInstance dc2i3 = mockInstance("dc2");
+ succeededOne = Arrays.asList(dc1i1);
+ succeededTwo = Arrays.asList(dc1i1, dc1i2);
+ succeededThree = Arrays.asList(dc1i1, dc1i2, dc1i3);
+ succeededOnePerDc = Arrays.asList(dc1i1,
+ dc2i1);
+ succeededTwoPerDc = Arrays.asList(dc1i1, dc1i2,
+ dc2i1, dc2i2);
+ succeededThreePerDc = Arrays.asList(dc1i1, dc1i2, dc1i3,
+ dc2i1, dc2i2, dc2i3);
+ succeededQuorum = Arrays.asList(dc1i1, dc1i2, dc1i3,
+ dc2i1);
+ succeededHalf = Arrays.asList(dc1i1, dc1i2,
+ dc2i1);
+ succeededAll = succeededThreePerDc;
}
@Test
@@ -82,13 +105,14 @@
testCanBeSatisfied(CL.LOCAL_QUORUM, succeededTwo, true);
testCanBeSatisfied(CL.LOCAL_QUORUM, succeededThree, true);
- testCanBeSatisfied(CL.EACH_QUORUM, succeededTwo, true);
- testCanBeSatisfied(CL.EACH_QUORUM, succeededThree, true);
+ testCanBeSatisfied(CL.EACH_QUORUM, succeededTwoPerDc, true);
+ testCanBeSatisfied(CL.EACH_QUORUM, succeededThreePerDc, true);
- testCanBeSatisfied(CL.QUORUM, succeededTwo, true);
- testCanBeSatisfied(CL.QUORUM, succeededThree, true);
+ testCanBeSatisfied(CL.QUORUM, succeededTwoPerDc, true);
+ testCanBeSatisfied(CL.QUORUM, succeededThreePerDc, true);
+ testCanBeSatisfied(CL.QUORUM, succeededQuorum, true);
- testCanBeSatisfied(CL.ALL, succeededThree, true);
+ testCanBeSatisfied(CL.ALL, succeededAll, true);
}
@Test
@@ -106,79 +130,107 @@
testCanBeSatisfied(CL.EACH_QUORUM, succeededNone, false);
testCanBeSatisfied(CL.EACH_QUORUM, succeededOne, false);
+ testCanBeSatisfied(CL.EACH_QUORUM, succeededOnePerDc, false);
testCanBeSatisfied(CL.QUORUM, succeededNone, false);
testCanBeSatisfied(CL.QUORUM, succeededOne, false);
+ testCanBeSatisfied(CL.QUORUM, succeededOnePerDc, false);
+ testCanBeSatisfied(CL.QUORUM, succeededTwo, false);
+ testCanBeSatisfied(CL.QUORUM, succeededHalf, false);
testCanBeSatisfied(CL.ALL, succeededNone, false);
testCanBeSatisfied(CL.ALL, succeededOne, false);
+ testCanBeSatisfied(CL.ALL, succeededOnePerDc, false);
testCanBeSatisfied(CL.ALL, succeededTwo, false);
+ testCanBeSatisfied(CL.ALL, succeededTwoPerDc, false);
+ testCanBeSatisfied(CL.ALL, succeededQuorum, false);
+ testCanBeSatisfied(CL.ALL, succeededHalf, false);
}
@Test
- void testCheckConsistencyReturnsTrue()
+ void testCanBeSatisfiedWithPendingReturnsTrue()
{
- testCheckConsistency(CL.ONE, /* total */ three, /* failed */ zero, zero, true);
- testCheckConsistency(CL.ONE, /* total */ three, /* failed */ one, zero, true);
- testCheckConsistency(CL.ONE, /* total */ three, /* failed */ two, zero, true);
+ testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ three, /* pending */ zero, true);
+ testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ three, /* pending */ two, true);
+ testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ three, /* pending */ one, true);
+ testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ two, /* pending */ zero, true);
+ testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ two, /* pending */ one, true);
+ testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ one, /* pending */ zero, true);
- testCheckConsistency(CL.TWO, /* total */ three, /* failed */ zero, zero, true);
- testCheckConsistency(CL.TWO, /* total */ three, /* failed */ one, zero, true);
+ testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ three, /* pending */ zero, true);
+ testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ three, /* pending */ one, true);
+ testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ two, /* pending */ zero, true);
- testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ zero, /* pending */ zero, true);
- testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ zero, /* pending */ one, true);
- testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ zero, /* pending */ two, true);
- testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ one, /* pending */ one, true);
- testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ two, /* pending */ zero, true);
+ testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ three, /* pending */ zero, true);
+ testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ three, /* pending */ two, true);
+ testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ three, /* pending */ one, true);
+ testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ two, /* pending */ zero, true);
+ testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ two, /* pending */ one, true);
+ testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ one, /* pending */ zero, true);
- testCheckConsistency(CL.LOCAL_QUORUM, /* total */ three, /* failed */ zero, zero, true);
- testCheckConsistency(CL.LOCAL_QUORUM, /* total */ three, /* failed */ one, zero, true);
+ testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ three, /* pending */ zero, true);
+ testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ three, /* pending */ one, true);
+ testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ three, /* pending */ onePerDc, true);
+ testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ two, /* pending */ zero, true);
- testCheckConsistency(CL.EACH_QUORUM, /* total */ three, /* failed */ zero, zero, true);
- testCheckConsistency(CL.EACH_QUORUM, /* total */ three, /* failed */ one, zero, true);
+ testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ threePerDc, /* pending */ zero, true);
+ testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ threePerDc, /* pending */ one, true);
+ testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ threePerDc, /* pending */ onePerDc, true);
+ testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ twoPerDc, /* pending */ zero, true);
- testCheckConsistency(CL.QUORUM, /* total */ three, /* failed */ zero, zero, true);
- testCheckConsistency(CL.QUORUM, /* total */ three, /* failed */ one, zero, true);
+ testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ threePerDc, /* pending */ zero, true);
+ testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ threePerDc, /* pending */ onePerDc, true);
+ testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ twoPerDc, /* pending */ zero, true);
- testCheckConsistency(CL.ALL, /* total */ three, /* failed */ zero, zero, true);
+ testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ threePerDc, /* pending */ zero, true);
}
@Test
- void testCheckConsistencyReturnsFalse()
+ void testCanBeSatisfiedWithPendingReturnsFalse()
{
- testCheckConsistency(CL.ONE, /* total */ three, /* failed */ three, zero, false);
+ testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ zero, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.ONE, /* succeeded */ onePerDc, /* pending */ onePerDc, false);
- testCheckConsistency(CL.TWO, /* total */ three, /* failed */ three, zero, false);
- testCheckConsistency(CL.TWO, /* total */ three, /* failed */ two, zero, false);
+ testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ zero, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ one, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.TWO, /* succeeded */ two, /* pending */ one, false);
- testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ three, /* pending */ zero, false);
- testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ two, /* pending */ one, false);
- testCheckConsistency(CL.LOCAL_ONE, /* total */ three, /* failed */ one, /* pending */ two, false);
+ testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ zero, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ one, /* pending */ one, false);
+ testCanBeSatisfiedWithPending(CL.LOCAL_ONE, /* succeeded */ two, /* pending */ two, false);
- testCheckConsistency(CL.LOCAL_QUORUM, /* total */ three, /* failed */ three, zero, false);
- testCheckConsistency(CL.LOCAL_QUORUM, /* total */ three, /* failed */ two, zero, false);
+ testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ zero, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ one, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.LOCAL_QUORUM, /* succeeded */ two, /* pending */ one, false);
- testCheckConsistency(CL.EACH_QUORUM, /* total */ three, /* failed */ three, zero, false);
- testCheckConsistency(CL.EACH_QUORUM, /* total */ three, /* failed */ two, zero, false);
+ testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ zero, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ onePerDc, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.EACH_QUORUM, /* succeeded */ twoPerDc, /* pending */ onePerDc, false);
- testCheckConsistency(CL.QUORUM, /* total */ three, /* failed */ three, zero, false);
- testCheckConsistency(CL.QUORUM, /* total */ three, /* failed */ two, zero, false);
+ testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ zero, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ onePerDc, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.QUORUM, /* succeeded */ twoPerDc, /* pending */ onePerDc, false);
- testCheckConsistency(CL.ALL, /* total */ three, /* failed */ one, zero, false);
- testCheckConsistency(CL.ALL, /* total */ three, /* failed */ two, zero, false);
- testCheckConsistency(CL.ALL, /* total */ three, /* failed */ three, zero, false);
+ testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ two, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ twoPerDc, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ one, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ onePerDc, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ zero, /* pending */ zero, false);
+ testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ threePerDc, /* pending */ one, false);
+ testCanBeSatisfiedWithPending(CL.ALL, /* succeeded */ threePerDc, /* pending */ onePerDc, false);
}
private void testCanBeSatisfied(ConsistencyLevel cl, List<CassandraInstance> succeeded, boolean expectedResult)
{
- assertThat(cl.canBeSatisfied(succeeded, replicationFactor, "dc1")).isEqualTo(expectedResult);
+ assertThat(cl.canBeSatisfied(succeeded, zero, replicationFactor, "dc1")).isEqualTo(expectedResult);
}
- private void testCheckConsistency(ConsistencyLevel cl, Set<String> total, Set<String> failed, Set<String> pending, boolean expectedResult)
+ private void testCanBeSatisfiedWithPending(ConsistencyLevel cl,
+ Set<CassandraInstance> succeeded,
+ Set<CassandraInstance> pending,
+ boolean expectedResult)
{
- assertThat(cl.checkConsistency(total, pending, zero, // replacement is not used
- zero, // include blocking instance set in failed set
- failed, "dc1")).isEqualTo(expectedResult);
+ assertThat(cl.canBeSatisfied(succeeded, pending, replicationFactor, "dc1")).isEqualTo(expectedResult);
}
private static CassandraInstance mockInstance(String dc)
@@ -188,8 +240,27 @@
return i;
}
- private static Set<String> intToSet(int i)
+ private static Set<CassandraInstance> intToSet(int i)
{
- return IntStream.range(0, i).mapToObj(String::valueOf).collect(Collectors.toSet());
+ return intToSet(i, false);
+ }
+
+ private static Set<CassandraInstance> intToSet(int i, boolean singleDc)
+ {
+ Set<CassandraInstance> res = new HashSet<>();
+ for (int j = 0; j < i; j++)
+ {
+ CassandraInstance dc1Instance = mock(CassandraInstance.class);
+ when(dc1Instance.datacenter()).thenReturn("dc1");
+ res.add(dc1Instance);
+ if (singleDc)
+ {
+ continue;
+ }
+ CassandraInstance dc2Instance = mock(CassandraInstance.class);
+ when(dc2Instance.datacenter()).thenReturn("dc2");
+ res.add(dc2Instance);
+ }
+ return res;
}
}
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java
index fbd5479..969c29b 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/BlockedInstancesTest.java
@@ -114,13 +114,13 @@
.hasMessageContaining("java.lang.RuntimeException: Bulk Write to Cassandra has failed");
Throwable cause = thrown;
- while (cause != null && !StringUtils.contains(cause.getMessage(), "Failed to load"))
+ while (cause != null && !StringUtils.contains(cause.getMessage(), "Failed to write"))
{
cause = cause.getCause();
}
assertThat(cause).isNotNull()
- .hasMessageFindingMatch(String.format("Failed to load (\\d+) ranges with %s for " +
+ .hasMessageFindingMatch(String.format("Failed to write (\\d+) ranges with %s for " +
"job ([a-zA-Z0-9-]+) in phase Environment Validation.", cl.writeCL));
expectedInstanceData.entrySet()
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
index 9628bed..5fc3425 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/SparkTestUtils.java
@@ -216,13 +216,13 @@
Throwable cause = thrown;
// Find the cause
- while (cause != null && !StringUtils.contains(cause.getMessage(), "Failed to load"))
+ while (cause != null && !StringUtils.contains(cause.getMessage(), "Failed to write"))
{
cause = cause.getCause();
}
assertThat(cause).isNotNull()
- .hasMessageFindingMatch("Failed to load (\\d+) ranges with " + writeCL +
+ .hasMessageFindingMatch("Failed to write (\\d+) ranges with " + writeCL +
" for job ([a-zA-Z0-9-]+) in phase .*");
}
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java
index a69f6c8..19c902d 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementMultiDCTest.java
@@ -43,13 +43,11 @@
import org.apache.cassandra.spark.bulkwriter.WriterOptions;
import org.apache.cassandra.testing.ClusterBuilderConfiguration;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.EACH_QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_QUORUM;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static org.apache.cassandra.testing.TestUtils.CREATE_TEST_TABLE_STATEMENT;
import static org.apache.cassandra.testing.TestUtils.DC1_RF3_DC2_RF3;
import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
@@ -107,8 +105,7 @@
return Stream.of(
Arguments.of(TestConsistencyLevel.of(LOCAL_QUORUM, LOCAL_QUORUM)),
Arguments.of(TestConsistencyLevel.of(LOCAL_QUORUM, EACH_QUORUM)),
- Arguments.of(TestConsistencyLevel.of(QUORUM, QUORUM)),
- Arguments.of(TestConsistencyLevel.of(ONE, ALL))
+ Arguments.of(TestConsistencyLevel.of(QUORUM, QUORUM))
);
}
diff --git a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
index b11a816..9cbbeba 100644
--- a/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
+++ b/cassandra-analytics-integration-tests/src/test/java/org/apache/cassandra/analytics/replacement/HostReplacementTestBase.java
@@ -49,7 +49,6 @@
import org.apache.spark.sql.SparkSession;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ONE;
import static org.apache.cassandra.distributed.api.ConsistencyLevel.QUORUM;
import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
@@ -191,7 +190,6 @@
static Stream<Arguments> singleDCTestInputs()
{
return Stream.of(
- Arguments.of(TestConsistencyLevel.of(ONE, ALL)),
Arguments.of(TestConsistencyLevel.of(QUORUM, QUORUM))
);
}