blob: bf79c3157a2a10445e2f5877658d03b0af834884 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.spark.bulkwriter;
import java.io.IOException;
import java.math.BigInteger;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.apache.cassandra.spark.bulkwriter.token.CassandraRing;
import org.apache.cassandra.spark.common.model.CassandraInstance;
import org.jetbrains.annotations.NotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.iterableWithSize;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
public class StreamSessionTest
{
public static final String LOAD_RANGE_ERROR_PREFIX = "Failed to load 1 ranges with LOCAL_QUORUM";
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private static final int FILES_PER_SSTABLE = 8;
private static final int REPLICATION_FACTOR = 3;
private StreamSession ss;
private MockBulkWriterContext writerContext;
private List<String> expectedInstances;
private CassandraRing<RingInstance> ring;
private MockScheduledExecutorService executor;
private MockTableWriter tableWriter;
private Range<BigInteger> range;
@Before
public void setup()
{
range = Range.range(BigInteger.valueOf(101L), BoundType.CLOSED, BigInteger.valueOf(199L), BoundType.CLOSED);
ring = RingUtils.buildRing(0, "app", "cluster", "DC1", "test", 12);
writerContext = getBulkWriterContext();
tableWriter = new MockTableWriter(folder.getRoot().toPath());
executor = new MockScheduledExecutorService();
ss = new StreamSession(writerContext, "sessionId", range, executor);
expectedInstances = Lists.newArrayList("DC1-i1", "DC1-i2", "DC1-i3");
}
@Test
public void testGetReplicasReturnsCorrectData()
{
List<RingInstance> replicas = ss.getReplicas();
assertNotNull(replicas);
List<String> actualInstances = replicas.stream().map(RingInstance::getNodeName).collect(Collectors.toList());
assertThat(actualInstances, containsInAnyOrder(expectedInstances.toArray()));
}
@Test
public void testScheduleStreamSendsCorrectFilesToCorrectInstances(
) throws IOException, ExecutionException, InterruptedException
{
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath());
Object[] row = {0, 1, "course", 2};
tr.addRow(BigInteger.valueOf(102L), row);
tr.close(writerContext, 1);
ss.scheduleStream(tr);
ss.close(); // Force "execution" of futures
executor.assertFuturesCalled();
assertThat(executor.futures.size(), equalTo(1)); // We only scheduled one SSTable
assertThat(writerContext.getUploads().values().stream()
.mapToInt(Collection::size)
.sum(),
equalTo(REPLICATION_FACTOR * FILES_PER_SSTABLE));
List<String> instances = writerContext.getUploads().keySet().stream()
.map(CassandraInstance::getNodeName)
.collect(Collectors.toList());
assertThat(instances, containsInAnyOrder(expectedInstances.toArray()));
}
@Test
public void testEmptyTokenRangeFails() throws IOException
{
Exception exception = assertThrows(IllegalStateException.class, () -> ss = new StreamSession(
writerContext,
"sessionId",
Range.range(BigInteger.valueOf(0L), BoundType.CLOSED, BigInteger.valueOf(0L), BoundType.OPEN)));
assertThat(exception.getMessage(), startsWith("Partition range [0‥0) is mapping more than one range {}"));
}
@Test
public void testMismatchedTokenRangeFails() throws IOException
{
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath());
Object[] row = {0, 1, "course", 2};
tr.addRow(BigInteger.valueOf(9999L), row);
tr.close(writerContext, 1);
IllegalStateException illegalStateException = assertThrows(IllegalStateException.class,
() -> ss.scheduleStream(tr));
assertEquals(illegalStateException.getMessage(),
"SSTable range [9999‥9999] should be enclosed in the partition range [101‥199]");
}
@Test
public void testUploadFailureCallsClean() throws IOException, ExecutionException, InterruptedException
{
runFailedUpload();
List<String> actualInstances = writerContext.getCleanedInstances().stream()
.map(CassandraInstance::getNodeName)
.collect(Collectors.toList());
assertThat(actualInstances, containsInAnyOrder(expectedInstances.toArray()));
}
@Test
public void testUploadFailureSkipsCleanWhenConfigured() throws IOException, ExecutionException, InterruptedException
{
writerContext.setSkipCleanOnFailures(true);
runFailedUpload();
List<String> actualInstances = writerContext.getCleanedInstances().stream()
.map(CassandraInstance::getNodeName)
.collect(Collectors.toList());
assertThat(actualInstances, iterableWithSize(0));
List<UploadRequest> uploads = writerContext.getUploads().values().stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
assertTrue(uploads.size() > 0);
assertTrue(uploads.stream().noneMatch(u -> u.uploadSucceeded));
}
@Test
public void testUploadFailureRefreshesClusterInfo() throws IOException, ExecutionException, InterruptedException
{
runFailedUpload();
assertThat(writerContext.refreshClusterInfoCallCount, equalTo(3));
}
@Test
public void testOutDirCreationFailureCleansAllReplicas()
{
assertThrows(RuntimeException.class, () -> {
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, tableWriter.getOutDir());
Object[] row = {0, 1, "course", 2};
tr.addRow(BigInteger.valueOf(102L), row);
tr.close(writerContext, 1);
tableWriter.removeOutDir();
ss.scheduleStream(tr);
ss.close();
});
List<String> actualInstances = writerContext.getCleanedInstances().stream()
.map(CassandraInstance::getNodeName)
.collect(Collectors.toList());
assertThat(actualInstances, containsInAnyOrder(expectedInstances.toArray()));
}
@Test
public void unavailableInstancesCreateErrors() throws IOException, ExecutionException, InterruptedException
{
writerContext.setInstancesAreAvailable(false);
ss = new StreamSession(writerContext, "sessionId", range, executor);
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath());
Object[] row = {0, 1, "course", 2};
tr.addRow(BigInteger.valueOf(102L), row);
tr.close(writerContext, 1);
ss.scheduleStream(tr);
assertThrows(LOAD_RANGE_ERROR_PREFIX, RuntimeException.class, () -> ss.close());
}
@Test
public void streamWithNoWritersReturnsEmptyStreamResult() throws ExecutionException, InterruptedException
{
writerContext.setInstancesAreAvailable(false);
ss = new StreamSession(writerContext, "sessionId", range, executor);
StreamResult result = ss.close();
assertThat(result.failures.size(), equalTo(0));
assertThat(result.passed.size(), equalTo(0));
assertThat(result.sessionID, equalTo("sessionId"));
assertThat(result.tokenRange, equalTo(range));
}
@Test
public void failedCleanDoesNotThrow() throws IOException, ExecutionException, InterruptedException
{
writerContext.setCleanShouldThrow(true);
runFailedUpload();
}
@Test
public void testLocalQuorumSucceedsWhenSingleCommitFails(
) throws IOException, ExecutionException, InterruptedException
{
ss = new StreamSession(writerContext, "sessionId", range, executor);
AtomicBoolean success = new AtomicBoolean(true);
writerContext.setCommitResultSupplier((uuids, dc) -> {
// Return failed result for 1st result, success for the rest
if (success.getAndSet(false))
{
return new DataTransferApi.RemoteCommitResult(false, uuids, null, "");
}
else
{
return new DataTransferApi.RemoteCommitResult(true, null, uuids, "");
}
});
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath());
Object[] row = {0, 1, "course", 2};
tr.addRow(BigInteger.valueOf(102L), row);
tr.close(writerContext, 1);
ss.scheduleStream(tr);
ss.close(); // Force "execution" of futures
executor.assertFuturesCalled();
assertThat(writerContext.getUploads().values().stream()
.mapToInt(Collection::size)
.sum(),
equalTo(REPLICATION_FACTOR * FILES_PER_SSTABLE));
List<String> instances = writerContext.getUploads().keySet().stream()
.map(CassandraInstance::getNodeName)
.collect(Collectors.toList());
assertThat(instances, containsInAnyOrder(expectedInstances.toArray()));
}
@Test
public void testLocalQuorumFailsWhenCommitsFail() throws IOException, ExecutionException, InterruptedException
{
ss = new StreamSession(writerContext, "sessionId", range, executor);
AtomicBoolean success = new AtomicBoolean(true);
// Return successful result for 1st result, failed for the rest
writerContext.setCommitResultSupplier((uuids, dc) -> {
if (success.getAndSet(false))
{
return new DataTransferApi.RemoteCommitResult(true, null, uuids, "");
}
else
{
return new DataTransferApi.RemoteCommitResult(false, uuids, null, "");
}
});
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath());
Object[] row = {0, 1, "course", 2};
tr.addRow(BigInteger.valueOf(102L), row);
tr.close(writerContext, 1);
ss.scheduleStream(tr);
RuntimeException exception = assertThrows(RuntimeException.class, () -> ss.close()); // Force "execution" of futures
assertEquals("Failed to load 1 ranges with LOCAL_QUORUM for job " + writerContext.job().getId()
+ " in phase UploadAndCommit", exception.getMessage());
executor.assertFuturesCalled();
assertThat(writerContext.getUploads().values().stream()
.mapToInt(Collection::size)
.sum(),
equalTo(REPLICATION_FACTOR * FILES_PER_SSTABLE));
List<String> instances = writerContext.getUploads().keySet().stream()
.map(CassandraInstance::getNodeName)
.collect(Collectors.toList());
assertThat(instances, containsInAnyOrder(expectedInstances.toArray()));
}
private void runFailedUpload() throws IOException, ExecutionException, InterruptedException
{
writerContext.setUploadSupplier(instance -> false);
SSTableWriter tr = new NonValidatingTestSSTableWriter(tableWriter, folder.getRoot().toPath());
Object[] row = {0, 1, "course", 2};
tr.addRow(BigInteger.valueOf(102L), row);
tr.close(writerContext, 1);
ss.scheduleStream(tr);
assertThrows(LOAD_RANGE_ERROR_PREFIX, RuntimeException.class, () -> ss.close());
}
@NotNull
private MockBulkWriterContext getBulkWriterContext()
{
return new MockBulkWriterContext(ring);
}
}