Improved cassandra appender
diff --git a/appender/cassandra/pom.xml b/appender/cassandra/pom.xml
index 68ea240..861bae1 100644
--- a/appender/cassandra/pom.xml
+++ b/appender/cassandra/pom.xml
@@ -33,13 +33,10 @@
<dependencies>
- <!-- Decanter API -->
<dependency>
<groupId>org.apache.karaf.decanter</groupId>
<artifactId>org.apache.karaf.decanter.api</artifactId>
</dependency>
-
- <!-- SLF4J -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
@@ -64,8 +61,14 @@
</exclusion>
</exclusions>
</dependency>
-
- <dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <version>1.7.21</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>${cassandra.version}</version>
diff --git a/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java b/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
index 6175f87..e3e7d17 100644
--- a/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
+++ b/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
@@ -23,6 +23,7 @@
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventConstants;
@@ -46,40 +47,41 @@
private final static Logger LOGGER = LoggerFactory.getLogger(CassandraAppender.class);
- private String cassandraHost;
- private Integer cassandraPort;
- private String cassandraUser;
- private String cassandraPassword;
private String keyspace;
private String tableName;
private Marshaller marshaller;
- private final static String createTableTemplate = "CREATE TABLE IF NOT EXISTS TABLENAME (timeStamp timestamp PRIMARY KEY, content Text);";
+ private final static String createTableTemplate = "CREATE TABLE IF NOT EXISTS %s (timeStamp timestamp PRIMARY KEY, content Text);";
- private final static String upsertQueryTemplate = "INSERT INTO TABLENAME(timeStamp, content) VALUES(?,?);";
+ private final static String insertTemplate = "INSERT INTO %s (timeStamp, content) VALUES(?,?);";
+
+ private Cluster cluster;
public CassandraAppender() {
}
- public CassandraAppender(Marshaller marshaller, String keyspace, String tableName, String cassandraHost,
- Integer cassandraPort, String cassandraUser, String cassandraPassword) {
- this.marshaller = marshaller;
- this.keyspace = keyspace;
- this.tableName = tableName;
- this.cassandraHost = cassandraHost;
- this.cassandraPort = cassandraPort;
- this.cassandraUser = cassandraUser;
- this.cassandraPassword = cassandraPassword;
- }
-
@SuppressWarnings("unchecked")
@Activate
public void activate(ComponentContext context) {
Dictionary<String, Object> config = context.getProperties();
+ activate(config);
+ }
+
+ void activate(Dictionary<String, Object> config) {
this.keyspace = getValue(config, "keyspace.name", "decanter");
this.tableName = getValue(config, "table.name", "decanter");
- this.cassandraHost = getValue(config, "cassandra.host", "localhost");
- this.cassandraPort = Integer.parseInt(getValue(config, "cassandra.port", "9042"));
+ String host = getValue(config, "cassandra.host", "localhost");
+ Integer port = Integer.parseInt(getValue(config, "cassandra.port", "9042"));
+ Builder clusterBuilder = Cluster.builder().addContactPoint(host);
+ if (port != null) {
+ clusterBuilder.withPort(port);
+ }
+ cluster = clusterBuilder.build();
+ }
+
+ @Deactivate
+ public void deactivate() {
+ cluster.close();
}
private String getValue(Dictionary<String, Object> config, String key, String defaultValue) {
@@ -90,40 +92,16 @@
@Override
public void handleEvent(Event event) {
LOGGER.trace("Looking for the Cassandra datasource");
- try (Session session = createSession()){
- ResultSet execute;
- try {
- execute = session.execute("USE " + keyspace + ";");
- } catch (InvalidQueryException e) {
- session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };");
- session.execute("USE " + keyspace + ";");
- }
-
- execute = session.execute("select columnfamily_name from system.schema_columnfamilies where keyspace_name = '"+keyspace+"';");
- List<Row> all = execute.all();
- boolean found = false;
- for(Row row : all) {
- String table = row.getString("columnfamily_name");
- if (table.equalsIgnoreCase(tableName)) {
- found = true;
- break;
- }
- }
- if (!found) {
- session.execute(createTableTemplate.replace("TABLENAME", tableName));
- LOGGER.debug("Table {} has been created", tableName);
- }
+ try (Session session = cluster.connect()){
+ useKeyspace(session, keyspace);
+ createTable(session, keyspace, tableName);
Long timestamp = (Long) event.getProperty("timestamp");
- java.util.Date date = timestamp != null ? new java.util.Date(timestamp) : new java.util.Date();
- String jsonSt = marshaller.marshal(event);
-
- String upsertQuery = upsertQueryTemplate.replaceAll("TABLENAME", tableName);
-
if (timestamp == null) {
timestamp = System.currentTimeMillis();
}
- session.execute(upsertQuery, timestamp, jsonSt);
+ String jsonSt = marshaller.marshal(event);
+ session.execute(String.format(insertTemplate, tableName), timestamp, jsonSt);
LOGGER.trace("Data inserted into {} table", tableName);
} catch (Exception e) {
@@ -131,17 +109,32 @@
}
}
- private Session createSession() {
- Session session;
- Builder clusterBuilder = Cluster.builder().addContactPoint(cassandraHost);
- if (cassandraPort != null) {
- clusterBuilder.withPort(cassandraPort);
+ private static void useKeyspace(Session session, String keyspace) {
+ try {
+ session.execute("USE " + keyspace + ";");
+ } catch (InvalidQueryException e) {
+ session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 };");
+ session.execute("USE " + keyspace + ";");
}
- Cluster cluster = clusterBuilder.build();
- session = cluster.connect();
- return session;
}
-
+
+ private static void createTable(Session session, String keyspace, String tableName) {
+ ResultSet execute = session.execute("select columnfamily_name from system.schema_columnfamilies where keyspace_name = '"+keyspace+"';");
+ List<Row> all = execute.all();
+ boolean found = false;
+ for(Row row : all) {
+ String table = row.getString("columnfamily_name");
+ if (table.equalsIgnoreCase(tableName)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ session.execute(String.format(createTableTemplate, tableName));
+ LOGGER.debug("Table {} has been created", tableName);
+ }
+ }
+
@Reference
public void setMarshaller(Marshaller marshaller) {
this.marshaller = marshaller;
diff --git a/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java b/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
index 5cd9e81..ec28736 100644
--- a/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
+++ b/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
@@ -5,7 +5,9 @@
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
+import java.util.Dictionary;
import java.util.HashMap;
+import java.util.Hashtable;
import java.util.List;
import java.util.Map;
@@ -30,7 +32,7 @@
public class CassandraAppenderTest {
private static final String KEYSPACE = "decanter";
- private static final int CASSANDRA_PORT = 9142;
+ private static final String CASSANDRA_PORT = "9142";
private static final String CASSANDRA_HOST = "localhost";
private static final String TABLE_NAME = "decanter";
private static final String TOPIC = "decanter/collect/jmx";
@@ -68,8 +70,14 @@
@Test
public void testHandleEvent() throws Exception {
Marshaller marshaller = new JsonMarshaller();
- CassandraAppender appender = new CassandraAppender(marshaller, KEYSPACE, TABLE_NAME, CASSANDRA_HOST,
- CASSANDRA_PORT, null, null);
+ CassandraAppender appender = new CassandraAppender();
+ Dictionary<String, Object> config = new Hashtable<String, Object>();
+ config.put("cassandra.host", CASSANDRA_HOST);
+ config.put("cassandra.port", CASSANDRA_PORT);
+ config.put("keyspace.name", KEYSPACE);
+ config.put("table.name", TABLE_NAME);
+ appender.setMarshaller(marshaller);
+ appender.activate(config);
Map<String, Object> properties = new HashMap<>();
properties.put(EventConstants.TIMESTAMP, TIMESTAMP);
@@ -90,7 +98,7 @@
private Session getSesion() {
Builder clusterBuilder = Cluster.builder().addContactPoint(CASSANDRA_HOST);
- clusterBuilder.withPort(CASSANDRA_PORT);
+ clusterBuilder.withPort(Integer.valueOf(CASSANDRA_PORT));
Cluster cluster = clusterBuilder.build();
return cluster.connect();