blob: 72650725c7ece5673d9d0bafbc6f4cc468cf5c56 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.io;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.tests.integration.containers.CassandraContainer;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import java.util.List;
import java.util.Map;
import static org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase.randomName;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertEquals;
/**
* A tester for testing cassandra sink.
*/
@Slf4j
public class CassandraSinkTester extends SinkTester<CassandraContainer> {
public static CassandraSinkTester createTester(boolean builtin) {
if (builtin) {
return new CassandraSinkTester(builtin);
} else {
return new CassandraSinkTester();
}
}
private static final String NAME = "cassandra";
private static final String ROOTS = "cassandra";
private static final String KEY = "key";
private static final String COLUMN = "col";
private final String keySpace;
private final String tableName;
private Cluster cluster;
private Session session;
private CassandraSinkTester() {
super(NAME, "/pulsar/connectors/pulsar-io-cassandra-2.2.0-incubating-SNAPSHOT.nar", "org.apache.pulsar.io.cassandra.CassandraStringSink");
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.keySpace = "keySpace_" + suffix;
this.tableName = "tableName_" + suffix;
sinkConfig.put("roots", ROOTS);
sinkConfig.put("keyspace", keySpace);
sinkConfig.put("columnFamily", tableName);
sinkConfig.put("keyname", KEY);
sinkConfig.put("columnName", COLUMN);
}
private CassandraSinkTester(boolean builtin) {
super(NAME, SinkType.CASSANDRA);
String suffix = randomName(8) + "_" + System.currentTimeMillis();
this.keySpace = "keySpace_" + suffix;
this.tableName = "tableName_" + suffix;
sinkConfig.put("roots", ROOTS);
sinkConfig.put("keyspace", keySpace);
sinkConfig.put("columnFamily", tableName);
sinkConfig.put("keyname", KEY);
sinkConfig.put("columnName", COLUMN);
}
@Override
protected CassandraContainer createSinkService(PulsarCluster cluster) {
return new CassandraContainer(cluster.getClusterName());
}
@Override
public void prepareSink() {
// build the sink
cluster = Cluster.builder()
.addContactPoint("localhost")
.withPort(serviceContainer.getCassandraPort())
.build();
// connect to the cluster
session = cluster.connect();
log.info("Connecting to cassandra cluster at localhost:{}", serviceContainer.getCassandraPort());
String createKeySpace =
"CREATE KEYSPACE " + keySpace
+ " WITH replication = {'class':'SimpleStrategy', 'replication_factor':1}; ";
log.info(createKeySpace);
session.execute(createKeySpace);
session.execute("USE " + keySpace);
String createTable = "CREATE TABLE " + tableName
+ "(" + KEY + " text PRIMARY KEY, "
+ COLUMN + " text);";
log.info(createTable);
session.execute(createTable);
}
@Override
public void validateSinkResult(Map<String, String> kvs) {
String query = "SELECT * FROM " + tableName + ";";
ResultSet result = session.execute(query);
List<Row> rows = result.all();
assertEquals(kvs.size(), rows.size());
for (Row row : rows) {
String key = row.getString(KEY);
String value = row.getString(COLUMN);
String expectedValue = kvs.get(key);
assertNotNull(expectedValue);
assertEquals(expectedValue, value);
}
}
}