blob: 0c6d5db0fe2beca2e8cfd36aa82a9c31efe47485 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.bulkwriter;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import com.google.common.collect.BoundType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.apache.cassandra.spark.bulkwriter.token.CassandraRing;
import org.apache.cassandra.spark.bulkwriter.token.ConsistencyLevel;
import org.apache.cassandra.spark.common.model.CassandraInstance;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
@RunWith(Parameterized.class)
public class StreamSessionConsistencyTest
{
private static final int NUMBER_DCS = 2;
private static final int FILES_PER_SSTABLE = 8;
private static final int REPLICATION_FACTOR = 3;
private static final List<String> EXPECTED_INSTANCES =
ImmutableList.of("DC1-i1", "DC1-i2", "DC1-i3", "DC2-i1", "DC2-i2", "DC2-i3");
private static final Range<BigInteger> RANGE = Range.range(BigInteger.valueOf(101L), BoundType.CLOSED,
BigInteger.valueOf(199L), BoundType.CLOSED);
private static final CassandraRing<RingInstance> RING = RingUtils.buildRing(0,
"app",
"cluster",
ImmutableMap.of("DC1", 3, "DC2", 3),
"test",
6);
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private MockTableWriter tableWriter;
private StreamSession streamSession;
private MockBulkWriterContext writerContext;
private final MockScheduledExecutorService executor = new MockScheduledExecutorService();
@Parameterized.Parameter(0)
public ConsistencyLevel.CL consistencyLevel; // CHECKSTYLE IGNORE: Public mutable field for parameterized testing
@Parameterized.Parameter(1)
public List<Integer> failuresPerDc; // CHECKSTYLE IGNORE: Public mutable field for parameterized testing
@Parameterized.Parameters(name = "CL: {0}, numFailures: {1}")
public static Collection<Object[]> data()
{
List<ConsistencyLevel.CL> cls = Arrays.stream(ConsistencyLevel.CL.values()).collect(Collectors.toList());
List<Integer> failures = IntStream.rangeClosed(0, REPLICATION_FACTOR).boxed().collect(Collectors.toList());
List<List<Integer>> failuresPerDc = Lists.cartesianProduct(failures, failures);
List<List<Object>> clsToFailures = Lists.cartesianProduct(cls, failuresPerDc);
return clsToFailures.stream().map(List::toArray).collect(Collectors.toList());
}
@Before
public void setup()
{
tableWriter = new MockTableWriter(folder.getRoot().toPath());
writerContext = new MockBulkWriterContext(RING, "cassandra-4.0.0", consistencyLevel);
streamSession = new StreamSession(writerContext, "sessionId", RANGE, executor);
}
@Test
public void testConsistencyLevelAndFailureInCommit() throws IOException, ExecutionException, InterruptedException
{
streamSession = new StreamSession(writerContext, "sessionId", RANGE, executor);
AtomicInteger dc1Failures = new AtomicInteger(failuresPerDc.get(0));
AtomicInteger dc2Failures = new AtomicInteger(failuresPerDc.get(1));
ImmutableMap<String, AtomicInteger> dcFailures = ImmutableMap.of("DC1", dc1Failures, "DC2", dc2Failures);
boolean shouldFail = calculateFailure(dc1Failures.get(), dc2Failures.get());
// Return successful result for 1st result, failed for the rest
writerContext.setCommitResultSupplier((uuids, dc) -> {
if (dcFailures.get(dc).getAndDecrement() > 0)
{
return new DataTransferApi.RemoteCommitResult(false, null, uuids, "");
}
else
{
return new DataTransferApi.RemoteCommitResult(true, uuids, null, "");
}
});
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath());
Object[] row = {0, 1, "course", 2};
tr.addRow(BigInteger.valueOf(102L), row);
tr.close(writerContext, 1);
streamSession.scheduleStream(tr);
if (shouldFail)
{
RuntimeException exception = assertThrows(RuntimeException.class,
() -> streamSession.close()); // Force "execution" of futures
assertEquals("Failed to load 1 ranges with " + consistencyLevel
+ " for job " + writerContext.job().getId()
+ " in phase UploadAndCommit", exception.getMessage());
}
else
{
streamSession.close(); // Force "execution" of futures
}
executor.assertFuturesCalled();
assertThat(writerContext.getUploads().values().stream()
.mapToInt(Collection::size)
.sum(),
equalTo(REPLICATION_FACTOR * NUMBER_DCS * FILES_PER_SSTABLE));
List<String> instances = writerContext.getUploads().keySet().stream()
.map(CassandraInstance::getNodeName)
.collect(Collectors.toList());
assertThat(instances, containsInAnyOrder(EXPECTED_INSTANCES.toArray()));
}
@Test
public void testConsistencyLevelAndFailureInUpload() throws IOException, ExecutionException, InterruptedException
{
streamSession = new StreamSession(writerContext, "sessionId", RANGE, executor);
AtomicInteger dc1Failures = new AtomicInteger(failuresPerDc.get(0));
AtomicInteger dc2Failures = new AtomicInteger(failuresPerDc.get(1));
int numFailures = dc1Failures.get() + dc2Failures.get();
ImmutableMap<String, AtomicInteger> dcFailures = ImmutableMap.of("DC1", dc1Failures, "DC2", dc2Failures);
boolean shouldFail = calculateFailure(dc1Failures.get(), dc2Failures.get());
writerContext.setUploadSupplier(instance -> dcFailures.get(instance.getDataCenter()).getAndDecrement() <= 0);
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath());
Object[] row = {0, 1, "course", 2};
tr.addRow(BigInteger.valueOf(102L), row);
tr.close(writerContext, 1);
streamSession.scheduleStream(tr);
if (shouldFail)
{
RuntimeException exception = assertThrows(RuntimeException.class,
() -> streamSession.close()); // Force "execution" of futures
assertEquals("Failed to load 1 ranges with " + consistencyLevel
+ " for job " + writerContext.job().getId()
+ " in phase UploadAndCommit", exception.getMessage());
}
else
{
streamSession.close(); // Force "execution" of futures
}
executor.assertFuturesCalled();
int totalFilesToUpload = REPLICATION_FACTOR * NUMBER_DCS * FILES_PER_SSTABLE;
// Once a file fails to upload, the rest of the components are not attempted
int filesSkipped = (numFailures * (FILES_PER_SSTABLE - 1));
assertThat(writerContext.getUploads().values().stream()
.mapToInt(Collection::size)
.sum(),
equalTo(totalFilesToUpload - filesSkipped));
List<String> instances = writerContext.getUploads().keySet().stream()
.map(CassandraInstance::getNodeName)
.collect(Collectors.toList());
assertThat(instances, containsInAnyOrder(EXPECTED_INSTANCES.toArray()));
}
private boolean calculateFailure(int dc1Failures, int dc2Failures)
{
// Assumes LOCAL_DC is DC1, given current CL and Failures, should we fail?
int localQuorum = REPLICATION_FACTOR / 2 + 1;
int localFailuresViolatingQuorum = REPLICATION_FACTOR - localQuorum;
int totalInstances = NUMBER_DCS * REPLICATION_FACTOR;
int allDcsQuorum = totalInstances / 2 + 1;
switch (consistencyLevel)
{
case ALL:
return dc1Failures + dc2Failures > 0;
case TWO:
return dc1Failures + dc2Failures > (totalInstances - 2);
case QUORUM:
return dc1Failures + dc2Failures > totalInstances - allDcsQuorum; // Total instances - quorum
case LOCAL_QUORUM:
return (dc1Failures > localFailuresViolatingQuorum);
case EACH_QUORUM:
return (dc1Failures > localFailuresViolatingQuorum) || (dc2Failures > localFailuresViolatingQuorum);
case LOCAL_ONE:
return dc1Failures > REPLICATION_FACTOR - 1;
case ONE:
return (dc1Failures + dc2Failures) > totalInstances - 1;
default:
throw new IllegalArgumentException("CL: " + consistencyLevel + " not supported");
}
}
}