Merge pull request #93 from raviu/master

[KARAF-6342] adding location disable configuration to karaf decanter co…
diff --git a/alerting/alerter/email/pom.xml b/alerting/alerter/email/pom.xml
index fd06440..c277d7e 100644
--- a/alerting/alerter/email/pom.xml
+++ b/alerting/alerter/email/pom.xml
@@ -37,7 +37,20 @@
         <dependency>
             <groupId>javax.mail</groupId>
             <artifactId>javax.mail-api</artifactId>
-            <version>1.5.6</version>
+            <version>1.6.0</version>
+        </dependency>
+
+        <!-- testing -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.sun.mail</groupId>
+            <artifactId>javax.mail</artifactId>
+            <version>1.6.0</version>
+            <scope>test</scope>
         </dependency>
     </dependencies>
 
diff --git a/alerting/alerter/email/src/main/java/org/apache/karaf/decanter/alerting/email/EmailAlerter.java b/alerting/alerter/email/src/main/java/org/apache/karaf/decanter/alerting/email/EmailAlerter.java
index 02f7568..b0c2ee2 100644
--- a/alerting/alerter/email/src/main/java/org/apache/karaf/decanter/alerting/email/EmailAlerter.java
+++ b/alerting/alerter/email/src/main/java/org/apache/karaf/decanter/alerting/email/EmailAlerter.java
@@ -42,20 +42,29 @@
 
     private final static Logger LOGGER = LoggerFactory.getLogger(EmailAlerter.class);
 
-    private String from;
-    private String to;
+    private String from = null;
+    private String to = null;
+    private String cc = null;
+    private String bcc = null;
+    private String subject = null;
+    private String body = null;
+    private String bodyType = null;
 
     private Properties properties;
 
     @SuppressWarnings("unchecked")
     public void activate(ComponentContext context) throws ConfigurationException {
-        Dictionary<String, Object> config = context.getProperties();
-        requireProperty(config, "from");
-        requireProperty(config, "to");
-        requireProperty(config, "host");
+        activate(context.getProperties());
+    }
 
-        this.from = (String) config.get("from");
-        this.to = (String) config.get("to");
+    protected void activate(Dictionary<String, Object> config) throws ConfigurationException {
+        from = (config.get("from") != null) ? config.get("from").toString() : null;
+        to = (config.get("to") != null) ? config.get("to").toString() : null;
+        subject = (config.get("subject") != null) ? config.get("subject").toString() : null;
+        body = (config.get("body") != null) ? config.get("body").toString() : null;
+        bodyType = (config.get("body.type") != null) ? config.get("body.type").toString() : "text/plain";
+        cc = (config.get("cc") != null) ? config.get("cc").toString() : null;
+        bcc = (config.get("bcc")  != null) ? config.get("bcc").toString() : null;
 
         properties = new Properties();
         properties.put("mail.smtp.host", config.get("host"));
@@ -84,18 +93,106 @@
         Session session = Session.getDefaultInstance(properties);
         MimeMessage message = new MimeMessage(session);
         try {
-            message.setFrom(new InternetAddress(from));
-            message.addRecipients(Message.RecipientType.TO, to);
+            // set from
+            if (event.getProperty("from") != null) {
+                message.setFrom(new InternetAddress(event.getProperty("from").toString()));
+            } else if (event.getProperty("alert.email.from") != null) {
+                message.setFrom(new InternetAddress(event.getProperty("alert.email.from").toString()));
+            } else if (from != null){
+                message.setFrom(from);
+            } else {
+                message.setFrom("decanter@karaf.apache.org");
+            }
+            // set to
+            if (event.getProperty("to") != null) {
+                message.addRecipients(Message.RecipientType.TO, event.getProperty("to").toString());
+            } else if (event.getProperty("alert.email.to") != null) {
+                message.addRecipients(Message.RecipientType.TO, event.getProperty("alert.email.to").toString());
+            } else if (to != null) {
+                message.addRecipients(Message.RecipientType.TO, to);
+            } else {
+                LOGGER.warn("to destination is not defined");
+                return;
+            }
+            // set cc
+            if (event.getProperty("cc") != null) {
+                message.addRecipients(Message.RecipientType.CC, event.getProperty("cc").toString());
+            } else if (event.getProperty("alert.email.cc") != null) {
+                message.addRecipients(Message.RecipientType.CC, event.getProperty("alert.email.cc").toString());
+            } else if (cc != null) {
+                message.addRecipients(Message.RecipientType.CC, cc);
+            }
+            // set bcc
+            if (event.getProperty("bcc") != null) {
+                message.addRecipients(Message.RecipientType.BCC, event.getProperty("bcc").toString());
+            } else if (event.getProperty("alert.email.bcc") != null) {
+                message.addRecipients(Message.RecipientType.BCC, event.getProperty("alert.email.bcc").toString());
+            } else if (bcc != null) {
+                message.addRecipients(Message.RecipientType.BCC, bcc);
+            }
+            // set subject
+            setSubject(message, event);
+            // set body
+            setBody(message, event);
+            // send email
+            if (properties.get("mail.smtp.user") != null) {
+                Transport.send(message, (String) properties.get("mail.smtp.user"), (String) properties.get("mail.smtp.password"));
+            } else {
+                Transport.send(message);
+            }
+        } catch (Exception e) {
+            LOGGER.error("Can't send the alert e-mail", e);
+        }
+    }
+
+    /**
+     * Visible for testing.
+     */
+    protected void setSubject(MimeMessage message, Event event) throws Exception {
+        if (event.getProperty("subject") != null) {
+            message.setSubject(interpolation(event.getProperty("subject").toString(), event));
+        } else if (event.getProperty("alert.email.subject") != null) {
+            message.setSubject(interpolation(event.getProperty("alert.email.subject").toString(), event));
+        } else if (subject != null) {
+            message.setSubject(interpolation(subject, event));
+        } else {
             String alertLevel = (String) event.getProperty("alertLevel");
             String alertAttribute = (String) event.getProperty("alertAttribute");
             String alertPattern = (String) event.getProperty("alertPattern");
-            boolean recovery = (boolean) event.getProperty("alertBackToNormal");
+            boolean recovery = false;
+            if (event.getProperty("alertBackToNormal") != null) {
+                recovery = (boolean) event.getProperty("alertBackToNormal");
+            }
             if (!recovery) {
                 message.setSubject("[" + alertLevel + "] Alert on " + alertAttribute);
             } else {
                 message.setSubject("Alert on " + alertAttribute + " back to normal");
             }
-            StringBuilder builder = new StringBuilder();
+        }
+    }
+
+    /**
+     * Visible for testing.
+     */
+    protected void setBody(MimeMessage message, Event event) throws Exception {
+        String contentType = bodyType;
+        contentType = (event.getProperty("body.type") != null) ? event.getProperty("body.type").toString() : contentType;
+        contentType = (event.getProperty("alert.email.body.type") != null) ? event.getProperty("alert.email.body.type").toString() : contentType;
+        StringBuilder builder = new StringBuilder();
+        if (event.getProperty("body") != null) {
+            builder.append(interpolation(event.getProperty("body").toString(), event));
+        } else if (event.getProperty("alert.email.body") != null) {
+            builder.append(interpolation(event.getProperty("alert.email.body").toString(), event));
+        } else if (body != null) {
+            builder.append(interpolation(body, event));
+        } else {
+            String alertLevel = (String) event.getProperty("alertLevel");
+            String alertAttribute = (String) event.getProperty("alertAttribute");
+            String alertPattern = (String) event.getProperty("alertPattern");
+            boolean recovery = false;
+            if (event.getProperty("alertBackToNormal") != null) {
+                recovery = (boolean) event.getProperty("alertBackToNormal");
+            }
             if (!recovery) {
                 builder.append(alertLevel + " alert: " + alertAttribute + " is out of the pattern " + alertPattern + "\n");
             } else {
@@ -106,15 +203,20 @@
             for (String name : event.getPropertyNames()) {
                 builder.append("\t").append(name).append(": ").append(event.getProperty(name)).append("\n");
             }
-            message.setText(builder.toString());
-            if (properties.get("mail.smtp.user") != null) {
-                Transport.send(message, (String) properties.get("mail.smtp.user"), (String) properties.get("mail.smtp.password"));
-            } else {
-                Transport.send(message);
-            }
-        } catch (Exception e) {
-            LOGGER.error("Can't send the alert e-mail", e);
         }
+        message.setText(builder.toString(), contentType);
+    }
+
+    /**
+     * Visible for testing
+     * @return the interpolated string
+     */
+    protected static String interpolation(String source, Event event) {
+        String interpolated = source;
+        for (String propertyName : event.getPropertyNames()) {
+            interpolated = interpolated.replaceAll("\\$\\{" + propertyName + "\\}", event.getProperty(propertyName).toString());
+        }
+        return interpolated;
     }
 
 }
diff --git a/alerting/alerter/email/src/test/java/org/apache/karaf/decanter/alerting/email/EmailAlerterTest.java b/alerting/alerter/email/src/test/java/org/apache/karaf/decanter/alerting/email/EmailAlerterTest.java
new file mode 100644
index 0000000..0f6081b
--- /dev/null
+++ b/alerting/alerter/email/src/test/java/org/apache/karaf/decanter/alerting/email/EmailAlerterTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.alerting.email;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.service.event.Event;
+
+import javax.mail.Session;
+import javax.mail.internet.MimeMessage;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Properties;
+
+public class EmailAlerterTest {
+
+    @Test
+    public void testInterpolation() throws Exception {
+        String source = "This is ${test} of the ${other} processing";
+        Map<String, Object> data = new HashMap<>();
+        data.put("test", "a test");
+        data.put("other", "interpolation");
+        data.put("not_used", "not_used");
+        Event event = new Event("topic", data);
+        String result = EmailAlerter.interpolation(source, event);
+        Assert.assertEquals("This is a test of the interpolation processing", result);
+    }
+
+    @Test
+    public void testSetSubjectWithEventProperties() throws Exception {
+        EmailAlerter emailAlerter = new EmailAlerter();
+        Hashtable<String, Object> componentConfig = new Hashtable<>();
+        componentConfig.put("host", "");
+        componentConfig.put("port", "8888");
+        componentConfig.put("auth", "false");
+        componentConfig.put("starttls", "false");
+        componentConfig.put("ssl", "false");
+        emailAlerter.activate(componentConfig);
+        Properties properties = new Properties();
+        Session session = Session.getDefaultInstance(properties);
+        MimeMessage message = new MimeMessage(session);
+        HashMap<String, Object> data = new HashMap<>();
+        data.put("subject", "Test subject");
+        Event event = new Event("topic", data);
+        emailAlerter.setSubject(message, event);
+        Assert.assertEquals("Test subject", message.getSubject());
+    }
+
+    @Test
+    public void testSetSubjectWithEventInterpolatedProperties() throws Exception {
+        EmailAlerter emailAlerter = new EmailAlerter();
+        Hashtable<String, Object> componentConfig = new Hashtable<>();
+        componentConfig.put("host", "");
+        componentConfig.put("port", "8888");
+        componentConfig.put("auth", "false");
+        componentConfig.put("starttls", "false");
+        componentConfig.put("ssl", "false");
+        componentConfig.put("subject", "not used subject");
+        emailAlerter.activate(componentConfig);
+        Properties properties = new Properties();
+        Session session = Session.getDefaultInstance(properties);
+        MimeMessage message = new MimeMessage(session);
+        HashMap<String, Object> data = new HashMap<>();
+        data.put("subject", "Test ${alert} subject");
+        data.put("alert", "alerting");
+        Event event = new Event("topic", data);
+        emailAlerter.setSubject(message, event);
+        Assert.assertEquals("Test alerting subject", message.getSubject());
+    }
+
+    @Test
+    public void testSetSubjectWithComponentProperties() throws Exception {
+        EmailAlerter emailAlerter = new EmailAlerter();
+        Hashtable<String, Object> componentConfig = new Hashtable<>();
+        componentConfig.put("host", "");
+        componentConfig.put("port", "8888");
+        componentConfig.put("auth", "false");
+        componentConfig.put("starttls", "false");
+        componentConfig.put("ssl", "false");
+        componentConfig.put("subject", "This is my subject");
+        emailAlerter.activate(componentConfig);
+        Properties properties = new Properties();
+        Session session = Session.getDefaultInstance(properties);
+        MimeMessage message = new MimeMessage(session);
+        Event event = new Event("topic", new HashMap<>());
+        emailAlerter.setSubject(message, event);
+        Assert.assertEquals("This is my subject", message.getSubject());
+    }
+
+    @Test
+    public void testSetSubjectWithComponentInterpolatedProperties() throws Exception {
+        EmailAlerter emailAlerter = new EmailAlerter();
+        Hashtable<String, Object> componentConfig = new Hashtable<>();
+        componentConfig.put("host", "");
+        componentConfig.put("port", "8888");
+        componentConfig.put("auth", "false");
+        componentConfig.put("starttls", "false");
+        componentConfig.put("ssl", "false");
+        componentConfig.put("subject", "This is my ${my.subject}");
+        emailAlerter.activate(componentConfig);
+        Properties properties = new Properties();
+        Session session = Session.getDefaultInstance(properties);
+        MimeMessage message = new MimeMessage(session);
+        HashMap<String, Object> data = new HashMap<>();
+        data.put("my.subject", "subject");
+        Event event = new Event("topic", data);
+        emailAlerter.setSubject(message, event);
+        Assert.assertEquals("This is my subject", message.getSubject());
+    }
+
+    @Test
+    public void testSetSubjectFallback() throws Exception {
+        EmailAlerter emailAlerter = new EmailAlerter();
+        Hashtable<String, Object> componentConfig = new Hashtable<>();
+        componentConfig.put("host", "");
+        componentConfig.put("port", "8888");
+        componentConfig.put("auth", "false");
+        componentConfig.put("starttls", "false");
+        componentConfig.put("ssl", "false");
+        emailAlerter.activate(componentConfig);
+        Properties properties = new Properties();
+        Session session = Session.getDefaultInstance(properties);
+        MimeMessage message = new MimeMessage(session);
+        HashMap<String, Object> data = new HashMap<>();
+        Event event = new Event("topic", data);
+        emailAlerter.setSubject(message, event);
+        Assert.assertTrue(message.getSubject().contains("Alert on null"));
+    }
+
+    @Test
+    public void testSetBodyWithEventProperties() throws Exception {
+        EmailAlerter emailAlerter = new EmailAlerter();
+        Hashtable<String, Object> componentConfig = new Hashtable<>();
+        componentConfig.put("host", "");
+        componentConfig.put("port", "8888");
+        componentConfig.put("auth", "false");
+        componentConfig.put("starttls", "false");
+        componentConfig.put("ssl", "false");
+        emailAlerter.activate(componentConfig);
+        Properties properties = new Properties();
+        Session session = Session.getDefaultInstance(properties);
+        MimeMessage message = new MimeMessage(session);
+        HashMap<String, Object> data = new HashMap<>();
+        data.put("body", "Test body");
+        Event event = new Event("topic", data);
+        emailAlerter.setBody(message, event);
+        Assert.assertEquals("Test body", message.getContent().toString());
+    }
+
+    @Test
+    public void testSetBodyWithEventInterpolatedProperties() throws Exception {
+        EmailAlerter emailAlerter = new EmailAlerter();
+        Hashtable<String, Object> componentConfig = new Hashtable<>();
+        componentConfig.put("host", "");
+        componentConfig.put("port", "8888");
+        componentConfig.put("auth", "false");
+        componentConfig.put("starttls", "false");
+        componentConfig.put("ssl", "false");
+        componentConfig.put("body", "not used body");
+        emailAlerter.activate(componentConfig);
+        Properties properties = new Properties();
+        Session session = Session.getDefaultInstance(properties);
+        MimeMessage message = new MimeMessage(session);
+        HashMap<String, Object> data = new HashMap<>();
+        data.put("body", "Test ${alert} body");
+        data.put("alert", "alerting");
+        Event event = new Event("topic", data);
+        emailAlerter.setBody(message, event);
+        Assert.assertEquals("Test alerting body", message.getContent().toString());
+    }
+
+    @Test
+    public void testSetBodyWithComponentProperties() throws Exception {
+        EmailAlerter emailAlerter = new EmailAlerter();
+        Hashtable<String, Object> componentConfig = new Hashtable<>();
+        componentConfig.put("host", "");
+        componentConfig.put("port", "8888");
+        componentConfig.put("auth", "false");
+        componentConfig.put("starttls", "false");
+        componentConfig.put("ssl", "false");
+        componentConfig.put("body", "This is the email body");
+        emailAlerter.activate(componentConfig);
+        Properties properties = new Properties();
+        Session session = Session.getDefaultInstance(properties);
+        MimeMessage message = new MimeMessage(session);
+        Event event = new Event("topic", new HashMap<>());
+        emailAlerter.setBody(message, event);
+        Assert.assertEquals("This is the email body", message.getContent().toString());
+    }
+
+    @Test
+    public void testSetBodyWithComponentInterpolatedProperties() throws Exception {
+        EmailAlerter emailAlerter = new EmailAlerter();
+        Hashtable<String, Object> componentConfig = new Hashtable<>();
+        componentConfig.put("host", "");
+        componentConfig.put("port", "8888");
+        componentConfig.put("auth", "false");
+        componentConfig.put("starttls", "false");
+        componentConfig.put("ssl", "false");
+        componentConfig.put("body", "This is the email ${my.body}");
+        emailAlerter.activate(componentConfig);
+        Properties properties = new Properties();
+        Session session = Session.getDefaultInstance(properties);
+        MimeMessage message = new MimeMessage(session);
+        HashMap<String, Object> data = new HashMap<>();
+        data.put("my.body", "body");
+        Event event = new Event("topic", data);
+        emailAlerter.setBody(message, event);
+        Assert.assertEquals("This is the email body", message.getContent().toString());
+    }
+
+    @Test
+    public void testSetBodyWithFallback() throws Exception {
+        EmailAlerter emailAlerter = new EmailAlerter();
+        Hashtable<String, Object> componentConfig = new Hashtable<>();
+        componentConfig.put("host", "");
+        componentConfig.put("port", "8888");
+        componentConfig.put("auth", "false");
+        componentConfig.put("starttls", "false");
+        componentConfig.put("ssl", "false");
+        emailAlerter.activate(componentConfig);
+        Properties properties = new Properties();
+        Session session = Session.getDefaultInstance(properties);
+        MimeMessage message = new MimeMessage(session);
+        HashMap<String, Object> data = new HashMap<>();
+        data.put("my.body", "unused");
+        Event event = new Event("topic", data);
+        emailAlerter.setBody(message, event);
+        Assert.assertTrue(message.getContent().toString().contains("out of the pattern"));
+    }
+
+}
diff --git a/alerting/checker/pom.xml b/alerting/checker/pom.xml
index f4b7638..f76e583 100644
--- a/alerting/checker/pom.xml
+++ b/alerting/checker/pom.xml
@@ -33,6 +33,14 @@
     <packaging>bundle</packaging>
     <name>Apache Karaf :: Decanter :: Alerting :: Checker</name>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
     <build>
         <plugins>
             <plugin>
diff --git a/alerting/checker/src/main/java/org/apache/karaf/decanter/alerting/checker/AlertStoreImpl.java b/alerting/checker/src/main/java/org/apache/karaf/decanter/alerting/checker/AlertStoreImpl.java
index 218d934..d1ebe81 100644
--- a/alerting/checker/src/main/java/org/apache/karaf/decanter/alerting/checker/AlertStoreImpl.java
+++ b/alerting/checker/src/main/java/org/apache/karaf/decanter/alerting/checker/AlertStoreImpl.java
@@ -16,24 +16,72 @@
  */
 package org.apache.karaf.decanter.alerting.checker;
 
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @Component(
         immediate = true
 )
 public class AlertStoreImpl implements AlertStore {
 
+    private Logger logger = LoggerFactory.getLogger(AlertStoreImpl.class);
+
     private Set<String> errorAlerts;
     private Set<String> warnAlerts;
+    private File file;
 
     @Activate
     public void activate() {
         this.errorAlerts = new HashSet<>();
         this.warnAlerts = new HashSet<>();
+
+        // store the data file in $KARAF_DATA/decanter/alerter.db
+        file = new File(System.getProperty("karaf.data") + File.separator + "decanter" + File.separator + "alerter.db");
+
+        if (file.exists()) {
+            try {
+                Files.lines(file.toPath())
+                        .forEach(line -> {
+                            if (line.startsWith(Level.error.name().concat(":"))) {
+                                this.errorAlerts.add(line.replaceFirst(Level.error.name().concat(":"), ""));
+                            } else if (line.startsWith(Level.warn.name().concat(":"))) {
+                                this.warnAlerts.add(line.replaceFirst(Level.warn.name().concat(":"), ""));
+                            } else {
+                                logger.error("Level unknow in line '{}'", line);
+                            }
+                        });
+            } catch (IOException exception) {
+                logger.error("Error while reading alerter store file!");
+            }
+        }
+    }
+
+    @Deactivate
+    public void deactivate() {
+        try {
+            // build data to write based on level for prefix
+            Set<String> data = new HashSet<>();
+            this.errorAlerts.stream().forEach(value -> data.add(Level.error.name().concat(":").concat(value)));
+            this.warnAlerts.stream().forEach(value -> data.add(Level.warn.name().concat(":").concat(value)));
+
+            // create directories if not exists
+            Files.createDirectories(file.getParentFile().toPath());
+
+            // write data
+            Files.write(file.toPath(), data.stream().collect(Collectors.toSet()), StandardOpenOption.CREATE);
+        } catch (IOException exception) {
+            logger.error("Error while writing alerter store file!");
+        }
     }
 
     public void add(String name, Level level) {
diff --git a/alerting/checker/src/test/java/org/apache/karaf/decanter/alerting/checker/AlertStoreImplTest.java b/alerting/checker/src/test/java/org/apache/karaf/decanter/alerting/checker/AlertStoreImplTest.java
new file mode 100644
index 0000000..1c40a18
--- /dev/null
+++ b/alerting/checker/src/test/java/org/apache/karaf/decanter/alerting/checker/AlertStoreImplTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.alerting.checker;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardOpenOption;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertStoreImplTest {
+
+    Logger logger = LoggerFactory.getLogger(AlertStoreImplTest.class);
+
+    @Before
+    public void setUp() {
+        System.setProperty("karaf.data", "target/data");
+    }
+
+    @Test
+    public void testWithoutInitFile() {
+        AlertStoreImpl alertStore = new AlertStoreImpl();
+        alertStore.activate();
+
+        alertStore.add("log service unavailable", AlertStore.Level.error);
+        Assert.assertTrue(alertStore.known("log service unavailable", AlertStore.Level.error));
+
+        alertStore.add("file service stopped", AlertStore.Level.warn);
+        Assert.assertTrue(alertStore.known("file service stopped", AlertStore.Level.warn));
+
+        alertStore.deactivate();
+
+        File file = new File(System.getProperty("karaf.data") + File.separator + "decanter" + File.separator + "alerter.db");
+        Assert.assertTrue(file.exists());
+
+        try {
+            Assert.assertEquals(2, Files.lines(file.toPath()).toArray().length);
+        } catch (IOException exception) {
+            logger.error("error while opening alerter db file!");
+        }
+    }
+
+    @Test
+    public void testWithInitFile() throws IOException {
+        File file = new File(System.getProperty("karaf.data") + File.separator + "decanter" + File.separator + "alerter.db");
+        Files.createDirectories(file.getParentFile().toPath());
+        Files.write(file.toPath(),
+                    Stream.of("error:log service unavailable", "warn:file service stopped").collect(Collectors.toSet()),
+                StandardOpenOption.CREATE);
+        Assert.assertTrue(file.exists());
+
+        AlertStoreImpl alertStore = new AlertStoreImpl();
+        alertStore.activate();
+
+        Assert.assertTrue(alertStore.known("log service unavailable", AlertStore.Level.error));
+        Assert.assertTrue(alertStore.known("file service stopped", AlertStore.Level.warn));
+
+        alertStore.deactivate();
+
+        Assert.assertEquals(2, Files.lines(file.toPath()).toArray().length);
+    }
+
+}
\ No newline at end of file
diff --git a/api/src/main/java/org/apache/karaf/decanter/api/marshaller/Marshaller.java b/api/src/main/java/org/apache/karaf/decanter/api/marshaller/Marshaller.java
index 74809a7..17e5a95 100644
--- a/api/src/main/java/org/apache/karaf/decanter/api/marshaller/Marshaller.java
+++ b/api/src/main/java/org/apache/karaf/decanter/api/marshaller/Marshaller.java
@@ -19,7 +19,7 @@
 import java.io.OutputStream;
 
 public interface Marshaller {
-    public static String SERVICE_KEY_DATAFORMAT = "dataFormat";
+    public static final String SERVICE_KEY_DATAFORMAT = "dataFormat";
 
     void marshal(Object obj, OutputStream out);
     String marshal(Object obj);
diff --git a/appender/cassandra/pom.xml b/appender/cassandra/pom.xml
index 92c1336..c8c69d9 100644
--- a/appender/cassandra/pom.xml
+++ b/appender/cassandra/pom.xml
@@ -46,8 +46,8 @@
         </dependency>
         
         <dependency>
-			<groupId>com.datastax.cassandra</groupId>
-			<artifactId>cassandra-driver-core</artifactId>
+			<groupId>com.datastax.oss</groupId>
+			<artifactId>java-driver-core</artifactId>
 			<version>${cassandra.driver.version}</version>
 			<exclusions>
 				<exclusion>
@@ -64,11 +64,16 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
+		<dependency>
+			<groupId>com.datastax.oss</groupId>
+			<artifactId>java-driver-query-builder</artifactId>
+			<version>${cassandra.driver.version}</version>
+		</dependency>
 
         <dependency>
         	<groupId>org.slf4j</groupId>
         	<artifactId>slf4j-jdk14</artifactId>
-        	<version>1.7.21</version>
+        	<version>1.7.26</version>
         	<scope>test</scope>
         </dependency>
         <dependency>
@@ -119,7 +124,7 @@
         <dependency>
 			<artifactId>guava</artifactId>
 			<groupId>com.google.guava</groupId>
-			<version>18.0</version>
+			<version>25.1-jre</version>
 			<scope>test</scope>
 		</dependency>
 		<dependency>
@@ -131,7 +136,7 @@
 		<dependency>
 			<groupId>io.netty</groupId>
 			<artifactId>netty-transport-native-epoll</artifactId>
-			<version>4.0.27.Final</version>
+			<version>4.1.34.Final</version>
 			<scope>test</scope>
 		</dependency>
 
diff --git a/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java b/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
index e6ad035..5860bf6 100644
--- a/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
+++ b/appender/cassandra/src/main/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppender.java
@@ -16,9 +16,18 @@
  */
 package org.apache.karaf.decanter.appender.cassandra;
 
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.datastax.oss.driver.api.core.cql.Statement;
+import com.datastax.oss.driver.api.core.servererrors.InvalidQueryException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Dictionary;
 import java.util.List;
-
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
 import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.osgi.service.component.ComponentContext;
@@ -32,12 +41,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-import com.datastax.driver.core.exceptions.InvalidQueryException;
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
 
 @Component(
     name = "org.apache.karaf.decanter.appender.cassandra",
@@ -46,15 +51,15 @@
 )
 public class CassandraAppender implements EventHandler {
 
-    public static String KEYSPACE_PROPERTY = "keyspace.name";
-    public static String TABLE_PROPERTY = "table.name";
-    public static String CASSANDRA_HOST_PROPERTY = "cassandra.host";
-    public static String CASSANDRA_PORT_PROPERTY = "cassandra.port";
+    public static final String KEYSPACE_PROPERTY = "keyspace.name";
+    public static final String TABLE_PROPERTY = "table.name";
+    public static final String CASSANDRA_HOST_PROPERTY = "cassandra.host";
+    public static final String CASSANDRA_PORT_PROPERTY = "cassandra.port";
 
-    public static String KEYSPACE_DEFAULT = "decanter";
-    public static String TABLE_DEFAULT = "decanter";
-    public static String CASSANDRA_HOST_DEFAULT = "localhost";
-    public static String CASSANDRA_PORT_DEFAULT = "9042";
+    public static final String KEYSPACE_DEFAULT = "decanter";
+    public static final String TABLE_DEFAULT = "decanter";
+    public static final String CASSANDRA_HOST_DEFAULT = "localhost";
+    public static final String CASSANDRA_PORT_DEFAULT = "9042";
 
     private final static Logger LOGGER = LoggerFactory.getLogger(CassandraAppender.class);
 
@@ -65,9 +70,11 @@
 
     private final static String createTableTemplate = "CREATE TABLE IF NOT EXISTS %s (timeStamp timestamp PRIMARY KEY, content Text);";
 
-    private final static String insertTemplate = "INSERT INTO %s (timeStamp, content) VALUES(?,?);";
+    private CqlSession session;
 
-    private Cluster cluster;
+    private String keyspace;
+
+    private String tableName;
 
     public CassandraAppender() {
     }
@@ -83,16 +90,85 @@
         this.config = config;
         String host = getValue(config, CASSANDRA_HOST_PROPERTY, CASSANDRA_HOST_DEFAULT);
         Integer port = Integer.parseInt(getValue(config, CASSANDRA_PORT_PROPERTY, CASSANDRA_PORT_DEFAULT));
-        Builder clusterBuilder = Cluster.builder().addContactPoint(host);
-        if (port != null) {
-            clusterBuilder.withPort(port);
-        }
-        cluster = clusterBuilder.build();
+        this.keyspace = getValue(config, KEYSPACE_PROPERTY, KEYSPACE_DEFAULT);
+        this.tableName = getValue(config, TABLE_PROPERTY, TABLE_DEFAULT);
+
+        DriverConfigLoader loader =
+                DriverConfigLoader.programmaticBuilder()
+                        .withStringList(DefaultDriverOption.CONTACT_POINTS, Arrays.asList(host + ":" + port))
+                        .withString(DefaultDriverOption.PROTOCOL_VERSION, "V3")
+                        .withString(DefaultDriverOption.PROTOCOL_MAX_FRAME_LENGTH, "256 MB")
+                        .withString(DefaultDriverOption.SESSION_NAME, "decanter")
+                        .withString(DefaultDriverOption.SESSION_KEYSPACE, keyspace)
+                        .withString(DefaultDriverOption.CONFIG_RELOAD_INTERVAL, "0")
+                        .withString(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, "500 milliseconds")
+                        .withString(DefaultDriverOption.CONNECTION_SET_KEYSPACE_TIMEOUT, "500 milliseconds")
+                        .withString(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL, "200 milliseconds")
+                        .withString(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, "10 seconds")
+                        .withBoolean(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, true)
+                        .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 1)
+                        .withInt(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, 1)
+                        .withInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS, 1024)
+                        .withInt(DefaultDriverOption.CONNECTION_MAX_ORPHAN_REQUESTS, 24576)
+                        .withString(DefaultDriverOption.HEARTBEAT_INTERVAL, "30 seconds")
+                        .withString(DefaultDriverOption.HEARTBEAT_TIMEOUT, "500 milliseconds")
+                        .withString(DefaultDriverOption.COALESCER_INTERVAL, "10 microseconds")
+                        .withInt(DefaultDriverOption.COALESCER_MAX_RUNS, 5)
+                        .withString(DefaultDriverOption.RECONNECTION_POLICY_CLASS, "ExponentialReconnectionPolicy")
+                        .withString(DefaultDriverOption.RECONNECTION_BASE_DELAY, "1 second")
+                        .withString(DefaultDriverOption.RECONNECTION_MAX_DELAY, "60 seconds")
+                        .withBoolean(DefaultDriverOption.RECONNECT_ON_INIT, true)
+                        .withString(DefaultDriverOption.LOAD_BALANCING_POLICY, "")
+                        .withString(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS, "DefaultLoadBalancingPolicy")
+                        .withString(DefaultDriverOption.RETRY_POLICY, "")
+                        .withString(DefaultDriverOption.RETRY_POLICY_CLASS, "DefaultRetryPolicy")
+                        .withString(DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY, "")
+                        .withString(DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS, "NoSpeculativeExecutionPolicy")
+                        .withString(DefaultDriverOption.ADDRESS_TRANSLATOR_CLASS, "PassThroughAddressTranslator")
+                        .withString(DefaultDriverOption.METADATA_SCHEMA_CHANGE_LISTENER_CLASS, "NoopSchemaChangeListener")
+                        .withString(DefaultDriverOption.METADATA_NODE_STATE_LISTENER_CLASS, "NoopNodeStateListener")
+                        .withString(DefaultDriverOption.REQUEST_TRACKER_CLASS, "NoopRequestTracker")
+                        .withString(DefaultDriverOption.REQUEST_THROTTLER_CLASS, "PassThroughRequestThrottler")
+                        .withBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE, false)
+                        .withString(DefaultDriverOption.REQUEST_CONSISTENCY, "LOCAL_ONE")
+                        .withInt(DefaultDriverOption.REQUEST_PAGE_SIZE, 5000)
+                        .withString(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY, "SERIAL")
+                        .withString(DefaultDriverOption.TIMESTAMP_GENERATOR_CLASS, "AtomicTimestampGenerator")
+                        .withBoolean(DefaultDriverOption.SOCKET_TCP_NODELAY, true)
+                        .withString(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, "500 milliseconds")
+                        .withInt(DefaultDriverOption.NETTY_IO_SIZE, 0)
+                        .withInt(DefaultDriverOption.NETTY_IO_SHUTDOWN_QUIET_PERIOD, 2)
+                        .withInt(DefaultDriverOption.NETTY_IO_SHUTDOWN_TIMEOUT, 15)
+                        .withString(DefaultDriverOption.NETTY_IO_SHUTDOWN_UNIT, "SECONDS")
+                        .withInt(DefaultDriverOption.NETTY_ADMIN_SIZE, 2)
+                        .withInt(DefaultDriverOption.NETTY_ADMIN_SHUTDOWN_QUIET_PERIOD, 2)
+                        .withInt(DefaultDriverOption.NETTY_ADMIN_SHUTDOWN_TIMEOUT, 15)
+                        .withString(DefaultDriverOption.NETTY_ADMIN_SHUTDOWN_UNIT, "SECONDS")
+                        .withString(DefaultDriverOption.NETTY_TIMER_TICK_DURATION, "100 milliseconds")
+                        .withInt(DefaultDriverOption.NETTY_TIMER_TICKS_PER_WHEEL, 2048)
+                        .withBoolean(DefaultDriverOption.METADATA_SCHEMA_ENABLED, true)
+                        .withString(DefaultDriverOption.METADATA_SCHEMA_WINDOW, "1 second")
+                        .withInt(DefaultDriverOption.METADATA_SCHEMA_MAX_EVENTS, 20)
+                        .withString(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, "500 milliseconds")
+                        .withInt(DefaultDriverOption.METADATA_SCHEMA_REQUEST_PAGE_SIZE, 5000)
+                        .withBoolean(DefaultDriverOption.METADATA_TOKEN_MAP_ENABLED, true)
+                        .withString(DefaultDriverOption.METADATA_TOPOLOGY_WINDOW,"1 second")
+                        .withInt(DefaultDriverOption.METADATA_TOPOLOGY_MAX_EVENTS,20)
+                        .withStringList(DefaultDriverOption.METRICS_SESSION_ENABLED, new ArrayList<>())
+                        .withStringList(DefaultDriverOption.METRICS_NODE_ENABLED, new ArrayList<>())
+                        .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(5))
+                        .build();
+        session = CqlSession.builder()
+                .withClassLoader(CqlSession.class.getClassLoader())
+                .withConfigLoader(loader)
+                .withLocalDatacenter("datacenter1").build();
+        useKeyspace(session, keyspace);
+        createTable(session, keyspace, tableName);
     }
     
     @Deactivate
     public void deactivate() {
-        cluster.close();
+        session.close();
     }
     
     private String getValue(Dictionary<String, Object> config, String key, String defaultValue) {
@@ -104,18 +180,18 @@
     public void handleEvent(Event event) {
         if (EventFilter.match(event, config)) {
             LOGGER.trace("Looking for the Cassandra datasource");
-            try (Session session = cluster.connect()) {
-                String keyspace = getValue(config, KEYSPACE_PROPERTY, KEYSPACE_DEFAULT);
-                String tableName = getValue(config, TABLE_PROPERTY, TABLE_DEFAULT);
-                useKeyspace(session, keyspace);
-                createTable(session, keyspace, tableName);
-
+            try {
                 Long timestamp = (Long) event.getProperty("timestamp");
                 if (timestamp == null) {
                     timestamp = System.currentTimeMillis();
                 }
                 String jsonSt = marshaller.marshal(event);
-                session.execute(String.format(insertTemplate, tableName), timestamp, jsonSt);
+                Statement stmt =
+                    insertInto(keyspace, tableName)
+                            .value("timestamp", literal(timestamp))
+                            .value("content", literal(jsonSt))
+                            .build();
+                session.execute(stmt);
 
                 LOGGER.trace("Data inserted into {} table", tableName);
             } catch (Exception e) {
@@ -124,7 +200,7 @@
         }
     }
 
-    private static void useKeyspace(Session session, String keyspace) {
+    private static void useKeyspace(CqlSession session, String keyspace) {
         try {
             session.execute("USE " + keyspace + ";");
         } catch (InvalidQueryException e) {
@@ -133,12 +209,12 @@
         }
     }
 
-    private static void createTable(Session session, String keyspace, String tableName) {
-        ResultSet execute = session.execute("select columnfamily_name from system.schema_columnfamilies where keyspace_name = '"+keyspace+"';");
+    private static void createTable(CqlSession session, String keyspace, String tableName) {
+        ResultSet execute = session.execute("select table_name from system_schema.tables where keyspace_name = '"+keyspace+"';");
         List<Row> all = execute.all();
         boolean found = false;
         for(Row row : all) {
-            String table = row.getString("columnfamily_name");
+            String table = row.getString("table_name");
             if (table.equalsIgnoreCase(tableName)) {
                 found = true;
                 break;
diff --git a/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java b/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
index 9f4bc37..f279f8f 100644
--- a/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
+++ b/appender/cassandra/src/test/java/org/apache/karaf/decanter/appender/cassandra/CassandraAppenderTest.java
@@ -16,80 +16,84 @@
  */
 package org.apache.karaf.decanter.appender.cassandra;
 
-import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
 
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
+import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
+import com.datastax.oss.driver.api.core.cql.ResultSet;
+import com.datastax.oss.driver.api.core.cql.Row;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.karaf.decanter.api.marshaller.Marshaller;
 import org.apache.karaf.decanter.appender.utils.EventFilter;
 import org.apache.karaf.decanter.marshaller.json.JsonMarshaller;
-import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datastax.driver.core.Cluster;
-import com.datastax.driver.core.Cluster.Builder;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
-import com.datastax.driver.core.Session;
-
 public class CassandraAppenderTest {
 
     private static final String KEYSPACE = "decanter";
-    private static final String CASSANDRA_PORT = "9142";
+    private static final String CASSANDRA_PORT = "9042";
     private static final String CASSANDRA_HOST = "localhost";
     private static final String TABLE_NAME = "decanter";
     private static final String TOPIC = "decanter/collect/jmx";
     private static final long TIMESTAMP = 1454428780634L;
 
     private static final Logger logger = LoggerFactory.getLogger(CassandraAppenderTest.class);
-    private CassandraDaemon cassandraDaemon;
+    private static CassandraDaemon cassandraDaemon;
     
-    @Before
-    public void setUp() throws Exception {
+    @BeforeClass
+    public static void setUp() throws Exception {
 
-        System.setProperty("cassandra-foreground", "false");
         System.setProperty("cassandra.boot_without_jna", "true");
+        System.setProperty("cassandra.storagedir", "target/data/cassandra/embedded");
 
-        cassandraDaemon = new CassandraDaemon(true);
-        logger.info("starting cassandra deamon");
-        cassandraDaemon.init(null);
-        cassandraDaemon.start();
-        
-        logger.info("cassandra up and runnign");
-        
+        cassandraDaemon = new CassandraDaemon(false);
+        logger.info("starting cassandra daemon");
+        cassandraDaemon.activate();
+        logger.info("cassandra up and running");
+        CqlSession session = getSession();
+        session.execute(
+                "CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE
+                        + " WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
+        session.close();
+        logger.info("default Keyspace 'decanter' created");
     }
 
-    @After
-    public void tearDown() throws Exception {
+    @AfterClass
+    public static void tearDown() throws Exception {
         Schema.instance.clear();
         logger.info("stopping cassandra");
         cassandraDaemon.stop();
-        logger.info("destroying the cassandra deamon");
+        logger.info("destroying the cassandra daemon");
         cassandraDaemon.destroy();
         logger.info("cassandra is removed");
         cassandraDaemon = null;
     }
 
     @Test
-    public void test() throws Exception {
+    public void test() {
         Marshaller marshaller = new JsonMarshaller();
         CassandraAppender appender = new CassandraAppender();
-        Dictionary<String, Object> config = new Hashtable<String, Object>();
+        Dictionary<String, Object> config = new Hashtable<>();
         config.put(CassandraAppender.CASSANDRA_PORT_PROPERTY, CASSANDRA_HOST);
         config.put(CassandraAppender.CASSANDRA_PORT_PROPERTY, CASSANDRA_PORT);
         config.put(CassandraAppender.KEYSPACE_PROPERTY, KEYSPACE);
@@ -102,21 +106,22 @@
         Event event = new Event(TOPIC, properties);
         
         appender.handleEvent(event);
-        
-        Session session = getSession();
-        
+        appender.deactivate();
+
+        CqlSession session = getSession();
+
         ResultSet execute = session.execute("SELECT * FROM "+ KEYSPACE+"."+TABLE_NAME+";");
         List<Row> all = execute.all();
         Assert.assertEquals(1, all.size());
         assertThat(all, not(nullValue()));
         
-        assertThat(all.get(0).getTimestamp("timeStamp").getTime(), is(TIMESTAMP));
+        assertThat(all.get(0).getInstant("timeStamp").toEpochMilli(), is(TIMESTAMP));
         
         session.close();
     }
 
     @Test
-    public void testWithFilter() throws Exception {
+    public void testWithFilter() {
         Marshaller marshaller = new JsonMarshaller();
         CassandraAppender appender = new CassandraAppender();
         Dictionary<String, Object> config = new Hashtable<>();
@@ -149,25 +154,90 @@
         event = new Event(TOPIC, data);
 
         appender.handleEvent(event);
+        appender.deactivate();
 
-        Session session = getSession();
+        CqlSession session = getSession();
 
         ResultSet execute = session.execute("SELECT * FROM "+ KEYSPACE+"."+TABLE_NAME+";");
         List<Row> all = execute.all();
         Assert.assertEquals(1, all.size());
         assertThat(all, not(nullValue()));
 
-        assertThat(all.get(0).getTimestamp("timeStamp").getTime(), is(TIMESTAMP));
+        assertThat(all.get(0).getInstant("timeStamp").toEpochMilli(), is(TIMESTAMP));
 
         session.close();
     }
 
-    private Session getSession() {
-        Builder clusterBuilder = Cluster.builder().addContactPoint(CASSANDRA_HOST);
-        clusterBuilder.withPort(Integer.valueOf(CASSANDRA_PORT));
+    private static CqlSession getSession() {
 
-        Cluster cluster = clusterBuilder.build();
-        return cluster.connect();
+        DriverConfigLoader loader =
+                DriverConfigLoader.programmaticBuilder()
+                        .withStringList(DefaultDriverOption.CONTACT_POINTS, Arrays.asList(CASSANDRA_HOST + ":" + CASSANDRA_PORT))
+                        .withString(DefaultDriverOption.PROTOCOL_VERSION, "V3")
+                        .withString(DefaultDriverOption.PROTOCOL_MAX_FRAME_LENGTH, "256 MB")
+                        .withString(DefaultDriverOption.SESSION_NAME, "decanter")
+                        .withString(DefaultDriverOption.CONFIG_RELOAD_INTERVAL, "0")
+                        .withString(DefaultDriverOption.CONNECTION_INIT_QUERY_TIMEOUT, "500 milliseconds")
+                        .withString(DefaultDriverOption.CONNECTION_SET_KEYSPACE_TIMEOUT, "500 milliseconds")
+                        .withString(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_INTERVAL, "200 milliseconds")
+                        .withString(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_TIMEOUT, "10 seconds")
+                        .withBoolean(DefaultDriverOption.CONTROL_CONNECTION_AGREEMENT_WARN, true)
+                        .withInt(DefaultDriverOption.CONNECTION_POOL_LOCAL_SIZE, 1)
+                        .withInt(DefaultDriverOption.CONNECTION_POOL_REMOTE_SIZE, 1)
+                        .withInt(DefaultDriverOption.CONNECTION_MAX_REQUESTS, 1024)
+                        .withInt(DefaultDriverOption.CONNECTION_MAX_ORPHAN_REQUESTS, 24576)
+                        .withString(DefaultDriverOption.HEARTBEAT_INTERVAL, "30 seconds")
+                        .withString(DefaultDriverOption.HEARTBEAT_TIMEOUT, "500 milliseconds")
+                        .withString(DefaultDriverOption.COALESCER_INTERVAL, "10 microseconds")
+                        .withInt(DefaultDriverOption.COALESCER_MAX_RUNS, 5)
+                        .withString(DefaultDriverOption.RECONNECTION_POLICY_CLASS, "ExponentialReconnectionPolicy")
+                        .withString(DefaultDriverOption.RECONNECTION_BASE_DELAY, "1 second")
+                        .withString(DefaultDriverOption.RECONNECTION_MAX_DELAY, "60 seconds")
+                        .withBoolean(DefaultDriverOption.RECONNECT_ON_INIT, true)
+                        .withString(DefaultDriverOption.LOAD_BALANCING_POLICY, "")
+                        .withString(DefaultDriverOption.LOAD_BALANCING_POLICY_CLASS, "DefaultLoadBalancingPolicy")
+                        .withString(DefaultDriverOption.RETRY_POLICY, "")
+                        .withString(DefaultDriverOption.RETRY_POLICY_CLASS, "DefaultRetryPolicy")
+                        .withString(DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY, "")
+                        .withString(DefaultDriverOption.SPECULATIVE_EXECUTION_POLICY_CLASS, "NoSpeculativeExecutionPolicy")
+                        .withString(DefaultDriverOption.ADDRESS_TRANSLATOR_CLASS, "PassThroughAddressTranslator")
+                        .withString(DefaultDriverOption.METADATA_SCHEMA_CHANGE_LISTENER_CLASS, "NoopSchemaChangeListener")
+                        .withString(DefaultDriverOption.METADATA_NODE_STATE_LISTENER_CLASS, "NoopNodeStateListener")
+                        .withString(DefaultDriverOption.REQUEST_TRACKER_CLASS, "NoopRequestTracker")
+                        .withString(DefaultDriverOption.REQUEST_THROTTLER_CLASS, "PassThroughRequestThrottler")
+                        .withBoolean(DefaultDriverOption.REQUEST_DEFAULT_IDEMPOTENCE, false)
+                        .withString(DefaultDriverOption.REQUEST_CONSISTENCY, "LOCAL_ONE")
+                        .withInt(DefaultDriverOption.REQUEST_PAGE_SIZE, 5000)
+                        .withString(DefaultDriverOption.REQUEST_SERIAL_CONSISTENCY, "SERIAL")
+                        .withString(DefaultDriverOption.TIMESTAMP_GENERATOR_CLASS, "AtomicTimestampGenerator")
+                        .withBoolean(DefaultDriverOption.SOCKET_TCP_NODELAY, true)
+                        .withString(DefaultDriverOption.CONTROL_CONNECTION_TIMEOUT, "500 milliseconds")
+                        .withInt(DefaultDriverOption.NETTY_IO_SIZE, 0)
+                        .withInt(DefaultDriverOption.NETTY_IO_SHUTDOWN_QUIET_PERIOD, 2)
+                        .withInt(DefaultDriverOption.NETTY_IO_SHUTDOWN_TIMEOUT, 15)
+                        .withString(DefaultDriverOption.NETTY_IO_SHUTDOWN_UNIT, "SECONDS")
+                        .withInt(DefaultDriverOption.NETTY_ADMIN_SIZE, 2)
+                        .withInt(DefaultDriverOption.NETTY_ADMIN_SHUTDOWN_QUIET_PERIOD, 2)
+                        .withInt(DefaultDriverOption.NETTY_ADMIN_SHUTDOWN_TIMEOUT, 15)
+                        .withString(DefaultDriverOption.NETTY_ADMIN_SHUTDOWN_UNIT, "SECONDS")
+                        .withString(DefaultDriverOption.NETTY_TIMER_TICK_DURATION, "100 milliseconds")
+                        .withInt(DefaultDriverOption.NETTY_TIMER_TICKS_PER_WHEEL, 2048)
+                        .withBoolean(DefaultDriverOption.METADATA_SCHEMA_ENABLED, true)
+                        .withString(DefaultDriverOption.METADATA_SCHEMA_WINDOW, "1 second")
+                        .withInt(DefaultDriverOption.METADATA_SCHEMA_MAX_EVENTS, 20)
+                        .withString(DefaultDriverOption.METADATA_SCHEMA_REQUEST_TIMEOUT, "500 milliseconds")
+                        .withInt(DefaultDriverOption.METADATA_SCHEMA_REQUEST_PAGE_SIZE, 5000)
+                        .withBoolean(DefaultDriverOption.METADATA_TOKEN_MAP_ENABLED, true)
+                        .withString(DefaultDriverOption.METADATA_TOPOLOGY_WINDOW,"1 second")
+                        .withInt(DefaultDriverOption.METADATA_TOPOLOGY_MAX_EVENTS,20)
+                        .withStringList(DefaultDriverOption.METRICS_SESSION_ENABLED, new ArrayList<>())
+                        .withStringList(DefaultDriverOption.METRICS_NODE_ENABLED, new ArrayList<>())
+                        .withDuration(DefaultDriverOption.REQUEST_TIMEOUT, Duration.ofSeconds(5))
+                        .build();
+        return CqlSession.builder()
+                .withClassLoader(CqlSession.class.getClassLoader())
+                .withConfigLoader(loader)
+                .withLocalDatacenter("datacenter1").build();
     }
     
 }
diff --git a/appender/cassandra/src/test/resources/cassandra-rackdc.properties b/appender/cassandra/src/test/resources/cassandra-rackdc.properties
new file mode 100644
index 0000000..e4f17cd
--- /dev/null
+++ b/appender/cassandra/src/test/resources/cassandra-rackdc.properties
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# These properties are used with GossipingPropertyFileSnitch and will
+# indicate the rack and dc for this node
+dc=datacenter1
+rack=rack1
+
+# Add a suffix to a datacenter name. Used by the Ec2Snitch and Ec2MultiRegionSnitch
+# to append a string to the EC2 region name.
+#dc_suffix=
+
+# Uncomment the following line to make this snitch prefer the internal ip when possible, as the Ec2MultiRegionSnitch does.
+# prefer_local=true
diff --git a/appender/cassandra/src/test/resources/cassandra.yaml b/appender/cassandra/src/test/resources/cassandra.yaml
index f5da43e..a90f5a1 100644
--- a/appender/cassandra/src/test/resources/cassandra.yaml
+++ b/appender/cassandra/src/test/resources/cassandra.yaml
@@ -26,41 +26,89 @@
 
 # The name of the cluster. This is mainly used to prevent machines in
 # one logical cluster from joining another.
-cluster_name: 'Decanter'
+cluster_name: 'Test Cluster'
 
-# You should always specify InitialToken when setting up a production
-# cluster for the first time, and often when adding capacity later.
-# The principle is that each node should be given an equal slice of
-# the token ring; see http://wiki.apache.org/cassandra/Operations
-# for more details.
+# This defines the number of tokens randomly assigned to this node on the ring
+# The more tokens, relative to other nodes, the larger the proportion of data
+# that this node will store. You probably want all nodes to have the same number
+# of tokens assuming they have equal hardware capability.
 #
-# If blank, Cassandra will request a token bisecting the range of
-# the heaviest-loaded existing node.  If there is no load information
-# available, such as is the case with a new cluster, it will pick
-# a random token, which will lead to hot spots.
-#initial_token:
+# If you leave this unspecified, Cassandra will use the default of 1 token for legacy compatibility,
+# and will use the initial_token as described below.
+#
+# Specifying initial_token will override this setting on the node's initial start,
+# on subsequent starts, this setting will apply even if initial token is set.
+#
+# If you already have a cluster with 1 token per node, and wish to migrate to 
+# multiple tokens per node, see http://wiki.apache.org/cassandra/Operations
+num_tokens: 256
+
+# Triggers automatic allocation of num_tokens tokens for this node. The allocation
+# algorithm attempts to choose tokens in a way that optimizes replicated load over
+# the nodes in the datacenter for the replication strategy used by the specified
+# keyspace.
+#
+# The load assigned to each node will be close to proportional to its number of
+# vnodes.
+#
+# Only supported with the Murmur3Partitioner.
+# allocate_tokens_for_keyspace: KEYSPACE
+
+# initial_token allows you to specify tokens manually.  While you can use it with
+# vnodes (num_tokens > 1, above) -- in which case you should provide a 
+# comma-separated list -- it's primarily used when adding nodes to legacy clusters 
+# that do not have vnodes enabled.
+# initial_token:
 
 # See http://wiki.apache.org/cassandra/HintedHandoff
+# May either be "true" or "false" to enable globally
 hinted_handoff_enabled: true
+
+# When hinted_handoff_enabled is true, a black list of data centers that will not
+# perform hinted handoff
+# hinted_handoff_disabled_datacenters:
+#    - DC1
+#    - DC2
+
 # this defines the maximum amount of time a dead host will have hints
 # generated.  After it has been dead this long, new hints for it will not be
 # created until it has been seen alive and gone down again.
 max_hint_window_in_ms: 10800000 # 3 hours
+
 # Maximum throttle in KBs per second, per delivery thread.  This will be
 # reduced proportionally to the number of nodes in the cluster.  (If there
 # are two nodes in the cluster, each delivery thread will use the maximum
 # rate; if there are three, each will throttle to half of the maximum,
 # since we expect two nodes to be delivering hints simultaneously.)
 hinted_handoff_throttle_in_kb: 1024
+
 # Number of threads with which to deliver hints;
 # Consider increasing this number when you have multi-dc deployments, since
 # cross-dc handoff tends to be slower
 max_hints_delivery_threads: 2
 
-# The following setting populates the page cache on memtable flush and compaction
-# WARNING: Enable this setting only when the whole node's data fits in memory.
-# Defaults to: false
-# populate_io_cache_on_flush: false
+# Directory where Cassandra should store hints.
+# If not set, the default directory is $CASSANDRA_HOME/data/hints.
+# hints_directory: /var/lib/cassandra/hints
+
+# How often hints should be flushed from the internal buffers to disk.
+# Will *not* trigger fsync.
+hints_flush_period_in_ms: 10000
+
+# Maximum size for a single hints file, in megabytes.
+max_hints_file_size_in_mb: 128
+
+# Compression to apply to the hint files. If omitted, hints files
+# will be written uncompressed. LZ4, Snappy, and Deflate compressors
+# are supported.
+#hints_compression:
+#   - class_name: LZ4Compressor
+#     parameters:
+#         -
+
+# Maximum throttle in KBs per second, total. This will be
+# reduced proportionally to the number of nodes in the cluster.
+batchlog_replay_throttle_in_kb: 1024
 
 # Authentication backend, implementing IAuthenticator; used to identify users
 # Out of the box, Cassandra provides org.apache.cassandra.auth.{AllowAllAuthenticator,
@@ -68,8 +116,9 @@
 #
 # - AllowAllAuthenticator performs no checks - set it to disable authentication.
 # - PasswordAuthenticator relies on username/password pairs to authenticate
-#   users. It keeps usernames and hashed passwords in system_auth.credentials table.
+#   users. It keeps usernames and hashed passwords in system_auth.roles table.
 #   Please increase system_auth keyspace replication factor if you use this authenticator.
+#   If using PasswordAuthenticator, CassandraRoleManager must also be used (see below)
 authenticator: AllowAllAuthenticator
 
 # Authorization backend, implementing IAuthorizer; used to limit access/provide permissions
@@ -77,64 +126,180 @@
 # CassandraAuthorizer}.
 #
 # - AllowAllAuthorizer allows any action to any user - set it to disable authorization.
-# - CassandraAuthorizer stores permissions in system_auth.permissions table. Please
+# - CassandraAuthorizer stores permissions in system_auth.role_permissions table. Please
 #   increase system_auth keyspace replication factor if you use this authorizer.
 authorizer: AllowAllAuthorizer
 
+# Part of the Authentication & Authorization backend, implementing IRoleManager; used
+# to maintain grants and memberships between roles.
+# Out of the box, Cassandra provides org.apache.cassandra.auth.CassandraRoleManager,
+# which stores role information in the system_auth keyspace. Most functions of the
+# IRoleManager require an authenticated login, so unless the configured IAuthenticator
+# actually implements authentication, most of this functionality will be unavailable.
+#
+# - CassandraRoleManager stores role data in the system_auth keyspace. Please
+#   increase system_auth keyspace replication factor if you use this role manager.
+role_manager: CassandraRoleManager
+
+# Validity period for roles cache (fetching granted roles can be an expensive
+# operation depending on the role manager, CassandraRoleManager is one example)
+# Granted roles are cached for authenticated sessions in AuthenticatedUser and
+# after the period specified here, become eligible for (async) reload.
+# Defaults to 2000, set to 0 to disable caching entirely.
+# Will be disabled automatically for AllowAllAuthenticator.
+roles_validity_in_ms: 2000
+
+# Refresh interval for roles cache (if enabled).
+# After this interval, cache entries become eligible for refresh. Upon next
+# access, an async reload is scheduled and the old value returned until it
+# completes. If roles_validity_in_ms is non-zero, then this must be
+# also.
+# Defaults to the same value as roles_validity_in_ms.
+# roles_update_interval_in_ms: 2000
+
 # Validity period for permissions cache (fetching permissions can be an
 # expensive operation depending on the authorizer, CassandraAuthorizer is
 # one example). Defaults to 2000, set to 0 to disable.
 # Will be disabled automatically for AllowAllAuthorizer.
 permissions_validity_in_ms: 2000
 
+# Refresh interval for permissions cache (if enabled).
+# After this interval, cache entries become eligible for refresh. Upon next
+# access, an async reload is scheduled and the old value returned until it
+# completes. If permissions_validity_in_ms is non-zero, then this must be
+# also.
+# Defaults to the same value as permissions_validity_in_ms.
+# permissions_update_interval_in_ms: 2000
 
-# The partitioner is responsible for distributing rows (by key) across
-# nodes in the cluster.  Any IPartitioner may be used, including your
-# own as long as it is on the classpath.  Out of the box, Cassandra
-# provides org.apache.cassandra.dht.{Murmur3Partitioner, RandomPartitioner
-# ByteOrderedPartitioner, OrderPreservingPartitioner (deprecated)}.
+# Validity period for credentials cache. This cache is tightly coupled to
+# the provided PasswordAuthenticator implementation of IAuthenticator. If
+# another IAuthenticator implementation is configured, this cache will not
+# be automatically used and so the following settings will have no effect.
+# Please note, credentials are cached in their encrypted form, so while
+# activating this cache may reduce the number of queries made to the
+# underlying table, it may not  bring a significant reduction in the
+# latency of individual authentication attempts.
+# Defaults to 2000, set to 0 to disable credentials caching.
+credentials_validity_in_ms: 2000
+
+# Refresh interval for credentials cache (if enabled).
+# After this interval, cache entries become eligible for refresh. Upon next
+# access, an async reload is scheduled and the old value returned until it
+# completes. If credentials_validity_in_ms is non-zero, then this must be
+# also.
+# Defaults to the same value as credentials_validity_in_ms.
+# credentials_update_interval_in_ms: 2000
+
+# The partitioner is responsible for distributing groups of rows (by
+# partition key) across nodes in the cluster.  You should leave this
+# alone for new clusters.  The partitioner can NOT be changed without
+# reloading all data, so when upgrading you should set this to the
+# same partitioner you were already using.
 #
-# - RandomPartitioner distributes rows across the cluster evenly by md5.
-#   This is the default prior to 1.2 and is retained for compatibility.
-# - Murmur3Partitioner is similar to RandomPartioner but uses Murmur3_128
-#   Hash Function instead of md5.  When in doubt, this is the best option.
-# - ByteOrderedPartitioner orders rows lexically by key bytes.  BOP allows
-#   scanning rows in key order, but the ordering can generate hot spots
-#   for sequential insertion workloads.
-# - OrderPreservingPartitioner is an obsolete form of BOP, that stores
-# - keys in a less-efficient format and only works with keys that are
-#   UTF8-encoded Strings.
-# - CollatingOPP collates according to EN,US rules rather than lexical byte
-#   ordering.  Use this as an example if you need custom collation.
+# Besides Murmur3Partitioner, partitioners included for backwards
+# compatibility include RandomPartitioner, ByteOrderedPartitioner, and
+# OrderPreservingPartitioner.
 #
-# See http://wiki.apache.org/cassandra/Operations for more on
-# partitioners and token selection.
 partitioner: org.apache.cassandra.dht.Murmur3Partitioner
 
-# directories where Cassandra should store data on disk.
+# Directories where Cassandra should store data on disk.  Cassandra
+# will spread data evenly across them, subject to the granularity of
+# the configured compaction strategy.
+# If not set, the default directory is $CASSANDRA_HOME/data/data.
 data_file_directories:
     - target/data/cassandra/embedded/data
 
-# commit log
+# commit log.  when running on magnetic HDD, this should be a
+# separate spindle than the data directories.
+# If not set, the default directory is $CASSANDRA_HOME/data/commitlog.
 commitlog_directory: target/data/cassandra/embedded/commitlog
 
-# policy for data disk failures:
-# stop: shut down gossip and Thrift, leaving the node effectively dead, but
-#       can still be inspected via JMX.
-# best_effort: stop using the failed disk and respond to requests based on
-#              remaining available sstables.  This means you WILL see obsolete
-#              data at CL.ONE!
-# ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
+# Enable / disable CDC functionality on a per-node basis. This modifies the logic used
+# for write path allocation rejection (standard: never reject. cdc: reject Mutation
+# containing a CDC-enabled table if at space limit in cdc_raw_directory).
+cdc_enabled: false
+
+# CommitLogSegments are moved to this directory on flush if cdc_enabled: true and the
+# segment contains mutations for a CDC-enabled table. This should be placed on a
+# separate spindle than the data directories. If not set, the default directory is
+# $CASSANDRA_HOME/data/cdc_raw.
+# cdc_raw_directory: /var/lib/cassandra/cdc_raw
+
+# Policy for data disk failures:
+#
+# die
+#   shut down gossip and client transports and kill the JVM for any fs errors or
+#   single-sstable errors, so the node can be replaced.
+#
+# stop_paranoid
+#   shut down gossip and client transports even for single-sstable errors,
+#   kill the JVM for errors during startup.
+#
+# stop
+#   shut down gossip and client transports, leaving the node effectively dead, but
+#   can still be inspected via JMX, kill the JVM for errors during startup.
+#
+# best_effort
+#    stop using the failed disk and respond to requests based on
+#    remaining available sstables.  This means you WILL see obsolete
+#    data at CL.ONE!
+#
+# ignore
+#    ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
 disk_failure_policy: stop
 
+# Policy for commit disk failures:
+#
+# die
+#   shut down gossip and Thrift and kill the JVM, so the node can be replaced.
+#
+# stop
+#   shut down gossip and Thrift, leaving the node effectively dead, but
+#   can still be inspected via JMX.
+#
+# stop_commit
+#   shutdown the commit log, letting writes collect but
+#   continuing to service reads, as in pre-2.0.5 Cassandra
+#
+# ignore
+#   ignore fatal errors and let the batches fail
+commit_failure_policy: stop
+
+# Maximum size of the native protocol prepared statement cache
+#
+# Valid values are either "auto" (omitting the value) or a value greater 0.
+#
+# Note that specifying a too large value will result in long running GCs and possbily
+# out-of-memory errors. Keep the value at a small fraction of the heap.
+#
+# If you constantly see "prepared statements discarded in the last minute because
+# cache limit reached" messages, the first step is to investigate the root cause
+# of these messages and check whether prepared statements are used correctly -
+# i.e. use bind markers for variable parts.
+#
+# Do only change the default value, if you really have more prepared statements than
+# fit in the cache. In most cases it is not neccessary to change this value.
+# Constantly re-preparing statements is a performance penalty.
+#
+# Default value ("auto") is 1/256th of the heap or 10MB, whichever is greater
+prepared_statements_cache_size_mb:
+
+# Maximum size of the Thrift prepared statement cache
+#
+# If you do not use Thrift at all, it is safe to leave this value at "auto".
+#
+# See description of 'prepared_statements_cache_size_mb' above for more information.
+#
+# Default value ("auto") is 1/256th of the heap or 10MB, whichever is greater
+thrift_prepared_statements_cache_size_mb:
 
 # Maximum size of the key cache in memory.
 #
 # Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the
 # minimum, sometimes more. The key cache is fairly tiny for the amount of
 # time it saves, so it's worthwhile to use it at large numbers.
-# The row cache saves even more time, but must store the whole values of
-# its rows, so it is extremely space-intensive. It's best to only use the
+# The row cache saves even more time, but must contain the entire row,
+# so it is extremely space-intensive. It's best to only use the
 # row cache if you have hot rows or static rows.
 #
 # NOTE: if you reduce the size, you may not get you hottest keys loaded on startup.
@@ -143,7 +308,7 @@
 key_cache_size_in_mb:
 
 # Duration in seconds after which Cassandra should
-# safe the keys cache. Caches are saved to saved_caches_directory as
+# save the key cache. Caches are saved to saved_caches_directory as
 # specified in this configuration file.
 #
 # Saved caches greatly improve cold-start speeds, and is relatively cheap in
@@ -157,15 +322,28 @@
 # Disabled by default, meaning all keys are going to be saved
 # key_cache_keys_to_save: 100
 
+# Row cache implementation class name. Available implementations:
+#
+# org.apache.cassandra.cache.OHCProvider
+#   Fully off-heap row cache implementation (default).
+#
+# org.apache.cassandra.cache.SerializingCacheProvider
+#   This is the row cache implementation availabile
+#   in previous releases of Cassandra.
+# row_cache_class_name: org.apache.cassandra.cache.OHCProvider
+
 # Maximum size of the row cache in memory.
-# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup.
+# Please note that OHC cache implementation requires some additional off-heap memory to manage
+# the map structures and some in-flight memory during operations before/after cache entries can be
+# accounted against the cache capacity. This overhead is usually small compared to the whole capacity.
+# Do not specify more memory that the system can afford in the worst usual situation and leave some
+# headroom for OS block level cache. Do never allow your system to swap.
 #
 # Default value is 0, to disable row caching.
 row_cache_size_in_mb: 0
 
-# Duration in seconds after which Cassandra should
-# safe the row cache. Caches are saved to saved_caches_directory as specified
-# in this configuration file.
+# Duration in seconds after which Cassandra should save the row cache.
+# Caches are saved to saved_caches_directory as specified in this configuration file.
 #
 # Saved caches greatly improve cold-start speeds, and is relatively cheap in
 # terms of I/O for the key cache. Row cache saving is much more expensive and
@@ -174,21 +352,51 @@
 # Default is 0 to disable saving the row cache.
 row_cache_save_period: 0
 
-# Number of keys from the row cache to save
-# Disabled by default, meaning all keys are going to be saved
+# Number of keys from the row cache to save.
+# Specify 0 (which is the default), meaning all keys are going to be saved
 # row_cache_keys_to_save: 100
 
+# Maximum size of the counter cache in memory.
+#
+# Counter cache helps to reduce counter locks' contention for hot counter cells.
+# In case of RF = 1 a counter cache hit will cause Cassandra to skip the read before
+# write entirely. With RF > 1 a counter cache hit will still help to reduce the duration
+# of the lock hold, helping with hot counter cell updates, but will not allow skipping
+# the read entirely. Only the local (clock, count) tuple of a counter cell is kept
+# in memory, not the whole counter, so it's relatively cheap.
+#
+# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup.
+#
+# Default value is empty to make it "auto" (min(2.5% of Heap (in MB), 50MB)). Set to 0 to disable counter cache.
+# NOTE: if you perform counter deletes and rely on low gcgs, you should disable the counter cache.
+counter_cache_size_in_mb:
+
+# Duration in seconds after which Cassandra should
+# save the counter cache (keys only). Caches are saved to saved_caches_directory as
+# specified in this configuration file.
+#
+# Default is 7200 or 2 hours.
+counter_cache_save_period: 7200
+
+# Number of keys from the counter cache to save
+# Disabled by default, meaning all keys are going to be saved
+# counter_cache_keys_to_save: 100
+
 # saved caches
+# If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
 saved_caches_directory: target/data/cassandra/embedded/saved_caches
 
-# commitlog_sync may be either "periodic" or "batch."
+# commitlog_sync may be either "periodic" or "batch." 
+# 
 # When in batch mode, Cassandra won't ack writes until the commit log
-# has been fsynced to disk.  It will wait up to
-# commitlog_sync_batch_window_in_ms milliseconds for other writes, before
-# performing the sync.
+# has been fsynced to disk.  It will wait
+# commitlog_sync_batch_window_in_ms milliseconds between fsyncs.
+# This window should be kept short because the writer threads will
+# be unable to do extra work while waiting.  (You may need to increase
+# concurrent_writes for the same reason.)
 #
 # commitlog_sync: batch
-# commitlog_sync_batch_window_in_ms: 50
+# commitlog_sync_batch_window_in_ms: 2
 #
 # the other option is "periodic" where writes may be acked immediately
 # and the CommitLog is simply synced every commitlog_sync_period_in_ms
@@ -205,12 +413,27 @@
 # archiving commitlog segments (see commitlog_archiving.properties),
 # then you probably want a finer granularity of archiving; 8 or 16 MB
 # is reasonable.
+# Max mutation size is also configurable via max_mutation_size_in_kb setting in
+# cassandra.yaml. The default is half the size commitlog_segment_size_in_mb * 1024.
+# This should be positive and less than 2048.
+#
+# NOTE: If max_mutation_size_in_kb is set explicitly then commitlog_segment_size_in_mb must
+# be set to at least twice the size of max_mutation_size_in_kb / 1024
+#
 commitlog_segment_size_in_mb: 32
 
+# Compression to apply to the commit log. If omitted, the commit log
+# will be written uncompressed.  LZ4, Snappy, and Deflate compressors
+# are supported.
+# commitlog_compression:
+#   - class_name: LZ4Compressor
+#     parameters:
+#         -
+
 # any class that implements the SeedProvider interface and has a
 # constructor that takes a Map<String, String> of parameters will do.
 seed_provider:
-    # Addresses of hosts that are deemed contact points.
+    # Addresses of hosts that are deemed contact points. 
     # Cassandra nodes use this list of hosts to find each other and learn
     # the topology of the ring.  You must change this if you are running
     # multiple nodes!
@@ -220,123 +443,310 @@
           # Ex: "<ip1>,<ip2>,<ip3>"
           - seeds: "127.0.0.1"
 
-
 # For workloads with more data than can fit in memory, Cassandra's
 # bottleneck will be reads that need to fetch data from
 # disk. "concurrent_reads" should be set to (16 * number_of_drives) in
 # order to allow the operations to enqueue low enough in the stack
-# that the OS and drives can reorder them.
+# that the OS and drives can reorder them. Same applies to
+# "concurrent_counter_writes", since counter writes read the current
+# values before incrementing and writing them back.
 #
 # On the other hand, since writes are almost never IO bound, the ideal
 # number of "concurrent_writes" is dependent on the number of cores in
 # your system; (8 * number_of_cores) is a good rule of thumb.
 concurrent_reads: 32
 concurrent_writes: 32
+concurrent_counter_writes: 32
 
-# Total memory to use for memtables.  Cassandra will flush the largest
-# memtable when this much memory is used.
-# If omitted, Cassandra will set it to 1/3 of the heap.
-# memtable_total_space_in_mb: 2048
+# For materialized view writes, as there is a read involved, so this should
+# be limited by the less of concurrent reads or concurrent writes.
+concurrent_materialized_view_writes: 32
 
-# Total space to use for commitlogs.
-# If space gets above this value (it will round up to the next nearest
-# segment multiple), Cassandra will flush every dirty CF in the oldest
-# segment and remove it.
-# commitlog_total_space_in_mb: 4096
+# Maximum memory to use for sstable chunk cache and buffer pooling.
+# 32MB of this are reserved for pooling buffers, the rest is used as an
+# cache that holds uncompressed sstable chunks.
+# Defaults to the smaller of 1/4 of heap or 512MB. This pool is allocated off-heap,
+# so is in addition to the memory allocated for heap. The cache also has on-heap
+# overhead which is roughly 128 bytes per chunk (i.e. 0.2% of the reserved size
+# if the default 64k chunk size is used).
+# Memory is only allocated when needed.
+# file_cache_size_in_mb: 512
 
-# This sets the amount of memtable flush writer threads.  These will
-# be blocked by disk io, and each one will hold a memtable in memory
-# while blocked. If you have a large heap and many data directories,
-# you can increase this value for better flush performance.
-# By default this will be set to the amount of data directories defined.
-#memtable_flush_writers: 1
+# Flag indicating whether to allocate on or off heap when the sstable buffer
+# pool is exhausted, that is when it has exceeded the maximum memory
+# file_cache_size_in_mb, beyond which it will not cache buffers but allocate on request.
 
-# the number of full memtables to allow pending flush, that is,
-# waiting for a writer thread.  At a minimum, this should be set to
-# the maximum number of secondary indexes created on a single CF.
-#memtable_flush_queue_size: 4
+# buffer_pool_use_heap_if_exhausted: true
+
+# The strategy for optimizing disk read
+# Possible values are:
+# ssd (for solid state disks, the default)
+# spinning (for spinning disks)
+# disk_optimization_strategy: ssd
+
+# Total permitted memory to use for memtables. Cassandra will stop
+# accepting writes when the limit is exceeded until a flush completes,
+# and will trigger a flush based on memtable_cleanup_threshold
+# If omitted, Cassandra will set both to 1/4 the size of the heap.
+# memtable_heap_space_in_mb: 2048
+# memtable_offheap_space_in_mb: 2048
+
+# memtable_cleanup_threshold is deprecated. The default calculation
+# is the only reasonable choice. See the comments on  memtable_flush_writers
+# for more information.
+#
+# Ratio of occupied non-flushing memtable size to total permitted size
+# that will trigger a flush of the largest memtable. Larger mct will
+# mean larger flushes and hence less compaction, but also less concurrent
+# flush activity which can make it difficult to keep your disks fed
+# under heavy write load.
+#
+# memtable_cleanup_threshold defaults to 1 / (memtable_flush_writers + 1)
+# memtable_cleanup_threshold: 0.11
+
+# Specify the way Cassandra allocates and manages memtable memory.
+# Options are:
+#
+# heap_buffers
+#   on heap nio buffers
+#
+# offheap_buffers
+#   off heap (direct) nio buffers
+#
+# offheap_objects
+#    off heap objects
+memtable_allocation_type: heap_buffers
+
+# Total space to use for commit logs on disk.
+#
+# If space gets above this value, Cassandra will flush every dirty CF
+# in the oldest segment and remove it.  So a small total commitlog space
+# will tend to cause more flush activity on less-active columnfamilies.
+#
+# The default value is the smaller of 8192, and 1/4 of the total space
+# of the commitlog volume.
+#
+# commitlog_total_space_in_mb: 8192
+
+# This sets the number of memtable flush writer threads per disk
+# as well as the total number of memtables that can be flushed concurrently.
+# These are generally a combination of compute and IO bound.
+#
+# Memtable flushing is more CPU efficient than memtable ingest and a single thread
+# can keep up with the ingest rate of a whole server on a single fast disk
+# until it temporarily becomes IO bound under contention typically with compaction.
+# At that point you need multiple flush threads. At some point in the future
+# it may become CPU bound all the time.
+#
+# You can tell if flushing is falling behind using the MemtablePool.BlockedOnAllocation
+# metric which should be 0, but will be non-zero if threads are blocked waiting on flushing
+# to free memory.
+#
+# memtable_flush_writers defaults to two for a single data directory.
+# This means that two  memtables can be flushed concurrently to the single data directory.
+# If you have multiple data directories the default is one memtable flushing at a time
+# but the flush will use a thread per data directory so you will get two or more writers.
+#
+# Two is generally enough to flush on a fast disk [array] mounted as a single data directory.
+# Adding more flush writers will result in smaller more frequent flushes that introduce more
+# compaction overhead.
+#
+# There is a direct tradeoff between number of memtables that can be flushed concurrently
+# and flush size and frequency. More is not better you just need enough flush writers
+# to never stall waiting for flushing to free memory.
+#
+#memtable_flush_writers: 2
+
+# Total space to use for change-data-capture logs on disk.
+#
+# If space gets above this value, Cassandra will throw WriteTimeoutException
+# on Mutations including tables with CDC enabled. A CDCCompactor is responsible
+# for parsing the raw CDC logs and deleting them when parsing is completed.
+#
+# The default value is the min of 4096 mb and 1/8th of the total space
+# of the drive where cdc_raw_directory resides.
+# cdc_total_space_in_mb: 4096
+
+# When we hit our cdc_raw limit and the CDCCompactor is either running behind
+# or experiencing backpressure, we check at the following interval to see if any
+# new space for cdc-tracked tables has been made available. Default to 250ms
+# cdc_free_space_check_interval_ms: 250
+
+# A fixed memory pool size in MB for for SSTable index summaries. If left
+# empty, this will default to 5% of the heap size. If the memory usage of
+# all index summaries exceeds this limit, SSTables with low read rates will
+# shrink their index summaries in order to meet this limit.  However, this
+# is a best-effort process. In extreme conditions Cassandra may need to use
+# more than this amount of memory.
+index_summary_capacity_in_mb:
+
+# How frequently index summaries should be resampled.  This is done
+# periodically to redistribute memory from the fixed-size pool to sstables
+# proportional their recent read rates.  Setting to -1 will disable this
+# process, leaving existing index summaries at their current sampling level.
+index_summary_resize_interval_in_minutes: 60
 
 # Whether to, when doing sequential writing, fsync() at intervals in
 # order to force the operating system to flush the dirty
 # buffers. Enable this to avoid sudden dirty buffer flushing from
-# impacting read latencies. Almost always a good idea on SSD:s; not
+# impacting read latencies. Almost always a good idea on SSDs; not
 # necessarily on platters.
 trickle_fsync: false
 trickle_fsync_interval_in_kb: 10240
 
 # TCP port, for commands and data
-storage_port: 7010
+# For security reasons, you should not expose this port to the internet.  Firewall it if needed.
+storage_port: 7000
 
 # SSL port, for encrypted communication.  Unused unless enabled in
 # encryption_options
-ssl_storage_port: 7011
+# For security reasons, you should not expose this port to the internet.  Firewall it if needed.
+ssl_storage_port: 7001
 
-# Address to bind to and tell other Cassandra nodes to connect to. You
-# _must_ change this if you want multiple nodes to be able to
-# communicate!
+# Address or interface to bind to and tell other Cassandra nodes to connect to.
+# You _must_ change this if you want multiple nodes to be able to communicate!
+#
+# Set listen_address OR listen_interface, not both.
 #
 # Leaving it blank leaves it up to InetAddress.getLocalHost(). This
-# will always do the Right Thing *if* the node is properly configured
+# will always do the Right Thing _if_ the node is properly configured
 # (hostname, name resolution, etc), and the Right Thing is to use the
 # address associated with the hostname (it might not be).
 #
-# Setting this to 0.0.0.0 is always wrong.
+# Setting listen_address to 0.0.0.0 is always wrong.
+#
 listen_address: 127.0.0.1
 
+# Set listen_address OR listen_interface, not both. Interfaces must correspond
+# to a single address, IP aliasing is not supported.
+# listen_interface: eth0
+
+# If you choose to specify the interface by name and the interface has an ipv4 and an ipv6 address
+# you can specify which should be chosen using listen_interface_prefer_ipv6. If false the first ipv4
+# address will be used. If true the first ipv6 address will be used. Defaults to false preferring
+# ipv4. If there is only one address it will be selected regardless of ipv4/ipv6.
+# listen_interface_prefer_ipv6: false
+
+# Address to broadcast to other Cassandra nodes
+# Leaving this blank will set it to the same value as listen_address
+broadcast_address: 127.0.0.1
+
+# When using multiple physical network interfaces, set this
+# to true to listen on broadcast_address in addition to
+# the listen_address, allowing nodes to communicate in both
+# interfaces.
+# Ignore this property if the network configuration automatically
+# routes  between the public and private networks such as EC2.
+# listen_on_broadcast_address: false
+
+# Internode authentication backend, implementing IInternodeAuthenticator;
+# used to allow/disallow connections from peer nodes.
+# internode_authenticator: org.apache.cassandra.auth.AllowAllInternodeAuthenticator
+
+# Whether to start the native transport server.
+# Please note that the address on which the native transport is bound is the
+# same as the rpc_address. The port however is different and specified below.
 start_native_transport: true
 # port for the CQL native transport to listen for clients on
-native_transport_port: 9142
+# For security reasons, you should not expose this port to the internet.  Firewall it if needed.
+native_transport_port: 9042
+# Enabling native transport encryption in client_encryption_options allows you to either use
+# encryption for the standard port or to use a dedicated, additional port along with the unencrypted
+# standard native_transport_port.
+# Enabling client encryption and keeping native_transport_port_ssl disabled will use encryption
+# for native_transport_port. Setting native_transport_port_ssl to a different value
+# from native_transport_port will use encryption for native_transport_port_ssl while
+# keeping native_transport_port unencrypted.
+# native_transport_port_ssl: 9142
+# The maximum threads for handling requests when the native transport is used.
+# This is similar to rpc_max_threads though the default differs slightly (and
+# there is no native_transport_min_threads, idle threads will always be stopped
+# after 30 seconds).
+# native_transport_max_threads: 128
+#
+# The maximum size of allowed frame. Frame (requests) larger than this will
+# be rejected as invalid. The default is 256MB. If you're changing this parameter,
+# you may want to adjust max_value_size_in_mb accordingly. This should be positive and less than 2048.
+# native_transport_max_frame_size_in_mb: 256
+
+# The maximum number of concurrent client connections.
+# The default is -1, which means unlimited.
+# native_transport_max_concurrent_connections: -1
+
+# The maximum number of concurrent client connections per source ip.
+# The default is -1, which means unlimited.
+# native_transport_max_concurrent_connections_per_ip: -1
 
 # Whether to start the thrift rpc server.
 start_rpc: false
 
-# Address to broadcast to other Cassandra nodes
-# Leaving this blank will set it to the same value as listen_address
-# broadcast_address: 1.2.3.4
-
-# The address to bind the Thrift RPC service to -- clients connect
-# here. Unlike ListenAddress above, you *can* specify 0.0.0.0 here if
-# you want Thrift to listen on all interfaces.
+# The address or interface to bind the Thrift RPC service and native transport
+# server to.
 #
-# Leaving this blank has the same effect it does for ListenAddress,
+# Set rpc_address OR rpc_interface, not both.
+#
+# Leaving rpc_address blank has the same effect as on listen_address
 # (i.e. it will be based on the configured hostname of the node).
-rpc_address: localhost
-# port for Thrift to listen for clients on
-rpc_port: 9171
+#
+# Note that unlike listen_address, you can specify 0.0.0.0, but you must also
+# set broadcast_rpc_address to a value other than 0.0.0.0.
+#
+# For security reasons, you should not expose this port to the internet.  Firewall it if needed.
+rpc_address: 0.0.0.0
 
-# enable or disable keepalive on rpc connections
+# Set rpc_address OR rpc_interface, not both. Interfaces must correspond
+# to a single address, IP aliasing is not supported.
+# rpc_interface: eth1
+
+# If you choose to specify the interface by name and the interface has an ipv4 and an ipv6 address
+# you can specify which should be chosen using rpc_interface_prefer_ipv6. If false the first ipv4
+# address will be used. If true the first ipv6 address will be used. Defaults to false preferring
+# ipv4. If there is only one address it will be selected regardless of ipv4/ipv6.
+# rpc_interface_prefer_ipv6: false
+
+# port for Thrift to listen for clients on
+rpc_port: 9160
+
+# RPC address to broadcast to drivers and other Cassandra nodes. This cannot
+# be set to 0.0.0.0. If left blank, this will be set to the value of
+# rpc_address. If rpc_address is set to 0.0.0.0, broadcast_rpc_address must
+# be set.
+broadcast_rpc_address: 127.0.0.1
+
+# enable or disable keepalive on rpc/native connections
 rpc_keepalive: true
 
-# Cassandra provides three options for the RPC Server:
+# Cassandra provides two out-of-the-box options for the RPC Server:
 #
-# sync  -> One connection per thread in the rpc pool (see below).
-#          For a very large number of clients, memory will be your limiting
-#          factor; on a 64 bit JVM, 128KB is the minimum stack size per thread.
-#          Connection pooling is very, very strongly recommended.
+# sync
+#   One thread per thrift connection. For a very large number of clients, memory
+#   will be your limiting factor. On a 64 bit JVM, 180KB is the minimum stack size
+#   per thread, and that will correspond to your use of virtual memory (but physical memory
+#   may be limited depending on use of stack space).
 #
-# async -> Nonblocking server implementation with one thread to serve
-#          rpc connections.  This is not recommended for high throughput use
-#          cases. Async has been tested to be about 50% slower than sync
-#          or hsha and is deprecated: it will be removed in the next major release.
-#
-# hsha  -> Stands for "half synchronous, half asynchronous." The rpc thread pool
-#          (see below) is used to manage requests, but the threads are multiplexed
-#          across the different clients.
+# hsha
+#   Stands for "half synchronous, half asynchronous." All thrift clients are handled
+#   asynchronously using a small number of threads that does not vary with the amount
+#   of thrift clients (and thus scales well to many clients). The rpc requests are still
+#   synchronous (one thread per active request). If hsha is selected then it is essential
+#   that rpc_max_threads is changed from the default value of unlimited.
 #
 # The default is sync because on Windows hsha is about 30% slower.  On Linux,
 # sync/hsha performance is about the same, with hsha of course using less memory.
+#
+# Alternatively,  can provide your own RPC server by providing the fully-qualified class name
+# of an o.a.c.t.TServerFactory that can create an instance of it.
 rpc_server_type: sync
 
-# Uncomment rpc_min|max|thread to set request pool size.
-# You would primarily set max for the sync server to safeguard against
-# misbehaved clients; if you do hit the max, Cassandra will block until one
-# disconnects before accepting more.  The defaults for sync are min of 16 and max
-# unlimited.
+# Uncomment rpc_min|max_thread to set request pool size limits.
 #
-# For the Hsha server, the min and max both default to quadruple the number of
-# CPU cores.
+# Regardless of your choice of RPC server (see above), the number of maximum requests in the
+# RPC thread pool dictates how many concurrent requests are possible (but if you are using the sync
+# RPC server, it also dictates the number of clients that can be connected at all).
 #
-# This configuration is ignored by the async server.
+# The default is unlimited and thus provides no protection against clients overwhelming the server. You are
+# encouraged to set a maximum that makes sense for you in production, but do keep in mind that
+# rpc_max_threads represents the maximum number of client requests this server may execute concurrently.
 #
 # rpc_min_threads: 16
 # rpc_max_threads: 2048
@@ -345,18 +755,28 @@
 # rpc_send_buff_size_in_bytes:
 # rpc_recv_buff_size_in_bytes:
 
-# Frame size for thrift (maximum field length).
-# 0 disables TFramedTransport in favor of TSocket. This option
-# is deprecated; we strongly recommend using Framed mode.
-thrift_framed_transport_size_in_mb: 15
+# Uncomment to set socket buffer size for internode communication
+# Note that when setting this, the buffer size is limited by net.core.wmem_max
+# and when not setting it it is defined by net.ipv4.tcp_wmem
+# See also:
+# /proc/sys/net/core/wmem_max
+# /proc/sys/net/core/rmem_max
+# /proc/sys/net/ipv4/tcp_wmem
+# /proc/sys/net/ipv4/tcp_wmem
+# and 'man tcp'
+# internode_send_buff_size_in_bytes:
 
-# The max length of a thrift message, including all fields and
-# internal thrift overhead.
-thrift_max_message_length_in_mb: 16
+# Uncomment to set socket buffer size for internode communication
+# Note that when setting this, the buffer size is limited by net.core.wmem_max
+# and when not setting it it is defined by net.ipv4.tcp_wmem
+# internode_recv_buff_size_in_bytes:
+
+# Frame size for thrift (maximum message length).
+thrift_framed_transport_size_in_mb: 15
 
 # Set to true to have Cassandra create a hard link to each sstable
 # flushed or streamed locally in a backups/ subdirectory of the
-# Keyspace data.  Removing these links is the operator's
+# keyspace data.  Removing these links is the operator's
 # responsibility.
 incremental_backups: false
 
@@ -367,24 +787,31 @@
 snapshot_before_compaction: false
 
 # Whether or not a snapshot is taken of the data before keyspace truncation
-# or dropping of column families. The STRONGLY advised default of true
+# or dropping of column families. The STRONGLY advised default of true 
 # should be used to provide data safety. If you set this flag to false, you will
 # lose data on truncation or drop.
-auto_snapshot: false
+auto_snapshot: true
 
-# Add column indexes to a row after its contents reach this size.
-# Increase if your column values are large, or if you have a very large
-# number of columns.  The competing causes are, Cassandra has to
-# deserialize this much of the row to read a single column, so you want
-# it to be small - at least if you do many partial-row reads - but all
-# the index data is read for each access, so you don't want to generate
-# that wastefully either.
+# Granularity of the collation index of rows within a partition.
+# Increase if your rows are large, or if you have a very large
+# number of rows per partition.  The competing goals are these:
+#
+# - a smaller granularity means more index entries are generated
+#   and looking up rows withing the partition by collation column
+#   is faster
+# - but, Cassandra will keep the collation index in memory for hot
+#   rows (as part of the key cache), so a larger granularity means
+#   you can cache more hot rows
 column_index_size_in_kb: 64
 
-# Size limit for rows being compacted in memory.  Larger rows will spill
-# over to disk and use a slower two-pass compaction process.  A message
-# will be logged specifying the row key.
-#in_memory_compaction_limit_in_mb: 64
+# Per sstable indexed key cache entries (the collation index in memory
+# mentioned above) exceeding this size will not be held on heap.
+# This means that only partition information is held on heap and the
+# index entries are read from disk.
+#
+# Note that this size refers to the size of the
+# serialized index information and not the size of the partition.
+column_index_cache_size_in_kb: 2
 
 # Number of simultaneous compactions to allow, NOT including
 # validation "compactions" for anti-entropy repair.  Simultaneous
@@ -395,19 +822,13 @@
 # slowly or too fast, you should look at
 # compaction_throughput_mb_per_sec first.
 #
-# This setting has no effect on LeveledCompactionStrategy.
-#
-# concurrent_compactors defaults to the number of cores.
-# Uncomment to make compaction mono-threaded, the pre-0.8 default.
+# concurrent_compactors defaults to the smaller of (number of disks,
+# number of cores), with a minimum of 2 and a maximum of 8.
+# 
+# If your data directories are backed by SSD, you should increase this
+# to the number of cores.
 #concurrent_compactors: 1
 
-# Multi-threaded compaction. When enabled, each compaction will use
-# up to one thread per core, plus one thread per sstable being merged.
-# This is usually only useful for SSD-based hardware: otherwise,
-# your concern is usually to get compaction to do LESS i/o (see:
-# compaction_throughput_mb_per_sec), not more.
-#multithreaded_compaction: false
-
 # Throttles compaction to the given total throughput across the entire
 # system. The faster you insert data, the faster you need to compact in
 # order to keep the sstable count down, but in general, setting this to
@@ -416,10 +837,11 @@
 # of compaction, including validation compaction.
 compaction_throughput_mb_per_sec: 16
 
-# Track cached row keys during compaction, and re-cache their new
-# positions in the compacted sstable.  Disable if you use really large
-# key caches.
-#compaction_preheat_key_cache: true
+# When compacting, the replacement sstable(s) can be opened before they
+# are completely written, and used in place of the prior sstables for
+# any range that has been written. This helps to smoothly transfer reads 
+# between the sstables, reducing page cache churn and keeping hot rows hot
+sstable_preemptive_open_interval_in_mb: 50
 
 # Throttles all outbound streaming file transfers on this node to the
 # given total throughput in Mbps. This is necessary because Cassandra does
@@ -428,12 +850,21 @@
 # When unset, the default is 200 Mbps or 25 MB/s.
 # stream_throughput_outbound_megabits_per_sec: 200
 
+# Throttles all streaming file transfer between the datacenters,
+# this setting allows users to throttle inter dc stream throughput in addition
+# to throttling all network stream traffic as configured with
+# stream_throughput_outbound_megabits_per_sec
+# When unset, the default is 200 Mbps or 25 MB/s
+# inter_dc_stream_throughput_outbound_megabits_per_sec: 200
+
 # How long the coordinator should wait for read operations to complete
 read_request_timeout_in_ms: 5000
 # How long the coordinator should wait for seq or index scans to complete
 range_request_timeout_in_ms: 10000
 # How long the coordinator should wait for writes to complete
 write_request_timeout_in_ms: 2000
+# How long the coordinator should wait for counter writes to complete
+counter_write_request_timeout_in_ms: 5000
 # How long a coordinator should continue to retry a CAS operation
 # that contends with other proposals for the same row
 cas_contention_timeout_in_ms: 1000
@@ -444,22 +875,28 @@
 # The default timeout for other, miscellaneous operations
 request_timeout_in_ms: 10000
 
+# How long before a node logs slow queries. Select queries that take longer than
+# this timeout to execute, will generate an aggregated log message, so that slow queries
+# can be identified. Set this value to zero to disable slow query logging.
+slow_query_log_timeout_in_ms: 500
+
 # Enable operation timeout information exchange between nodes to accurately
 # measure request timeouts.  If disabled, replicas will assume that requests
 # were forwarded to them instantly by the coordinator, which means that
-# under overload conditions we will waste that much extra time processing
+# under overload conditions we will waste that much extra time processing 
 # already-timed-out requests.
 #
 # Warning: before enabling this property make sure to ntp is installed
 # and the times are synchronized between the nodes.
 cross_node_timeout: false
 
-# Enable socket timeout for streaming operation.
-# When a timeout occurs during streaming, streaming is retried from the start
-# of the current file. This _can_ involve re-streaming an important amount of
-# data, so you should avoid setting the value too low.
-# Default value is 0, which never timeout streams.
-# streaming_socket_timeout_in_ms: 0
+# Set keep-alive period for streaming
+# This node will send a keep-alive message periodically with this period.
+# If the node does not receive a keep-alive message from the peer for
+# 2 keep-alive cycles the stream session times out and fail
+# Default value is 300s (5 minutes), which means stalled stream
+# times out in 10 minutes by default
+# streaming_keep_alive_period_in_secs: 300
 
 # phi value that must be reached for a host to be marked down.
 # most users should never need to adjust this.
@@ -467,6 +904,7 @@
 
 # endpoint_snitch -- Set this to a class that implements
 # IEndpointSnitch.  The snitch has two functions:
+#
 # - it teaches Cassandra enough about your network topology to route
 #   requests efficiently
 # - it allows Cassandra to spread replicas around your cluster to avoid
@@ -475,31 +913,42 @@
 #   more than one replica on the same "rack" (which may not actually
 #   be a physical location)
 #
-# IF YOU CHANGE THE SNITCH AFTER DATA IS INSERTED INTO THE CLUSTER,
-# YOU MUST RUN A FULL REPAIR, SINCE THE SNITCH AFFECTS WHERE REPLICAS
-# ARE PLACED.
+# CASSANDRA WILL NOT ALLOW YOU TO SWITCH TO AN INCOMPATIBLE SNITCH
+# ONCE DATA IS INSERTED INTO THE CLUSTER.  This would cause data loss.
+# This means that if you start with the default SimpleSnitch, which
+# locates every node on "rack1" in "datacenter1", your only options
+# if you need to add another datacenter are GossipingPropertyFileSnitch
+# (and the older PFS).  From there, if you want to migrate to an
+# incompatible snitch like Ec2Snitch you can do it by adding new nodes
+# under Ec2Snitch (which will locate them in a new "datacenter") and
+# decommissioning the old ones.
 #
-# Out of the box, Cassandra provides
-#  - SimpleSnitch:
-#    Treats Strategy order as proximity. This improves cache locality
-#    when disabling read repair, which can further improve throughput.
-#    Only appropriate for single-datacenter deployments.
-#  - PropertyFileSnitch:
+# Out of the box, Cassandra provides:
+#
+# SimpleSnitch:
+#    Treats Strategy order as proximity. This can improve cache
+#    locality when disabling read repair.  Only appropriate for
+#    single-datacenter deployments.
+#
+# GossipingPropertyFileSnitch
+#    This should be your go-to snitch for production use.  The rack
+#    and datacenter for the local node are defined in
+#    cassandra-rackdc.properties and propagated to other nodes via
+#    gossip.  If cassandra-topology.properties exists, it is used as a
+#    fallback, allowing migration from the PropertyFileSnitch.
+#
+# PropertyFileSnitch:
 #    Proximity is determined by rack and data center, which are
 #    explicitly configured in cassandra-topology.properties.
-#  - RackInferringSnitch:
-#    Proximity is determined by rack and data center, which are
-#    assumed to correspond to the 3rd and 2nd octet of each node's
-#    IP address, respectively.  Unless this happens to match your
-#    deployment conventions (as it did Facebook's), this is best used
-#    as an example of writing a custom Snitch class.
-#  - Ec2Snitch:
-#    Appropriate for EC2 deployments in a single Region.  Loads Region
+#
+# Ec2Snitch:
+#    Appropriate for EC2 deployments in a single Region. Loads Region
 #    and Availability Zone information from the EC2 API. The Region is
-#    treated as the Datacenter, and the Availability Zone as the rack.
+#    treated as the datacenter, and the Availability Zone as the rack.
 #    Only private IPs are used, so this will not work across multiple
 #    Regions.
-#  - Ec2MultiRegionSnitch:
+#
+# Ec2MultiRegionSnitch:
 #    Uses public IPs as broadcast_address to allow cross-region
 #    connectivity.  (Thus, you should set seed addresses to the public
 #    IP as well.) You will need to open the storage_port or
@@ -507,13 +956,20 @@
 #    traffic, Cassandra will switch to the private IP after
 #    establishing a connection.)
 #
+# RackInferringSnitch:
+#    Proximity is determined by rack and data center, which are
+#    assumed to correspond to the 3rd and 2nd octet of each node's IP
+#    address, respectively.  Unless this happens to match your
+#    deployment conventions, this is best used as an example of
+#    writing a custom Snitch class and is provided in that spirit.
+#
 # You can use a custom Snitch by setting this to the full class name
 # of the snitch, which will be assumed to be on your classpath.
 endpoint_snitch: SimpleSnitch
 
 # controls how often to perform the more expensive part of host score
 # calculation
-dynamic_snitch_update_interval_in_ms: 100
+dynamic_snitch_update_interval_in_ms: 100 
 # controls how often to reset all host scores, allowing a bad host to
 # possibly recover
 dynamic_snitch_reset_interval_in_ms: 600000
@@ -540,20 +996,26 @@
 request_scheduler: org.apache.cassandra.scheduler.NoScheduler
 
 # Scheduler Options vary based on the type of scheduler
-# NoScheduler - Has no options
+#
+# NoScheduler
+#   Has no options
+#
 # RoundRobin
-#  - throttle_limit -- The throttle_limit is the number of in-flight
-#                      requests per client.  Requests beyond
-#                      that limit are queued up until
-#                      running requests can complete.
-#                      The value of 80 here is twice the number of
-#                      concurrent_reads + concurrent_writes.
-#  - default_weight -- default_weight is optional and allows for
-#                      overriding the default which is 1.
-#  - weights -- Weights are optional and will default to 1 or the
-#               overridden default_weight. The weight translates into how
-#               many requests are handled during each turn of the
-#               RoundRobin, based on the scheduler id.
+#   throttle_limit
+#     The throttle_limit is the number of in-flight
+#     requests per client.  Requests beyond 
+#     that limit are queued up until
+#     running requests can complete.
+#     The value of 80 here is twice the number of
+#     concurrent_reads + concurrent_writes.
+#   default_weight
+#     default_weight is optional and allows for
+#     overriding the default which is 1.
+#   weights
+#     Weights are optional and will default to 1 or the
+#     overridden default_weight. The weight translates into how
+#     many requests are handled during each turn of the
+#     RoundRobin, based on the scheduler id.
 #
 # request_scheduler_options:
 #    throttle_limit: 80
@@ -562,27 +1024,20 @@
 #      Keyspace1: 1
 #      Keyspace2: 5
 
-# request_scheduler_id -- An identifer based on which to perform
+# request_scheduler_id -- An identifier based on which to perform
 # the request scheduling. Currently the only valid option is keyspace.
 # request_scheduler_id: keyspace
 
-# index_interval controls the sampling of entries from the primrary
-# row index in terms of space versus time.  The larger the interval,
-# the smaller and less effective the sampling will be.  In technicial
-# terms, the interval coresponds to the number of index entries that
-# are skipped between taking each sample.  All the sampled entries
-# must fit in memory.  Generally, a value between 128 and 512 here
-# coupled with a large key cache size on CFs results in the best trade
-# offs.  This value is not often changed, however if you have many
-# very small rows (many to an OS page), then increasing this will
-# often lower memory usage without a impact on performance.
-index_interval: 128
-
 # Enable or disable inter-node encryption
-# Default settings are TLS v1, RSA 1024-bit keys (it is imperative that
-# users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher
-# suite for authentication, key exchange and encryption of the actual data transfers.
-# NOTE: No custom encryption options are enabled at the moment
+# JVM defaults for supported SSL socket protocols and cipher suites can
+# be replaced using custom encryption options. This is not recommended
+# unless you have policies in place that dictate certain settings, or
+# need to disable vulnerable ciphers or protocols in case the JVM cannot
+# be updated.
+# FIPS compliant settings can be configured at JVM level and should not
+# involve changing encryption settings here:
+# https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/FIPS.html
+# *NOTE* No custom encryption options are enabled at the moment
 # The available internode options are : all, none, dc, rack
 #
 # If set to dc cassandra will encrypt the traffic between the DCs
@@ -592,7 +1047,7 @@
 # the keystore and truststore.  For instructions on generating these files, see:
 # http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
 #
-encryption_options:
+server_encryption_options:
     internode_encryption: none
     keystore: conf/.keystore
     keystore_password: cassandra
@@ -602,4 +1057,200 @@
     # protocol: TLS
     # algorithm: SunX509
     # store_type: JKS
-    # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]
\ No newline at end of file
+    # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA]
+    # require_client_auth: false
+    # require_endpoint_verification: false
+
+# enable or disable client/server encryption.
+client_encryption_options:
+    enabled: false
+    # If enabled and optional is set to true encrypted and unencrypted connections are handled.
+    optional: false
+    keystore: conf/.keystore
+    keystore_password: cassandra
+    # require_client_auth: false
+    # Set trustore and truststore_password if require_client_auth is true
+    # truststore: conf/.truststore
+    # truststore_password: cassandra
+    # More advanced defaults below:
+    # protocol: TLS
+    # algorithm: SunX509
+    # store_type: JKS
+    # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA,TLS_DHE_RSA_WITH_AES_128_CBC_SHA,TLS_DHE_RSA_WITH_AES_256_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA]
+
+# internode_compression controls whether traffic between nodes is
+# compressed.
+# Can be:
+#
+# all
+#   all traffic is compressed
+#
+# dc
+#   traffic between different datacenters is compressed
+#
+# none
+#   nothing is compressed.
+internode_compression: dc
+
+# Enable or disable tcp_nodelay for inter-dc communication.
+# Disabling it will result in larger (but fewer) network packets being sent,
+# reducing overhead from the TCP protocol itself, at the cost of increasing
+# latency if you block for cross-datacenter responses.
+inter_dc_tcp_nodelay: false
+
+# TTL for different trace types used during logging of the repair process.
+tracetype_query_ttl: 86400
+tracetype_repair_ttl: 604800
+
+# By default, Cassandra logs GC Pauses greater than 200 ms at INFO level
+# This threshold can be adjusted to minimize logging if necessary
+# gc_log_threshold_in_ms: 200
+
+# If unset, all GC Pauses greater than gc_log_threshold_in_ms will log at
+# INFO level
+# UDFs (user defined functions) are disabled by default.
+# As of Cassandra 3.0 there is a sandbox in place that should prevent execution of evil code.
+enable_user_defined_functions: false
+
+# Enables scripted UDFs (JavaScript UDFs).
+# Java UDFs are always enabled, if enable_user_defined_functions is true.
+# Enable this option to be able to use UDFs with "language javascript" or any custom JSR-223 provider.
+# This option has no effect, if enable_user_defined_functions is false.
+enable_scripted_user_defined_functions: false
+
+# Enables materialized view creation on this node.
+# Materialized views are considered experimental and are not recommended for production use.
+enable_materialized_views: true
+
+# The default Windows kernel timer and scheduling resolution is 15.6ms for power conservation.
+# Lowering this value on Windows can provide much tighter latency and better throughput, however
+# some virtualized environments may see a negative performance impact from changing this setting
+# below their system default. The sysinternals 'clockres' tool can confirm your system's default
+# setting.
+windows_timer_interval: 1
+
+
+# Enables encrypting data at-rest (on disk). Different key providers can be plugged in, but the default reads from
+# a JCE-style keystore. A single keystore can hold multiple keys, but the one referenced by
+# the "key_alias" is the only key that will be used for encrypt opertaions; previously used keys
+# can still (and should!) be in the keystore and will be used on decrypt operations
+# (to handle the case of key rotation).
+#
+# It is strongly recommended to download and install Java Cryptography Extension (JCE)
+# Unlimited Strength Jurisdiction Policy Files for your version of the JDK.
+# (current link: http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html)
+#
+# Currently, only the following file types are supported for transparent data encryption, although
+# more are coming in future cassandra releases: commitlog, hints
+transparent_data_encryption_options:
+    enabled: false
+    chunk_length_kb: 64
+    cipher: AES/CBC/PKCS5Padding
+    key_alias: testing:1
+    # CBC IV length for AES needs to be 16 bytes (which is also the default size)
+    # iv_length: 16
+    key_provider: 
+      - class_name: org.apache.cassandra.security.JKSKeyProvider
+        parameters: 
+          - keystore: conf/.keystore
+            keystore_password: cassandra
+            store_type: JCEKS
+            key_password: cassandra
+
+
+#####################
+# SAFETY THRESHOLDS #
+#####################
+
+# When executing a scan, within or across a partition, we need to keep the
+# tombstones seen in memory so we can return them to the coordinator, which
+# will use them to make sure other replicas also know about the deleted rows.
+# With workloads that generate a lot of tombstones, this can cause performance
+# problems and even exaust the server heap.
+# (http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets)
+# Adjust the thresholds here if you understand the dangers and want to
+# scan more tombstones anyway.  These thresholds may also be adjusted at runtime
+# using the StorageService mbean.
+tombstone_warn_threshold: 1000
+tombstone_failure_threshold: 100000
+
+# Log WARN on any multiple-partition batch size exceeding this value. 5kb per batch by default.
+# Caution should be taken on increasing the size of this threshold as it can lead to node instability.
+batch_size_warn_threshold_in_kb: 5
+
+# Fail any multiple-partition batch exceeding this value. 50kb (10x warn threshold) by default.
+batch_size_fail_threshold_in_kb: 50
+
+# Log WARN on any batches not of type LOGGED than span across more partitions than this limit
+unlogged_batch_across_partitions_warn_threshold: 10
+
+# Log a warning when compacting partitions larger than this value
+compaction_large_partition_warning_threshold_mb: 100
+
+# GC Pauses greater than gc_warn_threshold_in_ms will be logged at WARN level
+# Adjust the threshold based on your application throughput requirement
+# By default, Cassandra logs GC Pauses greater than 200 ms at INFO level
+gc_warn_threshold_in_ms: 1000
+
+# Maximum size of any value in SSTables. Safety measure to detect SSTable corruption
+# early. Any value size larger than this threshold will result into marking an SSTable
+# as corrupted. This should be positive and less than 2048.
+# max_value_size_in_mb: 256
+
+# Back-pressure settings #
+# If enabled, the coordinator will apply the back-pressure strategy specified below to each mutation
+# sent to replicas, with the aim of reducing pressure on overloaded replicas.
+back_pressure_enabled: false
+# The back-pressure strategy applied.
+# The default implementation, RateBasedBackPressure, takes three arguments:
+# high ratio, factor, and flow type, and uses the ratio between incoming mutation responses and outgoing mutation requests.
+# If below high ratio, outgoing mutations are rate limited according to the incoming rate decreased by the given factor;
+# if above high ratio, the rate limiting is increased by the given factor;
+# such factor is usually best configured between 1 and 10, use larger values for a faster recovery
+# at the expense of potentially more dropped mutations;
+# the rate limiting is applied according to the flow type: if FAST, it's rate limited at the speed of the fastest replica,
+# if SLOW at the speed of the slowest one.
+# New strategies can be added. Implementors need to implement org.apache.cassandra.net.BackpressureStrategy and
+# provide a public constructor accepting a Map<String, Object>.
+back_pressure_strategy:
+    - class_name: org.apache.cassandra.net.RateBasedBackPressure
+      parameters:
+        - high_ratio: 0.90
+          factor: 5
+          flow: FAST
+
+# Coalescing Strategies #
+# Coalescing multiples messages turns out to significantly boost message processing throughput (think doubling or more).
+# On bare metal, the floor for packet processing throughput is high enough that many applications won't notice, but in
+# virtualized environments, the point at which an application can be bound by network packet processing can be
+# surprisingly low compared to the throughput of task processing that is possible inside a VM. It's not that bare metal
+# doesn't benefit from coalescing messages, it's that the number of packets a bare metal network interface can process
+# is sufficient for many applications such that no load starvation is experienced even without coalescing.
+# There are other benefits to coalescing network messages that are harder to isolate with a simple metric like messages
+# per second. By coalescing multiple tasks together, a network thread can process multiple messages for the cost of one
+# trip to read from a socket, and all the task submission work can be done at the same time reducing context switching
+# and increasing cache friendliness of network message processing.
+# See CASSANDRA-8692 for details.
+
+# Strategy to use for coalescing messages in OutboundTcpConnection.
+# Can be fixed, movingaverage, timehorizon, disabled (default).
+# You can also specify a subclass of CoalescingStrategies.CoalescingStrategy by name.
+# otc_coalescing_strategy: DISABLED
+
+# How many microseconds to wait for coalescing. For fixed strategy this is the amount of time after the first
+# message is received before it will be sent with any accompanying messages. For moving average this is the
+# maximum amount of time that will be waited as well as the interval at which messages must arrive on average
+# for coalescing to be enabled.
+# otc_coalescing_window_us: 200
+
+# Do not try to coalesce messages if we already got that many messages. This should be more than 2 and less than 128.
+# otc_coalescing_enough_coalesced_messages: 8
+
+# How many milliseconds to wait between two expiration runs on the backlog (queue) of the OutboundTcpConnection.
+# Expiration is done if messages are piling up in the backlog. Droppable messages are expired to free the memory
+# taken by expired messages. The interval should be between 0 and 1000, and in most installations the default value
+# will be appropriate. A smaller value could potentially expire messages slightly sooner at the expense of more CPU
+# time and queue contention while iterating the backlog of messages.
+# An interval of 0 disables any wait time, which is the behavior of former Cassandra versions.
+#
+# otc_backlog_expiration_interval_ms: 200
diff --git a/appender/cassandra/src/test/resources/logback.xml b/appender/cassandra/src/test/resources/logback.xml
new file mode 100644
index 0000000..269ebfa
--- /dev/null
+++ b/appender/cassandra/src/test/resources/logback.xml
@@ -0,0 +1,102 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+<!--
+In order to disable debug.log, comment-out the ASYNCDEBUGLOG
+appender reference in the root level section below.
+-->
+
+<configuration scan="true">
+    <jmxConfigurator />
+
+    <!-- No shutdown hook; we run it ourselves in StorageService after shutdown -->
+
+    <!-- SYSTEMLOG rolling file appender to system.log (INFO level) -->
+
+    <appender name="SYSTEMLOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+        <file>${cassandra.logdir}/system.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>${cassandra.logdir}/system.log.%i.zip</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>20</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>20MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <!-- DEBUGLOG rolling file appender to debug.log (all levels) -->
+
+    <appender name="DEBUGLOG" class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${cassandra.logdir}/debug.log</file>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>${cassandra.logdir}/debug.log.%i.zip</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>20</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>20MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <!-- ASYNCLOG assynchronous appender to debug.log (all levels) -->
+
+    <appender name="ASYNCDEBUGLOG" class="ch.qos.logback.classic.AsyncAppender">
+        <queueSize>1024</queueSize>
+        <discardingThreshold>0</discardingThreshold>
+        <includeCallerData>true</includeCallerData>
+        <appender-ref ref="DEBUGLOG" />
+    </appender>
+
+    <!-- STDOUT console appender to stdout (INFO level) -->
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+            <level>INFO</level>
+        </filter>
+        <encoder>
+            <pattern>%-5level [%thread] %date{ISO8601} %F:%L - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <!-- Uncomment bellow and corresponding appender-ref to activate logback metrics
+    <appender name="LogbackMetrics" class="com.codahale.metrics.logback.InstrumentedAppender" />
+     -->
+
+    <root level="INFO">
+        <appender-ref ref="SYSTEMLOG" />
+        <appender-ref ref="STDOUT" />
+        <appender-ref ref="ASYNCDEBUGLOG" /> <!-- Comment this line to disable debug.log -->
+        <!--
+        <appender-ref ref="LogbackMetrics" />
+        -->
+    </root>
+
+    <logger name="org.apache.cassandra" level="DEBUG"/>
+    <logger name="com.thinkaurelius.thrift" level="ERROR"/>
+</configuration>
diff --git a/appender/elasticsearch-jest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/jest/ElasticsearchAppender.java b/appender/elasticsearch-jest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/jest/ElasticsearchAppender.java
index cd0266e..4917614 100644
--- a/appender/elasticsearch-jest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/jest/ElasticsearchAppender.java
+++ b/appender/elasticsearch-jest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/jest/ElasticsearchAppender.java
@@ -68,23 +68,23 @@
 )
 public class ElasticsearchAppender implements EventHandler {
 
+    public static final String ADDRESS_PROPERTY = "address";
+    public static final String USERNAME_PROPERTY = "username";
+    public static final String PASSWORD_PROPERTY = "password";
+    public static final String INDEX_PREFIX_PROPERTY = "index.prefix";
+    public static final String INDEX_TYPE_PROPERTY = "index.type";
+    public static final String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+
+    public static final String ADDRESS_DEFAULT = "http://localhost:9200";
+    public static final String USERNAME_DEFAULT = null;
+    public static final String PASSWORD_DEFAULT = null;
+    public static final String INDEX_PREFIX_DEFAULT = "karaf";
+    public static final String INDEX_TYPE_DEFAULT = "decanter";
+    public static final String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+
     @Reference
     public Marshaller marshaller;
 
-    public static String ADDRESS_PROPERTY = "address";
-    public static String USERNAME_PROPERTY = "username";
-    public static String PASSWORD_PROPERTY = "password";
-    public static String INDEX_PREFIX_PROPERTY = "index.prefix";
-    public static String INDEX_TYPE_PROPERTY = "index.type";
-    public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
-
-    public static String ADDRESS_DEFAULT = "http://localhost:9200";
-    public static String USERNAME_DEFAULT = null;
-    public static String PASSWORD_DEFAULT = null;
-    public static String INDEX_PREFIX_DEFAULT = "karaf";
-    public static String INDEX_TYPE_DEFAULT = "decanter";
-    public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
-
     private Dictionary<String, Object> config;
 
     private JestClient client;
diff --git a/appender/elasticsearch-native-1.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java b/appender/elasticsearch-native-1.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
index 87fc459..159c1f1 100644
--- a/appender/elasticsearch-native-1.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
+++ b/appender/elasticsearch-native-1.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
@@ -53,17 +53,17 @@
 )
 public class ElasticsearchAppender implements EventHandler {
 
-    public static String HOST_PROPERTY = "host";
-    public static String PORT_PROPERTY = "port";
-    public static String CLUSTER_NAME_PROPERTY = "clusterName";
-    public static String INDEX_PREFIX_PROPERTY = "index.prefix";
-    public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+    public static final String HOST_PROPERTY = "host";
+    public static final String PORT_PROPERTY = "port";
+    public static final String CLUSTER_NAME_PROPERTY = "clusterName";
+    public static final String INDEX_PREFIX_PROPERTY = "index.prefix";
+    public static final String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
 
-    public static String HOST_DEFAULT = "localhost";
-    public static String PORT_DEFAULT = "9300";
-    public static String CLUSTER_NAME_DEFAULT = "elasticsearch";
-    public static String INDEX_PREFIX_DEFAULT = "karaf";
-    public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+    public static final String HOST_DEFAULT = "localhost";
+    public static final String PORT_DEFAULT = "9300";
+    public static final String CLUSTER_NAME_DEFAULT = "elasticsearch";
+    public static final String INDEX_PREFIX_DEFAULT = "karaf";
+    public static final String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
 
     final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAppender.class);
 
diff --git a/appender/elasticsearch-native-2.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java b/appender/elasticsearch-native-2.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
index 9149a92..f72971a 100644
--- a/appender/elasticsearch-native-2.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
+++ b/appender/elasticsearch-native-2.x/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/ElasticsearchAppender.java
@@ -52,17 +52,17 @@
 )
 public class ElasticsearchAppender implements EventHandler {
 
-    public static String HOST_PROPERTY = "host";
-    public static String PORT_PROPERTY = "port";
-    public static String CLUSTER_NAME_PROPERTY = "clusterName";
-    public static String INDEX_PREFIX_PROPERTY = "index.prefix";
-    public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+    public static final String HOST_PROPERTY = "host";
+    public static final String PORT_PROPERTY = "port";
+    public static final String CLUSTER_NAME_PROPERTY = "clusterName";
+    public static final String INDEX_PREFIX_PROPERTY = "index.prefix";
+    public static final String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
 
-    public static String HOST_DEFAULT = "localhost";
-    public static String PORT_DEFAULT = "9300";
-    public static String CLUSTER_NAME_DEFAULT = "elasticsearch";
-    public static String INDEX_PREFIX_DEFAULT = "karaf";
-    public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+    public static final String HOST_DEFAULT = "localhost";
+    public static final String PORT_DEFAULT = "9300";
+    public static final String CLUSTER_NAME_DEFAULT = "elasticsearch";
+    public static final String INDEX_PREFIX_DEFAULT = "karaf";
+    public static final String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
 
     final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchAppender.class);
 
diff --git a/appender/elasticsearch-rest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/rest/ElasticsearchAppender.java b/appender/elasticsearch-rest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/rest/ElasticsearchAppender.java
index 1d3c359..ec2a1d1 100644
--- a/appender/elasticsearch-rest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/rest/ElasticsearchAppender.java
+++ b/appender/elasticsearch-rest/src/main/java/org/apache/karaf/decanter/appender/elasticsearch/rest/ElasticsearchAppender.java
@@ -52,19 +52,19 @@
 )
 public class ElasticsearchAppender implements EventHandler {
 
-    public static String ADDRESSES_PROPERTY = "addresses";
-    public static String USERNAME_PROPERTY = "username";
-    public static String PASSWORD_PROPERTY = "password";
-    public static String INDEX_PREFIX_PROPERTY = "index.prefix";
-    public static String INDEX_TYPE_PROPERTY = "index.type";
-    public static String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
+    public static final String ADDRESSES_PROPERTY = "addresses";
+    public static final String USERNAME_PROPERTY = "username";
+    public static final String PASSWORD_PROPERTY = "password";
+    public static final String INDEX_PREFIX_PROPERTY = "index.prefix";
+    public static final String INDEX_TYPE_PROPERTY = "index.type";
+    public static final String INDEX_EVENT_TIMESTAMPED_PROPERTY = "index.event.timestamped";
 
-    public static String ADDRESSES_DEFAULT = "http://localhost:9200";
-    public static String USERNAME_DEFAULT = null;
-    public static String PASSWORD_DEFAULT = null;
-    public static String INDEX_PREFIX_DEFAULT = "karaf";
-    public static String INDEX_TYPE_DEFAULT = "decanter";
-    public static String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
+    public static final String ADDRESSES_DEFAULT = "http://localhost:9200";
+    public static final String USERNAME_DEFAULT = null;
+    public static final String PASSWORD_DEFAULT = null;
+    public static final String INDEX_PREFIX_DEFAULT = "karaf";
+    public static final String INDEX_TYPE_DEFAULT = "decanter";
+    public static final String INDEX_EVENT_TIMESTAMPED_DEFAULT = "true";
 
     @Reference
     public Marshaller marshaller;
diff --git a/appender/file/src/main/java/org/apache/karaf/decanter/appender/file/FileAppender.java b/appender/file/src/main/java/org/apache/karaf/decanter/appender/file/FileAppender.java
index 4a2778e..3e341f0 100644
--- a/appender/file/src/main/java/org/apache/karaf/decanter/appender/file/FileAppender.java
+++ b/appender/file/src/main/java/org/apache/karaf/decanter/appender/file/FileAppender.java
@@ -39,8 +39,8 @@
 )
 public class FileAppender implements EventHandler {
 
-    public static String FILENAME_PROPERTY = "filename";
-    public static String APPEND_PROPERTY = "append";
+    public static final String FILENAME_PROPERTY = "filename";
+    public static final String APPEND_PROPERTY = "append";
 
     @Reference
     public Marshaller marshaller;
diff --git a/appender/jdbc/src/main/java/org/apache/karaf/decanter/appender/jdbc/JdbcAppender.java b/appender/jdbc/src/main/java/org/apache/karaf/decanter/appender/jdbc/JdbcAppender.java
index 63b627f..b1a2f26 100644
--- a/appender/jdbc/src/main/java/org/apache/karaf/decanter/appender/jdbc/JdbcAppender.java
+++ b/appender/jdbc/src/main/java/org/apache/karaf/decanter/appender/jdbc/JdbcAppender.java
@@ -43,11 +43,11 @@
 )
 public class JdbcAppender implements EventHandler {
 
-    public static String TABLE_NAME_PROPERTY = "table.name";
-    public static String DIALECT_PROPERTY = "dialect";
+    public static final String TABLE_NAME_PROPERTY = "table.name";
+    public static final String DIALECT_PROPERTY = "dialect";
 
-    public static String TABLE_NAME_DEFAULT = "decanter";
-    public static String DIALECT_DEFAULT = "generic";
+    public static final String TABLE_NAME_DEFAULT = "decanter";
+    public static final String DIALECT_DEFAULT = "generic";
 
     @Reference
     public Marshaller marshaller;
diff --git a/appender/jms/src/main/java/org/apache/karaf/decanter/appender/jms/JmsAppender.java b/appender/jms/src/main/java/org/apache/karaf/decanter/appender/jms/JmsAppender.java
index ae1cb00..0e6b144 100644
--- a/appender/jms/src/main/java/org/apache/karaf/decanter/appender/jms/JmsAppender.java
+++ b/appender/jms/src/main/java/org/apache/karaf/decanter/appender/jms/JmsAppender.java
@@ -40,17 +40,17 @@
 )
 public class JmsAppender implements EventHandler {
 
-    public static String USERNAME_PROPERTY = "username";
-    public static String PASSWORD_PROPERTY = "password";
-    public static String DESTINATION_NAME_PROPERTY = "destination.name";
-    public static String DESTINATION_TYPE_PROPERTY = "destination.type";
-    public static String MESSAGE_TYPE_PROPERTY = "message.type";
+    public static final String USERNAME_PROPERTY = "username";
+    public static final String PASSWORD_PROPERTY = "password";
+    public static final String DESTINATION_NAME_PROPERTY = "destination.name";
+    public static final String DESTINATION_TYPE_PROPERTY = "destination.type";
+    public static final String MESSAGE_TYPE_PROPERTY = "message.type";
 
-    public static String USERNAME_DEFAULT = null;
-    public static String PASSWORD_DEFAULT = null;
-    public static String DESTINATION_NAME_DEFAULT = "decanter";
-    public static String DESTINATION_TYPE_DEFAULT = "queue";
-    public static String MESSAGE_TYPE_DEFAULT = "text";
+    public static final String USERNAME_DEFAULT = null;
+    public static final String PASSWORD_DEFAULT = null;
+    public static final String DESTINATION_NAME_DEFAULT = "decanter";
+    public static final String DESTINATION_TYPE_DEFAULT = "queue";
+    public static final String MESSAGE_TYPE_DEFAULT = "text";
 
     @Reference
     public ConnectionFactory connectionFactory;
diff --git a/appender/mongodb/src/main/java/org/apache/karaf/decanter/appender/mongodb/MongoDbAppender.java b/appender/mongodb/src/main/java/org/apache/karaf/decanter/appender/mongodb/MongoDbAppender.java
index 16fef55..7221795 100644
--- a/appender/mongodb/src/main/java/org/apache/karaf/decanter/appender/mongodb/MongoDbAppender.java
+++ b/appender/mongodb/src/main/java/org/apache/karaf/decanter/appender/mongodb/MongoDbAppender.java
@@ -41,13 +41,13 @@
 )
 public class MongoDbAppender implements EventHandler {
 
-    public static String URI_PROPERTY = "uri";
-    public static String DATABASE_PROPERTY = "database";
-    public static String COLLECTION_PROPERTY = "collection";
+    public static final String URI_PROPERTY = "uri";
+    public static final String DATABASE_PROPERTY = "database";
+    public static final String COLLECTION_PROPERTY = "collection";
 
-    public static String URI_DEFAULT = "mongodb://localhost";
-    public static String DATABASE_DEFAULT = "decanter";
-    public static String COLLECTION_DEFAULT = "decanter";
+    public static final String URI_DEFAULT = "mongodb://localhost";
+    public static final String DATABASE_DEFAULT = "decanter";
+    public static final String COLLECTION_DEFAULT = "decanter";
 
     @Reference
     public Marshaller marshaller;
diff --git a/appender/mqtt/src/main/java/org/apache/karaf/decanter/appender/mqtt/MqttAppender.java b/appender/mqtt/src/main/java/org/apache/karaf/decanter/appender/mqtt/MqttAppender.java
index 024b53b..00ac599 100644
--- a/appender/mqtt/src/main/java/org/apache/karaf/decanter/appender/mqtt/MqttAppender.java
+++ b/appender/mqtt/src/main/java/org/apache/karaf/decanter/appender/mqtt/MqttAppender.java
@@ -43,13 +43,13 @@
 )
 public class MqttAppender implements EventHandler {
 
-    public static String SERVER_PROPERTY = "server";
-    public static String CLIENT_ID_PROPERTY = "clientId";
-    public static String TOPIC_PROPERTY = "topic";
+    public static final String SERVER_PROPERTY = "server";
+    public static final String CLIENT_ID_PROPERTY = "clientId";
+    public static final String TOPIC_PROPERTY = "topic";
 
-    public static String SERVER_DEFAULT = "tcp://localhost:9300";
-    public static String CLIENT_ID_DEFAULT = "decanter";
-    public static String TOPIC_DEFAULT = "decanter";
+    public static final String SERVER_DEFAULT = "tcp://localhost:9300";
+    public static final String CLIENT_ID_DEFAULT = "decanter";
+    public static final String TOPIC_DEFAULT = "decanter";
 
     @Reference
     public Marshaller marshaller;
diff --git a/appender/orientdb/src/main/java/org/apache/karaf/decanter/appender/orientdb/OrientDBAppender.java b/appender/orientdb/src/main/java/org/apache/karaf/decanter/appender/orientdb/OrientDBAppender.java
index fd50d0f..e0302fa 100644
--- a/appender/orientdb/src/main/java/org/apache/karaf/decanter/appender/orientdb/OrientDBAppender.java
+++ b/appender/orientdb/src/main/java/org/apache/karaf/decanter/appender/orientdb/OrientDBAppender.java
@@ -39,13 +39,13 @@
 )
 public class OrientDBAppender implements EventHandler {
 
-    public static String URL_PROPERTY = "url";
-    public static String USERNAME_PROPERTY = "username";
-    public static String PASSWORD_PROPERTY = "password";
+    public static final String URL_PROPERTY = "url";
+    public static final String USERNAME_PROPERTY = "username";
+    public static final String PASSWORD_PROPERTY = "password";
 
-    public static String URL_DEFAULT = "remote:localhost/decanter";
-    public static String USERNAME_DEFAULT = "root";
-    public static String PASSWORD_DEFAULT = "decanter";
+    public static final String URL_DEFAULT = "remote:localhost/decanter";
+    public static final String USERNAME_DEFAULT = "root";
+    public static final String PASSWORD_DEFAULT = "decanter";
 
     @Reference
     public Marshaller marshaller;
diff --git a/appender/redis/src/main/java/org/apache/karaf/decanter/appender/redis/RedisAppender.java b/appender/redis/src/main/java/org/apache/karaf/decanter/appender/redis/RedisAppender.java
index 749a320..04e55b4 100644
--- a/appender/redis/src/main/java/org/apache/karaf/decanter/appender/redis/RedisAppender.java
+++ b/appender/redis/src/main/java/org/apache/karaf/decanter/appender/redis/RedisAppender.java
@@ -41,19 +41,19 @@
 )
 public class RedisAppender implements EventHandler {
 
-    public static String ADDRESS_PROPERTY = "address";
-    public static String MODE_PROPERTY = "mode";
-    public static String MAP_PROPERTY = "map";
-    public static String MASTER_ADDRESS_PROPERTY = "masterAddress";
-    public static String MASTER_NAME_PROPERTY = "masterName";
-    public static String SCAN_INTERVAL_PROPERTY = "scanInterval";
+    public static final String ADDRESS_PROPERTY = "address";
+    public static final String MODE_PROPERTY = "mode";
+    public static final String MAP_PROPERTY = "map";
+    public static final String MASTER_ADDRESS_PROPERTY = "masterAddress";
+    public static final String MASTER_NAME_PROPERTY = "masterName";
+    public static final String SCAN_INTERVAL_PROPERTY = "scanInterval";
 
-    public static String ADDRESS_DEFAULT = "localhost:6379";
-    public static String MODE_DEFAULT = "Single";
-    public static String MAP_DEFAULT = "Decanter";
-    public static String MASTER_ADDRESS_DEFAULT = null;
-    public static String MASTER_NAME_DEFAULT = null;
-    public static String SCAN_INTERVAL_DEFAULT = "2000";
+    public static final String ADDRESS_DEFAULT = "localhost:6379";
+    public static final String MODE_DEFAULT = "Single";
+    public static final String MAP_DEFAULT = "Decanter";
+    public static final String MASTER_ADDRESS_DEFAULT = null;
+    public static final String MASTER_NAME_DEFAULT = null;
+    public static final String SCAN_INTERVAL_DEFAULT = "2000";
 
     private RedissonClient redissonClient;
 
diff --git a/appender/rest/src/main/java/org/apache/karaf/decanter/appender/rest/RestAppender.java b/appender/rest/src/main/java/org/apache/karaf/decanter/appender/rest/RestAppender.java
index 5762465..6130bbe 100644
--- a/appender/rest/src/main/java/org/apache/karaf/decanter/appender/rest/RestAppender.java
+++ b/appender/rest/src/main/java/org/apache/karaf/decanter/appender/rest/RestAppender.java
@@ -45,7 +45,7 @@
 )
 public class RestAppender implements EventHandler {
 
-    public static String URI_PROPERTY = "uri";
+    public static final String URI_PROPERTY = "uri";
 
     @Reference
     public Marshaller marshaller;
diff --git a/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java b/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
index fb5d0f7..4989206 100644
--- a/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
+++ b/appender/socket/src/main/java/org/apache/karaf/decanter/appender/socket/SocketAppender.java
@@ -38,11 +38,11 @@
 )
 public class SocketAppender implements EventHandler {
 
-    public static String HOST_PROPERTY = "host";
-    public static String PORT_PROPERTY = "port";
+    public static final String HOST_PROPERTY = "host";
+    public static final String PORT_PROPERTY = "port";
 
-    public static String HOST_DEFAULT = "localhost";
-    public static String PORT_DEFAULT = "34343";
+    public static final String HOST_DEFAULT = "localhost";
+    public static final String PORT_DEFAULT = "34343";
 
     @Reference
     public Marshaller marshaller;
diff --git a/appender/timescaledb/src/main/java/org/apache/karaf/decanter/appender/timescaledb/TimescaleDbAppender.java b/appender/timescaledb/src/main/java/org/apache/karaf/decanter/appender/timescaledb/TimescaleDbAppender.java
index c9ef9f3..7e03d7e 100644
--- a/appender/timescaledb/src/main/java/org/apache/karaf/decanter/appender/timescaledb/TimescaleDbAppender.java
+++ b/appender/timescaledb/src/main/java/org/apache/karaf/decanter/appender/timescaledb/TimescaleDbAppender.java
@@ -41,9 +41,9 @@
 )
 public class TimescaleDbAppender implements EventHandler {
 
-    public static String TABLE_NAME_PROPERTY = "table.name";
+    public static final String TABLE_NAME_PROPERTY = "table.name";
 
-    public static String TABLE_NAME_DEFAULT = "decanter";
+    public static final String TABLE_NAME_DEFAULT = "decanter";
 
     @Reference
     public Marshaller marshaller;
diff --git a/appender/utils/src/main/java/org/apache/karaf/decanter/appender/utils/EventFilter.java b/appender/utils/src/main/java/org/apache/karaf/decanter/appender/utils/EventFilter.java
index 8f5efa0..9a40260 100644
--- a/appender/utils/src/main/java/org/apache/karaf/decanter/appender/utils/EventFilter.java
+++ b/appender/utils/src/main/java/org/apache/karaf/decanter/appender/utils/EventFilter.java
@@ -22,10 +22,10 @@
 
 public class EventFilter {
 
-    public static String PROPERTY_NAME_EXCLUDE_CONFIG = "event.property.name.exclude";
-    public static String PROPERTY_NAME_INCLUDE_CONFIG = "event.property.name.include";
-    public static String PROPERTY_VALUE_EXCLUDE_CONFIG = "event.property.value.exclude";
-    public static String PROPERTY_VALUE_INCLUDE_CONFIG = "event.property.value.include";
+    public static final String PROPERTY_NAME_EXCLUDE_CONFIG = "event.property.name.exclude";
+    public static final String PROPERTY_NAME_INCLUDE_CONFIG = "event.property.name.include";
+    public static final String PROPERTY_VALUE_EXCLUDE_CONFIG = "event.property.value.exclude";
+    public static final String PROPERTY_VALUE_INCLUDE_CONFIG = "event.property.value.include";
 
     public static boolean match(Event event, Dictionary<String, Object> config) {
         if (config == null) {
diff --git a/appender/websocket-servlet/src/main/java/org/apache/karaf/decanter/appender/websocket/DecanterWebSocketAppender.java b/appender/websocket-servlet/src/main/java/org/apache/karaf/decanter/appender/websocket/DecanterWebSocketAppender.java
index 79cdd4a..e7a5568 100644
--- a/appender/websocket-servlet/src/main/java/org/apache/karaf/decanter/appender/websocket/DecanterWebSocketAppender.java
+++ b/appender/websocket-servlet/src/main/java/org/apache/karaf/decanter/appender/websocket/DecanterWebSocketAppender.java
@@ -48,9 +48,9 @@
 @WebSocket
 public class DecanterWebSocketAppender implements EventHandler {
 
-    public static String ALIAS_PROPERTY = "servlet.alias";
+    public static final String ALIAS_PROPERTY = "servlet.alias";
 
-    public static String ALIAS_DEFAULT = "/decanter-websocket";
+    public static final String ALIAS_DEFAULT = "/decanter-websocket";
 
     private static final Logger LOG = LoggerFactory.getLogger(DecanterWebSocketAppender.class);
 
diff --git a/assembly/src/main/feature/feature.xml b/assembly/src/main/feature/feature.xml
index c37855d..cc91619 100644
--- a/assembly/src/main/feature/feature.xml
+++ b/assembly/src/main/feature/feature.xml
@@ -344,19 +344,25 @@
 
     <feature name="decanter-appender-cassandra-core" version="${project.version}" description="Karaf Decanter Cassandra Appender core">
         <feature>decanter-common</feature>
-        <bundle dependency="true">mvn:com.google.guava/guava/16.0.1</bundle>
-        <bundle dependency="true">mvn:io.netty/netty-handler/4.0.37.Final</bundle>
-        <bundle dependency="true">mvn:io.netty/netty-transport-native-epoll/4.0.37.Final</bundle>
-        <bundle dependency="true">mvn:io.netty/netty-buffer/4.0.37.Final</bundle>
-        <bundle dependency="true">mvn:io.netty/netty-common/4.0.37.Final</bundle>
-        <bundle dependency="true">mvn:io.netty/netty-transport/4.0.37.Final</bundle>
-        <bundle dependency="true">mvn:io.netty/netty-codec/4.0.37.Final</bundle>
-        <bundle dependency="true">mvn:io.dropwizard.metrics/metrics-core/3.1.2</bundle>
-        <bundle dependency="true">mvn:io.dropwizard.metrics/metrics-json/3.1.2</bundle>
-        <bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-core/2.6.3</bundle>
-        <bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-databind/2.6.3</bundle>
-        <bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-annotations/2.6.3</bundle>
-        <bundle dependency="true">mvn:com.datastax.cassandra/cassandra-driver-core/${cassandra.driver.version}</bundle>
+        <bundle dependency="true">mvn:com.google.guava/guava/25.1-jre</bundle>
+        <bundle dependency="true">mvn:io.netty/netty-handler/4.1.34.Final</bundle>
+        <bundle dependency="true">mvn:io.netty/netty-transport-native-epoll/4.1.34.Final</bundle>
+        <bundle dependency="true">mvn:io.netty/netty-buffer/4.1.34.Final</bundle>
+        <bundle dependency="true">mvn:io.netty/netty-common/4.1.34.Final</bundle>
+        <bundle dependency="true">mvn:io.netty/netty-transport/4.1.34.Final</bundle>
+        <bundle dependency="true">mvn:io.netty/netty-codec/4.1.34.Final</bundle>
+        <bundle dependency="true">mvn:io.netty/netty-resolver/4.1.34.Final</bundle>
+        <bundle dependency="true">mvn:io.dropwizard.metrics/metrics-core/4.0.5</bundle>
+        <bundle dependency="true">mvn:io.dropwizard.metrics/metrics-json/4.0.5</bundle>
+        <bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-core/2.9.8</bundle>
+        <bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-databind/2.9.8</bundle>
+        <bundle dependency="true">mvn:com.fasterxml.jackson.core/jackson-annotations/2.9.8</bundle>
+        <bundle dependency="true">mvn:com.datastax.oss/java-driver-core-shaded/${cassandra.driver.version}</bundle>
+        <bundle dependency="true">mvn:com.datastax.oss/java-driver-query-builder/${cassandra.driver.version}</bundle>
+        <bundle dependency="true">mvn:com.datastax.oss/java-driver-shaded-guava/25.1-jre</bundle>
+        <bundle dependency="true">mvn:com.datastax.oss/native-protocol/1.4.5</bundle>
+        <bundle dependency="true">mvn:com.typesafe/config/1.3.3</bundle>
+        <bundle dependency="true">mvn:org.hdrhistogram/HdrHistogram/2.1.11</bundle>
         <bundle>mvn:org.apache.karaf.decanter.appender/org.apache.karaf.decanter.appender.cassandra/${project.version}</bundle>
     </feature>
     
diff --git a/backend/elasticsearch-1.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java b/backend/elasticsearch-1.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
index 7b96587..2e0b4f9 100644
--- a/backend/elasticsearch-1.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
+++ b/backend/elasticsearch-1.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
@@ -44,21 +44,21 @@
     
     private static Node node;
     
-    public static String PLUGINS_DIRECTORY = "plugins.directory";
-    public static String ELASTIC_YAML_FILE = "elasticsearch.yaml";
+    public static final String PLUGINS_DIRECTORY = "plugins.directory";
+    public static final String ELASTIC_YAML_FILE = "elasticsearch.yaml";
 
-    public static String CLUSTER_NAME = "cluster.name";
-    public static String HTTP_ENABLED = "http.enabled";
-    public static String NODE_DATA = "node.data";
-    public static String NODE_NAME = "node.name";
-    public static String NODE_MASTER = "node.master";
-    public static String PATH_DATA = "path.data";
-    public static String NETWORK_HOST = "network.host";
-    public static String PORT = "port";
-    public static String CLUSTER_ROUTING_SCHEDULE = "cluster.routing.schedule";
-    public static String PATH_PLUGINS = "path.plugins";
-    public static String HTTP_CORS_ENABLED = "http.cors.enabled";
-    public static String HTTP_CORS_ALLOW_ORIGIN = "http.cors.allow-origin";
+    public static final String CLUSTER_NAME = "cluster.name";
+    public static final String HTTP_ENABLED = "http.enabled";
+    public static final String NODE_DATA = "node.data";
+    public static final String NODE_NAME = "node.name";
+    public static final String NODE_MASTER = "node.master";
+    public static final String PATH_DATA = "path.data";
+    public static final String NETWORK_HOST = "network.host";
+    public static final String PORT = "port";
+    public static final String CLUSTER_ROUTING_SCHEDULE = "cluster.routing.schedule";
+    public static final String PATH_PLUGINS = "path.plugins";
+    public static final String HTTP_CORS_ENABLED = "http.cors.enabled";
+    public static final String HTTP_CORS_ALLOW_ORIGIN = "http.cors.allow-origin";
 
     private static final boolean IS_WINDOWS = System.getProperty("os.name").contains("indow");
 
diff --git a/backend/elasticsearch-2.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java b/backend/elasticsearch-2.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
index c63673d..df300d7 100644
--- a/backend/elasticsearch-2.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
+++ b/backend/elasticsearch-2.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
@@ -42,20 +42,20 @@
     private final static Logger LOGGER = LoggerFactory.getLogger(EmbeddedNode.class);
     private static Node node;
     
-    public static String PLUGINS_DIRECTORY = "plugins.directory";
-    public static String ELASTIC_YAML_FILE = "elasticsearch.yaml";
+    public static final String PLUGINS_DIRECTORY = "plugins.directory";
+    public static final String ELASTIC_YAML_FILE = "elasticsearch.yaml";
 
-    public static String CLUSTER_NAME = "cluster.name";
-    public static String HTTP_ENABLED = "http.enabled";
-    public static String NODE_NAME = "node.name";
-    public static String PATH_DATA = "path.data";
-    public static String PATH_HOME = "path.home";
-    public static String NETWORK_HOST = "network.host";
-    public static String CLUSTER_ROUTING_SCHEDULE = "cluster.routing.schedule";
-    public static String PATH_PLUGINS = "path.plugins";
-    public static String HTTP_CORS_ENABLED = "http.cors.enabled";
-    public static String HTTP_CORS_ALLOW_ORIGIN = "http.cors.allow-origin";
-    public static String INDEX_MAX_RESULT_WINDOW = "index.max_result_window";
+    public static final String CLUSTER_NAME = "cluster.name";
+    public static final String HTTP_ENABLED = "http.enabled";
+    public static final String NODE_NAME = "node.name";
+    public static final String PATH_DATA = "path.data";
+    public static final String PATH_HOME = "path.home";
+    public static final String NETWORK_HOST = "network.host";
+    public static final String CLUSTER_ROUTING_SCHEDULE = "cluster.routing.schedule";
+    public static final String PATH_PLUGINS = "path.plugins";
+    public static final String HTTP_CORS_ENABLED = "http.cors.enabled";
+    public static final String HTTP_CORS_ALLOW_ORIGIN = "http.cors.allow-origin";
+    public static final String INDEX_MAX_RESULT_WINDOW = "index.max_result_window";
 
     private static final boolean IS_WINDOWS = System.getProperty("os.name").contains("indow");
 
diff --git a/backend/elasticsearch-5.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java b/backend/elasticsearch-5.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
index ebf4387..3bfdf39 100644
--- a/backend/elasticsearch-5.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
+++ b/backend/elasticsearch-5.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
@@ -46,19 +46,19 @@
     private final static Logger LOGGER = LoggerFactory.getLogger(EmbeddedNode.class);
     private static Node node;
     
-    public static String PLUGINS_DIRECTORY = "plugins.directory";
-    public static String ELASTIC_YAML_FILE = "elasticsearch.yaml";
+    public static final String PLUGINS_DIRECTORY = "plugins.directory";
+    public static final String ELASTIC_YAML_FILE = "elasticsearch.yaml";
 
-    public static String CLUSTER_NAME = "cluster.name";
-    public static String TRANSPORT_TYPE = "transport.type";
-    public static String HTTP_TYPE = "http.type";
-    public static String HTTP_ENABLED = "http.enabled";
-    public static String NODE_NAME = "node.name";
-    public static String PATH_DATA = "path.data";
-    public static String PATH_HOME = "path.home";
-    public static String NETWORK_HOST = "network.host";
-    public static String HTTP_CORS_ENABLED = "http.cors.enabled";
-    public static String HTTP_CORS_ALLOW_ORIGIN = "http.cors.allow-origin";
+    public static final String CLUSTER_NAME = "cluster.name";
+    public static final String TRANSPORT_TYPE = "transport.type";
+    public static final String HTTP_TYPE = "http.type";
+    public static final String HTTP_ENABLED = "http.enabled";
+    public static final String NODE_NAME = "node.name";
+    public static final String PATH_DATA = "path.data";
+    public static final String PATH_HOME = "path.home";
+    public static final String NETWORK_HOST = "network.host";
+    public static final String HTTP_CORS_ENABLED = "http.cors.enabled";
+    public static final String HTTP_CORS_ALLOW_ORIGIN = "http.cors.allow-origin";
 
     @SuppressWarnings("unchecked")
     @Activate
diff --git a/backend/elasticsearch-6.x/pom.xml b/backend/elasticsearch-6.x/pom.xml
index 3f0ce80..6093aff 100644
--- a/backend/elasticsearch-6.x/pom.xml
+++ b/backend/elasticsearch-6.x/pom.xml
@@ -131,14 +131,6 @@
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-compiler-plugin</artifactId>
-                <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                </configuration>
-            </plugin>
         </plugins>
     </build>
 
diff --git a/backend/elasticsearch-6.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java b/backend/elasticsearch-6.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
index ac5a93f..3097df2 100644
--- a/backend/elasticsearch-6.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
+++ b/backend/elasticsearch-6.x/src/main/java/org/apache/karaf/decanter/elasticsearch/EmbeddedNode.java
@@ -46,18 +46,18 @@
     private final static Logger LOGGER = LoggerFactory.getLogger(EmbeddedNode.class);
     private static Node node;
     
-    public static String PLUGINS_DIRECTORY = "plugins.directory";
-    public static String ELASTIC_YAML_FILE = "elasticsearch.yaml";
+    public static final String PLUGINS_DIRECTORY = "plugins.directory";
+    public static final String ELASTIC_YAML_FILE = "elasticsearch.yaml";
 
-    public static String CLUSTER_NAME = "cluster.name";
-    public static String HTTP_TYPE = "http.type";
-    public static String HTTP_ENABLED = "http.enabled";
-    public static String NODE_NAME = "node.name";
-    public static String PATH_DATA = "path.data";
-    public static String PATH_HOME = "path.home";
-    public static String NETWORK_HOST = "network.host";
-    public static String HTTP_CORS_ENABLED = "http.cors.enabled";
-    public static String HTTP_CORS_ALLOW_ORIGIN = "http.cors.allow-origin";
+    public static final String CLUSTER_NAME = "cluster.name";
+    public static final String HTTP_TYPE = "http.type";
+    public static final String HTTP_ENABLED = "http.enabled";
+    public static final String NODE_NAME = "node.name";
+    public static final String PATH_DATA = "path.data";
+    public static final String PATH_HOME = "path.home";
+    public static final String NETWORK_HOST = "network.host";
+    public static final String HTTP_CORS_ENABLED = "http.cors.enabled";
+    public static final String HTTP_CORS_ALLOW_ORIGIN = "http.cors.allow-origin";
 
     @SuppressWarnings("unchecked")
     @Activate
diff --git a/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java b/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
index 190220b..8f9460d 100644
--- a/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
+++ b/collector/file/src/main/java/org/apache/karaf/decanter/collector/file/DecanterTailerListener.java
@@ -57,6 +57,7 @@
     private String type;
     private String path;
     private String regex;
+    private Pattern compiledRegex;
     
     /**
      * additional properties provided by the user
@@ -84,6 +85,9 @@
         this.path = path;
         this.regex = (String) properties.get("regex");
         thread.start();
+        if (regex != null) {
+            compiledRegex  = Pattern.compile(regex);
+        }
     }
     
     @Deactivate
@@ -99,9 +103,8 @@
         data.put("path", path);
         data.put("regex", regex);
 
-        if (regex != null) {
-            Pattern pattern = Pattern.compile(regex);
-            Matcher matcher = pattern.matcher(line);
+        if (compiledRegex != null) {
+            Matcher matcher = compiledRegex.matcher(line);
             if (matcher.matches()) {
                 data.putAll(this.parser.parse("line_" + type, line));
             } else {
diff --git a/collector/jmx/src/test/java/org/apache/karaf/decanter/collector/jmx/TestProtocol.java b/collector/jmx/src/test/java/org/apache/karaf/decanter/collector/jmx/TestProtocol.java
index 16e659e..c5a324d 100644
--- a/collector/jmx/src/test/java/org/apache/karaf/decanter/collector/jmx/TestProtocol.java
+++ b/collector/jmx/src/test/java/org/apache/karaf/decanter/collector/jmx/TestProtocol.java
@@ -29,6 +29,8 @@
 
 import javax.management.*;
 import javax.management.remote.*;
+
+import java.net.ServerSocket;
 import java.rmi.registry.LocateRegistry;
 import java.util.Dictionary;
 import java.util.LinkedList;
@@ -39,19 +41,30 @@
 
     private final static Logger LOGGER = LoggerFactory.getLogger(TestProtocol.class);
 
-    private final static String JMX_RMI_SERVICE_URL = "service:jmx:rmi:///jndi/rmi://localhost:8888/decanter";
-
+    private static String jmxRMIServiceURL;
+    private static String jmxMPServiceURL;
     private static JMXConnectorServer rmiConnectorServer;
     private static JMXConnectorServer jmxmpConnectorServer;
     private static MBeanServer mBeanServer;
 
     @BeforeClass
     public static void setup() throws Exception {
-        LOGGER.info("Using JMX service URL: {}", JMX_RMI_SERVICE_URL);
-        JMXServiceURL serviceURL = new JMXServiceURL(JMX_RMI_SERVICE_URL);
+        // Allocate random ports
+        ServerSocket serverSocket = new ServerSocket(0);
+        int rmiPort = serverSocket.getLocalPort();
+        serverSocket.close();
+        serverSocket = new ServerSocket(0);
+        int mpPort = serverSocket.getLocalPort();
+        serverSocket.close();
+
+        jmxRMIServiceURL = "service:jmx:rmi:///jndi/rmi://localhost:" + rmiPort + "/decanter";
+        jmxMPServiceURL = "service:jmx:jmxmp://localhost:" + mpPort;
+
+        LOGGER.info("Using JMX service URL: {}", jmxRMIServiceURL);
+        JMXServiceURL serviceURL = new JMXServiceURL(jmxRMIServiceURL);
 
         LOGGER.info("Creating the RMI registry");
-        LocateRegistry.createRegistry(8888);
+        LocateRegistry.createRegistry(rmiPort);
 
         LOGGER.info("Creating MBeanServer");
         mBeanServer = MBeanServerFactory.createMBeanServer();
@@ -61,7 +74,7 @@
         rmiConnectorServer.start();
 
         LOGGER.info("Creating JMXMP connector server");
-        jmxmpConnectorServer = JMXConnectorServerFactory.newJMXConnectorServer(new JMXServiceURL("jmxmp", null, 9999), null, mBeanServer);
+        jmxmpConnectorServer = JMXConnectorServerFactory.newJMXConnectorServer(new JMXServiceURL("jmxmp", null, mpPort), null, mBeanServer);
         jmxmpConnectorServer.start();
 
         ObjectName testObjectName = new ObjectName("decanter.test:type=test");
@@ -76,7 +89,7 @@
 
         ComponentContextMock componentContextMock = new ComponentContextMock();
         componentContextMock.getProperties().put("type", "jmx-test");
-        componentContextMock.getProperties().put("url", "service:jmx:rmi:///jndi/rmi://localhost:8888/decanter");
+        componentContextMock.getProperties().put("url", jmxRMIServiceURL);
 
         DispatcherMock dispatcherMock = new DispatcherMock();
         collector.dispatcher = dispatcherMock;
@@ -90,7 +103,7 @@
         Assert.assertEquals("decanter/collect/jmx/jmx-test/decanter/test", event.getTopic());
         Assert.assertEquals("Test", event.getProperty("Test"));
         Assert.assertEquals("decanter.test:type=test", event.getProperty("ObjectName"));
-        Assert.assertEquals("service:jmx:rmi:///jndi/rmi://localhost:8888/decanter", event.getProperty("url"));
+        Assert.assertEquals(jmxRMIServiceURL, event.getProperty("url"));
     }
 
     @Test
@@ -99,7 +112,7 @@
 
         ComponentContextMock componentContextMock = new ComponentContextMock();
         componentContextMock.getProperties().put("type", "jmx-test");
-        componentContextMock.getProperties().put("url", "service:jmx:jmxmp://localhost:9999");
+        componentContextMock.getProperties().put("url", jmxMPServiceURL);
 
         DispatcherMock dispatcherMock = new DispatcherMock();
         collector.dispatcher = dispatcherMock;
@@ -113,7 +126,7 @@
         Assert.assertEquals("decanter/collect/jmx/jmx-test/decanter/test", event.getTopic());
         Assert.assertEquals("Test", event.getProperty("Test"));
         Assert.assertEquals("decanter.test:type=test", event.getProperty("ObjectName"));
-        Assert.assertEquals("service:jmx:jmxmp://localhost:9999", event.getProperty("url"));
+        Assert.assertEquals(jmxMPServiceURL, event.getProperty("url"));
     }
 
     @AfterClass
diff --git a/collector/log4j-socket/pom.xml b/collector/log4j-socket/pom.xml
index f15d252..5f8c650 100644
--- a/collector/log4j-socket/pom.xml
+++ b/collector/log4j-socket/pom.xml
@@ -93,25 +93,6 @@
                     </execution>
                 </executions>
             </plugin>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-bundle-plugin</artifactId>
-                <configuration>
-                    <instructions>
-                        <Import-Package>
-                            !com.ibm.uvm.tools,
-                            *
-                        </Import-Package>
-                        <Private-Package>
-                            org.apache.karaf.decanter.collector.log.socket,
-                            org.apache.log4j.spi,
-                            org.apache.log4j.helpers,
-                            org.apache.log4j.or,
-                            org.apache.log4j.pattern
-                        </Private-Package>
-                    </instructions>
-                </configuration>
-            </plugin>
         </plugins>
     </build> 
-</project>
\ No newline at end of file
+</project>
diff --git a/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java b/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
index 738b978..fb6d6e1 100644
--- a/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
+++ b/collector/soap/src/main/java/org/apache/karaf/decanter/collector/soap/SoapCollector.java
@@ -16,6 +16,17 @@
  */
 package org.apache.karaf.decanter.collector.soap;
 
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.time.Instant;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.karaf.decanter.collector.utils.PropertiesPreparator;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
@@ -26,20 +37,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.io.OutputStream;
-import java.io.OutputStreamWriter;
-import java.io.StringWriter;
-import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLConnection;
-import java.util.Dictionary;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Map;
-
 @Component(
         service = Runnable.class,
         name = "org.apache.karaf.decanter.collector.soap",
@@ -112,13 +109,16 @@
             connection.setDoInput(true);
             connection.setRequestProperty("Content-Type", "text/xml");
             connection.setRequestProperty("Accept", "text/xml");
+            Instant startTime = Instant.now();
             try (OutputStreamWriter writer = new OutputStreamWriter(connection.getOutputStream())) {
+                Instant responseTime = Instant.now().minusMillis(startTime.toEpochMilli());
                 writer.write(soapRequest);
                 writer.flush();
                 data.put("http.response.code", connection.getResponseCode());
                 data.put("http.response.message", connection.getResponseMessage());
+                data.put("http.response.time", responseTime.toEpochMilli());
                 try (BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
-                    StringBuffer buffer = new StringBuffer();
+                    StringBuilder buffer = new StringBuilder();
                     String line;
                     while ((line = reader.readLine()) != null) {
                         buffer.append(line).append("\n");
diff --git a/collector/soap/src/test/java/org/apache/karaf/decanter/collector/soap/SoapCollectorTest.java b/collector/soap/src/test/java/org/apache/karaf/decanter/collector/soap/SoapCollectorTest.java
index b4267f0..0ddeef1 100644
--- a/collector/soap/src/test/java/org/apache/karaf/decanter/collector/soap/SoapCollectorTest.java
+++ b/collector/soap/src/test/java/org/apache/karaf/decanter/collector/soap/SoapCollectorTest.java
@@ -124,6 +124,7 @@
         Assert.assertNull(event.getProperty("error"));
         Assert.assertEquals(200, event.getProperty("http.response.code"));
         Assert.assertEquals("OK", event.getProperty("http.response.message"));
+        Assert.assertTrue(Long.class.cast(event.getProperty("http.response.time")) > 0L);
         Assert.assertTrue(((String) event.getProperty("soap.response")).contains("hello This is a test"));
     }
 
diff --git a/collector/system/src/main/cfg/org.apache.karaf.decanter.collector.system.cfg b/collector/system/src/main/cfg/org.apache.karaf.decanter.collector.system.cfg
index 7ba77f6..f419416 100644
--- a/collector/system/src/main/cfg/org.apache.karaf.decanter.collector.system.cfg
+++ b/collector/system/src/main/cfg/org.apache.karaf.decanter.collector.system.cfg
@@ -24,6 +24,9 @@
 # This collector executes system commands, retrieve the exec output/err
 # sent to the appenders
 #
+# You can define the number of thread to use for parallelization command calls:
+# thread.number=1
+#
 # The format is command.key=command_to_execute
 # where command is a reserved keyword used to identify a command property
 # for instance:
diff --git a/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java b/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
index accd986..5d4fa62 100644
--- a/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
+++ b/collector/system/src/main/java/org/apache/karaf/decanter/collector/system/SystemCollector.java
@@ -16,6 +16,20 @@
  */
 package org.apache.karaf.decanter.collector.system;
 
+import java.io.ByteArrayOutputStream;
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.commons.exec.CommandLine;
 import org.apache.commons.exec.DefaultExecutor;
 import org.apache.commons.exec.PumpStreamHandler;
@@ -28,12 +42,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.ByteArrayOutputStream;
-import java.net.InetAddress;
-import java.util.Dictionary;
-import java.util.Enumeration;
-import java.util.HashMap;
-
 @Component(
     name = "org.apache.karaf.decanter.collector.system",
     immediate = true,
@@ -50,17 +58,29 @@
     private final static Logger LOGGER = LoggerFactory.getLogger(SystemCollector.class);
 
     private Dictionary<String, Object> properties;
+    private String topic;
+    private int threadNumber;
 
     @SuppressWarnings("unchecked")
     @Activate
     public void activate(ComponentContext context) {
         this.properties = context.getProperties();
+        this.topic = context.getProperties().get("topic") != null ? String.class.cast(context.getProperties().get("topic")) : "decanter/collect/system/";
+        if (!this.topic.endsWith("/")) {
+            this.topic = this.topic + "/";
+        }
+        try {
+            this.threadNumber = context.getProperties().get("thread.number") != null ? Integer.class.cast(context.getProperties().get("thread.number")) : 1;
+        } catch (Exception e) {
+            throw new IllegalArgumentException("invalid parameter 'thread.number' is not a number");
+        }
     }
 
     @Override
     public void run() {
         if (properties != null) {
-            String karafName = System.getProperty("karaf.name");
+            final String karafName = System.getProperty("karaf.name");
+            final String topic = this.topic;
             String hostAddress = null;
             String hostName = null;
             try {
@@ -69,56 +89,77 @@
             } catch (Exception e) {
                 // nothing to do
             }
+
+            Collection<Callable<Object>> callables = new ArrayList<>();
+
             Enumeration<String> keys = properties.keys();
             while (keys.hasMoreElements()) {
-                String key = (String) keys.nextElement();
-                try {
-                    if (key.startsWith("command.")) {
-                        HashMap<String, Object> data = new HashMap<>();
-                        String command = (String) properties.get(key);
-                        LOGGER.debug("Executing {} ({})", command, key);
-                        CommandLine cmdLine = CommandLine.parse(command);
-                        DefaultExecutor executor = new DefaultExecutor();
-                        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-                        PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
-                        executor.setStreamHandler(streamHandler);
-                        data.put("timestamp", System.currentTimeMillis());
-                        data.put("type", "system");
-                        data.put("karafName", karafName);
-                        data.put("hostAddress", hostAddress);
-                        data.put("hostName", hostName);
-                        executor.execute(cmdLine);
-                        outputStream.flush();
-                        String output = outputStream.toString();
-                        if (output.endsWith("\n")) {
-                            output = output.substring(0, output.length() - 1);
-                        }
-                        output = output.trim();
-                        // try to convert to number
-                        try {
-                            if (output.contains(".")) {
-                                Double value = Double.parseDouble(output);
-                                data.put(key, value);
-                            } else {
-                                Integer value = Integer.parseInt(output);
-                                data.put(key, value);
+                String key = keys.nextElement();
+                if (key.startsWith("command.")) {
+                    String finalHostAddress = hostAddress;
+                    String finalHostName = hostName;
+                    callables.add(() -> {
+                        Event event = null;
+                        try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
+                            String command = (String) properties.get(key);
+                            LOGGER.debug("Executing {} ({})", command, key);
+                            CommandLine cmdLine = CommandLine.parse(command);
+                            DefaultExecutor executor = new DefaultExecutor();
+                            PumpStreamHandler streamHandler = new PumpStreamHandler(outputStream);
+                            executor.setStreamHandler(streamHandler);
+                            HashMap<String, Object> data = new HashMap<>();
+                            data.put("timestamp", System.currentTimeMillis());
+                            data.put("type", "system");
+                            data.put("karafName", karafName);
+                            data.put("hostAddress", finalHostAddress);
+                            data.put("hostName", finalHostName);
+                            executor.execute(cmdLine);
+                            outputStream.flush();
+                            String output = outputStream.toString();
+                            if (output.endsWith("\n")) {
+                                output = output.substring(0, output.length() - 1);
                             }
-                        } catch (NumberFormatException e) {
-                            data.put(key, output);
-                        }
-                        streamHandler.stop();
-                        Event event = new Event("decanter/collect/system/" + key.replace(".", "_"), data);
-                        dispatcher.postEvent(event);
-                        try {
-                            outputStream.close();
+                            output = output.trim();
+                            // try to convert to number
+                            try {
+                                if (output.contains(".")) {
+                                    Double value = Double.parseDouble(output);
+                                    data.put(key, value);
+                                } else {
+                                    Integer value = Integer.parseInt(output);
+                                    data.put(key, value);
+                                }
+                            } catch (NumberFormatException e) {
+                                data.put(key, output);
+                            }
+                            streamHandler.stop();
+                            event = new Event(topic + key.replace(".", "_"), data);
                         } catch (Exception e) {
-                            // nothing to do
+                            LOGGER.warn("Command {} execution failed", key, e);
                         }
-                    }
-                } catch (Exception e) {
-                    LOGGER.warn("Command {} execution failed", key, e);
+                        return event;
+                    });
                 }
             }
+
+            ExecutorService executorService = Executors.newFixedThreadPool(this.threadNumber);
+            try {
+                LOGGER.debug("Start invoking system commands...");
+                List<Future<Object>> results = executorService.invokeAll(callables);
+                results.stream().forEach(objectFuture -> {
+                    try {
+                        Event event = Event.class.cast(objectFuture.get());
+                        if (Optional.ofNullable(event).isPresent()) {
+                            dispatcher.postEvent(event);
+                        }
+                    } catch (InterruptedException | ExecutionException e) {
+                        LOGGER.warn("Thread executor for the collector system failed", e);
+                    }
+                });
+            } catch (InterruptedException e) {
+                LOGGER.warn("Thread executor for the collector system failed", e);
+            }
+            LOGGER.debug("Invoking system commands done");
         }
     }
 
diff --git a/collector/system/src/test/java/org/apache/karaf/decanter/collector/system/SystemCollectorTest.java b/collector/system/src/test/java/org/apache/karaf/decanter/collector/system/SystemCollectorTest.java
new file mode 100644
index 0000000..0f14d0e
--- /dev/null
+++ b/collector/system/src/test/java/org/apache/karaf/decanter/collector/system/SystemCollectorTest.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.karaf.decanter.collector.system;
+
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.List;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.ComponentInstance;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+
+public class SystemCollectorTest {
+
+    private ComponentContext componentContext;
+    private EventAdminStub eventAdmin;
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testBadConfiguration() {
+        SystemCollector collector = new SystemCollector();
+        this.eventAdmin = new EventAdminStub();
+        collector.dispatcher = eventAdmin;
+        componentContext = new ComponentContextStub();
+        componentContext.getProperties().put("thread.number", "A");
+        componentContext.getProperties().put("command.df1", "df -h");
+        collector.activate(componentContext);
+    }
+
+    @Test
+    public void testWithThreads() throws Exception {
+        SystemCollector collector = new SystemCollector();
+        this.eventAdmin = new EventAdminStub();
+        collector.dispatcher = eventAdmin;
+        componentContext = new ComponentContextStub();
+        componentContext.getProperties().put("thread.number", 5);
+        componentContext.getProperties().put("command.df1", "df -h");
+        componentContext.getProperties().put("command.df2", "df -h");
+        componentContext.getProperties().put("command.df3", "df -h");
+        componentContext.getProperties().put("command.df4", "df -h");
+        componentContext.getProperties().put("command.df5", "df -h");
+        collector.activate(componentContext);
+        collector.run();
+        waitUntilEventCountHandled(5);
+        Assert.assertEquals(5, eventAdmin.getPostEvents().size());
+    }
+
+    @Test
+    public void testBulkWithThreads() throws Exception {
+        SystemCollector collector = new SystemCollector();
+        this.eventAdmin = new EventAdminStub();
+        collector.dispatcher = eventAdmin;
+        componentContext = new ComponentContextStub();
+        componentContext.getProperties().put("thread.number", 5);
+        for (int cpt = 0; cpt < 1000; cpt++) {
+            componentContext.getProperties().put("command.df" + cpt, "df -h");
+        }
+        collector.activate(componentContext);
+        collector.run();
+        waitUntilEventCountHandled(1000);
+        Assert.assertEquals(1000, eventAdmin.getPostEvents().size());
+    }
+
+    @After
+    public void tearDown() {
+        this.eventAdmin.reset();
+    }
+
+    private void waitUntilEventCountHandled(int eventCount) throws InterruptedException {
+        long timeout = 20000L;
+        long start = System.currentTimeMillis();
+        boolean hasTimeoutReached = false;
+        do {
+            hasTimeoutReached = ((System.currentTimeMillis() - start) > timeout);
+            Thread.sleep(10L);
+        } while (eventAdmin.getPostEvents().size() < eventCount && hasTimeoutReached == false);
+    }
+
+    /**
+     * Stub used only for this unit test
+     */
+    private static class ComponentContextStub implements ComponentContext {
+
+        private Dictionary properties = new Hashtable<>();
+
+        @Override
+        public Dictionary getProperties() {
+            return properties;
+        }
+
+        @Override
+        public Object locateService(String name) {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public Object locateService(String name, ServiceReference reference) {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public Object[] locateServices(String name) {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public BundleContext getBundleContext() {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public Bundle getUsingBundle() {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public ComponentInstance getComponentInstance() {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public void enableComponent(String name) {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public void disableComponent(String name) {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+
+        @Override
+        public ServiceReference getServiceReference() {
+            throw new NoSuchMethodError("Unimplemented method");
+        }
+    }
+
+    private static class EventAdminStub implements EventAdmin {
+        private List<Event> postEvents = new ArrayList<>();
+        private List<Event> sendEvents = new ArrayList<>();
+
+        @Override
+        public void postEvent(Event event) {
+            postEvents.add(event);
+        }
+
+        @Override
+        public void sendEvent(Event event) {
+            sendEvents.add(event);
+        }
+
+        public List<Event> getPostEvents() {
+            return postEvents;
+        }
+
+        public List<Event> getSendEvents() {
+            return sendEvents;
+        }
+
+        public void reset() {
+            postEvents.clear();
+            sendEvents.clear();
+        }
+
+    }
+
+}
diff --git a/manual/src/main/asciidoc/dev-guide/custom-alerter.adoc b/manual/src/main/asciidoc/dev-guide/custom-alerter.adoc
index 1db4aa4..83f3e7f 100644
--- a/manual/src/main/asciidoc/dev-guide/custom-alerter.adoc
+++ b/manual/src/main/asciidoc/dev-guide/custom-alerter.adoc
@@ -16,7 +16,7 @@
 
 A Decanter Alerter is basically a special kind of appender.
 
-It's an OSGi EventAdmin EventHandler: it's listening of `decanter/alert/*` EventAdmin topics, and
+It's an OSGi EventAdmin EventHandler: it listens to `decanter/alert/*` EventAdmin topics, and
 receives the alerting data coming from the checker.
 
 To enable a new Decanter Alerter, you just have to register an EventHandler OSGi service, like we do for an appender.
@@ -134,4 +134,4 @@
 </project>
 ----
 
-Once built, you can enable this alerter by deploying the bundle in Karaf (using the deploy folder or the `bundle:install` command).
\ No newline at end of file
+Once built, you can enable this alerter by deploying the bundle in Karaf (using the deploy folder or the `bundle:install` command).
diff --git a/manual/src/main/asciidoc/dev-guide/custom-appender.adoc b/manual/src/main/asciidoc/dev-guide/custom-appender.adoc
index 864f8c0..c65994e 100644
--- a/manual/src/main/asciidoc/dev-guide/custom-appender.adoc
+++ b/manual/src/main/asciidoc/dev-guide/custom-appender.adoc
@@ -14,7 +14,7 @@
 
 === Custom Appender
 
-A Decanter Appender is an OSGi EventAdmin EventHandler: it's listening of `decanter/collect/*` EventAdmin topics, and
+A Decanter Appender is an OSGi EventAdmin EventHandler: it listens to `decanter/collect/*` EventAdmin topics, and
 receives the monitoring data coming from the collectors.
 
 It's responsible to store the data into a target backend.
@@ -45,7 +45,7 @@
 
 ----
 
-Now, we create a BundleActivator that register our SystemOutAppender as an EventHandler OSGi service:
+Now, we create a BundleActivator that registers our SystemOutAppender as an EventHandler OSGi service:
 
 ----
 package org.apache.karaf.decanter.sample.appender.systemout;
diff --git a/manual/src/main/asciidoc/user-guide/alerting.adoc b/manual/src/main/asciidoc/user-guide/alerting.adoc
index 58267a7..8b02a7e 100644
--- a/manual/src/main/asciidoc/user-guide/alerting.adoc
+++ b/manual/src/main/asciidoc/user-guide/alerting.adoc
@@ -34,7 +34,7 @@
 where:
 
 * `type` is optional. It allows you to filter the check for a given type of collected data. It's particulary interesting
-when Decanter collects multiple JMX object names or servers. You may want to perform different checks depending of the type
+when Decanter collects multiple JMX object names or servers. You may want to perform different checks depending on the type
 or source of the collected data.
 * `propertyName` is the data property key. For instance, `loggerName`, `message`, `HeapMemoryUsage.used`, etc.
 * `alertLevel` is the alerting level for this check. The only two possible values are `error` (critical alert), or
@@ -42,7 +42,7 @@
 * `checkType` is the check type. Possible values are `range`, `equal`, `notequal`, `match`, and `notmatch`.
 * `value` is the check value, where the data property value has to verify.
 
-The Decanter Checker supports numeric or string check.
+The Decanter Checker supports numeric or string checks.
 
 To verify a numeric value, you can use:
 
@@ -70,7 +70,7 @@
 myValue.warn=equal:10
 ----
 
-If myValue is not equal to 10, Decanter will create a warn alert send to the alerters.
+If myValue is not equal to 10, Decanter will create a warn alert sent to the alerters.
 
 To verify a string value, you can use:
 
@@ -91,13 +91,13 @@
 
 ==== Alerters
 
-When the value doesn't verify the check in the checker configuration, an alert is created an sent to the alerters.
+When the value doesn't verify the check in the checker configuration, an alert is created and sent to the alerters.
 
 Apache Karaf Decanter provides ready to use alerters.
 
 ===== Log
 
-The Decanter Log alerter log a message for each alert.
+The Decanter Log alerter logs a message for each alert.
 
 The `decanter-alerting-log` feature installs the log alerter:
 
@@ -161,6 +161,21 @@
 * the `username` property is optional and specifies the username to connect to the SMTP server
 * the `password` property is optional and specifies the password to connect to the SMTP server
 
+Optionally, you can add:
+
+* `cc` to add email carbon copy
+* `bcc` to add email blind carbon copy
+* `subject` if you want to customize the email subject sent by the alerter
+* `body` if you want to customize the email body sent by the alerter
+* `body.type` if you want to customize the email body type sent by the alerter
+
+The email alerter is also able to use collected data properties.
+
+For instance, `subject` can look like `This is my ${property}` where `${property}` is replaced by the `property` value.
+
+The email alerter is also able to use collected data properties for subject or body (including replacement). It looks for
+`body`, `subject`, `alert.email.body`, `alert.email.subject` collected data properties.
+
 ===== Camel
 
 The Decanter Camel alerter sends each alert to a Camel endpoint.
@@ -214,4 +229,4 @@
   </camelContext>
 
 </blueprint>
-----
\ No newline at end of file
+----
diff --git a/manual/src/main/asciidoc/user-guide/appenders.adoc b/manual/src/main/asciidoc/user-guide/appenders.adoc
index 455289a..7a59030 100644
--- a/manual/src/main/asciidoc/user-guide/appenders.adoc
+++ b/manual/src/main/asciidoc/user-guide/appenders.adoc
@@ -38,17 +38,17 @@
 
 Decanter provides three appenders for Elasticsearch:
 
-* decanter-appender-elasticsearch-rest (recommanded) is an appender which directly uses the Elasticsearch HTTP REST API. It's compliant with any Elasticsearch version (1.x, 2.x, 5.x, 6.x).
+* decanter-appender-elasticsearch-rest (recommended) is an appender which directly uses the Elasticsearch HTTP REST API. It's compliant with any Elasticsearch version (1.x, 2.x, 5.x, 6.x).
 * decanter-appender-elasticsearch-jest (deprecated) is an appender which directly uses the Elasticsearch HTTP REST API, working with any Elasticsearch version (1.x, 2.x, 5.x, 6.x).
 * decanter-appender-elasticsearch-native-1.x is an appender which uses the Elasticsearch 1.x Java Client API. It's compliant only with Elasticsearch 1.x versions.
 * decanter-appender-elasticsearch-native-2.x is an appender which uses the Elasticsearch 2.x Java Client API. It's compliant only with Elasticsearch 2.x versions.
 
 These appenders store the data (coming from the collectors) into an Elasticsearch node.
-They transformm the data as a json document, stored into Elasticsearch.
+They transform the data as a json document, stored into Elasticsearch.
 
 ===== Elasticsearch 5.x/6.x Rest Appender
 
-The Decanter Elasticsearch Rest appender uses the Elasticsearch Rest client provided since Elasticsearch 5.x. It can be use with Elasticsearch 5.x or 6.x versions.
+The Decanter Elasticsearch Rest appender uses the Elasticsearch Rest client provided since Elasticsearch 5.x. It can be used with Elasticsearch 5.x or 6.x versions.
 
 The `decanter-appender-elasticsearch-rest` feature installs this appender:
 
@@ -670,7 +670,7 @@
 
 The Decanter elasticsearch node also supports loading and override of the settings using a
 `etc/org.apache.karaf.decanter.elasticsearch.cfg` configuration file.
-This file is not provided by default, as it's used for override of the default settings.
+This file is not provided by default, as it's used to override the default settings.
 
 You can override the following elasticsearch properties in this configuration file:
 
@@ -750,9 +750,9 @@
 
 ===== Kibana 6.x
 
-The `kibana` 6.x feature doesn't really embeds Kibana like Kibana 3 or 4 features.
+The `kibana` 6.x feature doesn't really embed Kibana like the Kibana 3 or 4 features.
 
-However, it's a convenient feature that download and starts a Kibana instance for you.
+However, it's a convenient feature that downloads and starts a Kibana instance for you.
 
 ----
 karaf@root()> feature:install kibana/6.1.1
@@ -771,7 +771,7 @@
 
 ===== Elasticsearch Head console
 
-In addition of the embedded elasticsearch instance, Decanter also provides a web console allowing you to monitor and
+In addition to the embedded elasticsearch instance, Decanter also provides a web console allowing you to monitor and
 manage your elasticsearch cluster. It's a ready to use elastisearch-head console, directly embedded in Karaf.
 
 The `elasticsearch-head` feature installs the embedded elasticsearch-head web console, corresponding to the
@@ -809,7 +809,7 @@
 
 ==== JDBC
 
-The Decanter JDBC appender allows your to store the data (coming from the collectors) into a database.
+The Decanter JDBC appender allows you to store the data (coming from the collectors) into a database.
 
 The Decanter JDBC appender transforms the data as a json string. The appender stores the json string and the timestamp
 into the database.
@@ -845,7 +845,7 @@
 * the `datasource.name` property contains the name of the JDBC datasource to use to connect to the database. You can
 create this datasource using the Karaf `jdbc:create` command (provided by the `jdbc` feature).
 * the `table.name` property contains the table name in the database. The Decanter JDBC appender automatically creates
-the table for you, but you can create the table by yourself. The table is simple and contains just two column:
+the table for you, but you can create the table by yourself. The table is simple and contains just two columns:
 ** timestamp as INTEGER
 ** content as VARCHAR or CLOB
 * the `dialect` property allows you to specify the database type (generic, mysql, derby). This property is only used for
@@ -926,7 +926,7 @@
 
 * the `destination.uri` property specifies the URI of the Camel endpoint where to send the data.
 
-The Camel appender send an exchange. The "in" message body contains a Map of the harvested data.
+The Camel appender sends an exchange. The "in" message body contains a Map of the harvested data.
 
 For instance, in this configuration file, you can specify:
 
@@ -1048,18 +1048,18 @@
 
 This file allows you to define how the messages are sent to the Kafka broker:
 
-* the `bootstrap.servers` contains a lit of host:port of the Kafka brokers. Default value is `localhost:9092`.
+* the `bootstrap.servers` contains a list of host:port of the Kafka brokers. Default value is `localhost:9092`.
 * the `client.id` is optional. It identifies the client on the Kafka broker.
 * the `compression.type` defines if the messages have to be compressed on the Kafka broker. Default value is `none` meaning no compression.
 * the `acks` defines the acknowledgement policy. Default value is `all`. Possible values are:
-** `0` means the appender doesn't wait acknowledge from the Kafka broker. Basically, it means there's no guarantee that messages have been received completely by the broker.
-** `1` means the appender waits the acknowledge only from the leader. If the leader falls down, it's possible messages are lost if the replicas are not yet be created on the followers.
-** `all` means the appender waits the acknowledge from the leader and all followers. This mode is the most reliable as the appender will receive the acknowledge only when all replicas have been created. NB: this mode doesn't make sense if you have a single node Kafka broker or a replication factor set to 1.
+** `0` means the appender doesn't wait for an acknowledge from the Kafka broker. Basically, it means there's no guarantee that messages have been received completely by the broker.
+** `1` means the appender waits for the acknowledge only from the leader. If the leader falls down, its possible messages are lost if the replicas have not yet been created on the followers.
+** `all` means the appender waits for the acknowledge from the leader and all followers. This mode is the most reliable as the appender will receive the acknowledge only when all replicas have been created. NB: this mode doesn't make sense if you have a single node Kafka broker or a replication factor set to 1.
 * the `retries` defines the number of retries performed by the appender in case of error. The default value is `0` meaning no retry at all.
 * the `batch.size` defines the size of the batch records. The appender will attempt to batch records together into fewer requests whenever multiple records are being sent to the same Kafka partition. The default value is 16384.
 * the `buffer.memory` defines the size of the buffer the appender uses to send to the Kafka broker. The default value is 33554432.
-* the `key.serializer` defines the full qualified class name of the Serializer used to serializer the keys. The default is a String serializer (`org.apache.kafka.common.serialization.StringSerializer`).
-* the `value.serializer` defines the full qualified class name of the Serializer used to serializer the values. The default is a String serializer (`org.apache.kafka.common.serialization.StringSerializer`).
+* the `key.serializer` defines the fully qualified class name of the Serializer used to serialize the keys. The default is a String serializer (`org.apache.kafka.common.serialization.StringSerializer`).
+* the `value.serializer` defines the full qualified class name of the Serializer used to serialize the values. The default is a String serializer (`org.apache.kafka.common.serialization.StringSerializer`).
 * the `request.timeout.ms` is the time the producer wait before considering the message production on the broker fails (default is 5s).
 * the `max.request.size` is the max size of the request sent to the broker (default is 2097152 bytes).
 * the `topic` defines the name of the topic where to send data on the Kafka broker.
@@ -1260,7 +1260,7 @@
 
 ==== Network socket
 
-The Decanter network socket appender send the collected data to a remote Decanter network socket collector.
+The Decanter network socket appender sends the collected data to a remote Decanter network socket collector.
 
 The use case could be to dedicate a Karaf instance as a central monitoring platform, receiving collected data from
 the other nodes.
@@ -1396,7 +1396,7 @@
 
 ==== Dropwizard Metrics
 
-The Dropwizard Metrics appender receives the harvested data from the dispatcher and push in a Dropwizard Metrics
+The Dropwizard Metrics appender receives the harvested data from the dispatcher and pushes to a Dropwizard Metrics
 `MetricRegistry`. You can register this `MetricRegistry` in your own application or use a Dropwizard Metrics Reporter
 to "push" these metrics to some backend.
 
@@ -1457,7 +1457,7 @@
 
 ===== WebSocket Servlet
 
-The `decanter-appender-websocket-servlet` feature expose a websocket on wich client can register. Then, Decanter will send the collected data to the connected clients.
+The `decanter-appender-websocket-servlet` feature exposes a websocket on which clients can register. Then, Decanter will send the collected data to the connected clients.
 
 It's very easy to use. First install the feature:
 
@@ -1465,7 +1465,7 @@
 karaf@root()> feature:install decanter-appender-websocket-servlet
 ```
 
-The feature register the WebSocket endpoint on `http://localhost:8181/decanter-websocket` by default:
+The feature registers the WebSocket endpoint on `http://localhost:8181/decanter-websocket` by default:
 
 ```
 karaf@root()> http:list
@@ -1488,4 +1488,4 @@
      --header "Sec-WebSocket-Key: SGVsbG8sIHdvcmxkIQ==" \
      --header "Sec-WebSocket-Version: 13" \
      http://localhost:8181/decanter-websocket
-```
\ No newline at end of file
+```
diff --git a/manual/src/main/asciidoc/user-guide/collectors.adoc b/manual/src/main/asciidoc/user-guide/collectors.adoc
index f10b6a9..72ac320 100644
--- a/manual/src/main/asciidoc/user-guide/collectors.adoc
+++ b/manual/src/main/asciidoc/user-guide/collectors.adoc
@@ -25,7 +25,7 @@
 ==== Log
 
 The Decanter Log Collector is an event driven collector. It automatically reacts when a log occurs, and
-send the log details (level, logger name, message, etc) to the appenders.
+sends the log details (level, logger name, message, etc) to the appenders.
 
 The `decanter-collector-log` feature installs the log collector:
 
@@ -98,9 +98,9 @@
 
 The Decanter File Collector is an event driven collector. It automatically reacts when new lines are appended into
 a file (especially a log file). It acts like the tail Unix command. Basically, it's an alternative to the log collector.
-The log collector reacts for local Karaf log messages, whereas the file collector can react to any files, included log
-file from other system than Karaf. It means that you can monitor and send collected data for any system (even not Java
-base, or whatever).
+The log collector reacts to local Karaf log messages, whereas the file collector can react to any file, including log
+files from other systems to Karaf. It means that you can monitor and send collected data for any system (even if it is not Java
+based).
 
 The file collector deals with file rotation, file not found.
 
@@ -123,7 +123,7 @@
 ----
 
 * `type` is an ID (mandatory) that allows you to easily identify the monitored file
-* `path` is the location of the file that you want to monitore
+* `path` is the location of the file that you want to monitor
 * all other values (like `any`) will be part of the collected data. It means that you can add your own custom data, and
 easily create queries bases on this data.
 
@@ -156,11 +156,11 @@
 
 ====== Identity parser
 
-The identity parser doesn't actually parse the line, it just pass through. It's the default parser used by the file collector.
+The identity parser doesn't actually parse the line, it just passes through. It's the default parser used by the file collector.
 
 ====== Split parser
 
-The split parser split the line using a separator (`,` by default). Optionally, it can take `keys` used a property name in the event.
+The split parser splits the line using a separator (`,` by default). Optionally, it can take `keys` used a property name in the event.
 
 For instance, you can have the following `etc/org.apache.karaf.decanter.parser.split.cfg` configuration file:
 
@@ -169,7 +169,7 @@
 keys=first,second,third,fourth
 ----
 
-If the parser gets a line (collected by the file collector) like `this,is,a,test`, the line will be parsed as follow (the file collector will send the following data to the dispatcher):
+If the parser gets a line (collected by the file collector) like `this,is,a,test`, the line will be parsed as follows (the file collector will send the following data to the dispatcher):
 
 ----
 first->this
@@ -196,7 +196,7 @@
 regex=(t.*t)
 ----
 
-If the parser gets a line (collected by the file collector) like `a test here`, the linbe will be parsed as follow (the file collector will send the following data to the dispatcher):
+If the parser gets a line (collected by the file collector) like `a test here`, the line will be parsed as follows (the file collector will send the following data to the dispatcher):
 
 ----
 key-0->test
@@ -312,7 +312,7 @@
 This file harvests the data of the local MBeanServer:
 
 * the `type` property is a name (of your choice) allowing you to easily identify the harvested data
-* the `url` property is the MBeanServer to connect. "local" is reserved keyword to specify the local MBeanServer.
+* the `url` property is the MBeanServer to connect to. "local" is a reserved keyword to specify the local MBeanServer.
 Instead of "local", you can use the JMX service URL. For instance, for Karaf version 3.0.0, 3.0.1, 3.0.2, and 3.0.3,
 as the local MBeanServer is secured, you can specify `service:jmx:rmi:///jndi/rmi://localhost:1099/karaf-root`. You
 can also polled any remote MBean server (Karaf based or not) providing the service URL.
@@ -322,7 +322,7 @@
 is secured.
 * the `object.name` prefix is optional. If this property is not specified, the collector will retrieve the attributes
 of all MBeans. You can filter to consider only some MBeans. This property contains the ObjectName filter to retrieve
-the attributes only to some MBeans. Several object names can be listed, provided the property prefix is `object.name.`.
+the attributes only of some MBeans. Several object names can be listed, provided the property prefix is `object.name.`.
 * any other values will be part of the collected data. It means that you can add your own property if you want to add
 additional data, and create queries based on this data.
 * the `operation.name` prefix is also optional. You can use it to execute an operation. The value format is `objectName|operation|arguments|signatures`.
@@ -334,7 +334,7 @@
 
 The Karaf Decanter JMX collector by default uses RMI protocol for JMX. But it also supports JMXMP protocol.
 
-The features to install are the sames: `decanter-collector-jmx`.
+The features to install are the same: `decanter-collector-jmx`.
 
 However, you have to enable the `jmxmp` protocol support in the Apache Karaf instance hosting Karaf Decanter.
 
@@ -389,7 +389,7 @@
 karaf@root()> feature:install decanter-collector-jmx-activemq
 ----
 
-This feature installs the same collector as the `decanter-collector-jmx`, but also add the
+This feature installs the same collector as the `decanter-collector-jmx`, but also adds the
 `etc/org.apache.karaf.decanter.collector.jmx-activemq.cfg` configuration file.
 
 This file contains:
@@ -430,7 +430,7 @@
 karaf@root()> feature:install decanter-collector-jmx-camel
 ----
 
-This feature installs the same collector as the `decanter-collector-jmx`, but also add the
+This feature installs the same collector as the `decanter-collector-jmx`, but also adds the
 `etc/org.apache.karaf.decanter.collector.jmx-camel.cfg` configuration file.
 
 This file contains:
@@ -467,7 +467,7 @@
 
 ===== Camel Tracer
 
-If you enable the tracer on a Camel route, all tracer events (exchanges on each step of the route) are send to the
+If you enable the tracer on a Camel route, all tracer events (exchanges on each step of the route) are sent to the
 appenders.
 
 The `decanter-collector-camel` feature provides the Camel Tracer Handler:
@@ -526,7 +526,7 @@
 
 Decanter also provides `DecanterEventNotifier` implementing a Camel event notifier: http://camel.apache.org/eventnotifier-to-log-details-about-all-sent-exchanges.html
 
-It's very similar to the Decanter Camel Tracer. You can control the camel contexts and routes to which you want to trap event.
+It's very similar to the Decanter Camel Tracer. You can control the camel contexts and routes to which you want to trap events.
 
 ==== System
 
@@ -550,6 +550,9 @@
 # This collector executes system commands, retrieve the exec output/err
 # sent to the appenders
 #
+# You can define the number of thread to use for parallelization command calls:
+# thread.number=1
+#
 # The format is command.key=command_to_execute
 # where command is a reserved keyword used to identify a command property
 # for instance:
@@ -616,15 +619,15 @@
 ----
 
 * the `port` property contains the port number where the network socket collector is listening
-* the `workers` property contains the number of worker thread the socket collector is using for connection
+* the `workers` property contains the number of worker threads the socket collector is using for the connection
 * the `protocol` property contains the protocol used by the collector for transferring data with the client
 * the `unmarshaller.target` property contains the unmarshaller used by the collector to transform the data
-sended by the client.
+sent by the client.
 
 ==== JMS
 
 The Decanter JMS collector consumes the data from a JMS queue or topic. It's a way to aggregate collected data coming
-from remote and several machines.
+from (several) remote machines.
 
 The `decanter-collector-jms` feature installs the JMS collector:
 
@@ -663,7 +666,7 @@
 ==== MQTT
 
 The Decanter MQTT collector receives collected messages from a MQTT broker. It's a way to aggregate collected data coming
-from remote and several machines.
+from (several) remote machines.
 
 The `decanter-collector-mqtt` feature installs the MQTT collector:
 
@@ -695,7 +698,7 @@
 ==== Kafka
 
 The Decanter Kafka collector receives collected messages from a Kafka broker. It's a way to aggregate collected data coming
-from remote and several machines.
+from (several) remote machines.
 
 The `decanter-collector-kafka` feature installs the Kafka collector:
 
@@ -765,7 +768,7 @@
 # For SASL, you have to configure Java System property as explained in http://kafka.apache.org/documentation.html#security_ssl
 ----
 
-The configuration is similar to the Decanter Kafka appender. Please, see Kafka collector for details.
+The configuration is similar to the Decanter Kafka appender. Please, see the Kafka collector for details.
 
 ==== Rest Servlet
 
@@ -800,7 +803,7 @@
 soap.request=
 ----
 
-The collector send several collected properties to the dispatcher, especially:
+The collector sends several collected properties to the dispatcher, especially:
 
 * `soap.response` property contains the actual SOAP response
 * `error` is only populated when the service request failed, containing the error detail
@@ -821,7 +824,7 @@
 
 ==== JDBC
 
-The Decanter JDBC collector periodically executes a query on a database and send the query result to the dispatcher.
+The Decanter JDBC collector periodically executes a query on a database and sends the query result to the dispatcher.
 
 The `decanter-collector-jdbc` installs the JDBC collector:
 
@@ -876,4 +879,4 @@
 
     ----
     fields.rename.helo=hello
-    ----
\ No newline at end of file
+    ----
diff --git a/manual/src/main/asciidoc/user-guide/introduction.adoc b/manual/src/main/asciidoc/user-guide/introduction.adoc
index e6f3379..da976a7 100644
--- a/manual/src/main/asciidoc/user-guide/introduction.adoc
+++ b/manual/src/main/asciidoc/user-guide/introduction.adoc
@@ -14,19 +14,18 @@
 
 === Introduction
 
-Apache Karaf Decanter is monitoring solution running in Apache Karaf.
+Apache Karaf Decanter is a monitoring solution running in Apache Karaf.
 
-It's composed in three parts:
+It's composed of three parts:
 
-* Collectors are responsible of harvesting monitoring data. Decanter provides collectors to harvest different kind
+* Collectors are responsible for harvesting monitoring data. Decanter provides collectors to harvest different kinds
 of data. We have two kinds of collectors:
 ** Event Driven Collectors automatically react to events and send the event data to the Decanter appenders.
 ** Polled Collectors are periodically called by the Decanter Scheduler. They harvest data and send it to the Decanter
 appenders
 * Appenders receive the data from the collectors and are responsible to store the data into a given backend. Decanter
-provides appenders depending of the backend storage that you want to use.
-* Alerters is a special kind of appender. It receives all harvested data and checks on it. If a check fails, an alert event
-is created and sent to alerters. Decanter provides alerters depending of the kind of notification that you want.
+provides appenders depending on the backend storage that you want to use.
+* Alerters are a special kind of appender. A check is performed on all harvested data. If a check fails, an alert event is created and sent to the alerters. Decanter provides alerters depending on the kind of notification that you want.
 
 Apache Karaf Decanter provides Karaf features for each collector, appender, alerter.
 
diff --git a/pom.xml b/pom.xml
index 8161c81..6a31304 100644
--- a/pom.xml
+++ b/pom.xml
@@ -40,8 +40,8 @@
 
         <activemq.version>5.13.3</activemq.version>
         <camel.version>2.16.2</camel.version>
-        <cassandra.version>2.2.4</cassandra.version>
-        <cassandra.driver.version>2.2.0-rc1</cassandra.driver.version>
+        <cassandra.version>3.11.4</cassandra.version>
+        <cassandra.driver.version>4.1.0</cassandra.driver.version>
         <elasticsearch1.version>1.7.4</elasticsearch1.version>
         <elasticsearch1.bundle.version>1.7.4_1</elasticsearch1.bundle.version>
         <elasticsearch22.version>2.2.0</elasticsearch22.version>
@@ -148,7 +148,7 @@
                 <plugin>
                     <groupId>org.apache.felix</groupId>
                     <artifactId>maven-bundle-plugin</artifactId>
-                    <version>4.2.0</version>
+                    <version>4.2.1</version>
                 </plugin>
                 <plugin>
                     <groupId>org.apache.maven.plugins</groupId>
@@ -222,8 +222,8 @@
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <configuration>
-                    <source>1.7</source>
-                    <target>1.7</target>
+                    <source>1.8</source>
+                    <target>1.8</target>
                 </configuration>
             </plugin>
             <plugin>
@@ -503,7 +503,7 @@
                     <plugin>
                         <groupId>org.apache.rat</groupId>
                         <artifactId>apache-rat-plugin</artifactId>
-                        <version>0.12</version>
+                        <version>0.13</version>
                         <executions>
                             <execution>
                                 <phase>verify</phase>
@@ -534,6 +534,9 @@
                                 <exclude>**/META-INF/decanter.bundles.default</exclude>
                                 <exclude>**/bundle8/**/*</exclude>
                                 <exclude>**/test.cfg</exclude>
+                                <exclude>**/.classpath</exclude>
+                                <exclude>**/.settings/**/*</exclude>
+                                <exclude>**/.project</exclude>
                             </excludes>
                             <consoleOutput>true</consoleOutput>
                         </configuration>