import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstance;
import org.apache.cassandra.distributed.shared.JMXUtil;
import org.apache.cassandra.sidecar.common.server.dns.DnsResolver;
import org.apache.cassandra.sidecar.testing.MtlsTestHelper;
import org.apache.cassandra.sidecar.testing.QualifiedName;
import org.apache.cassandra.spark.KryoRegister;
import org.apache.cassandra.spark.bulkwriter.BulkSparkConf;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchThrowable;
* Helper methods for Spark tests
public class SparkTestUtils
protected ICluster<? extends IInstance> cluster;
protected DnsResolver dnsResolver;
protected int sidecarPort;
protected MtlsTestHelper mtlsTestHelper;
* Runs any initialization code required for the tests
* @param cluster the cassandra cluster
* @param dnsResolver the DNS resolver used to lookup replicas
* @param sidecarPort the port where Sidecar is running
public void initialize(ICluster<? extends IInstance> cluster, DnsResolver dnsResolver, int sidecarPort,
MtlsTestHelper mtlsTestHelper)
this.cluster = Objects.requireNonNull(cluster, "cluster is required");
this.dnsResolver = Objects.requireNonNull(dnsResolver, "dnsResolver is required");
this.mtlsTestHelper = Objects.requireNonNull(mtlsTestHelper, "mtlsTestHelper is required");
this.sidecarPort = sidecarPort;
* Runs any tear down code required for tests, currently empty
public void tearDown()
* Returns a {@link DataFrameReader} with default options for performing a bulk read test, including
* required parameters.
* @param sparkConf the spark configuration to use
* @param spark the spark session to use
* @param tableName the qualified name of the table
* @return a {@link DataFrameReader} with default options for performing a bulk read test
public DataFrameReader defaultBulkReaderDataFrame(SparkConf sparkConf,
SparkSession spark,
QualifiedName tableName)
SQLContext sql = spark.sqlContext();
SparkContext sc = spark.sparkContext();
int coresPerExecutor = sparkConf.getInt("spark.executor.cores", 1);
int numExecutors = sparkConf.getInt("spark.dynamicAllocation.maxExecutors", sparkConf.getInt("spark.executor.instances", 1));
int numCores = coresPerExecutor * numExecutors;
.option("sidecar_instances", sidecarInstancesOption(cluster, dnsResolver))
.option("keyspace", tableName.keyspace()) // unquoted
.option("table", tableName.table()) // unquoted
.option("DC", "datacenter1")
.option("snapshotName", UUID.randomUUID().toString())
.option("createSnapshot", "true")
// Shutdown hooks are called after the job ends, and in the case of integration tests
// the sidecar is already shut down before this. Since the cluster will be torn
// down anyway, the integration job skips clearing snapshots.
.option("clearSnapshotStrategy", "noop")
.option("defaultParallelism", sc.defaultParallelism())
.option("numCores", numCores)
.option("sizing", "default")
.option("sidecar_port", sidecarPort);
* Returns a {@link DataFrameWriter<Row>} with default options for performing a bulk write test, including
* required parameters.
* @param df the source data frame
* @param tableName the qualified name of the table
* @param additionalOptions additional options for the data frame
* @return a {@link DataFrameWriter<Row>} with default options for performing a bulk write test
public DataFrameWriter<Row> defaultBulkWriterDataFrameWriter(Dataset<Row> df, QualifiedName tableName,
Map<String, String> additionalOptions)
return df.write()
.option("sidecar_instances", sidecarInstancesOption(cluster, dnsResolver))
.option("keyspace", tableName.keyspace())
.option("table", tableName.table())
.option("local_dc", "datacenter1")
.option("bulk_writer_cl", "LOCAL_QUORUM")
.option("number_splits", "-1")
.option("sidecar_port", sidecarPort)
public SparkConf defaultSparkConf()
SparkConf sparkConf = new SparkConf()
.setAppName("Integration test Spark Cassandra Bulk Analytics Job")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// Spark is not case-sensitive by default, but we want to make it case-sensitive for
// the quoted identifiers tests where we test mixed case
.set("spark.sql.caseSensitive", "True")
.set("spark.master", "local[8,4]")
.set("spark.cassandra_analytics.sidecar.request.retries", "5")
.set("spark.cassandra_analytics.sidecar.request.retries.delay.milliseconds", "500")
.set("spark.cassandra_analytics.sidecar.request.retries.max.delay.milliseconds", "500");
BulkSparkConf.setupSparkConf(sparkConf, true);
return sparkConf;
public void validateWrites(List<Row> sourceData, Object[][] queriedData)
// build a set of entries read from Cassandra into a set
Set<String> actualEntries =
.map((Object[] columns) -> String.format("%s:%s:%s",
// Number of entries in Cassandra must match the original datasource
// remove from actual entries to make sure that the data read is the same as the data written
sourceData.forEach(row -> {
String key = String.format("%d:%s:%d",
assertThat(actualEntries.remove(key)).as(key + " is expected to exist in the actual entries")
// If this fails, it means there was more data in the database than we expected
assertThat(actualEntries).as("All entries are expected to be read from database")
public void assertExpectedBulkWriteFailure(String writeCL, DataFrameWriter<Row> dfWriter)
Throwable thrown = catchThrowable(dfWriter::save);
.hasMessageContaining("java.lang.RuntimeException: Bulk Write to Cassandra has failed");
Throwable cause = thrown;
// Find the cause
while (cause != null && !StringUtils.contains(cause.getMessage(), "Failed to load"))
cause = cause.getCause();
.hasMessageFindingMatch("Failed to load (\\d+) ranges with " + writeCL +
" for job ([a-zA-Z0-9-]+) in phase .*");
* @return a comma-separated string with a list of all the hosts in the in-jvm dtest cluster
protected String sidecarInstancesOption(ICluster<? extends IInstance> cluster, DnsResolver dnsResolver)
return IntStream.rangeClosed(1, cluster.size())
.mapToObj(i -> {
String ipAddress = JMXUtil.getJmxHost(cluster.get(i).config());
return dnsResolver.reverseResolve(ipAddress);
catch (UnknownHostException e)
return ipAddress;