blob: 7e5a33acdeabd7f4ffac6568c6ff81bca232ed34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) since 2016 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.javadsl;
// #init-session
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.stream.connectors.cassandra.CassandraSessionSettings;
import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSession;
import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSessionRegistry;
// #init-session
// #cql
import org.apache.pekko.stream.connectors.cassandra.javadsl.CassandraSource;
// #cql
import org.apache.pekko.stream.connectors.cassandra.scaladsl.CassandraAccess;
import org.apache.pekko.stream.connectors.testkit.javadsl.LogCapturingJunit4;
import org.apache.pekko.stream.javadsl.Sink;
// #statement
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
// #statement
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static docs.javadsl.CassandraTestHelper.await;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
public class CassandraSourceTest {
static final String TEST_NAME = "CassandraSourceTest";
static CassandraTestHelper helper;
static String idtable;
static final List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
@Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4();
@BeforeClass
public static void beforeAll() throws InterruptedException, ExecutionException, TimeoutException {
helper = new CassandraTestHelper(TEST_NAME);
idtable = helper.keyspaceName + ".inttable";
await(
helper.cassandraAccess.withSchemaMetadataDisabled(
() ->
helper
.cassandraAccess
.lifecycleSession()
.executeDDL(
"CREATE TABLE IF NOT EXISTS " + idtable + " (id int PRIMARY KEY);")));
await(
helper.cassandraAccess.executeCqlList(
data.stream()
.map(i -> "INSERT INTO " + idtable + "(id) VALUES (" + i + ")")
.collect(Collectors.toList())));
}
@AfterClass
public static void afterAll() {
helper.shutdown();
}
CassandraSession cassandraSession = helper.cassandraSession;
CassandraAccess cassandraAccess = helper.cassandraAccess;
@Test
public void createSession() throws InterruptedException, ExecutionException, TimeoutException {
// #init-session
ActorSystem system = // ???
// #init-session
helper.system;
// #init-session
CassandraSessionSettings sessionSettings = CassandraSessionSettings.create();
CassandraSession cassandraSession =
CassandraSessionRegistry.get(system).sessionFor(sessionSettings);
CompletionStage<String> version =
cassandraSession
.select("SELECT release_version FROM system.local;")
.map(row -> row.getString("release_version"))
.runWith(Sink.head(), system);
// #init-session
assertThat(await(version).isEmpty(), is(false));
}
@Test
public void select() throws InterruptedException, ExecutionException, TimeoutException {
ActorSystem system = helper.system;
// #cql
CompletionStage<List<Integer>> select =
CassandraSource.create(cassandraSession, "SELECT id FROM " + idtable + ";")
.map(r -> r.getInt("id"))
.runWith(Sink.seq(), system);
// #cql
List<Integer> rows = await(select);
assertThat(new ArrayList<>(rows), hasItems(data.toArray()));
}
@Test
public void selectVarArgs() throws InterruptedException, ExecutionException, TimeoutException {
ActorSystem system = helper.system;
int value = 5;
// #cql
CompletionStage<Integer> select =
CassandraSource.create(
cassandraSession, "SELECT * FROM " + idtable + " WHERE id = ?;", value)
.map(r -> r.getInt("id"))
.runWith(Sink.head(), system);
// #cql
assertThat(await(select), is(value));
}
@Test
public void statement() throws InterruptedException, ExecutionException, TimeoutException {
ActorSystem system = helper.system;
int value = 5;
// #statement
Statement<?> stmt =
SimpleStatement.newInstance("SELECT * FROM " + idtable + ";").setPageSize(20);
CompletionStage<List<Integer>> select =
CassandraSource.create(cassandraSession, stmt)
.map(r -> r.getInt("id"))
.runWith(Sink.seq(), system);
// #statement
assertThat(new ArrayList<>(await(select)), hasItems(data.toArray()));
}
public void compileOnlyDiscovery() {
ActorSystem system = helper.system;
// #discovery
CassandraSessionSettings sessionSettings =
CassandraSessionSettings.create("example-with-pekko-discovery");
CassandraSession session = CassandraSessionRegistry.get(system).sessionFor(sessionSettings);
// #discovery
session.close(system.dispatcher());
}
}