notification plugin framework
diff --git a/eagle-assembly/src/assembly/eagle-bin.xml b/eagle-assembly/src/assembly/eagle-bin.xml
index 1c1a00f..2a04fdc 100644
--- a/eagle-assembly/src/assembly/eagle-bin.xml
+++ b/eagle-assembly/src/assembly/eagle-bin.xml
@@ -218,14 +218,6 @@
</excludes>
</fileSet>
- <!--<fileSet>-->
- <!--<directory>${project.basedir}/../eagle-samples/eagle-persist-sample/target/</directory>-->
- <!--<outputDirectory>lib/topology</outputDirectory>-->
- <!--<includes>-->
- <!--<include>eagle-persist-sample-*-assembly.jar</include>-->
- <!--</includes>-->
- <!--</fileSet>-->
-
<fileSet>
<directory>${project.basedir}/../eagle-external/eagle-ambari</directory>
<outputDirectory>lib/ambari</outputDirectory>
diff --git a/eagle-assembly/src/main/bin/eagle-create-table.rb b/eagle-assembly/src/main/bin/eagle-create-table.rb
index 21dc030..cbef6f4 100644
--- a/eagle-assembly/src/main/bin/eagle-create-table.rb
+++ b/eagle-assembly/src/main/bin/eagle-create-table.rb
@@ -56,5 +56,6 @@
createEagleTable(admin, 'appDefinition')
createEagleTable(admin, 'serviceAudit')
createEagleTable(admin, 'aggregatedef')
+createEagleTable(admin, 'alertNotifications')
exit
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/pom.xml b/eagle-core/eagle-alert/eagle-alert-notification-plugin/pom.xml
new file mode 100644
index 0000000..b73d43b
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/pom.xml
@@ -0,0 +1,90 @@
+<!--
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-alert-parent</artifactId>
+ <version>0.3.0</version>
+ </parent>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-alert-notification-plugin</artifactId>
+ <name>eagle-alert-notification-plugin</name>
+ <description>Apache Eagle Notification Plugin to enable services to use custom or default notification </description>
+ <dependencies>
+ <dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-policy-base</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-scala_${scala.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java
new file mode 100644
index 0000000..98d6b5c
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.base;
+
+public class NotificationConstants {
+ public static final String NOTIFICATION_TYPE = "notificationType";
+ public static final String EMAIL_NOTIFICATION = "email";
+ public static final String KAFKA_STORE = "kafka";
+ public static final String EAGLE_STORE = "eagleStore";
+
+ // email specific constants
+ public static final String SUBJECT = "subject";
+ public static final String SENDER = "sender";
+ public static final String RECIPIENTS = "recipients";
+ public static final String TPL_FILE_NAME = "tplFileName";
+}
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationStatus.java
similarity index 69%
copy from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationStatus.java
index d608104..d57dbf9 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationStatus.java
@@ -14,18 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.persist.test;
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
+package org.apache.eagle.notification.base;
/**
- * Created on 1/4/16.
+ * Object that holds the status of Notification Posted to Notification Plugin
*/
-public class MetricSerializer implements SpoutKafkaMessageDeserializer {
- @Override
- public Object deserialize(byte[] arg0) {
- String logLine = new String(arg0);
-
- return null;
- }
+public class NotificationStatus {
+ public boolean successful;
+ public String errorMessage;
}
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAO.java
similarity index 64%
copy from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAO.java
index d608104..bf2e75e 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAO.java
@@ -14,18 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.persist.test;
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
+package org.apache.eagle.notification.dao;
+
+
+
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+
+import java.util.List;
/**
- * Created on 1/4/16.
+ * Alert Notification Data Access Obj Interface
*/
-public class MetricSerializer implements SpoutKafkaMessageDeserializer {
- @Override
- public Object deserialize(byte[] arg0) {
- String logLine = new String(arg0);
-
- return null;
- }
+public interface AlertNotificationDAO {
+ /**
+ * find the Alert Notification Types by querying alertNotifications Table
+ * @return
+ * @throws Exception
+ */
+ List<AlertNotificationEntity> findAlertNotificationTypes() throws Exception;
}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAOImpl.java
new file mode 100644
index 0000000..7251da3
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/dao/AlertNotificationDAOImpl.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.dao;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Notification Service API implementation which Provides Read/Write API's of Hbase AlertNotifications Table
+ */
+public class AlertNotificationDAOImpl implements AlertNotificationDAO {
+
+ private final Logger LOG = LoggerFactory.getLogger(AlertNotificationDAOImpl.class);
+ private final EagleServiceConnector connector;
+
+ public AlertNotificationDAOImpl(EagleServiceConnector connector){
+ this.connector = connector;
+ }
+
+ /**
+ * Find the Alerts by NotificationType
+ * @return
+ * @throws Exception
+ */
+ @Override
+ public List<AlertNotificationEntity> findAlertNotificationTypes() throws Exception {
+ try{
+ IEagleServiceClient client = new EagleServiceClientImpl(connector);
+ String query = Constants.ALERT_NOTIFICATION_SERVICE_ENDPOINT_NAME+"[@enabled=\"true\"]{*}";
+ GenericServiceAPIResponseEntity response = client.search(query).startTime(0)
+ .endTime(10 * DateUtils.MILLIS_PER_DAY)
+ .pageSize(Integer.MAX_VALUE)
+ .query(query)
+ .send();
+ client.close();
+ if (response.getException() != null) {
+ throw new Exception("Got an exception when query eagle service: " + response.getException());
+ }
+ return response.getObj();
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception when query alert notification service ", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+}
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailComponent.java
similarity index 64%
copy from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailComponent.java
index d608104..3fe55bf 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailComponent.java
@@ -14,18 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.persist.test;
+package org.apache.eagle.notification.email;
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
+import org.apache.eagle.common.metric.AlertContext;
/**
- * Created on 1/4/16.
+ * Alert email component is one part of an email, which could be an individual alert
*/
-public class MetricSerializer implements SpoutKafkaMessageDeserializer {
- @Override
- public Object deserialize(byte[] arg0) {
- String logLine = new String(arg0);
-
- return null;
+public class AlertEmailComponent {
+ private AlertContext alertContext;
+ public AlertContext getAlertContext() {
+ return alertContext;
}
-}
+ public void setAlertContext(AlertContext alertContext) {
+ this.alertContext = alertContext;
+ }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailContext.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailContext.java
new file mode 100644
index 0000000..f1642be
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import java.util.List;
+
+/**
+ * alert email bean
+ * one email consists of a list of email component
+ */
+public class AlertEmailContext {
+ private List<AlertEmailComponent> components;
+ private String sender;
+ private String subject;
+ private String recipients;
+ private String velocityTplFile;
+ private String cc;
+
+ public List<AlertEmailComponent> getComponents() {
+ return components;
+ }
+ public void setComponents(List<AlertEmailComponent> components) {
+ this.components = components;
+ }
+ public String getVelocityTplFile() {
+ return velocityTplFile;
+ }
+ public void setVelocityTplFile(String velocityTplFile) {
+ this.velocityTplFile = velocityTplFile;
+ }
+ public String getRecipients() {
+ return recipients;
+ }
+ public void setRecipients(String recipients) {
+ this.recipients = recipients;
+ }
+ public String getSender() {
+ return sender;
+ }
+ public void setSender(String sender) {
+ this.sender = sender;
+ }
+ public String getSubject() {
+ return subject;
+ }
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+ public String getCc() {
+ return cc;
+ }
+ public void setCc(String cc) {
+ this.cc = cc;
+ }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java
new file mode 100644
index 0000000..ddd3c96
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGenerator.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.eagle.notification.email;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import com.typesafe.config.ConfigObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertEmailGenerator{
+ private String tplFile;
+ private String sender;
+ private String recipients;
+ private String subject;
+ private ConfigObject eagleProps;
+
+ private ThreadPoolExecutor executorPool;
+
+ private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
+
+ private final static long MAX_TIMEOUT_MS =60000;
+
+ public boolean sendAlertEmail(AlertAPIEntity entity) {
+ return sendAlertEmail(entity, recipients, null);
+ }
+
+ public boolean sendAlertEmail(AlertAPIEntity entity, String recipients) {
+ return sendAlertEmail(entity, recipients, null);
+ }
+
+ public boolean sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) {
+ boolean sentSuccessfully = false;
+ AlertEmailContext email = new AlertEmailContext();
+
+ AlertEmailComponent component = new AlertEmailComponent();
+ component.setAlertContext(entity.getAlertContext());
+ List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
+ components.add(component);
+ email.setComponents(components);
+ if (entity.getAlertContext().getProperty(Constants.SUBJECT) != null) {
+ email.setSubject(entity.getAlertContext().getProperty(Constants.SUBJECT));
+ }
+ else email.setSubject(subject);
+ email.setVelocityTplFile(tplFile);
+ email.setRecipients(recipients);
+ email.setCc(cc);
+ email.setSender(sender);
+
+ /** asynchronized email sending */
+ @SuppressWarnings("rawtypes")
+ AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
+
+ if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
+
+ LOG.info("Sending email in asynchronous to: "+recipients+", cc: "+cc);
+ Future future = this.executorPool.submit(thread);
+ try {
+ future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ sentSuccessfully = true;
+ LOG.info(String.format("Successfully send email to %s", recipients));
+ } catch (InterruptedException | ExecutionException e) {
+ sentSuccessfully = false;
+ LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e);
+ } catch (TimeoutException e) {
+ sentSuccessfully = false;
+ LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
+ }
+ return sentSuccessfully;
+ }
+
+ public String getTplFile() {
+ return tplFile;
+ }
+
+ public void setTplFile(String tplFile) {
+ this.tplFile = tplFile;
+ }
+
+ public String getSender() {
+ return sender;
+ }
+
+ public void setSender(String sender) {
+ this.sender = sender;
+ }
+
+ public String getRecipients() {
+ return recipients;
+ }
+
+ public void setRecipients(String recipients) {
+ this.recipients = recipients;
+ }
+
+ public String getSubject() {
+ return subject;
+ }
+
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+
+ public ConfigObject getEagleProps() {
+ return eagleProps;
+ }
+
+ public void setEagleProps(ConfigObject eagleProps) {
+ this.eagleProps = eagleProps;
+ }
+
+ public void setExecutorPool(ThreadPoolExecutor executorPool) {
+ this.executorPool = executorPool;
+ }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java
new file mode 100644
index 0000000..2e63dab
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailGeneratorBuilder.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+import com.typesafe.config.ConfigObject;
+
+public class AlertEmailGeneratorBuilder {
+ private AlertEmailGenerator generator;
+ private AlertEmailGeneratorBuilder(){
+ generator = new AlertEmailGenerator();
+ }
+ public static AlertEmailGeneratorBuilder newBuilder(){
+ return new AlertEmailGeneratorBuilder();
+ }
+ public AlertEmailGeneratorBuilder withSubject(String subject){
+ generator.setSubject(subject);
+ return this;
+ }
+ public AlertEmailGeneratorBuilder withSender(String sender){
+ generator.setSender(sender);
+ return this;
+ }
+ public AlertEmailGeneratorBuilder withRecipients(String recipients){
+ generator.setRecipients(recipients);
+ return this;
+ }
+ public AlertEmailGeneratorBuilder withTplFile(String tplFile){
+ generator.setTplFile(tplFile);
+ return this;
+ }
+ public AlertEmailGeneratorBuilder withEagleProps(ConfigObject eagleProps) {
+ generator.setEagleProps(eagleProps);
+ return this;
+ }
+ public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
+ generator.setExecutorPool(threadPoolExecutor);
+ return this;
+ }
+
+ public AlertEmailGenerator build(){
+ return this.generator;
+ }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java
new file mode 100644
index 0000000..2b18f1e
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/email/AlertEmailSender.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.email;
+
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.velocity.VelocityContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.email.EagleMailClient;
+import com.netflix.config.ConcurrentMapConfiguration;
+import com.typesafe.config.ConfigObject;
+
+public class AlertEmailSender implements Runnable {
+
+ protected final List<Map<String, String>> alertContexts = new ArrayList<Map<String, String>>();
+ protected final String configFileName;
+ protected final String subject;
+ protected final String sender;
+ protected final String recipents;
+ protected final String cc;
+ protected final String origin;
+ protected boolean sentSuccessfully = false;
+
+ private final static Logger LOG = LoggerFactory.getLogger(AlertEmailSender.class);
+ private final static int MAX_RETRY_COUNT = 3;
+
+ private static final String MAIL_HOST = "mail.host";
+ private static final String MAIL_PORT = "mail.smtp.port";
+ private static final String MAIL_DEBUG = "mail.debug";
+
+ private static final String CONF_KEY_MAIL_HOST = "mailHost";
+ private static final String CONF_KEY_MAIL_PORT = "mailSmtpPort";
+ private static final String CONF_KEY_MAIL_DEBUG = "mailDebug";
+
+ private ConfigObject eagleProps;
+
+
+ private String threadName;
+ /**
+ * Derived class may have some additional context properties to add
+ * @param context velocity context
+ * @param env environment
+ */
+ protected void additionalContext(VelocityContext context, String env) {
+ // By default there's no additional context added
+ }
+
+ public AlertEmailSender(AlertEmailContext alertEmail){
+ this.recipents = alertEmail.getRecipients();
+ this.configFileName = alertEmail.getVelocityTplFile();
+ this.subject = alertEmail.getSubject();
+ this.sender = alertEmail.getSender();
+ this.cc = alertEmail.getCc();
+ for(AlertEmailComponent bean : alertEmail.getComponents()){
+ this.alertContexts.add(bean.getAlertContext().getProperties());
+ }
+ String tmp = ManagementFactory.getRuntimeMXBean().getName();
+ this.origin = tmp.split("@")[1] + "(pid:" + tmp.split("@")[0] + ")";
+ threadName = Thread.currentThread().getName();
+ LOG.info("Initialized "+threadName+": origin is : " + this.origin+", recipient of the email: " + this.recipents+", velocity TPL file: " + this.configFileName);
+ }
+
+ public AlertEmailSender(AlertEmailContext alertEmail, ConfigObject eagleProps){
+ this(alertEmail);
+ this.eagleProps = eagleProps;
+ }
+
+ @Override
+ public void run() {
+ int count = 0;
+ boolean success = false;
+ while(count++ < MAX_RETRY_COUNT && !success){
+ LOG.info("Sending email, tried: " + count+", max: "+MAX_RETRY_COUNT);
+ try {
+ final EagleMailClient client;
+ if (eagleProps != null) {
+ ConcurrentMapConfiguration con = new ConcurrentMapConfiguration();
+ con.addProperty(MAIL_HOST, eagleProps.get(CONF_KEY_MAIL_HOST).unwrapped());
+ con.addProperty(MAIL_PORT, eagleProps.get(CONF_KEY_MAIL_PORT).unwrapped());
+ if (eagleProps.get(CONF_KEY_MAIL_DEBUG) != null) {
+ con.addProperty(MAIL_DEBUG, eagleProps.get(CONF_KEY_MAIL_DEBUG).unwrapped());
+ }
+ client = new EagleMailClient(con);
+ }
+ else {
+ client = new EagleMailClient();
+ }
+ String env = "prod";
+ if (eagleProps != null && eagleProps.get("env") != null) {
+ env = (String) eagleProps.get("env").unwrapped();
+ }
+ LOG.info("Env is: " + env);
+ final VelocityContext context = new VelocityContext();
+ generateCommonContext(context);
+ LOG.info("After calling generateCommonContext...");
+ additionalContext(context, env);
+
+ if (recipents == null || recipents.equals("")) {
+ LOG.error("Recipients is null, skip sending emails ");
+ return;
+ }
+ String title = subject;
+ if (!env.trim().equals("prod")) {
+ title = "[" + env + "]" + title;
+ }
+ success = client.send(sender, recipents, cc, title, configFileName, context, null);
+ LOG.info("Success of sending email: " + success);
+ if(!success && count < MAX_RETRY_COUNT) {
+ LOG.info("Sleep for a while before retrying");
+ Thread.sleep(10*1000);
+ }
+ }
+ catch (Exception e){
+ LOG.warn("Sending mail exception", e);
+ }
+ }
+
+ if(success){
+ sentSuccessfully = true;
+ LOG.info(String.format("Successfully send email, thread: %s",threadName));
+ }else{
+ LOG.warn(String.format("Fail sending email after tries %s times, thread: %s",MAX_RETRY_COUNT,threadName));
+ }
+ }
+
+ private void generateCommonContext(VelocityContext context) {
+ context.put(Constants.ALERT_EMAIL_TIME_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds( System.currentTimeMillis() ));
+ context.put(Constants.ALERT_EMAIL_COUNT_PROPERTY, alertContexts.size());
+ context.put(Constants.ALERT_EMAIL_ALERTLIST_PROPERTY, alertContexts);
+ context.put(Constants.ALERT_EMAIL_ORIGIN_PROPERTY, origin);
+ }
+
+ public boolean sentSuccessfully(){
+ return this.sentSuccessfully;
+ }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java
new file mode 100644
index 0000000..e098256
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePersister.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Alert API entity Persistor
+ */
+public class AlertEagleStorePersister {
+ private static Logger LOG = LoggerFactory.getLogger(AlertEagleStorePersister.class);
+ private String eagleServiceHost;
+ private int eagleServicePort;
+ private String username;
+ private String password;
+
+
+ public AlertEagleStorePersister(String eagleServiceHost, int eagleServicePort) {
+ this(eagleServiceHost, eagleServicePort, null, null);
+ }
+
+ public AlertEagleStorePersister(String eagleServiceHost, int eagleServicePort, String username, String password) {
+ this.eagleServiceHost = eagleServiceHost;
+ this.eagleServicePort = eagleServicePort;
+ this.username = username;
+ this.password = password;
+ }
+
+ public AlertEagleStorePersister(Config config ) {
+ this.eagleServiceHost = config.getString("eagleProps.eagleService.host");
+ this.eagleServicePort = config.getInt("eagleProps.eagleService.port");
+ this.username = config.getString("eagleProps.eagleService.username");
+ this.password =config.getString("eagleProps.eagleService.password");
+ }
+
+ /**
+ * Persist passes list of Entities
+ * @param list
+ * @return
+ */
+ public boolean doPersist(List<? extends TaggedLogAPIEntity> list) {
+ if (list.isEmpty()) return false;
+ LOG.info("Going to persist entities, type: " + " " + list.get(0).getClass().getSimpleName() + ", list size: " + list.size());
+ try {
+ IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
+ GenericServiceAPIResponseEntity<String> response = client.create(list);
+ client.close();
+ if (response.isSuccess()) {
+ LOG.info("Successfully create entities " + list.toString());
+ return true;
+ }
+ else {
+ LOG.error("Fail to create entities with exception " + response.getException());
+ return false;
+ }
+ }
+ catch (Exception ex) {
+ LOG.error("Got an exception in persisting entities", ex);
+ return false;
+ }
+ }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
new file mode 100644
index 0000000..cb0df70
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Plugin to persist alerts to Eagle Storage
+ */
+public class AlertEagleStorePlugin implements NotificationPlugin {
+ private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class);
+ private NotificationStatus status;
+ private AlertEagleStorePersister persist;
+
+ @Override
+ public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+ this.persist = new AlertEagleStorePersister(config);
+ this.status = new NotificationStatus();
+ LOG.info("initialized plugin for EagleStorePlugin");
+ }
+
+ @Override
+ public void update(String policyId, Map<String,String> notificationConf , boolean isPolicyDelete ) throws Exception {
+ if( isPolicyDelete ){
+ LOG.info("Deleted policy ...");
+ return;
+ }
+ LOG.info("created/updated plugin ...");
+ }
+
+ @Override
+ public NotificationStatus getStatus() {
+ return this.status;
+ }
+
+ /**
+ * Persist AlertEntity to alert_details table
+ * @param alertEntity
+ */
+ @Override
+ public void onAlert(AlertAPIEntity alertEntity) {
+ LOG.info("write alert to eagle storage " + alertEntity);
+ try{
+ List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>();
+ list.add(alertEntity);
+ boolean result = persist.doPersist( list );
+ if(result) {
+ status.successful = true;
+ status.errorMessage = "";
+ }else{
+ status.successful = false;
+ status.errorMessage = "";
+ }
+ }catch (Exception ex ){
+ status.successful = false;
+ status.errorMessage = ex.getMessage();
+ LOG.error("Fail writing alert entity to Eagle Store", ex);
+ }
+ }
+
+ @Override
+ public int hashCode(){
+ return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object o){
+ if(o == this)
+ return true;
+ if(!(o instanceof AlertEagleStorePlugin))
+ return false;
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
new file mode 100644
index 0000000..0577f5c
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.apache.eagle.notification.email.AlertEmailGenerator;
+import org.apache.eagle.notification.email.AlertEmailGeneratorBuilder;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Send alert to email
+ */
+public class AlertEmailPlugin implements NotificationPlugin {
+ private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPlugin.class);
+ private Map<String, AlertEmailGenerator> emailGenerators = new ConcurrentHashMap<>();
+ private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
+ private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
+ private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
+ private transient ThreadPoolExecutor executorPool;
+ private NotificationStatus status = new NotificationStatus();
+
+ @Override
+ public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+ executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
+ LOG.info(" Creating Email Generator... ");
+ for( AlertDefinitionAPIEntity entity : initAlertDefs ){
+ List<Map<String,String>> configMaps = NotificationPluginUtils.deserializeNotificationConfig(entity.getNotificationDef());
+ for( Map<String,String> notificationConfigMap : configMaps ){
+ String notificationType = notificationConfigMap.get(NotificationConstants.NOTIFICATION_TYPE);
+ // for backward compatibility, default notification is email
+ if(notificationType == null || notificationType.equalsIgnoreCase(NotificationConstants.EMAIL_NOTIFICATION)){
+ AlertEmailGenerator generator = createEmailGenerator(notificationConfigMap);
+ this.emailGenerators.put(entity.getTags().get(Constants.POLICY_ID), generator);
+ LOG.info("Successfully initialized email notification for policy " + entity.getTags().get(Constants.POLICY_ID) + ",with " + notificationConfigMap);
+ }
+ }
+ }
+ }
+
+ /**
+ * @param notificationConf
+ * @throws Exception
+ */
+ @Override
+ public void update(String policyId, Map<String,String> notificationConf , boolean isPolicyDelete ) throws Exception {
+ if( isPolicyDelete ){
+ LOG.info(" Policy been deleted.. Removing reference from Notification Plugin ");
+ this.emailGenerators.remove(policyId);
+ return;
+ }
+ AlertEmailGenerator generator = createEmailGenerator(notificationConf);
+ this.emailGenerators.put(policyId , generator );
+ LOG.info("created/updated email generator for updated policy " + policyId);
+ }
+
+ /**
+ * API to send email
+ * @param alertEntity
+ * @throws Exception
+ */
+ @Override
+ public void onAlert(AlertAPIEntity alertEntity) throws Exception {
+ String policyId = alertEntity.getTags().get(Constants.POLICY_ID);
+ AlertEmailGenerator generator = this.emailGenerators.get(policyId);
+ boolean isSuccess = generator.sendAlertEmail(alertEntity);
+ if( !isSuccess ) {
+ status.errorMessage = "Failed to send email";
+ status.successful = false;
+ }else {
+ status.errorMessage = "";
+ status.successful = true;
+ }
+ }
+
+ @Override
+ public NotificationStatus getStatus() {
+ return this.status;
+ }
+
+ /**
+ * @param notificationConfig
+ * @return
+ */
+ private AlertEmailGenerator createEmailGenerator( Map<String,String> notificationConfig ) {
+ String tplFileName = notificationConfig.get(NotificationConstants.TPL_FILE_NAME);
+ if (tplFileName == null || tplFileName.equals("")) {
+ tplFileName = "ALERT_DEFAULT.vm";
+ }
+ AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
+ withEagleProps(EagleConfigFactory.load().getConfig().getObject("eagleProps")).
+ withSubject(notificationConfig.get(NotificationConstants.SUBJECT)).
+ withSender(notificationConfig.get(NotificationConstants.SENDER)).
+ withRecipients(notificationConfig.get(NotificationConstants.RECIPIENTS)).
+ withTplFile(tplFileName).
+ withExecutorPool(this.executorPool).build();
+ return gen;
+ }
+
+ @Override
+ public int hashCode(){
+ return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object o){
+ if(o == this)
+ return true;
+ if(!(o instanceof AlertEmailPlugin))
+ return false;
+ return true;
+ }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
new file mode 100644
index 0000000..cc4f89d
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.base.NotificationStatus;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * send alert to Kafka bus
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class AlertKafkaPlugin implements NotificationPlugin {
+ private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPlugin.class);
+ private NotificationStatus status = new NotificationStatus();
+ private Map<String, Map<String, String>> kafaConfigs = new ConcurrentHashMap<>();
+ private Config config;
+
+ @Override
+ public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
+ this.config = config;
+ for( AlertDefinitionAPIEntity entity : initAlertDefs ) {
+ List<Map<String,String>> configMaps = NotificationPluginUtils.deserializeNotificationConfig(entity.getNotificationDef());
+ for( Map<String,String> notificationConfigMap : configMaps ){
+ String notificationType = notificationConfigMap.get(NotificationConstants.NOTIFICATION_TYPE);
+ if(notificationType == null){
+ LOG.error("no notificationType field for this notification, ignoring and continue " + notificationConfigMap);
+ continue;
+ }else {
+ // single policy can have multiple configs , only load Kafka Config's
+ if (notificationType.equalsIgnoreCase(NotificationConstants.KAFKA_STORE)) {
+ kafaConfigs.put(entity.getTags().get(Constants.POLICY_ID), notificationConfigMap);
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Update API to update policy delete/create/update in Notification Plug-ins
+ * @param notificationConf
+ * @param isPolicyDelete
+ * @throws Exception
+ */
+ @Override
+ public void update(String policyId, Map<String,String> notificationConf , boolean isPolicyDelete ) throws Exception {
+ if( isPolicyDelete ){
+ LOG.info(" Policy been deleted.. Removing reference from Notification Plugin ");
+ this.kafaConfigs.remove(policyId);
+ return;
+ }
+ kafaConfigs.put(policyId, notificationConf );
+ }
+
+ /**
+ * Post Notification to KafkaTopic
+ * @param alertEntity
+ */
+ @Override
+ public void onAlert(AlertAPIEntity alertEntity) {
+ try{
+ KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer(config);
+ producer.send(createRecord(alertEntity));
+ status.successful = true;
+ status.errorMessage = "";
+ }catch(Exception ex ){
+ LOG.error("fail writing alert to Kafka bus", ex);
+ status.successful = false;
+ status.errorMessage = ex.getMessage();
+ }
+ }
+
+ /**
+ * To Create KafkaProducer Record
+ * @param entity
+ * @return
+ * @throws Exception
+ */
+ private ProducerRecord createRecord(AlertAPIEntity entity ) throws Exception {
+ String policyId = entity.getTags().get(Constants.POLICY_ID);
+ ProducerRecord record = new ProducerRecord( this.kafaConfigs.get(policyId).get("topic"), NotificationPluginUtils.objectToStr(entity));
+ return record;
+ }
+
+ @Override
+ public NotificationStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public int hashCode(){
+ return new HashCodeBuilder().append(getClass().getCanonicalName()).toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object o){
+ if(o == this)
+ return true;
+ if(!(o instanceof AlertKafkaPlugin))
+ return false;
+ return true;
+ }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
new file mode 100644
index 0000000..53f3bb0
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import java.util.Properties;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+/**
+ * The producer is thread safe and sharing a single producer instance across threads will generally be faster than having multiple instances.
+ */
+public enum KafkaProducerSingleton {
+ INSTANCE;
+
+ public KafkaProducer<String, Object> getProducer(Config config) throws Exception{
+ Properties configMap = new Properties();
+ configMap.put("bootstrap.servers", NotificationPluginUtils.getPropValue(config, "kafka_broker"));
+ configMap.put("metadata.broker.list", NotificationPluginUtils.getPropValue(config, "kafka_broker"));
+ configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ configMap.put("request.required.acks", "1");
+ configMap.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+ configMap.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
+ KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);
+ return producer;
+ }
+
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
new file mode 100644
index 0000000..5780176
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationStatus;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created on 2/10/16.
+ * Notification Plug-in interface which provide abstraction layer to notify to different system
+ */
+public interface NotificationPlugin {
+ /**
+ * for initialization
+ * @throws Exception
+ */
+ public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception;
+
+ /**
+ * Update Plugin if any change in Policy Definition
+ * @param policy to be impacted
+ * @param notificationConf
+ * @throws Exception
+ */
+ public void update(String policy, Map<String,String> notificationConf , boolean isPolicyDelete ) throws Exception;
+
+ /**
+ * Post a notification for the given alertEntity
+ * @param alertEntity
+ * @throws Exception
+ */
+
+ public void onAlert( AlertAPIEntity alertEntity ) throws Exception;
+
+ /**
+ * Returns Status of Notification Post
+ * @return
+ */
+ public NotificationStatus getStatus();
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java
new file mode 100644
index 0000000..4aa90c5
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginLoader.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.dao.AlertNotificationDAO;
+import org.apache.eagle.notification.dao.AlertNotificationDAOImpl;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created on 2/10/16.
+ * don't support dynamic discovery as of 2/10
+ */
+public class NotificationPluginLoader {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationPluginLoader.class);
+ private static NotificationPluginLoader instance = new NotificationPluginLoader();
+ private static Map<String,NotificationPlugin> notificationMapping = new ConcurrentHashMap<>();
+
+ private Config config;
+ private boolean initialized = false;
+
+ public static NotificationPluginLoader getInstance(){
+ return instance;
+ }
+
+ public void init(Config config){
+ if(!initialized){
+ synchronized (this){
+ if(!initialized){
+ internalInit(config);
+ initialized = true;
+ }
+ }
+ }
+ }
+
+ private void internalInit(Config config){
+ this.config = config;
+ loadPlugins();
+ }
+
+ /**
+ * Scan & Load Plugins
+ */
+ private void loadPlugins(){
+ try {
+ LOG.info("Start loading Plugins from eagle service ...");
+ AlertNotificationDAO dao = new AlertNotificationDAOImpl(new EagleServiceConnector(config));
+ List<AlertNotificationEntity> activeNotificationPlugins = dao.findAlertNotificationTypes();
+ for(AlertNotificationEntity plugin : activeNotificationPlugins){
+ notificationMapping.put(plugin.getTags().get(NotificationConstants.NOTIFICATION_TYPE),
+ (NotificationPlugin) Class.forName(plugin.getClassName()).newInstance());
+ }
+ LOG.info("successfully loaded Plugins from eagle service " + activeNotificationPlugins);
+ }catch ( Exception ex ){
+ LOG.error("Error in loading Notification Plugins: ", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ public Map<String, NotificationPlugin> getNotificationMapping() {
+ ensureInitialized();
+ return notificationMapping;
+ }
+
+ private void ensureInitialized(){
+ if(!initialized)
+ throw new IllegalStateException("Plugin loader not initialized");
+ }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java
new file mode 100644
index 0000000..fdf62d1
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.plugin;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+
+/**
+ * Created on 2/10/16.
+ */
+public interface NotificationPluginManager {
+ /**
+ * notify alerts to plugins for one specific alert entity
+ * @param entity
+ */
+ void notifyAlert( AlertAPIEntity entity );
+
+ /**
+ * responds to changes of alert notification definition
+ * @param entity
+ * @param isDelete
+ */
+ void updateNotificationPlugins(AlertDefinitionAPIEntity entity , boolean isDelete );
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
new file mode 100644
index 0000000..887a6a9
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notification.plugin;
+
+import com.typesafe.config.Config;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Created on 2/10/16.
+ */
+public class NotificationPluginManagerImpl implements NotificationPluginManager {
+ private static final Logger LOG = LoggerFactory.getLogger(NotificationPluginManagerImpl.class);
+ // mapping from policy Id to NotificationPlugin instance
+ private Map<String, Collection<NotificationPlugin>> policyNotificationMapping = new ConcurrentHashMap<>(1); //only one write thread
+ private Config config;
+
+ public NotificationPluginManagerImpl(Config config){
+ this.config = config;
+ internalInit();
+ }
+
+ private void internalInit(){
+ // iterate all policy ids, keep those notification which belong to plugins
+ PolicyDefinitionDAO policyDefinitionDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector( config ) , Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME);
+ String site = config.getString("eagleProps.site");
+ String dataSource = config.getString("eagleProps.dataSource");
+ try{
+ List<AlertDefinitionAPIEntity> activeAlertDefs = policyDefinitionDao.findActivePolicies( site , dataSource );
+ // initialize all loaded plugins
+ NotificationPluginLoader.getInstance().init(config);
+ for(NotificationPlugin plugin : NotificationPluginLoader.getInstance().getNotificationMapping().values()){
+ plugin.init(config, activeAlertDefs);
+ }
+ // build policy and plugin mapping
+ for( AlertDefinitionAPIEntity entity : activeAlertDefs ){
+ Map<String, NotificationPlugin> plugins = pluginsForPolicy(entity);
+ policyNotificationMapping.put(entity.getTags().get(Constants.POLICY_ID) , plugins.values());
+ }
+ }catch (Exception ex ){
+ LOG.error("Error initializing poliy/notification mapping ", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ @Override
+ public void notifyAlert(AlertAPIEntity entity) {
+ String policyId = entity.getTags().get(Constants.POLICY_ID);
+ Collection<NotificationPlugin> plugins = policyNotificationMapping.get(policyId);
+ if(plugins == null || plugins.size() == 0) {
+ LOG.debug("no plugin found for policy " + policyId);
+ return;
+ }
+ for(NotificationPlugin plugin : plugins){
+ try {
+ LOG.info("execute notification plugin " + plugin);
+ plugin.onAlert(entity);
+ }catch(Exception ex){
+ LOG.error("fail invoking plugin's onAlert, continue ", ex);
+ }
+ }
+ }
+
+ @Override
+ public void updateNotificationPlugins(AlertDefinitionAPIEntity alertDef, boolean isDelete) {
+ try {
+ // Update Notification Plugin about the change in AlertDefinition
+ String policyId = alertDef.getTags().get(Constants.POLICY_ID);
+ if(isDelete){
+ // iterate all plugins and delete this policy
+ for(NotificationPlugin plugin : policyNotificationMapping.get(policyId)){
+ plugin.update(policyId, null, true);
+ }
+ policyNotificationMapping.remove(policyId);
+ LOG.info("Deleted notifications for policy " + policyId);
+ return;
+ }
+
+ Map<String, NotificationPlugin> plugins = pluginsForPolicy(alertDef);
+ // calculate difference between current plugins and previous plugin
+ Collection<NotificationPlugin> previousPlugins = policyNotificationMapping.get(policyId);
+ if(previousPlugins != null) {
+ Collection<NotificationPlugin> deletedPlugins = CollectionUtils.subtract(previousPlugins, plugins.values());
+ LOG.info("Going to delete plugins " + deletedPlugins + ", for policy " + policyId);
+ for (NotificationPlugin plugin : deletedPlugins) {
+ plugin.update(policyId, null, true);
+ }
+ }
+
+ // iterate current notifications and update it individually
+ List<Map<String,String>> notificationConfigCollection = NotificationPluginUtils.deserializeNotificationConfig(alertDef.getNotificationDef());
+ for( Map<String,String> notificationConf : notificationConfigCollection ) {
+ String notificationType = notificationConf.get(NotificationConstants.NOTIFICATION_TYPE);
+ // for backward compatibility, use email for default notification type
+ if(notificationType == null){
+ notificationType = NotificationConstants.EMAIL_NOTIFICATION;
+ }
+ NotificationPlugin plugin = plugins.get(notificationType);
+ if(plugin != null){
+ plugin.update(policyId, notificationConf, false);
+ }
+ }
+
+ policyNotificationMapping.put(policyId, plugins.values());// update policy - notification types map
+ LOG.info("Successfully broadcasted policy updates to all Notification Plugins ...");
+ } catch (Exception e) {
+ LOG.error("Error broadcasting policy notification changes ", e);
+ }
+ }
+
+ private Map<String, NotificationPlugin> pluginsForPolicy(AlertDefinitionAPIEntity policy) throws Exception{
+ NotificationPluginLoader loader = NotificationPluginLoader.getInstance();
+ loader.init(config);
+ Map<String, NotificationPlugin> plugins = loader.getNotificationMapping();
+ // mapping from notificationType to plugin
+ Map<String, NotificationPlugin> notifications = new HashMap<>();
+ List<Map<String,String>> notificationConfigCollection = NotificationPluginUtils.deserializeNotificationConfig(policy.getNotificationDef());
+ for( Map<String,String> notificationConf : notificationConfigCollection ){
+ String notificationType = notificationConf.get(NotificationConstants.NOTIFICATION_TYPE);
+ // for backward compatibility, by default notification type is email if notification type is not specified
+ if(notificationType == null){
+ LOG.warn("notificationType is null so use default notification type email for this policy " + policy);
+ notifications.put(NotificationConstants.EMAIL_NOTIFICATION, plugins.get(NotificationConstants.EMAIL_NOTIFICATION));
+ }else if(!plugins.containsKey(notificationType)){
+ LOG.warn("No NotificationPlugin supports this notificationType " + notificationType);
+ }else {
+ notifications.put(notificationType, plugins.get(notificationType));
+ }
+ }
+ return notifications;
+ }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
new file mode 100644
index 0000000..350ecb5
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notification.utils;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigObject;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Common methods for Notification Plugin
+ */
+public class NotificationPluginUtils {
+ /**
+ * Fetch Notification specific property value
+ * @param key
+ * @return
+ * @throws Exception
+ */
+ public static String getPropValue(Config config, String key ) throws Exception {
+ if( config.getObject("eagleNotificationProps") == null )
+ throw new Exception("Eagle Notification Properties not found in application.conf ");
+ ConfigObject notificationConf = config.getObject("eagleNotificationProps");
+ return notificationConf.get(key).unwrapped().toString();
+ }
+
+ /**
+ * Deserialize Notification Definition and convert all config to Key Value Pairs
+ * @param notificationDef
+ * @return
+ * @throws Exception
+ */
+ public static List<Map<String,String>> deserializeNotificationConfig( String notificationDef ) throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ CollectionType mapCollectionType = mapper.getTypeFactory().constructCollectionType(List.class, Map.class);
+ return mapper.readValue( notificationDef , mapCollectionType);
+ }
+
+ /**
+ * Object to JSON String
+ * @param obj
+ * @return
+ * @throws Exception
+ */
+ public static String objectToStr( Object obj ) throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.writeValueAsString(obj);
+ }
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/ALERT_DEFAULT.vm b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/ALERT_DEFAULT.vm
new file mode 100644
index 0000000..4ceabad
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/ALERT_DEFAULT.vm
@@ -0,0 +1,275 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+ <head>
+ <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
+ <meta name="viewport" content="width=device-width"/>
+ <style>
+ body {
+ width:100% !important;
+ min-width: 100%;
+ -webkit-text-size-adjust:100%;
+ -ms-text-size-adjust:100%;
+ margin:0;
+ padding:0;
+ }
+
+ table {
+ border-spacing: 0;
+ border-collapse: collapse;
+ }
+
+ table th,
+ table td {
+ padding: 3px 0 3px 0;
+ }
+
+ .body {
+ width: 100%;
+ }
+
+ p,a,h1,h2,h3,ul,ol,li {
+ font-family: Helvetica, Arial, sans-serif;
+ font-weight: normal;
+ margin: 0;
+ padding: 0;
+ }
+ p {
+ font-size: 14px;
+ line-height: 19px;
+ }
+ a {
+ color: #3294b1;
+ }
+ h1 {
+ font-size: 36px;
+ margin: 15px 0 5px 0;
+ }
+ h2 {
+ font-size: 32px;
+ }
+ h3 {
+ font-size: 28px;
+ }
+
+ ul,ol {
+ margin: 0 0 0 25px;
+ padding: 0;
+ }
+
+ .btn {
+ background: #2ba6cb !important;
+ border: 1px solid #2284a1;
+ padding: 10px 20px 10px 20px;
+ text-align: center;
+ }
+ .btn:hover {
+ background: #2795b6 !important;
+ }
+ .btn a {
+ color: #FFFFFF;
+ text-decoration: none;
+ font-weight: bold;
+ padding: 10px 20px 10px 20px;
+ }
+
+ .tableBordered {
+ border-top: 1px solid #b9e5ff;
+ }
+ .tableBordered th {
+ background: #ECF8FF;
+ }
+ .tableBordered th p {
+ font-weight: bold;
+ color: #3294b1;
+ }
+ .tableBordered th,
+ .tableBordered td {
+ color: #333333;
+ border-bottom: 1px solid #b9e5ff;
+ text-align: center;
+ padding-bottom: 5px;
+ }
+
+ .panel {
+ height: 100px;
+ }
+ </style>
+ </head>
+ <body>
+ #set ( $elem = $alertList[0] )
+ #set ( $alertUrl = $elem["alertDetailUrl"] )
+ #set ( $policyUrl = $elem["policyDetailUrl"] )
+ <table class="body">
+ <tr>
+ <td align="center" valign="top" style="background: #999999; padding: 0 0 0 0;">
+ <!-- Eagle Header -->
+ <table width="580">
+ <tr>
+ <td style="padding: 0 0 0 0;" align="left" >
+ <p style="color:#FFFFFF;font-weight: bold; font-size: 24px">Eagle</p>
+ </td>
+ <td style="padding: 0 0 0 0;" align="right">
+ <p style="color:#FFFFFF;font-weight: bold;">DAM Alert</p>
+ </td>
+ </tr>
+ </table>
+ </td>
+ </tr>
+
+ <tr>
+ <td align="center" valign="top">
+ <!-- Eagle Body -->
+ <table width="580">
+ <tr>
+ <!-- Title -->
+ <td align="center">
+ <h1>Malicious Data Operation Detected</h1>
+ </td>
+ </tr>
+ <tr>
+ <!-- Time -->
+ <td>
+ <table width="580">
+ <tr>
+ <td>
+ <p><b>Detected Time: $elem["alertTimestamp"]</b></p>
+ </td>
+ #set ( $severity = $elem["severity"] )
+ #if (!$severity || ("$severity" == ""))
+ #set ( $elem["severity"] = "WARNING")
+ #end
+ <td align="right">
+ <p><b>
+ Severity:
+ #if ($elem["severity"] == "WARNING")
+ <span>$elem["severity"]</span>
+ #else
+ <span style="color: #FF0000;">$elem["severity"]</span>
+ #end
+ </b></p>
+ </td>
+ </tr>
+ </table>
+ </td>
+ </tr>
+ <tr>
+ <!-- Description -->
+ <td valign="top" style="background: #ECF8FF; border: 1px solid #b9e5ff; padding: 10px 10px 12px 10px;">
+ <p>$elem["alertMessage"]</p>
+ </td>
+ </tr>
+ <tr>
+ <!-- View Detail -->
+ <td align="center" style="padding: 10px 0 0 0;">
+ <table width="580">
+ <tr>
+ <td class="btn">
+ <a href="$alertUrl">View Alert Details on Eagle Web</a>
+ </td>
+ </tr>
+ </table>
+ </td>
+ </tr>
+ <tr>
+ <!-- Basic Information -->
+ <td style="padding: 20px 0 0 0;">
+ <p><b>Basic Information:</b></p>
+ </td>
+ </tr>
+ <tr>
+ <!-- Basic Information Content -->
+ <td>
+ <table class="tableBordered" width="580">
+ <tr>
+ <th>
+ <p>Site</p>
+ </th>
+ <th>
+ <p>Data Source</p>
+ </th>
+ </tr>
+ <tr>
+ <td>
+ <p>$elem["site"]</p>
+ </td>
+ <td>
+ <p>$elem["dataSource"]</p>
+ </td>
+ </tr>
+ <tr>
+ <th>
+ <p>Alert Type</p>
+ </th>
+ <th>
+ <p>Policy Name</p>
+ </th>
+ <th>
+ <p>Severity</p>
+ </th>
+ </tr>
+ <tr>
+ <td>
+ <p>DAM Alert</p>
+ </td>
+ <td>
+ <p>$elem["policyId"]</p>
+ </td>
+ <td>
+ <p>$elem["severity"]</p>
+ </td>
+ </tr>
+ </table>
+ </td>
+ </tr>
+ <tr>
+ <!-- View Detail -->
+ <td align="center" style="padding: 10px 0 0 0;">
+ <table width="580">
+ <tr>
+ <td class="btn">
+ <a href="$policyUrl">View Policy Details on Eagle Web</a>
+ </td>
+ </tr>
+ </table>
+ </td>
+ </tr>
+ <tr>
+ <!-- Actions Required -->
+ <td style="padding: 20px 0 0 0;">
+ <p><b>Actions Required:</b></p>
+ </td>
+ </tr>
+ <tr>
+ <!-- Possible Root Causes Content -->
+ <td class="panel" valign="top" style="background: #F4F4F4; border: 1px solid #AAAAAA; padding: 10px 10px 12px 10px;">
+ <p> Malicious data operation found, please check.</p>
+ </td>
+ </tr>
+ <tr>
+ <!-- Copyright -->
+ <td align="center">
+ <p><a href="http://123.xyz.com/alerts/alertlist.html">Copyright 2014 @ Hadoop Eagle</a></p>
+ </td>
+ </tr>
+ </table>
+ </td>
+ </tr>
+ </table>
+ </body>
+</html>
\ No newline at end of file
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/application.conf
similarity index 63%
copy from eagle-samples/eagle-persist-sample/src/main/resources/application.conf
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/application.conf
index 6c4d261..d57172b 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/application.conf
@@ -16,71 +16,54 @@
{
"envContextConfig" : {
"env" : "storm",
- "mode" : "local",
- "topologyName" : "persistTestTopology",
- "stormConfigFile" : "persist-test-storm.yaml",
+ "mode" : "cluster",
+ "topologyName" : "sandbox-hdfsAuditLog-topology",
+ "stormConfigFile" : "security-auditlog-storm.yaml",
"parallelismConfig" : {
"kafkaMsgConsumer" : 1,
"hdfsAuditLogAlertExecutor*" : 1
}
},
"dataSourceConfig": {
- "topic" : "persist_test_log",
- "zkConnection" : "localhost:2181",
+ "topic" : "sandbox_hdfs_audit_log",
+ "zkConnection" : "127.0.0.1:2181",
+ "brokerZkPath" : "/brokers",
"zkConnectionTimeoutMS" : 15000,
- "consumerGroupId" : "EagleConsumer",
"fetchSize" : 1048586,
- "deserializerClass" : "org.apache.eagle.persist.test.MetricSerializer",
- "transactionZKServers" : "localhost",
+ "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
+ "transactionZKServers" : "127.0.0.1",
"transactionZKPort" : 2181,
"transactionZKRoot" : "/consumers",
+ "consumerGroupId" : "eagle.hdfsaudit.consumer",
"transactionStateUpdateMS" : 2000
},
"alertExecutorConfigs" : {
"hdfsAuditLogAlertExecutor" : {
"parallelism" : 1,
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+ "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner",
"needValidation" : "true"
}
},
- "persistExecutorConfigs" {
- "persistExecutor1" : {
- "kafka": {
- "bootstrap_servers" : "www.xyz.com",
- "topics" : {
- "defaultOutput" : "downSampleOutput"
- }
- }
- }
- },
- "aggregateExecutorConfigs" : {
- "aggregateExecutor1" : {
- "parallelism" : 1,
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
- "needValidation" : "true"
- }
- },
"eagleProps" : {
"site" : "sandbox",
- "dataSource": "persistTest",
+ "dataSource": "hdfsAuditLog",
"dataJoinPollIntervalSec" : 30,
- "mailHost" : "mailHost.com",
+ "mailHost" : "mailhost.com",
"mailSmtpPort":"25",
"mailDebug" : "true",
- "balancePartitionEnabled" : true,
- #"partitionRefreshIntervalInMin" : 60,
- #"kafkaStatisticRangeInMin" : 60,
"eagleService": {
"host": "localhost",
"port": 9099,
"username": "admin",
"password": "secret"
- },
- "readHdfsUserCommandPatternFrom" : "file"
+ }
},
"dynamicConfigSource" : {
"enabled" : true,
"initDelayMillis" : 0,
"delayMillis" : 30000
+ },
+ "eagleNotificationProps" : {
+ "kafka_broker":"192.168.56.101:6667"
}
}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/notification-plugins-init.sh b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/notification-plugins-init.sh
new file mode 100644
index 0000000..0293f9d
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/resources/notification-plugins-init.sh
@@ -0,0 +1,66 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with`
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CUR_DIR=$(dirname $0)
+source $CUR_DIR/../../../../../../eagle-assembly/src/main/bin/eagle-env.sh
+
+#####################################################################
+# Import notification plugin configuration into Eagle Service #
+#####################################################################
+
+## AlertNotificationService : schema for notifcation plugin configuration
+echo ""
+echo "Importing notification plugin configurations ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertNotificationService" \
+ -d '
+ [
+ {
+ "prefix": "alertNotifications",
+ "tags": {
+ "notificationType": "email"
+ },
+ "className": "org.apache.eagle.notification.plugin.AlertEmailPlugin",
+ "description": "send alert to email",
+ "enabled":true
+ },
+ {
+ "prefix": "alertNotifications",
+ "tags": {
+ "notificationType": "kafka"
+ },
+ "className": "org.apache.eagle.notification.plugin.AlertKafkaPlugin",
+ "description": "send alert to kafka bus",
+ "enabled":true
+ },
+ {
+ "prefix": "alertNotifications",
+ "tags": {
+ "notificationType": "eagleStore"
+ },
+ "className": "org.apache.eagle.notification.plugin.AlertEagleStorePlugin",
+ "description": "send alert to eagle store",
+ "enabled":true
+ }
+ ]
+ '
+
+## Finished
+echo ""
+echo "Finished initialization for alert notification plugins"
+
+exit 0
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java
new file mode 100644
index 0000000..74c8e40
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.notification.plugin.AlertEagleStorePlugin;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+/**
+ * Created on 2/11/16.
+ */
+public class TestAlertEagleStorePlugin {
+ @Ignore // only work when eagle service is up
+ @Test
+ public void testEagleStorePlugin() throws Exception{
+ AlertEagleStorePlugin plugin = new AlertEagleStorePlugin();
+ Config config = ConfigFactory.load();
+ AlertDefinitionAPIEntity def = new AlertDefinitionAPIEntity();
+ def.setNotificationDef("");
+ plugin.init(config, Arrays.asList(def));
+
+ AlertAPIEntity alert = new AlertAPIEntity();
+ alert.setDescription("");
+ plugin.onAlert(alert);
+ Assert.assertTrue(plugin.getStatus().successful);
+ }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java
new file mode 100644
index 0000000..b44c238
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.notification.plugin.AlertEmailPlugin;
+import org.apache.eagle.policy.common.Constants;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+/**
+ * Created on 2/11/16.
+ */
+public class TestAlertEmailPlugin {
+ @Ignore // only works when there is correct email setup and eagle service
+ @Test
+ public void testAlertEmailPlugin() throws Exception{
+ AlertEmailPlugin plugin = new AlertEmailPlugin();
+ Config config = ConfigFactory.load();
+ AlertDefinitionAPIEntity def = new AlertDefinitionAPIEntity();
+ def.setTags(new HashMap<String, String>());
+ def.getTags().put(Constants.POLICY_ID, "testPolicyId");
+ def.setNotificationDef("[{\"notificationType\":\"email\",\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"last check point time lag found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]");
+ plugin.init(config, Arrays.asList(def));
+
+ AlertAPIEntity alert = new AlertAPIEntity();
+ alert.setTags(new HashMap<String, String>());
+ alert.getTags().put(Constants.POLICY_ID, "testPolicyId");
+ alert.setDescription("");
+ alert.setAlertContext(new AlertContext());
+ plugin.onAlert(alert);
+ Assert.assertTrue(plugin.getStatus().successful);
+ }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java
new file mode 100644
index 0000000..b5ed63e
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.notification.plugin.AlertKafkaPlugin;
+import org.apache.eagle.notification.plugin.KafkaProducerSingleton;
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class TestAlertKafkaPlugin {
+ @Ignore // only work when kafka is ready for use
+ @Test
+ public void testAlertToKafkaBus() throws Exception
+ {
+ AlertKafkaPlugin plugin = new AlertKafkaPlugin();
+ Config config = ConfigFactory.load();
+ AlertDefinitionAPIEntity def = new AlertDefinitionAPIEntity();
+ def.setTags(new HashMap<String, String>());
+ def.getTags().put(Constants.POLICY_ID, "testPolicyId");
+ def.setNotificationDef("[{\"notificationType\":\"kafka\",\"topic\":\"testTopic\"}]");
+ plugin.init(config, Arrays.asList(def));
+
+ AlertAPIEntity alert = new AlertAPIEntity();
+ alert.setTags(new HashMap<String, String>());
+ alert.getTags().put(Constants.POLICY_ID, "testPolicyId");
+ alert.setDescription("");
+ alert.setAlertContext(new AlertContext());
+ plugin.onAlert(alert);
+ Thread.sleep(1000); // wait for message sent out
+ Assert.assertTrue(plugin.getStatus().successful);
+ }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestGetAllNotifications.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestGetAllNotifications.java
new file mode 100644
index 0000000..2749648
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestGetAllNotifications.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AlertNotificationEntity;
+import org.apache.eagle.common.config.EagleConfigFactory;
+import org.apache.eagle.notification.dao.AlertNotificationDAO;
+import org.apache.eagle.notification.dao.AlertNotificationDAOImpl;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+
+public class TestGetAllNotifications {
+ @Ignore
+ @Test
+ public void getAllNotification() throws Exception {
+ Config config = EagleConfigFactory.load().getConfig();
+ AlertNotificationDAO dao = new AlertNotificationDAOImpl( new EagleServiceConnector(config));
+ List<AlertNotificationEntity> list = dao.findAlertNotificationTypes();
+ System.out.println(" Fetch all Notifications : "+list);
+ }
+}
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginLoader.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginLoader.java
new file mode 100644
index 0000000..624343b
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginLoader.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.notifications.testcases;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.notification.base.NotificationConstants;
+import org.apache.eagle.notification.plugin.NotificationPluginLoader;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Created on 2/10/16.
+ */
+public class TestNotificationPluginLoader {
+ @Ignore //only work when connected to eagle service
+ @Test
+ public void testLoader(){
+ Config config = ConfigFactory.load();
+ NotificationPluginLoader loader = NotificationPluginLoader.getInstance();
+ loader.init(config);
+ Assert.assertTrue(loader.getNotificationMapping().keySet().contains(NotificationConstants.EAGLE_STORE));
+ Assert.assertTrue(loader.getNotificationMapping().keySet().contains(NotificationConstants.KAFKA_STORE));
+ Assert.assertTrue(loader.getNotificationMapping().keySet().contains(NotificationConstants.EMAIL_NOTIFICATION));
+ }
+}
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/application.conf
similarity index 62%
copy from eagle-samples/eagle-persist-sample/src/main/resources/application.conf
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/application.conf
index 6c4d261..d57172b 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/application.conf
@@ -16,71 +16,54 @@
{
"envContextConfig" : {
"env" : "storm",
- "mode" : "local",
- "topologyName" : "persistTestTopology",
- "stormConfigFile" : "persist-test-storm.yaml",
+ "mode" : "cluster",
+ "topologyName" : "sandbox-hdfsAuditLog-topology",
+ "stormConfigFile" : "security-auditlog-storm.yaml",
"parallelismConfig" : {
"kafkaMsgConsumer" : 1,
"hdfsAuditLogAlertExecutor*" : 1
}
},
"dataSourceConfig": {
- "topic" : "persist_test_log",
- "zkConnection" : "localhost:2181",
+ "topic" : "sandbox_hdfs_audit_log",
+ "zkConnection" : "127.0.0.1:2181",
+ "brokerZkPath" : "/brokers",
"zkConnectionTimeoutMS" : 15000,
- "consumerGroupId" : "EagleConsumer",
"fetchSize" : 1048586,
- "deserializerClass" : "org.apache.eagle.persist.test.MetricSerializer",
- "transactionZKServers" : "localhost",
+ "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
+ "transactionZKServers" : "127.0.0.1",
"transactionZKPort" : 2181,
"transactionZKRoot" : "/consumers",
+ "consumerGroupId" : "eagle.hdfsaudit.consumer",
"transactionStateUpdateMS" : 2000
},
"alertExecutorConfigs" : {
"hdfsAuditLogAlertExecutor" : {
"parallelism" : 1,
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+ "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner",
"needValidation" : "true"
}
},
- "persistExecutorConfigs" {
- "persistExecutor1" : {
- "kafka": {
- "bootstrap_servers" : "www.xyz.com",
- "topics" : {
- "defaultOutput" : "downSampleOutput"
- }
- }
- }
- },
- "aggregateExecutorConfigs" : {
- "aggregateExecutor1" : {
- "parallelism" : 1,
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
- "needValidation" : "true"
- }
- },
"eagleProps" : {
"site" : "sandbox",
- "dataSource": "persistTest",
+ "dataSource": "hdfsAuditLog",
"dataJoinPollIntervalSec" : 30,
- "mailHost" : "mailHost.com",
+ "mailHost" : "mailhost.com",
"mailSmtpPort":"25",
"mailDebug" : "true",
- "balancePartitionEnabled" : true,
- #"partitionRefreshIntervalInMin" : 60,
- #"kafkaStatisticRangeInMin" : 60,
"eagleService": {
"host": "localhost",
"port": 9099,
"username": "admin",
"password": "secret"
- },
- "readHdfsUserCommandPatternFrom" : "file"
+ }
},
"dynamicConfigSource" : {
"enabled" : true,
"initDelayMillis" : 0,
"delayMillis" : 30000
+ },
+ "eagleNotificationProps" : {
+ "kafka_broker":"192.168.56.101:6667"
}
}
\ No newline at end of file
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/log4j.properties
similarity index 66%
copy from eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties
copy to eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/log4j.properties
index 4a22987..3499c46 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/resources/log4j.properties
@@ -13,28 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.rootLogger=DEBUG, stdout, DRFA
+log4j.rootLogger=INFO, stdout
-eagle.log.dir=./logs
-eagle.log.file=eagle.log
+ eagle.log.dir=./logs
+ eagle.log.file=eagle.log
-
-#log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
-#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
-log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG
-#log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG
# standard output
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
# Daily Rolling File Appender
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
-log4j.appender.DRFA.DatePattern=yyyy-MM-dd
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
## 30-day backup
# log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-process/pom.xml b/eagle-core/eagle-alert/eagle-alert-process/pom.xml
index 012cc64..fdf059d 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/pom.xml
+++ b/eagle-core/eagle-alert/eagle-alert-process/pom.xml
@@ -113,6 +113,11 @@
<artifactId>eagle-stream-process-base</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-alert-notification-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
deleted file mode 100644
index 1147328..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- *
- */
-package org.apache.eagle.alert.notification;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.*;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.common.AlertEmailSender;
-import org.apache.eagle.alert.email.AlertEmailComponent;
-import org.apache.eagle.alert.email.AlertEmailContext;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import com.typesafe.config.ConfigObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AlertEmailGenerator{
- private String tplFile;
- private String sender;
- private String recipients;
- private String subject;
- private ConfigObject eagleProps;
-
- private ThreadPoolExecutor executorPool;
-
- private final static Logger LOG = LoggerFactory.getLogger(AlertEmailGenerator.class);
-
- private final static long MAX_TIMEOUT_MS =60000;
-
- public void sendAlertEmail(AlertAPIEntity entity) {
- sendAlertEmail(entity, recipients, null);
- }
-
- public void sendAlertEmail(AlertAPIEntity entity, String recipients) {
- sendAlertEmail(entity, recipients, null);
- }
-
- public void sendAlertEmail(AlertAPIEntity entity, String recipients, String cc) {
- AlertEmailContext email = new AlertEmailContext();
-
- AlertEmailComponent component = new AlertEmailComponent();
- component.setAlertContext(entity.getAlertContext());
- List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
- components.add(component);
- email.setComponents(components);
- if (entity.getAlertContext().getProperty(Constants.SUBJECT) != null) {
- email.setSubject(entity.getAlertContext().getProperty(Constants.SUBJECT));
- }
- else email.setSubject(subject);
- email.setVelocityTplFile(tplFile);
- email.setRecipients(recipients);
- email.setCc(cc);
- email.setSender(sender);
-
- /** asynchronized email sending */
- @SuppressWarnings("rawtypes")
- AlertEmailSender thread = new AlertEmailSender(email, eagleProps);
-
- if(this.executorPool == null) throw new IllegalStateException("Invoking thread executor pool but it's is not set yet");
-
- LOG.info("Sending email in asynchronous to: "+recipients+", cc: "+cc);
- Future future = this.executorPool.submit(thread);
- try {
- future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
- LOG.info(String.format("Successfully send email to %s", recipients));
- } catch (InterruptedException | ExecutionException e) {
- LOG.error(String.format("Failed to send email to %s, due to:%s",recipients,e),e);
- } catch (TimeoutException e) {
- LOG.error(String.format("Failed to send email to %s due to timeout exception, max timeout: %s ms ",recipients, MAX_TIMEOUT_MS),e);
- }
- }
-
- public String getTplFile() {
- return tplFile;
- }
-
- public void setTplFile(String tplFile) {
- this.tplFile = tplFile;
- }
-
- public String getSender() {
- return sender;
- }
-
- public void setSender(String sender) {
- this.sender = sender;
- }
-
- public String getRecipients() {
- return recipients;
- }
-
- public void setRecipients(String recipients) {
- this.recipients = recipients;
- }
-
- public String getSubject() {
- return subject;
- }
-
- public void setSubject(String subject) {
- this.subject = subject;
- }
-
- public ConfigObject getEagleProps() {
- return eagleProps;
- }
-
- public void setEagleProps(ConfigObject eagleProps) {
- this.eagleProps = eagleProps;
- }
-
- public void setExecutorPool(ThreadPoolExecutor executorPool) {
- this.executorPool = executorPool;
- }
-}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGeneratorBuilder.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGeneratorBuilder.java
deleted file mode 100644
index 0303476..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGeneratorBuilder.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.notification;
-
-import java.util.concurrent.ThreadPoolExecutor;
-
-import com.typesafe.config.ConfigObject;
-
-public class AlertEmailGeneratorBuilder {
- private AlertEmailGenerator generator;
- private AlertEmailGeneratorBuilder(){
- generator = new AlertEmailGenerator();
- }
- public static AlertEmailGeneratorBuilder newBuilder(){
- return new AlertEmailGeneratorBuilder();
- }
- public AlertEmailGeneratorBuilder withSubject(String subject){
- generator.setSubject(subject);
- return this;
- }
- public AlertEmailGeneratorBuilder withSender(String sender){
- generator.setSender(sender);
- return this;
- }
- public AlertEmailGeneratorBuilder withRecipients(String recipients){
- generator.setRecipients(recipients);
- return this;
- }
- public AlertEmailGeneratorBuilder withTplFile(String tplFile){
- generator.setTplFile(tplFile);
- return this;
- }
- public AlertEmailGeneratorBuilder withEagleProps(ConfigObject eagleProps) {
- generator.setEagleProps(eagleProps);
- return this;
- }
- public AlertEmailGeneratorBuilder withExecutorPool(ThreadPoolExecutor threadPoolExecutor) {
- generator.setExecutorPool(threadPoolExecutor);
- return this;
- }
-
- public AlertEmailGenerator build(){
- return this.generator;
- }
-}
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
index 7200865..06ecb0d 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
@@ -16,129 +16,65 @@
*/
package org.apache.eagle.alert.notification;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.eagle.alert.config.EmailNotificationConfig;
+import org.apache.eagle.notification.plugin.NotificationPluginManagerImpl;
import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
import org.apache.eagle.alert.entity.AlertAPIEntity;
import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
import org.apache.eagle.policy.DynamicPolicyLoader;
import org.apache.eagle.policy.PolicyLifecycleMethods;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
import org.apache.eagle.datastream.Collector;
import org.apache.eagle.datastream.JavaStormStreamExecutor1;
import org.apache.eagle.datastream.Tuple1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
import com.typesafe.config.Config;
/**
- * notify alert by email, sms or other means
- * currently we only implements email notification
+ * notify alert by email, kafka message, storage or other means
*/
public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
-
private static final long serialVersionUID = 1690354365435407034L;
private static final Logger LOG = LoggerFactory.getLogger(AlertNotificationExecutor.class);
private Config config;
+ /** Notification Manager - Responsible for forward and invoke configured Notification Plugin **/
+ private NotificationPluginManagerImpl notificationManager;
private List<String> alertExecutorIdList;
- private volatile CopyOnWriteHashMap<String, List<AlertEmailGenerator>> alertEmailGeneratorsMap;
private PolicyDefinitionDAO dao;
- private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
- private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
- private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
-
- private transient ThreadPoolExecutor executorPool;
public AlertNotificationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO dao){
this.alertExecutorIdList = alertExecutorIdList;
this.dao = dao;
}
-
- public List<AlertEmailGenerator> createAlertEmailGenerator(AlertDefinitionAPIEntity alertDef) {
- Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email"));
- EmailNotificationConfig[] emailConfigs = new EmailNotificationConfig[0];
- try {
- emailConfigs = JsonSerDeserUtils.deserialize(alertDef.getNotificationDef(), EmailNotificationConfig[].class, Arrays.asList(module));
+
+ @Override
+ public void init() {
+ String site = config.getString("eagleProps.site");
+ String dataSource = config.getString("eagleProps.dataSource");
+ Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
+ try {
+ initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId( site, dataSource );
}
catch (Exception ex) {
- LOG.warn("Initial emailConfig error, wrong format or it's error " + ex.getMessage());
+ LOG.error("fail to initialize initialAlertDefs: ", ex);
+ throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
}
- List<AlertEmailGenerator> gens = new ArrayList<AlertEmailGenerator>();
- if (emailConfigs == null) {
- return gens;
- }
- for(EmailNotificationConfig emailConfig : emailConfigs) {
- String tplFileName = emailConfig.getTplFileName();
- if (tplFileName == null || tplFileName.equals("")) { // empty tplFileName, use default tpl file name
- tplFileName = "ALERT_DEFAULT.vm";
- }
- AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
- withEagleProps(config.getObject("eagleProps")).
- withSubject(emailConfig.getSubject()).
- withSender(emailConfig.getSender()).
- withRecipients(emailConfig.getRecipients()).
- withTplFile(tplFileName).
- withExecutorPool(executorPool).
- build();
- gens.add(gen);
- }
- return gens;
- }
-
- /**
- * 1. register both file and database configuration
- * 2. create email generator from configuration
- */
- @Override
- public void init(){
- executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
- Map<String, List<AlertEmailGenerator>> tmpEmailGenerators = new HashMap<String, List<AlertEmailGenerator>> ();
-
- String site = config.getString("eagleProps.site");
- String dataSource = config.getString("eagleProps.dataSource");
- Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
- try {
- initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
- }
- catch (Exception ex) {
- LOG.error("fail to initialize initialAlertDefs: ", ex);
- throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
- }
-
- if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
- LOG.warn("No alert definitions found for site: "+site+", dataSource: "+dataSource);
- }
- else {
- for (String alertExecutorId: alertExecutorIdList) {
- if(initialAlertDefs.containsKey(alertExecutorId)) {
- for (AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()) {
- List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
- tmpEmailGenerators.put(alertDef.getTags().get("policyId"), gens);
- }
- }else{
- LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId));
- }
- }
- }
-
- alertEmailGeneratorsMap = new CopyOnWriteHashMap<String, List<AlertEmailGenerator>>();
- alertEmailGeneratorsMap.putAll(tmpEmailGenerators);
+ if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
+ LOG.warn("No alert definitions found for site: "+site+", dataSource: "+dataSource);
+ }
+ try{
+ notificationManager = new NotificationPluginManagerImpl(config);
+ }catch (Exception ex ){
+ LOG.error("Fail to initialize NotificationManager: ", ex);
+ throw new IllegalStateException("Fail to initialize NotificationManager: ", ex);
+ }
+
DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
policyLoader.init(initialAlertDefs, dao, config);
for (String alertExecutorId : alertExecutorIdList) {
@@ -146,32 +82,20 @@
}
}
- @Override
+ @Override
public void prepareConfig(Config config) {
this.config = config;
}
- @Override
- public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
- String policyId = (String) input.get(0);
- AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
- processAlerts(policyId, Arrays.asList(alertEntity));
- }
-
- //TODO: add a thread pool for email sender?
- private void processAlerts(String policyId, List<AlertAPIEntity> list) {
- List<AlertEmailGenerator> generators;
- synchronized(alertEmailGeneratorsMap) {
- generators = alertEmailGeneratorsMap.get(policyId);
- }
- if (generators == null) {
- LOG.warn("Notification config of policyId " + policyId + " has been deleted");
- return;
- }
+ @Override
+ public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
+ AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
+ processAlerts(Arrays.asList(alertEntity));
+ }
+
+ private void processAlerts(List<AlertAPIEntity> list) {
for (AlertAPIEntity entity : list) {
- for(AlertEmailGenerator generator : generators){
- generator.sendAlertEmail(entity);
- }
+ notificationManager.notifyAlert(entity);
}
}
@@ -180,11 +104,8 @@
if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
for(AlertDefinitionAPIEntity alertDef : added.values()){
LOG.info("alert notification config really changed " + alertDef);
- List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
- synchronized(alertEmailGeneratorsMap) {
- alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
- }
- }
+ notificationManager.updateNotificationPlugins( alertDef , false );
+ }
}
@Override
@@ -192,11 +113,8 @@
if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be added : " + changed);
for(AlertDefinitionAPIEntity alertDef : changed.values()){
LOG.info("alert notification config really added " + alertDef);
- List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
- synchronized(alertEmailGeneratorsMap) {
- alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
- }
- }
+ notificationManager.updateNotificationPlugins( alertDef , false );
+ }
}
@Override
@@ -204,9 +122,7 @@
if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be deleted : " + deleted);
for(AlertDefinitionAPIEntity alertDef : deleted.values()){
LOG.info("alert notification config really deleted " + alertDef);
- synchronized(alertEmailGeneratorsMap) {
- alertEmailGeneratorsMap.remove(alertDef.getTags().get("policyId"));
- }
- }
+ notificationManager.updateNotificationPlugins( alertDef , true );
+ }
}
-}
+}
\ No newline at end of file
diff --git a/eagle-core/eagle-alert/pom.xml b/eagle-core/eagle-alert/pom.xml
index 6bd9bdf..43f6675 100644
--- a/eagle-core/eagle-alert/pom.xml
+++ b/eagle-core/eagle-alert/pom.xml
@@ -33,5 +33,6 @@
<module>eagle-alert-base</module>
<module>eagle-alert-process</module>
<module>eagle-alert-service</module>
- </modules>
+ <module>eagle-alert-notification-plugin</module>
+ </modules>
</project>
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
index 0c2732b..0dc9acc 100644
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
@@ -70,11 +70,16 @@
<artifactId>eagle-storage-base</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </dependency>
+<!-- <dependency>
<groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
+ <artifactId>slf4j-api</artifactId>
<scope>test</scope>
</dependency>
+-->
<!--<dependency>-->
<!--<groupId>org.scala-lang</groupId>-->
<!--<artifactId>scala-reflect</artifactId>-->
@@ -150,4 +155,4 @@
</plugin>
</plugins>
</build>
-</project>
\ No newline at end of file
+</project>
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
index 6933a5f..5df7e55 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/core/JsonSerDeserUtils.java
@@ -50,4 +50,5 @@
}
return mapper.readValue(value, cls);
}
+
}
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
index f1aa065..36bcaec 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
@@ -153,6 +153,7 @@
return evalContext.policyId;
}
+ @Override
public Map<String, String> getAdditionalContext() {
return this.context;
}
@@ -166,4 +167,4 @@
@Override
public String getMarkdownReason() { return null; }
-}
\ No newline at end of file
+}
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
index ad518e9..0525126 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
@@ -28,5 +28,6 @@
entitySet.add(AlertStreamEntity.class);
entitySet.add(AlertDataSourceEntity.class);
entitySet.add(AlertExecutorEntity.class);
+ entitySet.add(AlertNotificationEntity.class);
}
}
\ No newline at end of file
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
new file mode 100644
index 0000000..6bc6318
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.apache.eagle.policy.common.Constants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertNotifications")
+@ColumnFamily("f")
+@Prefix("alertNotifications")
+@Service(Constants.ALERT_NOTIFICATION_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"notificationType"})
+public class AlertNotificationEntity extends TaggedLogAPIEntity {
+ @Column("a")
+ private String className;
+ public String getClassName(){
+ return className;
+ }
+ public void setClassName(String className){
+ this.className = className;
+ valueChanged("className");
+ }
+
+ @Column("b")
+ private String description;
+ public String getDescription(){
+ return description;
+ }
+ public void setDescription(String description){
+ this.description = description;
+ valueChanged("description");
+ }
+
+ @Column("c")
+ private boolean enabled;
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ valueChanged("enabled");
+ }
+}
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
index c5d8fea..256e46b 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
@@ -16,8 +16,6 @@
*/
package org.apache.eagle.policy.common;
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
-
public class Constants {
public final static String ALERT_SERVICE_ENDPOINT_NAME = "AlertService";
public final static String ALERT_DEFINITION_SERVICE_ENDPOINT_NAME = "AlertDefinitionService";
@@ -25,6 +23,7 @@
public final static String ALERT_DATA_SOURCE_SERVICE_ENDPOINT_NAME = "AlertDataSourceService";
public final static String ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME = "AlertExecutorService";
public final static String ALERT_STREAM_SERVICE_ENDPOINT_NAME = "AlertStreamService";
+ public final static String ALERT_NOTIFICATION_SERVICE_ENDPOINT_NAME = "AlertNotificationService";
public static final String ALERT_EMAIL_ORIGIN_PROPERTY = "alertEmailOrigin";
public static final String ALERT_TIMESTAMP_PROPERTY = "alertTimestamp";
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
index 982d8c2..6c580f6 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/PolicyProcessExecutor.java
@@ -228,18 +228,15 @@
boolean needValidation = !config.hasPath(needValidationConfigKey) || config.getBoolean(needValidationConfigKey);
AbstractPolicyDefinition policyDef = null;
- try {
- policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class,
- PolicyManager.getInstance().getPolicyModules(policyType));
- } catch (Exception ex) {
- LOG.error("Fail initial alert policy def: "+alertDef.getPolicyDef(), ex);
- }
PolicyEvaluator<T> pe;
- PolicyEvaluationContext<T, K> context = new PolicyEvaluationContext<>();
- context.policyId = alertDef.getTags().get("policyId");
- context.alertExecutor = this;
- context.resultRender = this.getResultRender();
try {
+ policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class,
+ PolicyManager.getInstance().getPolicyModules(policyType));
+
+ PolicyEvaluationContext<T, K> context = new PolicyEvaluationContext<>();
+ context.policyId = alertDef.getTags().get("policyId");
+ context.alertExecutor = this;
+ context.resultRender = this.getResultRender();
// create evaluator instance
pe = (PolicyEvaluator<T>) evalCls
.getConstructor(Config.class, PolicyEvaluationContext.class, AbstractPolicyDefinition.class, String[].class, boolean.class)
diff --git a/eagle-samples/eagle-persist-sample/pom.xml b/eagle-examples/eagle-topology-example/pom.xml
similarity index 89%
rename from eagle-samples/eagle-persist-sample/pom.xml
rename to eagle-examples/eagle-topology-example/pom.xml
index cbfe6d1..8b75d72 100644
--- a/eagle-samples/eagle-persist-sample/pom.xml
+++ b/eagle-examples/eagle-topology-example/pom.xml
@@ -19,13 +19,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>eagle-samples</artifactId>
+ <artifactId>eagle-examples</artifactId>
<groupId>eagle</groupId>
<version>0.3.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>eagle-persist-sample</artifactId>
+ <artifactId>eagle-topology-example</artifactId>
<dependencies>
<dependency>
<groupId>eagle</groupId>
@@ -49,8 +49,8 @@
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
- <descriptor>src/assembly/eagle-persist-sample-assembly.xml</descriptor>
- <finalName>eagle-persist-sample-${project.version}</finalName>
+ <descriptor>src/assembly/eagle-topology-example-assembly.xml</descriptor>
+ <finalName>eagle-topology-example-${project.version}</finalName>
</configuration>
<executions>
<execution>
diff --git a/eagle-samples/eagle-persist-sample/src/assembly/eagle-persist-sample-assembly.xml b/eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml
similarity index 100%
rename from eagle-samples/eagle-persist-sample/src/assembly/eagle-persist-sample-assembly.xml
rename to eagle-examples/eagle-topology-example/src/assembly/eagle-topology-example-assembly.xml
diff --git a/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java
new file mode 100644
index 0000000..a5e39a5
--- /dev/null
+++ b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/notificationplugin/NotificationPluginTestMain.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.example.notificationplugin;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Created on 2/16/16.
+ */
+public class NotificationPluginTestMain {
+ public static void main(String[] args){
+ System.setProperty("config.resource", "/application-plugintest.conf");
+ StormExecutionEnvironment env = ExecutionEnvironments.getStorm();
+ env.fromSpout(createProvider(env.getConfig())).withOutputFields(2).nameAs("testSpout").alertWithConsumer("testStream", "testExecutor");
+ env.execute();
+ }
+
+ public static StormSpoutProvider createProvider(Config config) {
+ return new StormSpoutProvider(){
+
+ @Override
+ public BaseRichSpout getSpout(Config context) {
+ return new TestSpout();
+ }
+ };
+ }
+
+ public static class TestSpout extends BaseRichSpout {
+ private static final Logger LOG = LoggerFactory.getLogger(TestSpout.class);
+ private SpoutOutputCollector collector;
+ public TestSpout() {
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ Utils.sleep(5000);
+ LOG.info("emitted tuple ...");
+ Map<String, Object> map = new TreeMap<>();
+ map.put("testAttribute", "testValue");
+ collector.emit(new Values("testStream", map));
+ }
+ }
+}
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
similarity index 95%
rename from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
rename to eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
index d608104..58dfe48 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/MetricSerializer.java
+++ b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/MetricSerializer.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.persist.test;
+package org.apache.eagle.example.persist;
import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
similarity index 98%
rename from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain.java
rename to eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
index c552576..8f105fc 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain.java
+++ b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.persist.test;
+package org.apache.eagle.example.persist;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
diff --git a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain2.java b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
similarity index 97%
rename from eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain2.java
rename to eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
index 7768385..d9a7bbb 100644
--- a/eagle-samples/eagle-persist-sample/src/main/java/org/apache/eagle/persist/test/PersistTopoTestMain2.java
+++ b/eagle-examples/eagle-topology-example/src/main/java/org/apache/eagle/example/persist/PersistTopoTestMain2.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.eagle.persist.test;
+package org.apache.eagle.example.persist;
import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
import org.apache.eagle.datastream.ExecutionEnvironments;
diff --git a/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh b/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh
new file mode 100644
index 0000000..e9288fa
--- /dev/null
+++ b/eagle-examples/eagle-topology-example/src/main/resources/add-notification-for-plugin-test.sh
@@ -0,0 +1,50 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with`
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CUR_DIR=$(dirname $0)
+source $CUR_DIR/../../../../../eagle-assembly/src/main/bin/eagle-env.sh
+
+##### delete email notification ##########
+echo ""
+echo "Importing policy ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
+ -d '
+ [
+ {
+ "prefix": "alertdef",
+ "tags": {
+ "site": "sandbox",
+ "dataSource": "testSpout",
+ "policyId": "pluginTestPolicy",
+ "alertExecutorId": "testExecutor",
+ "policyType": "siddhiCEPEngine"
+ },
+ "description": "pluginTest",
+ "policyDef": "{\"expression\":\"from testStream[(testAttribute == \\\"testValue\\\")] select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}",
+ "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]",
+ "remediationDef":"",
+ "enabled":true
+ }
+ ]
+ '
+
+## Finished
+echo ""
+echo "Finished initialization for NotificationPluginTest"
+
+exit 0
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf b/eagle-examples/eagle-topology-example/src/main/resources/application-plugintest.conf
similarity index 62%
copy from eagle-samples/eagle-persist-sample/src/main/resources/application.conf
copy to eagle-examples/eagle-topology-example/src/main/resources/application-plugintest.conf
index 6c4d261..f73e7c0 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf
+++ b/eagle-examples/eagle-topology-example/src/main/resources/application-plugintest.conf
@@ -17,52 +17,25 @@
"envContextConfig" : {
"env" : "storm",
"mode" : "local",
- "topologyName" : "persistTestTopology",
+ "topologyName" : "pluginTest",
"stormConfigFile" : "persist-test-storm.yaml",
"parallelismConfig" : {
- "kafkaMsgConsumer" : 1,
- "hdfsAuditLogAlertExecutor*" : 1
+ "testSpout" : 1,
+ "testExecutor*" : 1
}
},
"dataSourceConfig": {
- "topic" : "persist_test_log",
- "zkConnection" : "localhost:2181",
- "zkConnectionTimeoutMS" : 15000,
- "consumerGroupId" : "EagleConsumer",
- "fetchSize" : 1048586,
- "deserializerClass" : "org.apache.eagle.persist.test.MetricSerializer",
- "transactionZKServers" : "localhost",
- "transactionZKPort" : 2181,
- "transactionZKRoot" : "/consumers",
- "transactionStateUpdateMS" : 2000
},
"alertExecutorConfigs" : {
- "hdfsAuditLogAlertExecutor" : {
+ "testExecutor" : {
"parallelism" : 1,
"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
"needValidation" : "true"
}
},
- "persistExecutorConfigs" {
- "persistExecutor1" : {
- "kafka": {
- "bootstrap_servers" : "www.xyz.com",
- "topics" : {
- "defaultOutput" : "downSampleOutput"
- }
- }
- }
- },
- "aggregateExecutorConfigs" : {
- "aggregateExecutor1" : {
- "parallelism" : 1,
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
- "needValidation" : "true"
- }
- },
"eagleProps" : {
"site" : "sandbox",
- "dataSource": "persistTest",
+ "dataSource": "testSpout",
"dataJoinPollIntervalSec" : 30,
"mailHost" : "mailHost.com",
"mailSmtpPort":"25",
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf b/eagle-examples/eagle-topology-example/src/main/resources/application.conf
similarity index 96%
rename from eagle-samples/eagle-persist-sample/src/main/resources/application.conf
rename to eagle-examples/eagle-topology-example/src/main/resources/application.conf
index 6c4d261..aad74b3 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/application.conf
+++ b/eagle-examples/eagle-topology-example/src/main/resources/application.conf
@@ -30,7 +30,7 @@
"zkConnectionTimeoutMS" : 15000,
"consumerGroupId" : "EagleConsumer",
"fetchSize" : 1048586,
- "deserializerClass" : "org.apache.eagle.persist.test.MetricSerializer",
+ "deserializerClass" : "org.apache.eagle.example.persist.MetricSerializer",
"transactionZKServers" : "localhost",
"transactionZKPort" : 2181,
"transactionZKRoot" : "/consumers",
diff --git a/eagle-examples/eagle-topology-example/src/main/resources/create-policy-for-plugin-test.sh b/eagle-examples/eagle-topology-example/src/main/resources/create-policy-for-plugin-test.sh
new file mode 100644
index 0000000..3fd2752
--- /dev/null
+++ b/eagle-examples/eagle-topology-example/src/main/resources/create-policy-for-plugin-test.sh
@@ -0,0 +1,99 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with`
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CUR_DIR=$(dirname $0)
+source $CUR_DIR/../../../../../eagle-assembly/src/main/bin/eagle-env.sh
+
+#####################################################################
+# Import stream metadata
+#####################################################################
+
+## AlertDataSource: data sources bound to sites
+echo "Importing AlertDataSourceService for persist... "
+
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDataSourceService" \
+ -d '[{"prefix":"alertDataSource","tags":{"site" : "sandbox", "dataSource":"testSpout"}, "enabled": "true", "config" : " just some description", "desc":"pluginTest"}]'
+
+
+## AlertStreamService: alert streams generated from data source
+echo ""
+echo "Importing AlertStreamService for HDFS... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
+ -d '[{"prefix":"alertStream","tags":{"dataSource":"testSpout","streamName":"testStream"},"desc":"pluginTest"}]'
+
+## AlertExecutorService: what alert streams are consumed by alert executor
+echo ""
+echo "Importing AlertExecutorService for HDFS... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \
+ -d '[{"prefix":"alertExecutor","tags":{"dataSource":"testSpout","alertExecutorId":"testExecutor","streamName":"testStream"},"desc":"testStream->testExecutor"}]'
+
+## AlertStreamSchemaService: schema for event from alert stream
+echo ""
+echo "Importing AlertStreamSchemaService for HDFS... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \
+ -d '
+ [
+ {
+ "prefix": "alertStreamSchema",
+ "tags": {
+ "dataSource": "testSpout",
+ "streamName": "testStream",
+ "attrName": "testAttribute"
+ },
+ "attrDescription": "testAttribute",
+ "attrType": "string",
+ "category": "",
+ "attrValueResolver": ""
+ }
+ ]
+ '
+
+##### add policies ##########
+echo ""
+echo "Importing policy ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
+ -d '
+ [
+ {
+ "prefix": "alertdef",
+ "tags": {
+ "site": "sandbox",
+ "dataSource": "testSpout",
+ "policyId": "pluginTestPolicy",
+ "alertExecutorId": "testExecutor",
+ "policyType": "siddhiCEPEngine"
+ },
+ "description": "pluginTest",
+ "policyDef": "{\"expression\":\"from testStream[(testAttribute == \\\"testValue\\\")] select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}",
+ "dedupeDef": "{\"alertDedupIntervalMin\":0,\"emailDedupIntervalMin\":1440}",
+ "notificationDef": "[{\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]",
+ "remediationDef":"",
+ "enabled":true
+ }
+ ]
+ '
+
+## Finished
+echo ""
+echo "Finished initialization for NotificationPluginTest"
+
+exit 0
diff --git a/eagle-examples/eagle-topology-example/src/main/resources/delete-email-for-plugin-test.sh b/eagle-examples/eagle-topology-example/src/main/resources/delete-email-for-plugin-test.sh
new file mode 100644
index 0000000..30d277f
--- /dev/null
+++ b/eagle-examples/eagle-topology-example/src/main/resources/delete-email-for-plugin-test.sh
@@ -0,0 +1,50 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with`
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+CUR_DIR=$(dirname $0)
+source $CUR_DIR/../../../../../eagle-assembly/src/main/bin/eagle-env.sh
+
+##### delete email notification ##########
+echo ""
+echo "Importing policy ... "
+curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
+ "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
+ -d '
+ [
+ {
+ "prefix": "alertdef",
+ "tags": {
+ "site": "sandbox",
+ "dataSource": "testSpout",
+ "policyId": "pluginTestPolicy",
+ "alertExecutorId": "testExecutor",
+ "policyType": "siddhiCEPEngine"
+ },
+ "description": "pluginTest",
+ "policyDef": "{\"expression\":\"from testStream[(testAttribute == \\\"testValue\\\")] select * insert into outputStream;\",\"type\":\"siddhiCEPEngine\"}",
+ "notificationDef": "[{\"subject\":\"just for test\",\"sender\":\"nobody@test.com\",\"recipients\":\"nobody@test.com\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]",
+ "remediationDef":"",
+ "enabled":true
+ }
+ ]
+ '
+
+## Finished
+echo ""
+echo "Finished initialization for NotificationPluginTest"
+
+exit 0
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties b/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
similarity index 97%
rename from eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties
rename to eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
index 4a22987..07f8402 100644
--- a/eagle-samples/eagle-persist-sample/src/main/resources/log4j.properties
+++ b/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.rootLogger=DEBUG, stdout, DRFA
+log4j.rootLogger=INFO, stdout, DRFA
eagle.log.dir=./logs
eagle.log.file=eagle.log
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/persist-test-topo-init.sh b/eagle-examples/eagle-topology-example/src/main/resources/persist-test-topo-init.sh
similarity index 100%
rename from eagle-samples/eagle-persist-sample/src/main/resources/persist-test-topo-init.sh
rename to eagle-examples/eagle-topology-example/src/main/resources/persist-test-topo-init.sh
diff --git a/eagle-samples/eagle-persist-sample/src/main/resources/persit-test-storm.yaml b/eagle-examples/eagle-topology-example/src/main/resources/persit-test-storm.yaml
similarity index 100%
rename from eagle-samples/eagle-persist-sample/src/main/resources/persit-test-storm.yaml
rename to eagle-examples/eagle-topology-example/src/main/resources/persit-test-storm.yaml
diff --git a/eagle-samples/pom.xml b/eagle-examples/pom.xml
similarity index 92%
rename from eagle-samples/pom.xml
rename to eagle-examples/pom.xml
index f9fedc1..486ab2d 100644
--- a/eagle-samples/pom.xml
+++ b/eagle-examples/pom.xml
@@ -1,6 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
-<<<<<<< HEAD
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
@@ -26,10 +25,10 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>eagle-samples</artifactId>
+ <artifactId>eagle-examples</artifactId>
<packaging>pom</packaging>
<modules>
- <module>eagle-persist-sample</module>
+ <module>eagle-topology-example</module>
</modules>
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 45adfb4..e1ecac3 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -69,6 +69,11 @@
<artifactId>eagle-hadoop-metric</artifactId>
<version>${project.version}</version>
</dependency>
+<dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-alert-notification-plugin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/pom.xml b/pom.xml
index f2e0128..6bf7abc 100755
--- a/pom.xml
+++ b/pom.xml
@@ -131,7 +131,7 @@
<module>eagle-external</module>
<module>eagle-assembly</module>
<module>eagle-topology-assembly</module>
- <module>eagle-samples</module>
+ <module>eagle-examples</module>
<module>eagle-gc</module>
<module>eagle-hadoop-metric</module>
</modules>
@@ -165,6 +165,12 @@
<hive.version>1.2.1</hive.version>
<spark.core.version>1.4.0</spark.core.version>
+ <!-- Client -->
+ <kafka-client.version>0.9.0.0</kafka-client.version>
+
+ <!-- Reflection -->
+ <reflections.version>0.9.8</reflections.version>
+
<!-- Common Versions -->
<commons-cli.version>1.2</commons-cli.version>
<commons-lang.version>2.6</commons-lang.version>
@@ -193,10 +199,9 @@
<!-- Utility -->
<joda-time.version>2.7</joda-time.version>
<joda-convert.version>1.7</joda-convert.version>
- <!--<log4j.version>1.2.17</log4j.version>-->
<log4j.version>1.2.17</log4j.version>
- <slf4j.version>1.6.5</slf4j.version>
- <log4j-over-slf4j.version>1.6.6</log4j-over-slf4j.version>
+ <slf4j.version>1.7.5</slf4j.version>
+ <log4j-over-slf4j.version>1.7.2</log4j-over-slf4j.version>
<quartz.version>2.2.1</quartz.version>
<scopt.version>3.3.0</scopt.version>
<akka.actor.version>2.3.14</akka.actor.version>
@@ -336,22 +341,17 @@
<version>${slf4j.version}</version>
<scope>compile</scope>
</dependency>
- <!--<dependency>-->
- <!--<groupId>org.slf4j</groupId>-->
- <!--<artifactId>slf4j-log4j12</artifactId>-->
- <!--<version>${slf4j.version}</version>-->
- <!--<scope>compile</scope>-->
- <!--</dependency>-->
<dependency>
<groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- <version>${log4j-over-slf4j.version}</version>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
- <artifactId>slf4j-simple</artifactId>
+ <artifactId>log4j-over-slf4j</artifactId>
<version>${slf4j.version}</version>
- <scope>test</scope>
+ <scope>compile</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
@@ -898,8 +898,6 @@
<exclude>**/.metadata/</exclude>
<!-- Maven working directory -->
<exclude>**/target/**</exclude>
- <exclude>**/*.json</exclude>
- <exclude>**/*.json.*</exclude>
<!-- Patch files which can be lying around -->
<exclude>**/*.patch</exclude>
<exclude>**/*.rej</exclude>
@@ -909,7 +907,9 @@
<exclude>README*</exclude>
<exclude>**/*.log</exclude>
<exclude>**/eagle.log*</exclude>
- <exclude>**/resources/**/*.json</exclude>
+ <exclude>**/velocity.log*</exclude>
+ <!-- all json files should be excluded -->
+ <exclude>**/*.json</exclude>
<exclude>**/resources/eagle.siddhiext</exclude>
<exclude>**/test/resources/securityAuditLog</exclude>
<exclude>**/resources/**/ml-policyDef-UserProfile.txt</exclude>
@@ -918,7 +918,8 @@
<exclude>**/dev-supports/**/*.json</exclude>
<exclude>**/dev-supports/**/useractivity-agg-json.txt</exclude>
<exclude>**/conf/sandbox-userprofile-topology.conf</exclude>
- <exclude>**/hadoop_jmx_collector/**</exclude>
+ <exclude>**/kafka-python/**</exclude>
+ <exclude>**/six/**</exclude>
<!-- Fonts and Images -->
<exclude>**/fonts/**</exclude>
<exclude>**/images/**</exclude>