blob: f8e78161a200f95b3c03ec683b0c5fbc89f27a1f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.analytics;
import java.io.IOException;
import org.junit.jupiter.api.Test;
import com.vdurmont.semver4j.Semver;
import org.apache.cassandra.distributed.UpgradeableCluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.api.TokenSupplier;
import org.apache.cassandra.distributed.shared.Versions;
import org.apache.cassandra.sidecar.testing.JvmDTestSharedClassesPredicate;
import org.apache.cassandra.sidecar.testing.QualifiedName;
import org.apache.cassandra.testing.TestVersion;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.jetbrains.annotations.NotNull;
import static org.apache.cassandra.testing.CassandraTestTemplate.fixDistributedSchemas;
import static org.apache.cassandra.testing.CassandraTestTemplate.waitForHealthyRing;
import static org.apache.cassandra.testing.TestUtils.DC1_RF3;
import static org.apache.cassandra.testing.TestUtils.ROW_COUNT;
import static org.apache.cassandra.testing.TestUtils.TEST_KEYSPACE;
import static org.assertj.core.api.Assertions.assertThat;
class BulkWriteUdtTest extends SharedClusterSparkIntegrationTestBase
{
static final QualifiedName UDT_TABLE_NAME = new QualifiedName(TEST_KEYSPACE, "test_udt");
static final QualifiedName NESTED_TABLE_NAME = new QualifiedName(TEST_KEYSPACE, "test_nested_udt");
public static final String TWO_FIELD_UDT_NAME = "two_field_udt";
public static final String NESTED_FIELD_UDT_NAME = "nested_udt";
public static final String UDT_TABLE_CREATE = "CREATE TABLE " + UDT_TABLE_NAME + " (\n"
+ " id BIGINT PRIMARY KEY,\n"
+ " udtfield " + TWO_FIELD_UDT_NAME + ");";
public static final String TWO_FIELD_UDT_DEF = "CREATE TYPE " + UDT_TABLE_NAME.keyspace() + "."
+ TWO_FIELD_UDT_NAME + " (\n"
+ " f1 text,\n"
+ " f2 int);";
public static final String NESTED_UDT_DEF = "CREATE TYPE " + NESTED_TABLE_NAME.keyspace() + "."
+ NESTED_FIELD_UDT_NAME + " (\n"
+ " n1 BIGINT,\n"
+ " n2 frozen<" + TWO_FIELD_UDT_NAME + ">"
+ ");";
public static final String NESTED_TABLE_CREATE = "CREATE TABLE " + NESTED_TABLE_NAME + "(\n"
+ " id BIGINT PRIMARY KEY,\n"
+ " nested " + NESTED_FIELD_UDT_NAME + ");";
@Test
void testWriteWithUdt()
{
SparkSession spark = getOrCreateSparkSession();
Dataset<Row> df = DataGenerationUtils.generateUdtData(spark, ROW_COUNT);
bulkWriterDataFrameWriter(df, UDT_TABLE_NAME).save();
SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT * FROM " + UDT_TABLE_NAME, ConsistencyLevel.ALL);
assertThat(result.hasNext()).isTrue();
validateWritesWithDriverResultSet(df.collectAsList(),
queryAllDataWithDriver(cluster, UDT_TABLE_NAME,
shaded.com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM),
BulkWriteUdtTest::defaultRowFormatter);
}
@Test
void testWriteWithNestedUdt()
{
SparkSession spark = getOrCreateSparkSession();
Dataset<Row> df = DataGenerationUtils.generateNestedUdtData(spark, ROW_COUNT);
bulkWriterDataFrameWriter(df, NESTED_TABLE_NAME).save();
SimpleQueryResult result = cluster.coordinator(1).executeWithResult("SELECT * FROM " + NESTED_TABLE_NAME, ConsistencyLevel.ALL);
assertThat(result.hasNext()).isTrue();
validateWritesWithDriverResultSet(df.collectAsList(),
queryAllDataWithDriver(cluster, NESTED_TABLE_NAME, shaded.com.datastax.driver.core.ConsistencyLevel.LOCAL_QUORUM),
BulkWriteUdtTest::defaultRowFormatter);
}
@NotNull
public static String defaultRowFormatter(shaded.com.datastax.driver.core.Row row)
{
return row.getLong(0) +
":" +
row.getUDTValue(1); // Formats as field:value with no whitespaces, and strings quoted
}
@Override
protected UpgradeableCluster provisionCluster(TestVersion testVersion) throws IOException
{
// spin up a C* cluster using the in-jvm dtest
Versions versions = Versions.find();
Versions.Version requestedVersion = versions.getLatest(new Semver(testVersion.version(), Semver.SemverType.LOOSE));
UpgradeableCluster.Builder clusterBuilder =
UpgradeableCluster.build(3)
.withDynamicPortAllocation(true)
.withVersion(requestedVersion)
.withDCs(1)
.withDataDirCount(1)
.withSharedClasses(JvmDTestSharedClassesPredicate.INSTANCE)
.withConfig(config -> config.with(Feature.NATIVE_PROTOCOL)
.with(Feature.GOSSIP)
.with(Feature.JMX));
TokenSupplier tokenSupplier = TokenSupplier.evenlyDistributedTokens(3, clusterBuilder.getTokenCount());
clusterBuilder.withTokenSupplier(tokenSupplier);
UpgradeableCluster cluster = clusterBuilder.createWithoutStarting();
cluster.startup();
waitForHealthyRing(cluster);
fixDistributedSchemas(cluster);
return cluster;
}
@Override
protected void initializeSchemaForTest()
{
createTestKeyspace(UDT_TABLE_NAME, DC1_RF3);
cluster.schemaChangeIgnoringStoppedInstances(TWO_FIELD_UDT_DEF);
cluster.schemaChangeIgnoringStoppedInstances(NESTED_UDT_DEF);
cluster.schemaChangeIgnoringStoppedInstances(UDT_TABLE_CREATE);
cluster.schemaChangeIgnoringStoppedInstances(NESTED_TABLE_CREATE);
}
}