blob: 131ce83b48ddcedfde943b6e04a7c2387fdcadb9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.cassandra;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.ProtocolVersion;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.TypeCodec;
import com.datastax.driver.core.exceptions.NoHostAvailableException;
import com.datastax.driver.mapping.annotations.ClusteringColumn;
import com.datastax.driver.mapping.annotations.Column;
import com.datastax.driver.mapping.annotations.Computed;
import com.datastax.driver.mapping.annotations.PartitionKey;
import com.datastax.driver.mapping.annotations.Table;
import info.archinnov.achilles.embedded.CassandraEmbeddedServerBuilder;
import info.archinnov.achilles.embedded.CassandraShutDownHook;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.JMX;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.cassandra.service.StorageServiceMBean;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Tests of {@link CassandraIO}. */
@RunWith(JUnit4.class)
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
})
public class CassandraIOTest implements Serializable {
private static final long NUM_ROWS = 22L;
private static final String CASSANDRA_KEYSPACE = "beam_ks";
private static final String CASSANDRA_HOST = "127.0.0.1";
private static final String CASSANDRA_TABLE = "scientist";
private static final Logger LOG = LoggerFactory.getLogger(CassandraIOTest.class);
private static final String STORAGE_SERVICE_MBEAN = "org.apache.cassandra.db:type=StorageService";
private static final float ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE = 0.5f;
private static final int FLUSH_TIMEOUT = 30000;
private static final int JMX_CONF_TIMEOUT = 1000;
private static int jmxPort;
private static int cassandraPort;
private static Cluster cluster;
private static Session session;
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
@Rule public transient TestPipeline pipeline = TestPipeline.create();
private static CassandraShutDownHook shutdownHook;
@BeforeClass
public static void beforeClass() throws Exception {
jmxPort = NetworkTestHelper.getAvailableLocalPort();
shutdownHook = new CassandraShutDownHook();
String data = TEMPORARY_FOLDER.newFolder("data").getAbsolutePath();
String commitLog = TEMPORARY_FOLDER.newFolder("commit-log").getAbsolutePath();
String cdcRaw = TEMPORARY_FOLDER.newFolder("cdc-raw").getAbsolutePath();
String hints = TEMPORARY_FOLDER.newFolder("hints").getAbsolutePath();
String savedCache = TEMPORARY_FOLDER.newFolder("saved-cache").getAbsolutePath();
Files.createDirectories(Paths.get(savedCache));
CassandraEmbeddedServerBuilder builder =
CassandraEmbeddedServerBuilder.builder()
.withKeyspaceName(CASSANDRA_KEYSPACE)
.withDataFolder(data)
.withCommitLogFolder(commitLog)
.withCdcRawFolder(cdcRaw)
.withHintsFolder(hints)
.withSavedCachesFolder(savedCache)
.withShutdownHook(shutdownHook)
// randomized CQL port at startup
.withJMXPort(jmxPort)
.cleanDataFilesAtStartup(false);
// under load we get a NoHostAvailable exception at cluster creation,
// so retry to create it every 1 sec up to 3 times.
cluster = buildCluster(builder);
cassandraPort = cluster.getConfiguration().getProtocolOptions().getPort();
session = CassandraIOTest.cluster.newSession();
insertData();
disableAutoCompaction();
}
private static Cluster buildCluster(CassandraEmbeddedServerBuilder builder) {
int tried = 0;
int delay = 5000;
Exception exception = null;
while (tried < 5) {
try {
return builder.buildNativeCluster();
} catch (NoHostAvailableException e) {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
tried++;
try {
Thread.sleep(delay);
} catch (InterruptedException e1) {
Thread thread = Thread.currentThread();
thread.interrupt();
throw new RuntimeException(String.format("Thread %s was interrupted", thread.getName()));
}
}
}
throw new RuntimeException(
String.format(
"Unable to create embedded Cassandra cluster: tried %d times with %d delay",
tried, delay),
exception);
}
@AfterClass
public static void afterClass() throws InterruptedException, IOException {
shutdownHook.shutDownNow();
}
private static void insertData() throws Exception {
LOG.info("Create Cassandra tables");
session.execute(
String.format(
"CREATE TABLE IF NOT EXISTS %s.%s(person_department text, person_id int, person_name text, PRIMARY KEY"
+ "((person_department), person_id));",
CASSANDRA_KEYSPACE, CASSANDRA_TABLE));
session.execute(
String.format(
"CREATE TABLE IF NOT EXISTS %s.%s(person_department text, person_id int, person_name text, PRIMARY KEY"
+ "((person_department), person_id));",
CASSANDRA_KEYSPACE, CASSANDRA_TABLE_WRITE));
LOG.info("Insert records");
String[][] scientists = {
new String[] {"phys", "Einstein"},
new String[] {"bio", "Darwin"},
new String[] {"phys", "Copernicus"},
new String[] {"bio", "Pasteur"},
new String[] {"bio", "Curie"},
new String[] {"phys", "Faraday"},
new String[] {"math", "Newton"},
new String[] {"phys", "Bohr"},
new String[] {"phys", "Galileo"},
new String[] {"math", "Maxwell"},
new String[] {"logic", "Russel"},
};
for (int i = 0; i < NUM_ROWS; i++) {
int index = i % scientists.length;
String insertStr =
String.format(
"INSERT INTO %s.%s(person_department, person_id, person_name) values("
+ "'"
+ scientists[index][0]
+ "', "
+ i
+ ", '"
+ scientists[index][1]
+ "');",
CASSANDRA_KEYSPACE,
CASSANDRA_TABLE);
session.execute(insertStr);
}
flushMemTablesAndRefreshSizeEstimates();
}
/**
* Force the flush of cassandra memTables to SSTables to update size_estimates.
* https://wiki.apache.org/cassandra/MemtableSSTable This is what cassandra spark connector does
* through nodetool binary call. See:
* https://github.com/datastax/spark-cassandra-connector/blob/master/spark-cassandra-connector
* /src/it/scala/com/datastax/spark/connector/rdd/partitioner/DataSizeEstimatesSpec.scala which
* uses the same JMX service as bellow. See:
* https://github.com/apache/cassandra/blob/cassandra-3.X
* /src/java/org/apache/cassandra/tools/nodetool/Flush.java
*/
@SuppressWarnings("unused")
private static void flushMemTablesAndRefreshSizeEstimates() throws Exception {
JMXServiceURL url =
new JMXServiceURL(
String.format(
"service:jmx:rmi://%s/jndi/rmi://%s:%s/jmxrmi",
CASSANDRA_HOST, CASSANDRA_HOST, jmxPort));
JMXConnector jmxConnector = JMXConnectorFactory.connect(url, null);
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
ObjectName objectName = new ObjectName(STORAGE_SERVICE_MBEAN);
StorageServiceMBean mBeanProxy =
JMX.newMBeanProxy(mBeanServerConnection, objectName, StorageServiceMBean.class);
mBeanProxy.forceKeyspaceFlush(CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
mBeanProxy.refreshSizeEstimates();
jmxConnector.close();
Thread.sleep(FLUSH_TIMEOUT);
}
/**
* Disable auto compaction on embedded cassandra host, to avoid race condition in temporary files
* cleaning.
*/
@SuppressWarnings("unused")
private static void disableAutoCompaction() throws Exception {
JMXServiceURL url =
new JMXServiceURL(
String.format(
"service:jmx:rmi://%s/jndi/rmi://%s:%s/jmxrmi",
CASSANDRA_HOST, CASSANDRA_HOST, jmxPort));
JMXConnector jmxConnector = JMXConnectorFactory.connect(url, null);
MBeanServerConnection mBeanServerConnection = jmxConnector.getMBeanServerConnection();
ObjectName objectName = new ObjectName(STORAGE_SERVICE_MBEAN);
StorageServiceMBean mBeanProxy =
JMX.newMBeanProxy(mBeanServerConnection, objectName, StorageServiceMBean.class);
mBeanProxy.disableAutoCompaction(CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
jmxConnector.close();
Thread.sleep(JMX_CONF_TIMEOUT);
}
@Test
public void testRead() throws Exception {
PCollection<Scientist> output =
pipeline.apply(
CassandraIO.<Scientist>read()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withKeyspace(CASSANDRA_KEYSPACE)
.withTable(CASSANDRA_TABLE)
.withMinNumberOfSplits(50)
.withCoder(SerializableCoder.of(Scientist.class))
.withEntity(Scientist.class));
PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(NUM_ROWS);
PCollection<KV<String, Integer>> mapped =
output.apply(
MapElements.via(
new SimpleFunction<Scientist, KV<String, Integer>>() {
@Override
public KV<String, Integer> apply(Scientist scientist) {
return KV.of(scientist.name, scientist.id);
}
}));
PAssert.that(mapped.apply("Count occurrences per scientist", Count.perKey()))
.satisfies(
input -> {
int count = 0;
for (KV<String, Long> element : input) {
count++;
assertEquals(element.getKey(), NUM_ROWS / 10, element.getValue().longValue());
}
assertEquals(11, count);
return null;
});
pipeline.run();
}
private CassandraIO.Read<Scientist> getReadWithRingRange(RingRange... rr) {
return CassandraIO.<Scientist>read()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withRingRanges(new HashSet<>(Arrays.asList(rr)))
.withKeyspace(CASSANDRA_KEYSPACE)
.withTable(CASSANDRA_TABLE)
.withCoder(SerializableCoder.of(Scientist.class))
.withEntity(Scientist.class);
}
private CassandraIO.Read<Scientist> getReadWithQuery(String query) {
return CassandraIO.<Scientist>read()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withQuery(query)
.withKeyspace(CASSANDRA_KEYSPACE)
.withTable(CASSANDRA_TABLE)
.withCoder(SerializableCoder.of(Scientist.class))
.withEntity(Scientist.class);
}
@Test
public void testReadAllQuery() {
String physQuery =
String.format(
"SELECT * From %s.%s WHERE person_department='phys' AND person_id=0;",
CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
String mathQuery =
String.format(
"SELECT * From %s.%s WHERE person_department='math' AND person_id=6;",
CASSANDRA_KEYSPACE, CASSANDRA_TABLE);
PCollection<Scientist> output =
pipeline
.apply(Create.of(getReadWithQuery(physQuery), getReadWithQuery(mathQuery)))
.apply(
CassandraIO.<Scientist>readAll().withCoder(SerializableCoder.of(Scientist.class)));
PCollection<String> mapped =
output.apply(
MapElements.via(
new SimpleFunction<Scientist, String>() {
@Override
public String apply(Scientist scientist) {
return scientist.name;
}
}));
PAssert.that(mapped).containsInAnyOrder("Einstein", "Newton");
PAssert.thatSingleton(output.apply("count", Count.globally())).isEqualTo(2L);
pipeline.run();
}
@Test
public void testReadAllRingRange() {
RingRange physRR =
fromEncodedKey(
cluster.getMetadata(), TypeCodec.varchar().serialize("phys", ProtocolVersion.V3));
RingRange mathRR =
fromEncodedKey(
cluster.getMetadata(), TypeCodec.varchar().serialize("math", ProtocolVersion.V3));
RingRange logicRR =
fromEncodedKey(
cluster.getMetadata(), TypeCodec.varchar().serialize("logic", ProtocolVersion.V3));
PCollection<Scientist> output =
pipeline
.apply(Create.of(getReadWithRingRange(physRR), getReadWithRingRange(mathRR, logicRR)))
.apply(
CassandraIO.<Scientist>readAll().withCoder(SerializableCoder.of(Scientist.class)));
PCollection<KV<String, Integer>> mapped =
output.apply(
MapElements.via(
new SimpleFunction<Scientist, KV<String, Integer>>() {
@Override
public KV<String, Integer> apply(Scientist scientist) {
return KV.of(scientist.department, scientist.id);
}
}));
PAssert.that(mapped.apply("Count occurrences per department", Count.perKey()))
.satisfies(
input -> {
HashMap<String, Long> map = new HashMap<>();
for (KV<String, Long> element : input) {
map.put(element.getKey(), element.getValue());
}
assertEquals(3, map.size()); // do we have all three departments
assertEquals(map.get("phys"), 10L, 0L);
assertEquals(map.get("math"), 4L, 0L);
assertEquals(map.get("logic"), 2L, 0L);
return null;
});
pipeline.run();
}
@Test
public void testReadWithQuery() throws Exception {
PCollection<Scientist> output =
pipeline.apply(
CassandraIO.<Scientist>read()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withKeyspace(CASSANDRA_KEYSPACE)
.withTable(CASSANDRA_TABLE)
.withMinNumberOfSplits(20)
.withQuery(
"select person_id, writetime(person_name) from beam_ks.scientist where person_id=10 AND person_department='logic'")
.withCoder(SerializableCoder.of(Scientist.class))
.withEntity(Scientist.class));
PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(1L);
PAssert.that(output)
.satisfies(
input -> {
for (Scientist sci : input) {
assertNull(sci.name);
assertTrue(sci.nameTs != null && sci.nameTs > 0);
}
return null;
});
pipeline.run();
}
@Test
public void testWrite() {
ArrayList<ScientistWrite> data = new ArrayList<>();
for (int i = 0; i < NUM_ROWS; i++) {
ScientistWrite scientist = new ScientistWrite();
scientist.id = i;
scientist.name = "Name " + i;
scientist.department = "bio";
data.add(scientist);
}
pipeline
.apply(Create.of(data))
.apply(
CassandraIO.<ScientistWrite>write()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withKeyspace(CASSANDRA_KEYSPACE)
.withEntity(ScientistWrite.class));
// table to write to is specified in the entity in @Table annotation (in that case
// scientist_write)
pipeline.run();
List<Row> results = getRows(CASSANDRA_TABLE_WRITE);
assertEquals(NUM_ROWS, results.size());
for (Row row : results) {
assertTrue(row.getString("person_name").matches("Name (\\d*)"));
}
}
private static final AtomicInteger counter = new AtomicInteger();
private static class NOOPMapperFactory implements SerializableFunction<Session, Mapper> {
@Override
public Mapper apply(Session input) {
return new NOOPMapper();
}
}
private static class NOOPMapper implements Mapper<String>, Serializable {
private final ListeningExecutorService executor =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
final Callable<Void> asyncTask = () -> (null);
@Override
public Iterator map(ResultSet resultSet) {
if (!resultSet.isExhausted()) {
resultSet.iterator().forEachRemaining(r -> counter.getAndIncrement());
}
return Collections.emptyIterator();
}
@Override
public Future<Void> deleteAsync(String entity) {
counter.incrementAndGet();
return executor.submit(asyncTask);
}
@Override
public Future<Void> saveAsync(String entity) {
counter.incrementAndGet();
return executor.submit(asyncTask);
}
}
@Test
public void testReadWithMapper() throws Exception {
counter.set(0);
SerializableFunction<Session, Mapper> factory = new NOOPMapperFactory();
pipeline.apply(
CassandraIO.<String>read()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withKeyspace(CASSANDRA_KEYSPACE)
.withTable(CASSANDRA_TABLE)
.withCoder(SerializableCoder.of(String.class))
.withEntity(String.class)
.withMapperFactoryFn(factory));
pipeline.run();
assertEquals(NUM_ROWS, counter.intValue());
}
@Test
public void testCustomMapperImplWrite() throws Exception {
counter.set(0);
SerializableFunction<Session, Mapper> factory = new NOOPMapperFactory();
pipeline
.apply(Create.of(""))
.apply(
CassandraIO.<String>write()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withKeyspace(CASSANDRA_KEYSPACE)
.withMapperFactoryFn(factory)
.withEntity(String.class));
pipeline.run();
assertEquals(1, counter.intValue());
}
@Test
public void testCustomMapperImplDelete() {
counter.set(0);
SerializableFunction<Session, Mapper> factory = new NOOPMapperFactory();
pipeline
.apply(Create.of(""))
.apply(
CassandraIO.<String>delete()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withKeyspace(CASSANDRA_KEYSPACE)
.withMapperFactoryFn(factory)
.withEntity(String.class));
pipeline.run();
assertEquals(1, counter.intValue());
}
private List<Row> getRows(String table) {
ResultSet result =
session.execute(
String.format("select person_id,person_name from %s.%s", CASSANDRA_KEYSPACE, table));
return result.all();
}
@Test
public void testDelete() throws Exception {
List<Row> results = getRows(CASSANDRA_TABLE);
assertEquals(NUM_ROWS, results.size());
Scientist einstein = new Scientist();
einstein.id = 0;
einstein.department = "phys";
einstein.name = "Einstein";
pipeline
.apply(Create.of(einstein))
.apply(
CassandraIO.<Scientist>delete()
.withHosts(Collections.singletonList(CASSANDRA_HOST))
.withPort(cassandraPort)
.withKeyspace(CASSANDRA_KEYSPACE)
.withEntity(Scientist.class));
pipeline.run();
results = getRows(CASSANDRA_TABLE);
assertEquals(NUM_ROWS - 1, results.size());
// re-insert suppressed doc to make the test autonomous
session.execute(
String.format(
"INSERT INTO %s.%s(person_department, person_id, person_name) values("
+ "'phys', "
+ einstein.id
+ ", '"
+ einstein.name
+ "');",
CASSANDRA_KEYSPACE,
CASSANDRA_TABLE));
}
/** Simple Cassandra entity used in read tests. */
@Table(name = CASSANDRA_TABLE, keyspace = CASSANDRA_KEYSPACE)
static class Scientist implements Serializable {
@Column(name = "person_name")
String name;
@Computed("writetime(person_name)")
Long nameTs;
@ClusteringColumn()
@Column(name = "person_id")
int id;
@PartitionKey
@Column(name = "person_department")
String department;
@Override
public String toString() {
return id + ":" + name;
}
@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Scientist scientist = (Scientist) o;
return id == scientist.id
&& Objects.equal(name, scientist.name)
&& Objects.equal(department, scientist.department);
}
@Override
public int hashCode() {
return Objects.hashCode(name, id);
}
}
private static RingRange fromEncodedKey(Metadata metadata, ByteBuffer... bb) {
BigInteger bi = BigInteger.valueOf((long) metadata.newToken(bb).getValue());
return RingRange.of(bi, bi.add(BigInteger.valueOf(1L)));
}
private static final String CASSANDRA_TABLE_WRITE = "scientist_write";
/** Simple Cassandra entity used in write tests. */
@Table(name = CASSANDRA_TABLE_WRITE, keyspace = CASSANDRA_KEYSPACE)
static class ScientistWrite extends Scientist {}
}