notification plugin framework
diff --git a/eagle-assembly/src/assembly/eagle-bin.xml b/eagle-assembly/src/assembly/eagle-bin.xml
index 1c1a00f..2a04fdc 100644
--- a/eagle-assembly/src/assembly/eagle-bin.xml
+++ b/eagle-assembly/src/assembly/eagle-bin.xml
@@ -218,14 +218,6 @@
             </excludes>
         </fileSet>
 
-        <!--<fileSet>-->
-            <!--<directory>${project.basedir}/../eagle-samples/eagle-persist-sample/target/</directory>-->
-            <!--<outputDirectory>lib/topology</outputDirectory>-->
-            <!--<includes>-->
-                <!--<include>eagle-persist-sample-*-assembly.jar</include>-->
-            <!--</includes>-->
-        <!--</fileSet>-->
-
         <fileSet>
             <directory>${project.basedir}/../eagle-external/eagle-ambari</directory>
             <outputDirectory>lib/ambari</outputDirectory>
diff --git a/eagle-assembly/src/main/bin/eagle-create-table.rb b/eagle-assembly/src/main/bin/eagle-create-table.rb
index 21dc030..cbef6f4 100644
--- a/eagle-assembly/src/main/bin/eagle-create-table.rb
+++ b/eagle-assembly/src/main/bin/eagle-create-table.rb
@@ -56,5 +56,6 @@
 createEagleTable(admin, 'appDefinition')
 createEagleTable(admin, 'serviceAudit')
 createEagleTable(admin, 'aggregatedef')
+createEagleTable(admin, 'alertNotifications')
 
 exit
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/pom.xml b/eagle-core/eagle-alert/eagle-alert-notification-plugin/pom.xml
new file mode 100644
index 0000000..b73d43b
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/pom.xml
@@ -0,0 +1,90 @@
+<!--
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>eagle</groupId>
+    <artifactId>eagle-alert-parent</artifactId>
+    <version>0.3.0</version>
+  </parent>
+  <groupId>eagle</groupId>
+  <artifactId>eagle-alert-notification-plugin</artifactId>
+  <name>eagle-alert-notification-plugin</name>
+  <description>Apache Eagle Notification Plugin to  enable services to use custom or default notification </description>
+  <dependencies>
+  	<dependency>
+          <groupId>eagle</groupId>
+          <artifactId>eagle-policy-base</artifactId>
+          <version>${project.version}</version>
+          <exclusions>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>log4j-over-slf4j</artifactId>
+              </exclusion>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-simple</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+  	<dependency>
+	    <groupId>org.apache.kafka</groupId>
+	    <artifactId>kafka-clients</artifactId>
+	</dependency>
+      <dependency>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-core</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-databind</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>com.fasterxml.jackson.module</groupId>
+          <artifactId>jackson-module-scala_${scala.version}</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-mapper-asl</artifactId>
+      </dependency>
+      <dependency>
+          <groupId>org.codehaus.jackson</groupId>
+          <artifactId>jackson-core-asl</artifactId>
+      </dependency>
+
+      <dependency>
+          <groupId>org.reflections</groupId>
+          <artifactId>reflections</artifactId>
+          <exclusions>
+              <exclusion>
+                  <groupId>org.slf4j</groupId>
+                  <artifactId>slf4j-simple</artifactId>
+              </exclusion>
+          </exclusions>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+      </dependency>
+  </dependencies>
+</project>
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java
new file mode 100644
index 0000000..98d6b5c
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.base;
+
+public class NotificationConstants {
+    public static final String NOTIFICATION_TYPE = "notificationType";
+    public static final String EMAIL_NOTIFICATION = "email";
+    public static final String KAFKA_STORE = "kafka";
+    public static final String EAGLE_STORE = "eagleStore";
+
+    // email specific constants
+    public static final String SUBJECT = "subject";
+    public static final String SENDER = "sender";
+    public static final String RECIPIENTS = "recipients";
+    public static final String TPL_FILE_NAME = "tplFileName";
+}
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationStatus.java
similarity index 69%
copy from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationStatus.java
index d608104..d57dbf9 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationStatus.java
@@ -14,18 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.eagle.persist.test;
 
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
+package org.apache.eagle.notification.base;
 
 /**
- * Created on 1/4/16.
+ * Object that holds the status of Notification Posted to Notification Plugin  
  */
-public class MetricSerializer implements SpoutKafkaMessageDeserializer {
-    @Override
-    public Object deserialize(byte[] arg0) {
-        String logLine = new String(arg0);
-
-        return null;
-    }
+public class NotificationStatus {
+	public boolean successful;
+	public String errorMessage;
 }
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAO.java
similarity index 64%
copy from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAO.java
index d608104..bf2e75e 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAO.java
@@ -14,18 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.eagle.persist.test;
 
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
+package org.apache.eagle.notification.dao;
+
+
+
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+
+import java.util.List;
 
 /**
- * Created on 1/4/16.
+ * Alert Notification Data Access Obj Interface
  */
-public class MetricSerializer implements SpoutKafkaMessageDeserializer {
-    @Override
-    public Object deserialize(byte[] arg0) {
-        String logLine = new String(arg0);
-
-        return null;
-    }
+public interface AlertNotificationDAO {
+    /**
+     * find the Alert Notification Types by querying alertNotifications Table
+     * @return
+     * @throws Exception
+     */
+    List<AlertNotificationEntity> findAlertNotificationTypes() throws Exception;
 }
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAOImpl.java
new file mode 100644
index 0000000..7251da3
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAOImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.dao;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Notification Service API implementation which Provides Read/Write API's of Hbase AlertNotifications Table
+ */
+public class AlertNotificationDAOImpl implements  AlertNotificationDAO {
+
+    private final Logger LOG = LoggerFactory.getLogger(AlertNotificationDAOImpl.class);
+    private final EagleServiceConnector connector;
+
+    public AlertNotificationDAOImpl(EagleServiceConnector connector){
+        this.connector = connector;
+    }
+
+    /**
+     * Find the Alerts by NotificationType
+     * @return
+     * @throws Exception
+     */
+    @Override
+    public List<AlertNotificationEntity> findAlertNotificationTypes() throws Exception {
+        try{
+            IEagleServiceClient client = new EagleServiceClientImpl(connector);
+            String query = Constants.ALERT_NOTIFICATION_SERVICE_ENDPOINT_NAME+"[@enabled=\"true\"]{*}";
+            GenericServiceAPIResponseEntity response = client.search(query).startTime(0)
+                    .endTime(10 * DateUtils.MILLIS_PER_DAY)
+                    .pageSize(Integer.MAX_VALUE)
+                    .query(query)
+                    .send();
+            client.close();
+            if (response.getException() != null) {
+                throw new Exception("Got an exception when query eagle service: " + response.getException());
+            }
+            return response.getObj();
+        }
+        catch (Exception ex) {
+            LOG.error("Got an exception when query alert notification service ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+}
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailComponent.java
similarity index 64%
copy from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailComponent.java
index d608104..3fe55bf 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailComponent.java
@@ -14,18 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.eagle.persist.test;
+package org.apache.eagle.notification.email;
 
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
+import org.apache.eagle.common.metric.AlertContext;
 
 /**
- * Created on 1/4/16.
+ * Alert email component is one part of an email, which could be an individual alert
  */
-public class MetricSerializer implements SpoutKafkaMessageDeserializer {
-    @Override
-    public Object deserialize(byte[] arg0) {
-        String logLine = new String(arg0);
-
-        return null;
+public class AlertEmailComponent {
+    private AlertContext alertContext;
+    public AlertContext getAlertContext() {
+        return alertContext;
     }
-}
+    public void setAlertContext(AlertContext alertContext) {
+        this.alertContext = alertContext;
+    }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailContext.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailContext.java
new file mode 100644
index 0000000..f1642be
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import java.util.List;
+
+/**
+ * alert email bean
+ * one email consists of a list of email component
+ */
+public class AlertEmailContext {
+    private List<AlertEmailComponent> components;
+    private String sender;
+    private String subject;
+    private String recipients;
+    private String velocityTplFile;
+    private String cc;
+
+    public List<AlertEmailComponent> getComponents() {
+        return components;
+    }
+    public void setComponents(List<AlertEmailComponent> components) {
+        this.components = components;
+    }
+    public String getVelocityTplFile() {
+        return velocityTplFile;
+    }
+    public void setVelocityTplFile(String velocityTplFile) {
+        this.velocityTplFile = velocityTplFile;
+    }
+    public String getRecipients() {
+        return recipients;
+    }
+    public void setRecipients(String recipients) {
+        this.recipients = recipients;
+    }
+    public String getSender() {
+        return sender;
+    }
+    public void setSender(String sender) {
+        this.sender = sender;
+    }
+    public String getSubject() {
+        return subject;
+    }
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+    public String getCc() {
+        return cc;
+    }
+    public void setCc(String cc) {
+        this.cc = cc;
+    }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java
new file mode 100644
index 0000000..ddd3c96
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.eagle.notification.email;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import com.typesafe.config.ConfigObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertEmailGenerator{
+    private String tplFile;
+    private String sender;
+    private String recipients;
+    private String subject;
+    private ConfigObject eagleProps;
+
+    private ThreadPoolExecutor executorPool;
+
+    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
+
+    private final static long MAX_TIMEOUT_MS =60000;
+
+    public boolean sendAlertEmail(AlertAPIEntity entity) {
+        return sendAlertEmail(entity, recipients, null);
+    }
+
+    public boolean sendAlertEmail(AlertAPIEntity entity, String recipients) {
+        return sendAlertEmail(entity, recipients, null);
+    }
+
+    public boolean sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) {
+        boolean sentSuccessfully = false;
+        AlertEmailContext email = new AlertEmailContext();
+
+        AlertEmailComponent component = new AlertEmailComponent();
+        component.setAlertContext(entity.getAlertContext());
+        List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
+        components.add(component);
+        email.setComponents(components);
+        if (entity.getAlertContext().getProperty(Constants.SUBJECT) != null) {
+            email.setSubject(entity.getAlertContext().getProperty(Constants.SUBJECT));
+        }
+        else email.setSubject(subject);
+        email.setVelocityTplFile(tplFile);
+        email.setRecipients(recipients);
+        email.setCc(cc);
+        email.setSender(sender);
+
+        /** asynchronized email sending */
+        @SuppressWarnings("rawtypes")
+        AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
+
+        if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
+
+        LOG.info("Sending email  in asynchronous to: "+recipients+", cc: "+cc);
+        Future future = this.executorPool.submit(thread);
+        try {
+            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            sentSuccessfully = true;
+            LOG.info(String.format("Successfully send email to %s", recipients));
+        } catch (InterruptedException | ExecutionException  e) {
+            sentSuccessfully = false;
+            LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e);
+        } catch (TimeoutException e) {
+            sentSuccessfully = false;
+            LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
+        }
+        return sentSuccessfully;
+    }
+
+    public String getTplFile() {
+        return tplFile;
+    }
+
+    public void setTplFile(String tplFile) {
+        this.tplFile = tplFile;
+    }
+
+    public String getSender() {
+        return sender;
+    }
+
+    public void setSender(String sender) {
+        this.sender = sender;
+    }
+
+    public String getRecipients() {
+        return recipients;
+    }
+
+    public void setRecipients(String recipients) {
+        this.recipients = recipients;
+    }
+
+    public String getSubject() {
+        return subject;
+    }
+
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+
+    public ConfigObject getEagleProps() {
+        return eagleProps;
+    }
+
+    public void setEagleProps(ConfigObject eagleProps) {
+        this.eagleProps = eagleProps;
+    }
+
+    public void setExecutorPool(ThreadPoolExecutor executorPool) {
+        this.executorPool = executorPool;
+    }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java
new file mode 100644
index 0000000..2e63dab
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import com.typesafe.config.ConfigObject;
+
+public class AlertEmailGeneratorBuilder {
+    private AlertEmailGenerator generator;
+    private AlertEmailGeneratorBuilder(){
+        generator = new AlertEmailGenerator();
+    }
+    public static AlertEmailGeneratorBuilder newBuilder(){
+        return new AlertEmailGeneratorBuilder();
+    }
+    public AlertEmailGeneratorBuilder withSubject(String subject){
+        generator.setSubject(subject);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withSender(String sender){
+        generator.setSender(sender);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withRecipients(String recipients){
+        generator.setRecipients(recipients);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withTplFile(String tplFile){
+        generator.setTplFile(tplFile);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withEagleProps(ConfigObject eagleProps) {
+        generator.setEagleProps(eagleProps);
+        return this;
+    }
+    public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
+        generator.setExecutorPool(threadPoolExecutor);
+        return this;
+    }
+
+    public AlertEmailGenerator build(){
+        return this.generator;
+    }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java
new file mode 100644
index 0000000..2b18f1e
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.velocity.VelocityContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.email.EagleMailClient;
+import com.netflix.config.ConcurrentMapConfiguration;
+import com.typesafe.config.ConfigObject;
+
+public class AlertEmailSender implements Runnable {
+
+    protected final List<Map<String, String>> alertContexts = new ArrayList<Map<String, String>>();
+    protected final String configFileName;
+    protected final String subject;
+    protected final String sender;
+    protected final String recipents;
+    protected final String cc;
+    protected final String origin;
+    protected boolean sentSuccessfully = false;
+
+    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class);
+    private final static int MAX_RETRY_COUNT = 3;
+
+    private static final String MAIL_HOST = "mail.host";
+    private static final String MAIL_PORT = "mail.smtp.port";
+    private static final String MAIL_DEBUG = "mail.debug";
+
+    private static final String CONF_KEY_MAIL_HOST = "mailHost";
+    private static final String CONF_KEY_MAIL_PORT = "mailSmtpPort";
+    private static final String CONF_KEY_MAIL_DEBUG = "mailDebug";
+
+    private ConfigObject eagleProps;
+
+
+    private String threadName;
+    /**
+     * Derived class may have some additional context properties to add
+     * @param context velocity context
+     * @param env environment
+     */
+    protected void additionalContext(VelocityContext context, String env) {
+        // By default there's no additional context added
+    }
+
+    public AlertEmailSender(AlertEmailContext alertEmail){
+        this.recipents = alertEmail.getRecipients();
+        this.configFileName = alertEmail.getVelocityTplFile();
+        this.subject = alertEmail.getSubject();
+        this.sender = alertEmail.getSender();
+        this.cc = alertEmail.getCc();
+        for(AlertEmailComponent bean : alertEmail.getComponents()){
+            this.alertContexts.add(bean.getAlertContext().getProperties());
+        }
+        String tmp = ManagementFactory.getRuntimeMXBean().getName();
+        this.origin = tmp.split("@")[1] + "(pid:" + tmp.split("@")[0] + ")";
+        threadName = Thread.currentThread().getName();
+        LOG.info("Initialized "+threadName+": origin is : " + this.origin+", recipient of the email: " + this.recipents+", velocity TPL file: " + this.configFileName);
+    }
+
+    public AlertEmailSender(AlertEmailContext alertEmail, ConfigObject eagleProps){
+        this(alertEmail);
+        this.eagleProps = eagleProps;
+    }
+
+    @Override
+    public void run() {
+        int count = 0;
+        boolean success = false;
+        while(count++ < MAX_RETRY_COUNT && !success){
+            LOG.info("Sending email, tried: " + count+", max: "+MAX_RETRY_COUNT);
+            try {
+                final EagleMailClient client;
+                if (eagleProps != null) {
+                    ConcurrentMapConfiguration con = new ConcurrentMapConfiguration();
+                    con.addProperty(MAIL_HOST, eagleProps.get(CONF_KEY_MAIL_HOST).unwrapped());
+                    con.addProperty(MAIL_PORT, eagleProps.get(CONF_KEY_MAIL_PORT).unwrapped());
+                    if (eagleProps.get(CONF_KEY_MAIL_DEBUG) != null) {
+                        con.addProperty(MAIL_DEBUG, eagleProps.get(CONF_KEY_MAIL_DEBUG).unwrapped());
+                    }
+                    client = new EagleMailClient(con);
+                }
+                else {
+                    client = new EagleMailClient();
+                }
+                String env = "prod";
+                if (eagleProps != null && eagleProps.get("env") != null) {
+                    env = (String) eagleProps.get("env").unwrapped();
+                }
+                LOG.info("Env is: " + env);
+                final VelocityContext context = new VelocityContext();
+                generateCommonContext(context);
+                LOG.info("After calling generateCommonContext...");
+                additionalContext(context, env);
+
+                if (recipents == null || recipents.equals("")) {
+                    LOG.error("Recipients is null, skip sending emails ");
+                    return;
+                }
+                String title = subject;
+                if (!env.trim().equals("prod")) {
+                    title = "[" + env + "]" + title;
+                }
+                success = client.send(sender, recipents, cc, title, configFileName, context, null);
+                LOG.info("Success of sending email: " + success);
+                if(!success && count < MAX_RETRY_COUNT) {
+                    LOG.info("Sleep for a while before retrying");
+                    Thread.sleep(10*1000);
+                }
+            }
+            catch (Exception e){
+                LOG.warn("Sending mail exception", e);
+            }
+        }
+
+        if(success){
+            sentSuccessfully = true;
+            LOG.info(String.format("Successfully send email, thread: %s",threadName));
+        }else{
+            LOG.warn(String.format("Fail sending email after tries %s times, thread: %s",MAX_RETRY_COUNT,threadName));
+        }
+    }
+
+    private void generateCommonContext(VelocityContext context) {
+        context.put(Constants.ALERT_EMAIL_TIME_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds( System.currentTimeMillis() ));
+        context.put(Constants.ALERT_EMAIL_COUNT_PROPERTY, alertContexts.size());
+        context.put(Constants.ALERT_EMAIL_ALERTLIST_PROPERTY, alertContexts);
+        context.put(Constants.ALERT_EMAIL_ORIGIN_PROPERTY, origin);
+    }
+
+    public boolean sentSuccessfully(){
+        return this.sentSuccessfully;
+    }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java
new file mode 100644
index 0000000..e098256
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Alert API entity Persistor
+ */
+public class AlertEagleStorePersister {
+	private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePersister.class);
+	private String eagleServiceHost;
+	private int eagleServicePort;
+	private String username;
+	private String password;
+
+
+	public AlertEagleStorePersister(String eagleServiceHost, int eagleServicePort) {
+		this(eagleServiceHost, eagleServicePort, null, null);
+	}
+
+	public AlertEagleStorePersister(String eagleServiceHost, int eagleServicePort, String username, String password) {
+		this.eagleServiceHost = eagleServiceHost;
+		this.eagleServicePort = eagleServicePort;
+		this.username = username;
+		this.password = password;
+	}
+
+	public AlertEagleStorePersister(Config config ) {
+		this.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+		this.eagleServicePort = config.getInt("eagleProps.eagleService.port");
+		this.username = config.getString("eagleProps.eagleService.username");
+		this.password =config.getString("eagleProps.eagleService.password");
+	}
+
+	/**
+	 * Persist passes list of Entities
+	 * @param list
+	 * @return
+     */
+	public boolean doPersist(List<? extends TaggedLogAPIEntity> list) {
+		if (list.isEmpty()) return false;
+		LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
+		try {
+			IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
+			GenericServiceAPIResponseEntity<String> response = client.create(list);
+			client.close();
+			if (response.isSuccess()) {
+				LOG.info("Successfully create entities " + list.toString());
+				return true;
+			}
+			else {
+				LOG.error("Fail to create entities with exception " + response.getException());
+				return false;
+			}
+		}
+		catch (Exception ex) {
+			LOG.error("Got an exception in persisting entities", ex);
+			return false;
+		}
+	}
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
new file mode 100644
index 0000000..cb0df70
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Plugin to persist alerts to Eagle Storage
+ */
+public class AlertEagleStorePlugin implements NotificationPlugin {
+    private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class);
+    private NotificationStatus status;
+    private AlertEagleStorePersister persist;
+
+    @Override
+    public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+        this.persist = new AlertEagleStorePersister(config);
+        this.status = new NotificationStatus();
+        LOG.info("initialized plugin for EagleStorePlugin");
+    }
+
+    @Override
+    public void update(String policyId, Map<String,String> notificationConf , boolean isPolicyDelete ) throws Exception {
+        if( isPolicyDelete ){
+            LOG.info("Deleted policy ...");
+            return;
+        }
+        LOG.info("created/updated plugin ...");
+    }
+
+    @Override
+    public NotificationStatus getStatus() {
+        return this.status;
+    }
+
+    /**
+     * Persist AlertEntity to alert_details table
+     * @param alertEntity
+     */
+    @Override
+    public void onAlert(AlertAPIEntity alertEntity) {
+        LOG.info("write alert to eagle storage " + alertEntity);
+        try{
+            List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>();
+            list.add(alertEntity);
+            boolean result = persist.doPersist( list );
+            if(result) {
+                status.successful = true;
+                status.errorMessage = "";
+            }else{
+                status.successful = false;
+                status.errorMessage = "";
+            }
+        }catch (Exception ex ){
+            status.successful = false;
+            status.errorMessage = ex.getMessage();
+            LOG.error("Fail writing alert entity to Eagle Store", ex);
+        }
+    }
+
+    @Override
+    public int hashCode(){
+        return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+    }
+
+    @Override
+    public boolean equals(Object o){
+        if(o == this)
+            return true;
+        if(!(o instanceof AlertEagleStorePlugin))
+            return false;
+        return true;
+    }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
new file mode 100644
index 0000000..0577f5c
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.apache.eagle.notification.email.AlertEmailGenerator;
+import org.apache.eagle.notification.email.AlertEmailGeneratorBuilder;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ *  Send alert to email
+ */
+public class AlertEmailPlugin implements NotificationPlugin {
+	private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPlugin.class);
+	private Map<String, AlertEmailGenerator> emailGenerators = new ConcurrentHashMap<>();
+	private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
+	private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
+	private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
+	private transient ThreadPoolExecutor executorPool;
+	private NotificationStatus status = new NotificationStatus();
+
+	@Override
+	public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+		executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+		LOG.info(" Creating Email Generator... ");
+		for( AlertDefinitionAPIEntity  entity : initAlertDefs ){
+			List<Map<String,String>>  configMaps = NotificationPluginUtils.deserializeNotificationConfig(entity.getNotificationDef());
+			for( Map<String,String> notificationConfigMap : configMaps ){
+				String notificationType = notificationConfigMap.get(NotificationConstants.NOTIFICATION_TYPE);
+				// for backward compatibility, default notification is email
+				if(notificationType == null || notificationType.equalsIgnoreCase(NotificationConstants.EMAIL_NOTIFICATION)){
+					AlertEmailGenerator generator = createEmailGenerator(notificationConfigMap);
+						this.emailGenerators.put(entity.getTags().get(Constants.POLICY_ID), generator);
+						LOG.info("Successfully initialized email notification for policy " + entity.getTags().get(Constants.POLICY_ID) + ",with " + notificationConfigMap);
+				}
+			}
+		}
+	}
+
+	/**
+	 * @param notificationConf
+	 * @throws Exception
+     */
+	@Override
+	public void update(String policyId, Map<String,String> notificationConf  , boolean isPolicyDelete ) throws Exception {
+		if( isPolicyDelete ){
+			LOG.info(" Policy been deleted.. Removing reference from Notification Plugin ");
+			this.emailGenerators.remove(policyId);
+			return;
+		}
+		AlertEmailGenerator generator = createEmailGenerator(notificationConf);
+		this.emailGenerators.put(policyId , generator );
+		LOG.info("created/updated email generator for updated policy " + policyId);
+	}
+
+	/**
+	 * API to send email
+	 * @param alertEntity
+	 * @throws Exception
+     */
+	@Override
+	public void onAlert(AlertAPIEntity alertEntity) throws  Exception {
+		String policyId = alertEntity.getTags().get(Constants.POLICY_ID);
+		AlertEmailGenerator generator = this.emailGenerators.get(policyId);
+		boolean isSuccess = generator.sendAlertEmail(alertEntity);
+		if( !isSuccess ) {
+			status.errorMessage = "Failed to send email";
+			status.successful = false;
+		}else {
+			status.errorMessage = "";
+			status.successful = true;
+		}
+	}
+
+	@Override
+	public NotificationStatus getStatus() {
+		return this.status;
+	}
+
+	/**
+	 * @param notificationConfig
+	 * @return
+     */
+	private AlertEmailGenerator createEmailGenerator( Map<String,String> notificationConfig ) {
+		String tplFileName = notificationConfig.get(NotificationConstants.TPL_FILE_NAME);
+		if (tplFileName == null || tplFileName.equals("")) {
+			tplFileName = "ALERT_DEFAULT.vm";
+		}
+		AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
+				withEagleProps(EagleConfigFactory.load().getConfig().getObject("eagleProps")).
+				withSubject(notificationConfig.get(NotificationConstants.SUBJECT)).
+				withSender(notificationConfig.get(NotificationConstants.SENDER)).
+				withRecipients(notificationConfig.get(NotificationConstants.RECIPIENTS)).
+				withTplFile(tplFileName).
+				withExecutorPool(this.executorPool).build();
+		return gen;
+	}
+
+	@Override
+	public int hashCode(){
+		return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+	}
+
+	@Override
+	public boolean equals(Object o){
+		if(o == this)
+			return true;
+		if(!(o instanceof AlertEmailPlugin))
+			return false;
+		return true;
+	}
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
new file mode 100644
index 0000000..cc4f89d
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *  send alert to Kafka bus
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class AlertKafkaPlugin implements NotificationPlugin {
+	private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPlugin.class);
+	private NotificationStatus status = new NotificationStatus();
+	private Map<String, Map<String, String>> kafaConfigs = new ConcurrentHashMap<>();
+	private Config config;
+
+	@Override
+	public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+		this.config = config;
+		for( AlertDefinitionAPIEntity entity : initAlertDefs ) {
+			List<Map<String,String>>  configMaps = NotificationPluginUtils.deserializeNotificationConfig(entity.getNotificationDef());
+			for( Map<String,String> notificationConfigMap : configMaps ){
+				String notificationType = notificationConfigMap.get(NotificationConstants.NOTIFICATION_TYPE);
+				if(notificationType == null){
+					LOG.error("no notificationType field for this notification, ignoring and continue " + notificationConfigMap);
+					continue;
+				}else {
+					// single policy can have multiple configs , only load Kafka Config's
+					if (notificationType.equalsIgnoreCase(NotificationConstants.KAFKA_STORE)) {
+						kafaConfigs.put(entity.getTags().get(Constants.POLICY_ID), notificationConfigMap);
+						break;
+					}
+				}
+			}
+		}
+	}
+
+	/**
+	 * Update API to update policy delete/create/update in Notification Plug-ins
+	 * @param  notificationConf
+	 * @param isPolicyDelete
+	 * @throws Exception
+     */
+	@Override
+	public void update(String policyId, Map<String,String> notificationConf , boolean isPolicyDelete ) throws Exception {
+		if( isPolicyDelete ){
+			LOG.info(" Policy been deleted.. Removing reference from Notification Plugin ");
+			this.kafaConfigs.remove(policyId);
+			return;
+		}
+		kafaConfigs.put(policyId, notificationConf );
+	}
+
+	/**
+	 * Post Notification to KafkaTopic
+	 * @param alertEntity
+     */
+	@Override
+	public void onAlert(AlertAPIEntity alertEntity) {
+		try{
+			KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer(config);
+			producer.send(createRecord(alertEntity));
+			status.successful = true;
+			status.errorMessage = "";
+		}catch(Exception ex ){
+			LOG.error("fail writing alert to Kafka bus", ex);
+			status.successful = false;
+			status.errorMessage = ex.getMessage();
+		}
+	}
+
+	/**
+	 * To Create  KafkaProducer Record 
+	 * @param entity
+	 * @return
+	 * @throws Exception
+	 */
+	private ProducerRecord  createRecord(AlertAPIEntity entity ) throws Exception {
+		String policyId = entity.getTags().get(Constants.POLICY_ID);
+		ProducerRecord  record  = new ProducerRecord( this.kafaConfigs.get(policyId).get("topic"), NotificationPluginUtils.objectToStr(entity));
+		return record;
+	}	
+	
+	@Override
+	public NotificationStatus getStatus() {
+		return status;
+	}
+
+	@Override
+	public int hashCode(){
+		return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+	}
+
+	@Override
+	public boolean equals(Object o){
+		if(o == this)
+			return true;
+		if(!(o instanceof AlertKafkaPlugin))
+			return false;
+		return true;
+	}
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
new file mode 100644
index 0000000..53f3bb0
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import java.util.Properties;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+/**
+ * The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances. 
+ */
+public enum KafkaProducerSingleton {
+	INSTANCE;	
+
+	public KafkaProducer<String, Object>  getProducer(Config config) throws Exception{
+		Properties configMap = new Properties();
+		configMap.put("bootstrap.servers", NotificationPluginUtils.getPropValue(config, "kafka_broker"));
+		configMap.put("metadata.broker.list", NotificationPluginUtils.getPropValue(config, "kafka_broker"));
+		configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+		configMap.put("request.required.acks", "1");	     
+		configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+		configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+		KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
+		return producer;
+	}
+	
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
new file mode 100644
index 0000000..5780176
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationStatus;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created on 2/10/16.
+ * Notification Plug-in interface which provide abstraction layer to notify to different system
+ */
+public interface NotificationPlugin {
+    /**
+     * for initialization
+     * @throws Exception
+     */
+    public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws  Exception;
+
+    /**
+     * Update Plugin if any change in Policy Definition
+     * @param policy to be impacted
+     * @param  notificationConf
+     * @throws Exception
+     */
+    public void update(String policy, Map<String,String> notificationConf , boolean isPolicyDelete ) throws  Exception;
+
+    /**
+     * Post a notification for the given alertEntity
+     * @param alertEntity
+     * @throws Exception
+     */
+
+    public void onAlert( AlertAPIEntity alertEntity ) throws  Exception;
+
+    /**
+     * Returns Status of Notification Post
+     * @return
+     */
+    public NotificationStatus getStatus();
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java
new file mode 100644
index 0000000..4aa90c5
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.dao.AlertNotificationDAO;
+import org.apache.eagle.notification.dao.AlertNotificationDAOImpl;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created on 2/10/16.
+ * don't support dynamic discovery as of 2/10
+ */
+public class NotificationPluginLoader {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationPluginLoader.class);
+    private static NotificationPluginLoader instance = new NotificationPluginLoader();
+    private static Map<String,NotificationPlugin> notificationMapping = new ConcurrentHashMap<>();
+
+    private Config config;
+    private boolean initialized = false;
+
+    public static NotificationPluginLoader getInstance(){
+        return instance;
+    }
+
+    public void init(Config config){
+        if(!initialized){
+            synchronized (this){
+                if(!initialized){
+                    internalInit(config);
+                    initialized = true;
+                }
+            }
+        }
+    }
+
+    private void internalInit(Config config){
+        this.config = config;
+        loadPlugins();
+    }
+
+    /**
+     * Scan & Load Plugins
+     */
+    private void loadPlugins(){
+        try {
+            LOG.info("Start loading Plugins from eagle service ...");
+            AlertNotificationDAO dao = new AlertNotificationDAOImpl(new EagleServiceConnector(config));
+            List<AlertNotificationEntity> activeNotificationPlugins = dao.findAlertNotificationTypes();
+            for(AlertNotificationEntity plugin : activeNotificationPlugins){
+                notificationMapping.put(plugin.getTags().get(NotificationConstants.NOTIFICATION_TYPE),
+                        (NotificationPlugin) Class.forName(plugin.getClassName()).newInstance());
+            }
+            LOG.info("successfully loaded Plugins from eagle service " + activeNotificationPlugins);
+        }catch ( Exception ex ){
+            LOG.error("Error in loading Notification Plugins: ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    public Map<String, NotificationPlugin> getNotificationMapping() {
+        ensureInitialized();
+        return notificationMapping;
+    }
+
+    private void ensureInitialized(){
+        if(!initialized)
+            throw new IllegalStateException("Plugin loader not initialized");
+    }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java
new file mode 100644
index 0000000..fdf62d1
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+
+/**
+ * Created on 2/10/16.
+ */
+public interface NotificationPluginManager {
+    /**
+     * notify alerts to plugins for one specific alert entity
+     * @param entity
+     */
+    void notifyAlert( AlertAPIEntity entity );
+
+    /**
+     * responds to changes of alert notification definition
+     * @param entity
+     * @param isDelete
+     */
+    void updateNotificationPlugins(AlertDefinitionAPIEntity entity , boolean isDelete );
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
new file mode 100644
index 0000000..887a6a9
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created on 2/10/16.
+ */
+public class NotificationPluginManagerImpl implements NotificationPluginManager {
+    private static final Logger LOG = LoggerFactory.getLogger(NotificationPluginManagerImpl.class);
+    // mapping from policy Id to NotificationPlugin instance
+    private Map<String, Collection<NotificationPlugin>> policyNotificationMapping = new ConcurrentHashMap<>(1); //only one write thread
+    private Config config;
+
+    public NotificationPluginManagerImpl(Config config){
+        this.config = config;
+        internalInit();
+    }
+
+    private void internalInit(){
+        // iterate all policy ids, keep those notification which belong to plugins
+        PolicyDefinitionDAO policyDefinitionDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector( config ) , Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME);
+        String site = config.getString("eagleProps.site");
+        String dataSource = config.getString("eagleProps.dataSource");
+        try{
+            List<AlertDefinitionAPIEntity> activeAlertDefs = policyDefinitionDao.findActivePolicies( site , dataSource );
+            // initialize all loaded plugins
+            NotificationPluginLoader.getInstance().init(config);
+            for(NotificationPlugin plugin : NotificationPluginLoader.getInstance().getNotificationMapping().values()){
+                plugin.init(config, activeAlertDefs);
+            }
+            // build policy and plugin mapping
+            for( AlertDefinitionAPIEntity entity : activeAlertDefs ){
+                Map<String, NotificationPlugin> plugins = pluginsForPolicy(entity);
+                policyNotificationMapping.put(entity.getTags().get(Constants.POLICY_ID) , plugins.values());
+            }
+        }catch (Exception ex ){
+            LOG.error("Error initializing poliy/notification mapping ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    @Override
+    public void notifyAlert(AlertAPIEntity entity) {
+        String policyId = entity.getTags().get(Constants.POLICY_ID);
+        Collection<NotificationPlugin> plugins = policyNotificationMapping.get(policyId);
+        if(plugins == null || plugins.size() == 0) {
+            LOG.debug("no plugin found for policy " + policyId);
+            return;
+        }
+        for(NotificationPlugin plugin : plugins){
+            try {
+                LOG.info("execute notification plugin " + plugin);
+                plugin.onAlert(entity);
+            }catch(Exception ex){
+                LOG.error("fail invoking plugin's onAlert, continue ", ex);
+            }
+        }
+    }
+
+    @Override
+    public void updateNotificationPlugins(AlertDefinitionAPIEntity alertDef, boolean isDelete) {
+        try {
+            // Update Notification Plugin about the change in AlertDefinition
+            String policyId = alertDef.getTags().get(Constants.POLICY_ID);
+            if(isDelete){
+                // iterate all plugins and delete this policy
+                for(NotificationPlugin plugin : policyNotificationMapping.get(policyId)){
+                    plugin.update(policyId, null, true);
+                }
+                policyNotificationMapping.remove(policyId);
+                LOG.info("Deleted notifications for policy " + policyId);
+                return;
+            }
+
+            Map<String, NotificationPlugin> plugins = pluginsForPolicy(alertDef);
+            // calculate difference between current plugins and previous plugin
+            Collection<NotificationPlugin> previousPlugins = policyNotificationMapping.get(policyId);
+            if(previousPlugins != null) {
+                Collection<NotificationPlugin> deletedPlugins = CollectionUtils.subtract(previousPlugins, plugins.values());
+                LOG.info("Going to delete plugins " + deletedPlugins + ", for policy " + policyId);
+                for (NotificationPlugin plugin : deletedPlugins) {
+                    plugin.update(policyId, null, true);
+                }
+            }
+
+            // iterate current notifications and update it individually
+            List<Map<String,String>> notificationConfigCollection = NotificationPluginUtils.deserializeNotificationConfig(alertDef.getNotificationDef());
+            for( Map<String,String> notificationConf : notificationConfigCollection ) {
+                String notificationType = notificationConf.get(NotificationConstants.NOTIFICATION_TYPE);
+                // for backward compatibility, use email for default notification type
+                if(notificationType == null){
+                    notificationType = NotificationConstants.EMAIL_NOTIFICATION;
+                }
+                NotificationPlugin plugin = plugins.get(notificationType);
+                if(plugin != null){
+                    plugin.update(policyId, notificationConf, false);
+                }
+            }
+
+            policyNotificationMapping.put(policyId, plugins.values());// update policy - notification types map
+            LOG.info("Successfully broadcasted policy updates to all Notification Plugins ...");
+        } catch (Exception e) {
+            LOG.error("Error broadcasting policy notification changes ", e);
+        }
+    }
+
+    private Map<String, NotificationPlugin> pluginsForPolicy(AlertDefinitionAPIEntity policy) throws Exception{
+        NotificationPluginLoader loader = NotificationPluginLoader.getInstance();
+        loader.init(config);
+        Map<String, NotificationPlugin> plugins = loader.getNotificationMapping();
+        // mapping from notificationType to plugin
+        Map<String, NotificationPlugin>  notifications = new HashMap<>();
+        List<Map<String,String>> notificationConfigCollection = NotificationPluginUtils.deserializeNotificationConfig(policy.getNotificationDef());
+        for( Map<String,String> notificationConf : notificationConfigCollection ){
+            String notificationType = notificationConf.get(NotificationConstants.NOTIFICATION_TYPE);
+            // for backward compatibility, by default notification type is email if notification type is not specified
+            if(notificationType == null){
+                LOG.warn("notificationType is null so use default notification type email for this policy  " + policy);
+                notifications.put(NotificationConstants.EMAIL_NOTIFICATION, plugins.get(NotificationConstants.EMAIL_NOTIFICATION));
+            }else if(!plugins.containsKey(notificationType)){
+                LOG.warn("No NotificationPlugin supports this notificationType " + notificationType);
+            }else {
+                notifications.put(notificationType, plugins.get(notificationType));
+            }
+        }
+        return notifications;
+    }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
new file mode 100644
index 0000000..350ecb5
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.utils;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Common methods for Notification Plugin
+ */
+public class NotificationPluginUtils {
+	/**
+	 * Fetch Notification specific property value
+	 * @param key
+	 * @return
+	 * @throws Exception
+     */
+	public static String getPropValue(Config config, String key ) throws Exception {
+		if( config.getObject("eagleNotificationProps") == null )
+			throw new Exception("Eagle Notification Properties not found in application.conf ");
+		ConfigObject notificationConf = config.getObject("eagleNotificationProps");
+		return notificationConf.get(key).unwrapped().toString();
+	}
+
+	/**
+	 * Deserialize Notification Definition and convert all config to Key Value Pairs
+	 * @param notificationDef
+	 * @return
+	 * @throws Exception
+     */
+	public static List<Map<String,String>> deserializeNotificationConfig( String notificationDef ) throws Exception {
+		ObjectMapper mapper = new ObjectMapper();
+		CollectionType mapCollectionType = mapper.getTypeFactory().constructCollectionType(List.class, Map.class);
+		return mapper.readValue( notificationDef , mapCollectionType);
+	}
+
+	/**
+	 * Object to JSON String
+	 * @param obj
+	 * @return
+	 * @throws Exception
+     */
+	public static String objectToStr( Object obj ) throws  Exception {
+		ObjectMapper mapper = new ObjectMapper();
+		return mapper.writeValueAsString(obj);
+	}
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/ALERT_DEFAULT.vm b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/ALERT_DEFAULT.vm
new file mode 100644
index 0000000..4ceabad
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/ALERT_DEFAULT.vm
@@ -0,0 +1,275 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+	<head>
+		<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+		<meta name="viewport" content="width=device-width"/>
+		<style>
+			body {
+				width:100% !important;
+				min-width: 100%;
+				-webkit-text-size-adjust:100%;
+				-ms-text-size-adjust:100%;
+				margin:0;
+				padding:0;
+			}
+
+			table {
+				border-spacing: 0;
+				border-collapse: collapse;
+			}
+
+			table th,
+			table td {
+				padding: 3px 0 3px 0;
+			}
+
+			.body {
+				width: 100%;
+			}
+
+			p,a,h1,h2,h3,ul,ol,li {
+				font-family: Helvetica, Arial, sans-serif;
+				font-weight: normal;
+				margin: 0;
+				padding: 0;
+			}
+			p {
+				font-size: 14px;
+				line-height: 19px;
+			}
+			a {
+				color: #3294b1;
+			}
+			h1 {
+				font-size: 36px;
+				margin: 15px 0 5px 0;
+			}
+			h2 {
+				font-size: 32px;
+			}
+			h3 {
+				font-size: 28px;
+			}
+
+			ul,ol {
+				margin: 0 0 0 25px;
+				padding: 0;
+			}
+
+			.btn {
+				background: #2ba6cb !important;
+				border: 1px solid #2284a1;
+				padding: 10px 20px 10px 20px;
+				text-align: center;
+			}
+			.btn:hover {
+				background: #2795b6 !important;
+			}
+			.btn a {
+				color: #FFFFFF;
+				text-decoration: none;
+				font-weight: bold;
+				padding: 10px 20px 10px 20px;
+			}
+
+			.tableBordered {
+				border-top: 1px solid #b9e5ff;
+			}
+			.tableBordered th {
+				background: #ECF8FF;
+			}
+			.tableBordered th p {
+				font-weight: bold;
+				color: #3294b1;
+			}
+			.tableBordered th,
+			.tableBordered td {
+				color: #333333;
+				border-bottom: 1px solid #b9e5ff;
+				text-align: center;
+				padding-bottom: 5px;
+			}
+
+			.panel {
+				height: 100px;
+			}
+		</style>
+	</head>
+	<body>
+		#set ( $elem = $alertList[0] )
+		#set ( $alertUrl = $elem["alertDetailUrl"] )
+		#set ( $policyUrl = $elem["policyDetailUrl"] )
+		<table class="body">
+			<tr>
+				<td align="center" valign="top" style="background: #999999; padding: 0 0 0 0;">
+					<!-- Eagle Header -->
+					<table width="580">
+						<tr>
+							<td style="padding: 0 0 0 0;" align="left" >
+								<p style="color:#FFFFFF;font-weight: bold; font-size: 24px">Eagle</p>
+							</td>
+							<td style="padding: 0 0 0 0;" align="right">
+								<p style="color:#FFFFFF;font-weight: bold;">DAM Alert</p>
+							</td>
+						</tr>
+					</table>
+				</td>
+			</tr>
+
+			<tr>
+				<td align="center" valign="top">
+					<!-- Eagle Body -->
+					<table width="580">
+						<tr>
+							<!-- Title -->
+							<td align="center">
+								<h1>Malicious Data Operation Detected</h1>
+							</td>
+						</tr>
+						<tr>
+							<!-- Time -->
+							<td>
+								<table width="580">
+									<tr>
+										<td>
+											<p><b>Detected Time: $elem["alertTimestamp"]</b></p>
+										</td>
+										#set ( $severity = $elem["severity"] )
+										#if (!$severity || ("$severity" == ""))
+											#set ( $elem["severity"] = "WARNING")
+										#end
+										<td align="right">
+											<p><b>
+												Severity:
+									            #if ($elem["severity"] == "WARNING")
+													<span>$elem["severity"]</span>												
+    											#else
+													<span style="color: #FF0000;">$elem["severity"]</span>
+    											#end
+											</b></p>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- Description -->
+							<td valign="top" style="background: #ECF8FF; border: 1px solid #b9e5ff; padding: 10px 10px 12px 10px;">
+								<p>$elem["alertMessage"]</p>
+							</td>
+						</tr>
+						<tr>
+							<!-- View Detail -->
+							<td align="center" style="padding: 10px 0 0 0;">
+								<table width="580">
+									<tr>
+										<td class="btn">
+											<a href="$alertUrl">View Alert Details on Eagle Web</a>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- Basic Information -->
+							<td style="padding: 20px 0 0 0;">
+								<p><b>Basic Information:</b></p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Basic Information Content -->
+							<td>
+								<table class="tableBordered" width="580">
+									<tr>
+										<th>
+											<p>Site</p>
+										</th>
+										<th>
+											<p>Data Source</p>
+										</th>
+									</tr>
+									<tr>
+										<td>
+											<p>$elem["site"]</p>
+										</td>
+										<td>
+											<p>$elem["dataSource"]</p>
+										</td>
+									</tr>
+									<tr>
+										<th>
+											<p>Alert Type</p>
+										</th>
+										<th>
+											<p>Policy Name</p>
+										</th>
+										<th>
+											<p>Severity</p>
+										</th>
+									</tr>
+									<tr>
+										<td>
+											<p>DAM Alert</p>
+										</td>									
+										<td>
+											<p>$elem["policyId"]</p>
+										</td>
+										<td>
+											<p>$elem["severity"]</p>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>
+						<tr>
+							<!-- View Detail -->
+							<td align="center" style="padding: 10px 0 0 0;">
+								<table width="580">
+									<tr>
+										<td class="btn">
+											<a href="$policyUrl">View Policy Details on Eagle Web</a>
+										</td>
+									</tr>
+								</table>
+							</td>
+						</tr>						
+						<tr>
+							<!-- Actions Required -->
+							<td style="padding: 20px 0 0 0;">
+								<p><b>Actions Required:</b></p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Possible Root Causes Content -->
+							<td class="panel" valign="top" style="background: #F4F4F4; border: 1px solid #AAAAAA; padding: 10px 10px 12px 10px;">
+								<p> Malicious data operation found, please check.</p>
+							</td>
+						</tr>
+						<tr>
+							<!-- Copyright -->
+							<td align="center">
+								<p><a href="http://123.xyz.com/alerts/alertlist.html">Copyright 2014 @ Hadoop Eagle</a></p>
+							</td>
+						</tr>
+					</table>
+				</td>
+			</tr>
+		</table>
+	</body>
+</html>
\ No newline at end of file
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/application.conf
similarity index 63%
copy from eagle-samples/eagle-persist-sample/src/main/resources/application.conf
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/application.conf
index 6c4d261..d57172b 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/application.conf
@@ -16,71 +16,54 @@
 {
   "envContextConfig" : {
     "env" : "storm",
-    "mode" : "local",
-    "topologyName" : "persistTestTopology",
-    "stormConfigFile" : "persist-test-storm.yaml",
+    "mode" : "cluster",
+    "topologyName" : "sandbox-hdfsAuditLog-topology",
+    "stormConfigFile" : "security-auditlog-storm.yaml",
     "parallelismConfig" : {
       "kafkaMsgConsumer" : 1,
       "hdfsAuditLogAlertExecutor*" : 1
     }
   },
   "dataSourceConfig": {
-    "topic" : "persist_test_log",
-    "zkConnection" : "localhost:2181",
+    "topic" : "sandbox_hdfs_audit_log",
+    "zkConnection" : "127.0.0.1:2181",
+    "brokerZkPath" : "/brokers",
     "zkConnectionTimeoutMS" : 15000,
-    "consumerGroupId" : "EagleConsumer",
     "fetchSize" : 1048586,
-    "deserializerClass" : "org.apache.eagle.persist.test.MetricSerializer",
-    "transactionZKServers" : "localhost",
+    "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
+    "transactionZKServers" : "127.0.0.1",
     "transactionZKPort" : 2181,
     "transactionZKRoot" : "/consumers",
+    "consumerGroupId" : "eagle.hdfsaudit.consumer",
     "transactionStateUpdateMS" : 2000
   },
   "alertExecutorConfigs" : {
      "hdfsAuditLogAlertExecutor" : {
        "parallelism" : 1,
-       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner",
        "needValidation" : "true"
      }
   },
-  "persistExecutorConfigs" {
-    "persistExecutor1" : {
-      "kafka": {
-        "bootstrap_servers" : "www.xyz.com",
-        "topics" : {
-          "defaultOutput" : "downSampleOutput"
-        }
-      }
-    }
-  },
-  "aggregateExecutorConfigs" : {
-    "aggregateExecutor1" : {
-      "parallelism" : 1,
-      "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-      "needValidation" : "true"
-    }
-  },
   "eagleProps" : {
     "site" : "sandbox",
-    "dataSource": "persistTest",
+    "dataSource": "hdfsAuditLog",
   	"dataJoinPollIntervalSec" : 30,
-    "mailHost" : "mailHost.com",
+    "mailHost" : "mailhost.com",
     "mailSmtpPort":"25",
     "mailDebug" : "true",
-    "balancePartitionEnabled" : true,
-    #"partitionRefreshIntervalInMin" : 60,
-    #"kafkaStatisticRangeInMin" : 60,
     "eagleService": {
       "host": "localhost",
       "port": 9099,
       "username": "admin",
       "password": "secret"
-    },
-    "readHdfsUserCommandPatternFrom" : "file"
+    }
   },
   "dynamicConfigSource" : {
   	"enabled" : true,
   	"initDelayMillis" : 0,
   	"delayMillis" : 30000
+  },
+  "eagleNotificationProps" : {
+    "kafka_broker":"192.168.56.101:6667"
   }
 }
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/notification-plugins-init.sh b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/notification-plugins-init.sh
new file mode 100644
index 0000000..0293f9d
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/notification-plugins-init.sh
@@ -0,0 +1,66 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with`
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CUR_DIR=$(dirname $0)
+source $CUR_DIR/../../../../../../eagle-assembly/src/main/bin/eagle-env.sh
+
+#####################################################################
+#     Import notification plugin configuration into Eagle Service   #
+#####################################################################
+
+## AlertNotificationService : schema for notifcation plugin configuration
+echo ""
+echo "Importing notification plugin configurations ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertNotificationService" \
+ -d '
+ [
+     {
+       "prefix": "alertNotifications",
+       "tags": {
+         "notificationType": "email"
+       },
+       "className": "org.apache.eagle.notification.plugin.AlertEmailPlugin",
+       "description": "send alert to email",
+       "enabled":true
+     },
+     {
+       "prefix": "alertNotifications",
+       "tags": {
+         "notificationType": "kafka"
+       },
+       "className": "org.apache.eagle.notification.plugin.AlertKafkaPlugin",
+       "description": "send alert to kafka bus",
+       "enabled":true
+     },
+     {
+       "prefix": "alertNotifications",
+       "tags": {
+         "notificationType": "eagleStore"
+       },
+       "className": "org.apache.eagle.notification.plugin.AlertEagleStorePlugin",
+       "description": "send alert to eagle store",
+       "enabled":true
+     }
+ ]
+ '
+
+## Finished
+echo ""
+echo "Finished initialization for alert notification plugins"
+
+exit 0
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java
new file mode 100644
index 0000000..74c8e40
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.plugin.AlertEagleStorePlugin;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+/**
+ * Created on 2/11/16.
+ */
+public class TestAlertEagleStorePlugin {
+    @Ignore // only work when eagle service is up
+    @Test
+    public void testEagleStorePlugin() throws Exception{
+        AlertEagleStorePlugin plugin = new AlertEagleStorePlugin();
+        Config config = ConfigFactory.load();
+        AlertDefinitionAPIEntity def = new AlertDefinitionAPIEntity();
+        def.setNotificationDef("");
+        plugin.init(config, Arrays.asList(def));
+
+        AlertAPIEntity alert = new AlertAPIEntity();
+        alert.setDescription("");
+        plugin.onAlert(alert);
+        Assert.assertTrue(plugin.getStatus().successful);
+    }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java
new file mode 100644
index 0000000..b44c238
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.notification.plugin.AlertEmailPlugin;
+import org.apache.eagle.policy.common.Constants;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+/**
+ * Created on 2/11/16.
+ */
+public class TestAlertEmailPlugin {
+    @Ignore // only works when there is correct email setup and eagle service
+    @Test
+    public void testAlertEmailPlugin() throws Exception{
+        AlertEmailPlugin plugin = new AlertEmailPlugin();
+        Config config = ConfigFactory.load();
+        AlertDefinitionAPIEntity def = new AlertDefinitionAPIEntity();
+        def.setTags(new HashMap<String, String>());
+        def.getTags().put(Constants.POLICY_ID, "testPolicyId");
+        def.setNotificationDef("[{\"notificationType\":\"email\",\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"last check point time lag found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]");
+        plugin.init(config, Arrays.asList(def));
+
+        AlertAPIEntity alert = new AlertAPIEntity();
+        alert.setTags(new HashMap<String, String>());
+        alert.getTags().put(Constants.POLICY_ID, "testPolicyId");
+        alert.setDescription("");
+        alert.setAlertContext(new AlertContext());
+        plugin.onAlert(alert);
+        Assert.assertTrue(plugin.getStatus().successful);
+    }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java
new file mode 100644
index 0000000..b5ed63e
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.notification.plugin.AlertKafkaPlugin;
+import org.apache.eagle.notification.plugin.KafkaProducerSingleton;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class TestAlertKafkaPlugin {
+	@Ignore // only work when kafka is ready for use
+	@Test
+	public void testAlertToKafkaBus() throws Exception
+	{
+		AlertKafkaPlugin plugin = new AlertKafkaPlugin();
+		Config config = ConfigFactory.load();
+		AlertDefinitionAPIEntity def = new AlertDefinitionAPIEntity();
+		def.setTags(new HashMap<String, String>());
+		def.getTags().put(Constants.POLICY_ID, "testPolicyId");
+		def.setNotificationDef("[{\"notificationType\":\"kafka\",\"topic\":\"testTopic\"}]");
+		plugin.init(config, Arrays.asList(def));
+
+		AlertAPIEntity alert = new AlertAPIEntity();
+		alert.setTags(new HashMap<String, String>());
+		alert.getTags().put(Constants.POLICY_ID, "testPolicyId");
+		alert.setDescription("");
+		alert.setAlertContext(new AlertContext());
+		plugin.onAlert(alert);
+		Thread.sleep(1000); // wait for message sent out
+		Assert.assertTrue(plugin.getStatus().successful);
+	}
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestGetAllNotifications.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestGetAllNotifications.java
new file mode 100644
index 0000000..2749648
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestGetAllNotifications.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.notification.dao.AlertNotificationDAO;
+import org.apache.eagle.notification.dao.AlertNotificationDAOImpl;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestGetAllNotifications {
+    @Ignore
+    @Test
+    public void  getAllNotification() throws Exception {
+        Config config = EagleConfigFactory.load().getConfig();
+        AlertNotificationDAO dao = new AlertNotificationDAOImpl( new EagleServiceConnector(config));
+        List<AlertNotificationEntity> list = dao.findAlertNotificationTypes();
+        System.out.println(" Fetch all Notifications : "+list);
+    }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginLoader.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginLoader.java
new file mode 100644
index 0000000..624343b
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginLoader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.plugin.NotificationPluginLoader;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Created on 2/10/16.
+ */
+public class TestNotificationPluginLoader {
+    @Ignore //only work when connected to eagle service
+    @Test
+    public void testLoader(){
+        Config config = ConfigFactory.load();
+        NotificationPluginLoader loader = NotificationPluginLoader.getInstance();
+        loader.init(config);
+        Assert.assertTrue(loader.getNotificationMapping().keySet().contains(NotificationConstants.EAGLE_STORE));
+        Assert.assertTrue(loader.getNotificationMapping().keySet().contains(NotificationConstants.KAFKA_STORE));
+        Assert.assertTrue(loader.getNotificationMapping().keySet().contains(NotificationConstants.EMAIL_NOTIFICATION));
+    }
+}
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/application.conf
similarity index 62%
copy from eagle-samples/eagle-persist-sample/src/main/resources/application.conf
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/application.conf
index 6c4d261..d57172b 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/application.conf
@@ -16,71 +16,54 @@
 {
   "envContextConfig" : {
     "env" : "storm",
-    "mode" : "local",
-    "topologyName" : "persistTestTopology",
-    "stormConfigFile" : "persist-test-storm.yaml",
+    "mode" : "cluster",
+    "topologyName" : "sandbox-hdfsAuditLog-topology",
+    "stormConfigFile" : "security-auditlog-storm.yaml",
     "parallelismConfig" : {
       "kafkaMsgConsumer" : 1,
       "hdfsAuditLogAlertExecutor*" : 1
     }
   },
   "dataSourceConfig": {
-    "topic" : "persist_test_log",
-    "zkConnection" : "localhost:2181",
+    "topic" : "sandbox_hdfs_audit_log",
+    "zkConnection" : "127.0.0.1:2181",
+    "brokerZkPath" : "/brokers",
     "zkConnectionTimeoutMS" : 15000,
-    "consumerGroupId" : "EagleConsumer",
     "fetchSize" : 1048586,
-    "deserializerClass" : "org.apache.eagle.persist.test.MetricSerializer",
-    "transactionZKServers" : "localhost",
+    "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
+    "transactionZKServers" : "127.0.0.1",
     "transactionZKPort" : 2181,
     "transactionZKRoot" : "/consumers",
+    "consumerGroupId" : "eagle.hdfsaudit.consumer",
     "transactionStateUpdateMS" : 2000
   },
   "alertExecutorConfigs" : {
      "hdfsAuditLogAlertExecutor" : {
        "parallelism" : 1,
-       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner",
        "needValidation" : "true"
      }
   },
-  "persistExecutorConfigs" {
-    "persistExecutor1" : {
-      "kafka": {
-        "bootstrap_servers" : "www.xyz.com",
-        "topics" : {
-          "defaultOutput" : "downSampleOutput"
-        }
-      }
-    }
-  },
-  "aggregateExecutorConfigs" : {
-    "aggregateExecutor1" : {
-      "parallelism" : 1,
-      "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-      "needValidation" : "true"
-    }
-  },
   "eagleProps" : {
     "site" : "sandbox",
-    "dataSource": "persistTest",
+    "dataSource": "hdfsAuditLog",
   	"dataJoinPollIntervalSec" : 30,
-    "mailHost" : "mailHost.com",
+    "mailHost" : "mailhost.com",
     "mailSmtpPort":"25",
     "mailDebug" : "true",
-    "balancePartitionEnabled" : true,
-    #"partitionRefreshIntervalInMin" : 60,
-    #"kafkaStatisticRangeInMin" : 60,
     "eagleService": {
       "host": "localhost",
       "port": 9099,
       "username": "admin",
       "password": "secret"
-    },
-    "readHdfsUserCommandPatternFrom" : "file"
+    }
   },
   "dynamicConfigSource" : {
   	"enabled" : true,
   	"initDelayMillis" : 0,
   	"delayMillis" : 30000
+  },
+  "eagleNotificationProps" : {
+    "kafka_broker":"192.168.56.101:6667"
   }
 }
\ No newline at end of file
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/log4j.properties
similarity index 66%
copy from eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/log4j.properties
index 4a22987..3499c46 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/log4j.properties
@@ -13,28 +13,23 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.rootLogger=DEBUG, stdout, DRFA
+log4j.rootLogger=INFO, stdout
 
-eagle.log.dir=./logs
-eagle.log.file=eagle.log
+ eagle.log.dir=./logs
+ eagle.log.file=eagle.log
 
-
-#log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
-#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
-log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG
-#log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
 
 # Daily Rolling File Appender
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
-log4j.appender.DRFA.DatePattern=yyyy-MM-dd
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
 ## 30-day backup
 # log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
 
 # Pattern format: Date LogLevel LoggerName LogMessage
 log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-process/pom.xml b/eagle-core/eagle-alert/eagle-alert-process/pom.xml
index 012cc64..fdf059d 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/pom.xml
+++ b/eagle-core/eagle-alert/eagle-alert-process/pom.xml
@@ -113,6 +113,11 @@
             <artifactId>eagle-stream-process-base</artifactId>
             <version>${project.version}</version>
         </dependency>
+	<dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-alert-notification-plugin</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 	</dependencies>
 
     <build>
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
deleted file mode 100644
index 1147328..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * 
- */
-package org.apache.eagle.alert.notification;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.common.AlertEmailSender;
-import org.apache.eagle.alert.email.AlertEmailComponent;
-import org.apache.eagle.alert.email.AlertEmailContext;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.ConfigObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AlertEmailGenerator{
-	private String tplFile;
-	private String sender;
-	private String recipients;
-	private String subject;
-	private ConfigObject eagleProps;
-
-    private ThreadPoolExecutor executorPool;
-
-    private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
-
-    private final static long MAX_TIMEOUT_MS =60000;
-
-    public void sendAlertEmail(AlertAPIEntity entity) {
-		sendAlertEmail(entity, recipients, null);
-	}
-	
-	public void sendAlertEmail(AlertAPIEntity entity, String recipients) {
-		sendAlertEmail(entity, recipients, null);	
-	}
-	
-	public void sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) {
-		AlertEmailContext email = new AlertEmailContext();
-		
-		AlertEmailComponent component = new AlertEmailComponent();
-		component.setAlertContext(entity.getAlertContext());
-		List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
-		components.add(component);		
-		email.setComponents(components);
-		if (entity.getAlertContext().getProperty(Constants.SUBJECT) != null) {
-			email.setSubject(entity.getAlertContext().getProperty(Constants.SUBJECT));
-		}
-		else email.setSubject(subject);
-		email.setVelocityTplFile(tplFile);
-		email.setRecipients(recipients);
-		email.setCc(cc);
-		email.setSender(sender);
-		
-		/** asynchronized email sending */
-		@SuppressWarnings("rawtypes")
-        AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
-
-        if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
-
-        LOG.info("Sending email  in asynchronous to: "+recipients+", cc: "+cc);
-        Future future = this.executorPool.submit(thread);
-        try {
-            future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
-            LOG.info(String.format("Successfully send email to %s", recipients));
-        } catch (InterruptedException | ExecutionException  e) {
-            LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e);
-        } catch (TimeoutException e) {
-            LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
-        }
-    }
-	
-	public String getTplFile() {
-		return tplFile;
-	}
-	
-	public void setTplFile(String tplFile) {
-		this.tplFile = tplFile;
-	}
-
-	public String getSender() {
-		return sender;
-	}
-
-	public void setSender(String sender) {
-		this.sender = sender;
-	}
-
-	public String getRecipients() {
-		return recipients;
-	}
-
-	public void setRecipients(String recipients) {
-		this.recipients = recipients;
-	}
-
-	public String getSubject() {
-		return subject;
-	}
-
-	public void setSubject(String subject) {
-		this.subject = subject;
-	}
-
-	public ConfigObject getEagleProps() {
-		return eagleProps;
-	}
-
-	public void setEagleProps(ConfigObject eagleProps) {
-		this.eagleProps = eagleProps;
-	}
-
-    public void setExecutorPool(ThreadPoolExecutor executorPool) {
-        this.executorPool = executorPool;
-    }
-}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGeneratorBuilder.java
deleted file mode 100644
index 0303476..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGeneratorBuilder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.notification;
-
-import java.util.concurrent.ThreadPoolExecutor;
-
-import com.typesafe.config.ConfigObject;
-
-public class AlertEmailGeneratorBuilder {
-	private AlertEmailGenerator generator;
-	private AlertEmailGeneratorBuilder(){
-		generator = new AlertEmailGenerator();
-	}
-	public static AlertEmailGeneratorBuilder newBuilder(){
-		return new AlertEmailGeneratorBuilder();
-	}
-	public AlertEmailGeneratorBuilder withSubject(String subject){
-		generator.setSubject(subject);
-		return this;
-	}
-	public AlertEmailGeneratorBuilder withSender(String sender){
-		generator.setSender(sender);
-		return this;
-	}
-	public AlertEmailGeneratorBuilder withRecipients(String recipients){
-		generator.setRecipients(recipients);
-		return this;
-	}
-	public AlertEmailGeneratorBuilder withTplFile(String tplFile){
-		generator.setTplFile(tplFile);
-		return this;
-	}
-	public AlertEmailGeneratorBuilder withEagleProps(ConfigObject eagleProps) {
-		generator.setEagleProps(eagleProps);
-		return this;
-	}
-    public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
-        generator.setExecutorPool(threadPoolExecutor);
-        return this;
-    }
-
-    public AlertEmailGenerator build(){
-		return this.generator;
-	}
-}
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
index 7200865..06ecb0d 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
@@ -16,129 +16,65 @@
  */
 package org.apache.eagle.alert.notification;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.eagle.alert.config.EmailNotificationConfig;
+import org.apache.eagle.notification.plugin.NotificationPluginManagerImpl;
 import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
 import org.apache.eagle.alert.entity.AlertAPIEntity;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
 import org.apache.eagle.policy.DynamicPolicyLoader;
 import org.apache.eagle.policy.PolicyLifecycleMethods;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor1;
 import org.apache.eagle.datastream.Tuple1;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
 import com.typesafe.config.Config;
 
 /**
- * notify alert by email, sms or other means
- * currently we only implements email notification
+ * notify alert by email, kafka message, storage or other means
  */
 public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
-
 	private static final long serialVersionUID = 1690354365435407034L;
 	private static final Logger LOG = LoggerFactory.getLogger(AlertNotificationExecutor.class);
 	private Config config;
+	/** Notification Manager - Responsible for forward and invoke configured Notification Plugin **/
+	private NotificationPluginManagerImpl notificationManager;
 
 	private List<String> alertExecutorIdList;
-	private volatile CopyOnWriteHashMap<String, List<AlertEmailGenerator>> alertEmailGeneratorsMap;
 	private PolicyDefinitionDAO dao;
 
-    private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
-    private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
-    private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
-
-    private transient ThreadPoolExecutor executorPool;
 
     public AlertNotificationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO dao){
 		this.alertExecutorIdList = alertExecutorIdList;
 		this.dao = dao;
 	}
-	
-	public List<AlertEmailGenerator> createAlertEmailGenerator(AlertDefinitionAPIEntity alertDef) {
-		Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email"));
-		EmailNotificationConfig[] emailConfigs = new EmailNotificationConfig[0];
-		try {			
-			emailConfigs = JsonSerDeserUtils.deserialize(alertDef.getNotificationDef(), EmailNotificationConfig[].class, Arrays.asList(module));
+
+	@Override
+	public void init() {
+		String site = config.getString("eagleProps.site");
+		String dataSource = config.getString("eagleProps.dataSource");
+		Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
+		try {
+			initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId( site, dataSource );
 		}
 		catch (Exception ex) {
-			LOG.warn("Initial emailConfig error, wrong format or it's error " + ex.getMessage());
+			LOG.error("fail to initialize initialAlertDefs: ", ex);
+			throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
 		}
-		List<AlertEmailGenerator> gens = new ArrayList<AlertEmailGenerator>();
-		if (emailConfigs == null) {
-			return gens;		
-		}
-		for(EmailNotificationConfig emailConfig : emailConfigs) {
-			String tplFileName = emailConfig.getTplFileName();			
-			if (tplFileName == null || tplFileName.equals("")) { // empty tplFileName, use default tpl file name
-				tplFileName = "ALERT_DEFAULT.vm";
-			}
-			AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
-																withEagleProps(config.getObject("eagleProps")).
-																withSubject(emailConfig.getSubject()).
-																withSender(emailConfig.getSender()).
-																withRecipients(emailConfig.getRecipients()).
-																withTplFile(tplFileName).
-                                                                withExecutorPool(executorPool).
-																build();
-			gens.add(gen);
-		}
-		return gens;
-	}
-	
-	/**
-	 * 1. register both file and database configuration
-	 * 2. create email generator from configuration
-	 */
-    @Override
-	public void init(){
-        executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
 
-		Map<String, List<AlertEmailGenerator>> tmpEmailGenerators = new HashMap<String, List<AlertEmailGenerator>> ();
-		
-        String site = config.getString("eagleProps.site");
-        String dataSource = config.getString("eagleProps.dataSource");
-	    Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
-	    try {
-	 		initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
-	    }
-	    catch (Exception ex) {
- 			LOG.error("fail to initialize initialAlertDefs: ", ex);
-	        throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
-        }
- 	   
-        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
-            LOG.warn("No alert definitions found for site: "+site+", dataSource: "+dataSource);
-        }
-        else {
-		    for (String alertExecutorId: alertExecutorIdList) {
-                if(initialAlertDefs.containsKey(alertExecutorId)) {
-                    for (AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()) {
-                        List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
-                        tmpEmailGenerators.put(alertDef.getTags().get("policyId"), gens);
-                    }
-                }else{
-                    LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId));
-                }
-		    }
-        }
-		
-		alertEmailGeneratorsMap = new CopyOnWriteHashMap<String, List<AlertEmailGenerator>>();
-		alertEmailGeneratorsMap.putAll(tmpEmailGenerators);				
+		if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
+			LOG.warn("No alert definitions found for site: "+site+", dataSource: "+dataSource);
+		}
+		try{
+			notificationManager = new NotificationPluginManagerImpl(config);
+		}catch (Exception ex ){
+			LOG.error("Fail to initialize NotificationManager: ", ex);
+			throw new IllegalStateException("Fail to initialize NotificationManager: ", ex);
+		}
+
 		DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
 		policyLoader.init(initialAlertDefs, dao, config);
 		for (String alertExecutorId : alertExecutorIdList) {
@@ -146,32 +82,20 @@
 		}
 	}
 
-    @Override
+	@Override
 	public void prepareConfig(Config config) {
 		this.config = config;
 	}
 
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
-        String policyId = (String) input.get(0);
-        AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
-        processAlerts(policyId, Arrays.asList(alertEntity));
-    }
-	
-	//TODO: add a thread pool for email sender?
-	private void processAlerts(String policyId, List<AlertAPIEntity> list) {
-		List<AlertEmailGenerator> generators;
-		synchronized(alertEmailGeneratorsMap) {		
-			generators = alertEmailGeneratorsMap.get(policyId);
-		}
-		if (generators == null) {
-			LOG.warn("Notification config of policyId " + policyId + " has been deleted");
-			return;
-		}
+	@Override
+	public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
+		AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
+		processAlerts(Arrays.asList(alertEntity));
+	}
+
+	private void processAlerts(List<AlertAPIEntity> list) {
 		for (AlertAPIEntity entity : list) {
-			for(AlertEmailGenerator generator : generators){
-				generator.sendAlertEmail(entity);
-			}
+			notificationManager.notifyAlert(entity);
 		}
 	}
 
@@ -180,11 +104,8 @@
 		if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
 		for(AlertDefinitionAPIEntity alertDef : added.values()){
 			LOG.info("alert notification config really changed " + alertDef);
-			List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
-			synchronized(alertEmailGeneratorsMap) {		
-				alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
-			}
-		}		
+			notificationManager.updateNotificationPlugins( alertDef , false );
+		}
 	}
 
 	@Override
@@ -192,11 +113,8 @@
 		if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be added : " + changed);
 		for(AlertDefinitionAPIEntity alertDef : changed.values()){
 			LOG.info("alert notification config really added " + alertDef);
-			List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
-			synchronized(alertEmailGeneratorsMap) {					
-				alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
-			}
-		}			
+			notificationManager.updateNotificationPlugins( alertDef , false );
+		}
 	}
 
 	@Override
@@ -204,9 +122,7 @@
 		if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be deleted : " + deleted);
 		for(AlertDefinitionAPIEntity alertDef : deleted.values()){
 			LOG.info("alert notification config really deleted " + alertDef);
-			synchronized(alertEmailGeneratorsMap) {		
-				alertEmailGeneratorsMap.remove(alertDef.getTags().get("policyId"));
-			}
-		}		
+			notificationManager.updateNotificationPlugins( alertDef , true );
+		}
 	}
-}
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/pom.xml b/eagle-core/eagle-alert/pom.xml
index 6bd9bdf..43f6675 100644
--- a/eagle-core/eagle-alert/pom.xml
+++ b/eagle-core/eagle-alert/pom.xml
@@ -33,5 +33,6 @@
 		<module>eagle-alert-base</module>
 		<module>eagle-alert-process</module>
 		<module>eagle-alert-service</module>
-    </modules>
+		<module>eagle-alert-notification-plugin</module>
+	</modules>
 </project>
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
index 0c2732b..0dc9acc 100644
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
@@ -70,11 +70,16 @@
             <artifactId>eagle-storage-base</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
+	<dependency>
+       		<groupId>log4j</groupId>
+		<artifactId>log4j</artifactId>
+      	</dependency>
+<!--        <dependency>
             <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-simple</artifactId>
+            <artifactId>slf4j-api</artifactId>
             <scope>test</scope>
         </dependency>
+-->
         <!--<dependency>-->
             <!--<groupId>org.scala-lang</groupId>-->
             <!--<artifactId>scala-reflect</artifactId>-->
@@ -150,4 +155,4 @@
             </plugin>
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
index 6933a5f..5df7e55 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
@@ -50,4 +50,5 @@
 		}
 		return mapper.readValue(value, cls);	
 	}
+
 }
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
index f1aa065..36bcaec 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
@@ -153,6 +153,7 @@
 		return evalContext.policyId;
 	}
 	
+	@Override
 	public Map<String, String> getAdditionalContext() {
 		return this.context;
 	}
@@ -166,4 +167,4 @@
 
 	@Override
 	public String getMarkdownReason() { return null; }
-}
\ No newline at end of file
+}
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
index ad518e9..0525126 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
@@ -28,5 +28,6 @@
 		entitySet.add(AlertStreamEntity.class);
 		entitySet.add(AlertDataSourceEntity.class);
         entitySet.add(AlertExecutorEntity.class);
+		entitySet.add(AlertNotificationEntity.class);
 	}
 }
\ No newline at end of file
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
new file mode 100644
index 0000000..6bc6318
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.apache.eagle.policy.common.Constants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertNotifications")
+@ColumnFamily("f")
+@Prefix("alertNotifications")
+@Service(Constants.ALERT_NOTIFICATION_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"notificationType"})
+public class AlertNotificationEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String className;
+    public String getClassName(){
+        return className;
+    }
+    public void setClassName(String className){
+        this.className = className;
+        valueChanged("className");
+    }
+
+    @Column("b")
+    private String description;
+    public String getDescription(){
+        return description;
+    }
+    public void setDescription(String description){
+        this.description = description;
+        valueChanged("description");
+    }
+
+    @Column("c")
+    private boolean enabled;
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    public void setEnabled(boolean enabled) {
+        this.enabled = enabled;
+        valueChanged("enabled");
+    }
+}
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
index c5d8fea..256e46b 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
@@ -16,8 +16,6 @@
  */
 package org.apache.eagle.policy.common;
 
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
-
 public class Constants {
 	public final static String ALERT_SERVICE_ENDPOINT_NAME = "AlertService";
 	public final static String ALERT_DEFINITION_SERVICE_ENDPOINT_NAME = "AlertDefinitionService";
@@ -25,6 +23,7 @@
 	public final static String ALERT_DATA_SOURCE_SERVICE_ENDPOINT_NAME = "AlertDataSourceService";
 	public final static String ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME = "AlertExecutorService";
 	public final static String ALERT_STREAM_SERVICE_ENDPOINT_NAME = "AlertStreamService";
+	public final static String ALERT_NOTIFICATION_SERVICE_ENDPOINT_NAME = "AlertNotificationService";
 	public static final String ALERT_EMAIL_ORIGIN_PROPERTY = "alertEmailOrigin";
 	public static final String ALERT_TIMESTAMP_PROPERTY = "alertTimestamp";
 	
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
index 982d8c2..6c580f6 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
@@ -228,18 +228,15 @@
         boolean needValidation = !config.hasPath(needValidationConfigKey) || config.getBoolean(needValidationConfigKey);
 
 		AbstractPolicyDefinition policyDef = null;
-		try {
-			policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class, 
-					PolicyManager.getInstance().getPolicyModules(policyType));
-		} catch (Exception ex) {
-			LOG.error("Fail initial alert policy def: "+alertDef.getPolicyDef(), ex);
-		}
 		PolicyEvaluator<T> pe;
-		PolicyEvaluationContext<T, K> context = new PolicyEvaluationContext<>();
-		context.policyId = alertDef.getTags().get("policyId");
-		context.alertExecutor = this;
-		context.resultRender = this.getResultRender();
 		try {
+			policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class,
+					PolicyManager.getInstance().getPolicyModules(policyType));
+
+			PolicyEvaluationContext<T, K> context = new PolicyEvaluationContext<>();
+			context.policyId = alertDef.getTags().get("policyId");
+			context.alertExecutor = this;
+			context.resultRender = this.getResultRender();
 			// create evaluator instance
 			pe = (PolicyEvaluator<T>) evalCls
 					.getConstructor(Config.class, PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
diff --git a/eagle-samples/eagle-persist-sample/pom.xml b/eagle-examples/eagle-topology-example/pom.xml
similarity index 89%
rename from eagle-samples/eagle-persist-sample/pom.xml
rename to eagle-examples/eagle-topology-example/pom.xml
index cbfe6d1..8b75d72 100644
--- a/eagle-samples/eagle-persist-sample/pom.xml
+++ b/eagle-examples/eagle-topology-example/pom.xml
@@ -19,13 +19,13 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>eagle-samples</artifactId>
+        <artifactId>eagle-examples</artifactId>
         <groupId>eagle</groupId>
         <version>0.3.0</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>eagle-persist-sample</artifactId>
+    <artifactId>eagle-topology-example</artifactId>
     <dependencies>
         <dependency>
             <groupId>eagle</groupId>
@@ -49,8 +49,8 @@
             <plugin>
                 <artifactId>maven-assembly-plugin</artifactId>
                 <configuration>
-                    <descriptor>src/assembly/eagle-persist-sample-assembly.xml</descriptor>
-                    <finalName>eagle-persist-sample-${project.version}</finalName>
+                    <descriptor>src/assembly/eagle-topology-example-assembly.xml</descriptor>
+                    <finalName>eagle-topology-example-${project.version}</finalName>
                 </configuration>
                 <executions>
                     <execution>
diff --git a/eagle-samples/eagle-persist-sample/src/assembly/eagle-persist-sample-assembly.xml b/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml
similarity index 100%
rename from eagle-samples/eagle-persist-sample/src/assembly/eagle-persist-sample-assembly.xml
rename to eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml
diff --git a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java
new file mode 100644
index 0000000..a5e39a5
--- /dev/null
+++ b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.example.notificationplugin;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Created on 2/16/16.
+ */
+public class NotificationPluginTestMain {
+    public static void main(String[] args){
+        System.setProperty("config.resource", "/application-plugintest.conf");
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm();
+        env.fromSpout(createProvider(env.getConfig())).withOutputFields(2).nameAs("testSpout").alertWithConsumer("testStream", "testExecutor");
+        env.execute();
+    }
+
+    public static StormSpoutProvider createProvider(Config config) {
+        return new StormSpoutProvider(){
+
+            @Override
+            public BaseRichSpout getSpout(Config context) {
+                return new TestSpout();
+            }
+        };
+    }
+
+    public static class TestSpout extends BaseRichSpout {
+        private static final Logger LOG = LoggerFactory.getLogger(TestSpout.class);
+        private SpoutOutputCollector collector;
+        public TestSpout() {
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            Utils.sleep(5000);
+            LOG.info("emitted tuple ...");
+            Map<String, Object> map = new TreeMap<>();
+            map.put("testAttribute", "testValue");
+            collector.emit(new Values("testStream", map));
+        }
+    }
+}
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
similarity index 95%
rename from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
rename to eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
index d608104..58dfe48 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
+++ b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.eagle.persist.test;
+package org.apache.eagle.example.persist;
 
 import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
 
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
similarity index 98%
rename from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain.java
rename to eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
index c552576..8f105fc 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain.java
+++ b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.eagle.persist.test;
+package org.apache.eagle.example.persist;
 
 import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain2.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
similarity index 97%
rename from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain2.java
rename to eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
index 7768385..d9a7bbb 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain2.java
+++ b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.eagle.persist.test;
+package org.apache.eagle.example.persist;
 
 import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
 import org.apache.eagle.datastream.ExecutionEnvironments;
diff --git a/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh b/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh
new file mode 100644
index 0000000..e9288fa
--- /dev/null
+++ b/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh
@@ -0,0 +1,50 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with`
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CUR_DIR=$(dirname $0)
+source $CUR_DIR/../../../../../eagle-assembly/src/main/bin/eagle-env.sh
+
+##### delete email notification ##########
+echo ""
+echo "Importing policy ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
+ -d '
+ [
+     {
+       "prefix": "alertdef",
+       "tags": {
+         "site": "sandbox",
+         "dataSource": "testSpout",
+         "policyId": "pluginTestPolicy",
+         "alertExecutorId": "testExecutor",
+         "policyType": "siddhiCEPEngine"
+       },
+       "description": "pluginTest",
+       "policyDef": "{\"expression\":\"from testStream[(testAttribute == \\\"testValue\\\")] select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}",
+       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]",
+       "remediationDef":"",
+       "enabled":true
+     }
+ ]
+ '
+
+## Finished
+echo ""
+echo "Finished initialization for NotificationPluginTest"
+
+exit 0
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf b/eagle-examples/eagle-topology-example/src/main/resources/application-plugintest.conf
similarity index 62%
copy from eagle-samples/eagle-persist-sample/src/main/resources/application.conf
copy to eagle-examples/eagle-topology-example/src/main/resources/application-plugintest.conf
index 6c4d261..f73e7c0 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf
+++ b/eagle-examples/eagle-topology-example/src/main/resources/application-plugintest.conf
@@ -17,52 +17,25 @@
   "envContextConfig" : {
     "env" : "storm",
     "mode" : "local",
-    "topologyName" : "persistTestTopology",
+    "topologyName" : "pluginTest",
     "stormConfigFile" : "persist-test-storm.yaml",
     "parallelismConfig" : {
-      "kafkaMsgConsumer" : 1,
-      "hdfsAuditLogAlertExecutor*" : 1
+      "testSpout" : 1,
+      "testExecutor*" : 1
     }
   },
   "dataSourceConfig": {
-    "topic" : "persist_test_log",
-    "zkConnection" : "localhost:2181",
-    "zkConnectionTimeoutMS" : 15000,
-    "consumerGroupId" : "EagleConsumer",
-    "fetchSize" : 1048586,
-    "deserializerClass" : "org.apache.eagle.persist.test.MetricSerializer",
-    "transactionZKServers" : "localhost",
-    "transactionZKPort" : 2181,
-    "transactionZKRoot" : "/consumers",
-    "transactionStateUpdateMS" : 2000
   },
   "alertExecutorConfigs" : {
-     "hdfsAuditLogAlertExecutor" : {
+     "testExecutor" : {
        "parallelism" : 1,
        "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
        "needValidation" : "true"
      }
   },
-  "persistExecutorConfigs" {
-    "persistExecutor1" : {
-      "kafka": {
-        "bootstrap_servers" : "www.xyz.com",
-        "topics" : {
-          "defaultOutput" : "downSampleOutput"
-        }
-      }
-    }
-  },
-  "aggregateExecutorConfigs" : {
-    "aggregateExecutor1" : {
-      "parallelism" : 1,
-      "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-      "needValidation" : "true"
-    }
-  },
   "eagleProps" : {
     "site" : "sandbox",
-    "dataSource": "persistTest",
+    "dataSource": "testSpout",
   	"dataJoinPollIntervalSec" : 30,
     "mailHost" : "mailHost.com",
     "mailSmtpPort":"25",
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf b/eagle-examples/eagle-topology-example/src/main/resources/application.conf
similarity index 96%
rename from eagle-samples/eagle-persist-sample/src/main/resources/application.conf
rename to eagle-examples/eagle-topology-example/src/main/resources/application.conf
index 6c4d261..aad74b3 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf
+++ b/eagle-examples/eagle-topology-example/src/main/resources/application.conf
@@ -30,7 +30,7 @@
     "zkConnectionTimeoutMS" : 15000,
     "consumerGroupId" : "EagleConsumer",
     "fetchSize" : 1048586,
-    "deserializerClass" : "org.apache.eagle.persist.test.MetricSerializer",
+    "deserializerClass" : "org.apache.eagle.example.persist.MetricSerializer",
     "transactionZKServers" : "localhost",
     "transactionZKPort" : 2181,
     "transactionZKRoot" : "/consumers",
diff --git a/eagle-examples/eagle-topology-example/src/main/resources/create-policy-for-plugin-test.sh b/eagle-examples/eagle-topology-example/src/main/resources/create-policy-for-plugin-test.sh
new file mode 100644
index 0000000..3fd2752
--- /dev/null
+++ b/eagle-examples/eagle-topology-example/src/main/resources/create-policy-for-plugin-test.sh
@@ -0,0 +1,99 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with`
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CUR_DIR=$(dirname $0)
+source $CUR_DIR/../../../../../eagle-assembly/src/main/bin/eagle-env.sh
+
+#####################################################################
+#            Import stream metadata
+#####################################################################
+
+## AlertDataSource: data sources bound to sites
+echo "Importing AlertDataSourceService for persist... "
+
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" \
+  -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"testSpout"}, "enabled": "true", "config" : " just some description", "desc":"pluginTest"}]'
+
+
+## AlertStreamService: alert streams generated from data source
+echo ""
+echo "Importing AlertStreamService for HDFS... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
+ -d '[{"prefix":"alertStream","tags":{"dataSource":"testSpout","streamName":"testStream"},"desc":"pluginTest"}]'
+
+## AlertExecutorService: what alert streams are consumed by alert executor
+echo ""
+echo "Importing AlertExecutorService for HDFS... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \
+ -d '[{"prefix":"alertExecutor","tags":{"dataSource":"testSpout","alertExecutorId":"testExecutor","streamName":"testStream"},"desc":"testStream->testExecutor"}]'
+
+## AlertStreamSchemaService: schema for event from alert stream
+echo ""
+echo "Importing AlertStreamSchemaService for HDFS... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \
+ -d '
+ [
+    {
+       "prefix": "alertStreamSchema",
+       "tags": {
+          "dataSource": "testSpout",
+          "streamName": "testStream",
+          "attrName": "testAttribute"
+       },
+       "attrDescription": "testAttribute",
+       "attrType": "string",
+       "category": "",
+       "attrValueResolver": ""
+    }
+ ]
+ '
+
+##### add policies ##########
+echo ""
+echo "Importing policy ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
+ -d '
+ [
+     {
+       "prefix": "alertdef",
+       "tags": {
+         "site": "sandbox",
+         "dataSource": "testSpout",
+         "policyId": "pluginTestPolicy",
+         "alertExecutorId": "testExecutor",
+         "policyType": "siddhiCEPEngine"
+       },
+       "description": "pluginTest",
+       "policyDef": "{\"expression\":\"from testStream[(testAttribute == \\\"testValue\\\")] select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}",
+       "dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}",
+       "notificationDef": "[{\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]",
+       "remediationDef":"",
+       "enabled":true
+     }
+ ]
+ '
+
+## Finished
+echo ""
+echo "Finished initialization for NotificationPluginTest"
+
+exit 0
diff --git a/eagle-examples/eagle-topology-example/src/main/resources/delete-email-for-plugin-test.sh b/eagle-examples/eagle-topology-example/src/main/resources/delete-email-for-plugin-test.sh
new file mode 100644
index 0000000..30d277f
--- /dev/null
+++ b/eagle-examples/eagle-topology-example/src/main/resources/delete-email-for-plugin-test.sh
@@ -0,0 +1,50 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with`
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CUR_DIR=$(dirname $0)
+source $CUR_DIR/../../../../../eagle-assembly/src/main/bin/eagle-env.sh
+
+##### delete email notification ##########
+echo ""
+echo "Importing policy ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
+ -d '
+ [
+     {
+       "prefix": "alertdef",
+       "tags": {
+         "site": "sandbox",
+         "dataSource": "testSpout",
+         "policyId": "pluginTestPolicy",
+         "alertExecutorId": "testExecutor",
+         "policyType": "siddhiCEPEngine"
+       },
+       "description": "pluginTest",
+       "policyDef": "{\"expression\":\"from testStream[(testAttribute == \\\"testValue\\\")] select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}",
+       "notificationDef": "[{\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]",
+       "remediationDef":"",
+       "enabled":true
+     }
+ ]
+ '
+
+## Finished
+echo ""
+echo "Finished initialization for NotificationPluginTest"
+
+exit 0
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties b/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
similarity index 97%
rename from eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties
rename to eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
index 4a22987..07f8402 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties
+++ b/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.rootLogger=DEBUG, stdout, DRFA
+log4j.rootLogger=INFO, stdout, DRFA
 
 eagle.log.dir=./logs
 eagle.log.file=eagle.log
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/persist-test-topo-init.sh b/eagle-examples/eagle-topology-example/src/main/resources/persist-test-topo-init.sh
similarity index 100%
rename from eagle-samples/eagle-persist-sample/src/main/resources/persist-test-topo-init.sh
rename to eagle-examples/eagle-topology-example/src/main/resources/persist-test-topo-init.sh
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/persit-test-storm.yaml b/eagle-examples/eagle-topology-example/src/main/resources/persit-test-storm.yaml
similarity index 100%
rename from eagle-samples/eagle-persist-sample/src/main/resources/persit-test-storm.yaml
rename to eagle-examples/eagle-topology-example/src/main/resources/persit-test-storm.yaml
diff --git a/eagle-samples/pom.xml b/eagle-examples/pom.xml
similarity index 92%
rename from eagle-samples/pom.xml
rename to eagle-examples/pom.xml
index f9fedc1..486ab2d 100644
--- a/eagle-samples/pom.xml
+++ b/eagle-examples/pom.xml
@@ -1,6 +1,5 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-<<<<<<< HEAD
   ~ Licensed to the Apache Software Foundation (ASF) under one or more
   ~ contributor license agreements.  See the NOTICE file distributed with
   ~ this work for additional information regarding copyright ownership.
@@ -26,10 +25,10 @@
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>eagle-samples</artifactId>
+    <artifactId>eagle-examples</artifactId>
     <packaging>pom</packaging>
     <modules>
-        <module>eagle-persist-sample</module>
+        <module>eagle-topology-example</module>
     </modules>
 
 
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 45adfb4..e1ecac3 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -69,6 +69,11 @@
             <artifactId>eagle-hadoop-metric</artifactId>
             <version>${project.version}</version>
         </dependency>
+<dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-alert-notification-plugin</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/pom.xml b/pom.xml
index f2e0128..6bf7abc 100755
--- a/pom.xml
+++ b/pom.xml
@@ -131,7 +131,7 @@
         <module>eagle-external</module>
         <module>eagle-assembly</module>
         <module>eagle-topology-assembly</module>
-        <module>eagle-samples</module>
+        <module>eagle-examples</module>
         <module>eagle-gc</module>
         <module>eagle-hadoop-metric</module>
     </modules>
@@ -165,6 +165,12 @@
         <hive.version>1.2.1</hive.version>
         <spark.core.version>1.4.0</spark.core.version>
 
+        <!--  Client -->
+        <kafka-client.version>0.9.0.0</kafka-client.version>
+
+        <!-- Reflection -->
+        <reflections.version>0.9.8</reflections.version>
+
         <!-- Common Versions -->
         <commons-cli.version>1.2</commons-cli.version>
         <commons-lang.version>2.6</commons-lang.version>
@@ -193,10 +199,9 @@
         <!-- Utility -->
         <joda-time.version>2.7</joda-time.version>
         <joda-convert.version>1.7</joda-convert.version>
-        <!--<log4j.version>1.2.17</log4j.version>-->
         <log4j.version>1.2.17</log4j.version>
-        <slf4j.version>1.6.5</slf4j.version>
-        <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
+        <slf4j.version>1.7.5</slf4j.version>
+        <log4j-over-slf4j.version>1.7.2</log4j-over-slf4j.version>
         <quartz.version>2.2.1</quartz.version>
         <scopt.version>3.3.0</scopt.version>
         <akka.actor.version>2.3.14</akka.actor.version>
@@ -336,22 +341,17 @@
                 <version>${slf4j.version}</version>
                 <scope>compile</scope>
             </dependency>
-            <!--<dependency>-->
-                <!--<groupId>org.slf4j</groupId>-->
-                <!--<artifactId>slf4j-log4j12</artifactId>-->
-                <!--<version>${slf4j.version}</version>-->
-                <!--<scope>compile</scope>-->
-            <!--</dependency>-->
             <dependency>
                 <groupId>org.slf4j</groupId>
-                <artifactId>log4j-over-slf4j</artifactId>
-                <version>${log4j-over-slf4j.version}</version>
+                <artifactId>slf4j-log4j12</artifactId>
+                <version>${slf4j.version}</version>
+                <scope>compile</scope>
             </dependency>
             <dependency>
                 <groupId>org.slf4j</groupId>
-                <artifactId>slf4j-simple</artifactId>
+                <artifactId>log4j-over-slf4j</artifactId>
                 <version>${slf4j.version}</version>
-                <scope>test</scope>
+                <scope>compile</scope>
             </dependency>
             <dependency>
                 <groupId>log4j</groupId>
@@ -898,8 +898,6 @@
                                 <exclude>**/.metadata/</exclude>
                                 <!-- Maven working directory -->
                                 <exclude>**/target/**</exclude>
-                                <exclude>**/*.json</exclude>
-                                <exclude>**/*.json.*</exclude>
                                 <!-- Patch files which can be lying around -->
                                 <exclude>**/*.patch</exclude>
                                 <exclude>**/*.rej</exclude>
@@ -909,7 +907,9 @@
                                 <exclude>README*</exclude>
                                 <exclude>**/*.log</exclude>
                                 <exclude>**/eagle.log*</exclude>
-                                <exclude>**/resources/**/*.json</exclude>
+                                <exclude>**/velocity.log*</exclude>
+                                <!-- all json files should be excluded -->
+                                <exclude>**/*.json</exclude>
                                 <exclude>**/resources/eagle.siddhiext</exclude>
                                 <exclude>**/test/resources/securityAuditLog</exclude>
                                 <exclude>**/resources/**/ml-policyDef-UserProfile.txt</exclude>
@@ -918,7 +918,8 @@
                                 <exclude>**/dev-supports/**/*.json</exclude>
                                 <exclude>**/dev-supports/**/useractivity-agg-json.txt</exclude>
                                 <exclude>**/conf/sandbox-userprofile-topology.conf</exclude>
-                                <exclude>**/hadoop_jmx_collector/**</exclude>
+                                <exclude>**/kafka-python/**</exclude>
+                                <exclude>**/six/**</exclude>
                                 <!-- Fonts and Images -->
                                 <exclude>**/fonts/**</exclude>
                                 <exclude>**/images/**</exclude>