blob: 7200865f0dd75031328a151a5937a4c68c0d76cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.alert.notification;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.eagle.alert.config.EmailNotificationConfig;
import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
import org.apache.eagle.alert.entity.AlertAPIEntity;
import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
import org.apache.eagle.policy.DynamicPolicyLoader;
import org.apache.eagle.policy.PolicyLifecycleMethods;
import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
import org.apache.eagle.datastream.Collector;
import org.apache.eagle.datastream.JavaStormStreamExecutor1;
import org.apache.eagle.datastream.Tuple1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.sun.jersey.client.impl.CopyOnWriteHashMap;
import com.typesafe.config.Config;
/**
* notify alert by email, sms or other means
* currently we only implements email notification
*/
public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
private static final long serialVersionUID = 1690354365435407034L;
private static final Logger LOG = LoggerFactory.getLogger(AlertNotificationExecutor.class);
private Config config;
private List<String> alertExecutorIdList;
private volatile CopyOnWriteHashMap<String, List<AlertEmailGenerator>> alertEmailGeneratorsMap;
private PolicyDefinitionDAO dao;
private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
private transient ThreadPoolExecutor executorPool;
public AlertNotificationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO dao){
this.alertExecutorIdList = alertExecutorIdList;
this.dao = dao;
}
public List<AlertEmailGenerator> createAlertEmailGenerator(AlertDefinitionAPIEntity alertDef) {
Module module = new SimpleModule("notification").registerSubtypes(new NamedType(EmailNotificationConfig.class, "email"));
EmailNotificationConfig[] emailConfigs = new EmailNotificationConfig[0];
try {
emailConfigs = JsonSerDeserUtils.deserialize(alertDef.getNotificationDef(), EmailNotificationConfig[].class, Arrays.asList(module));
}
catch (Exception ex) {
LOG.warn("Initial emailConfig error, wrong format or it's error " + ex.getMessage());
}
List<AlertEmailGenerator> gens = new ArrayList<AlertEmailGenerator>();
if (emailConfigs == null) {
return gens;
}
for(EmailNotificationConfig emailConfig : emailConfigs) {
String tplFileName = emailConfig.getTplFileName();
if (tplFileName == null || tplFileName.equals("")) { // empty tplFileName, use default tpl file name
tplFileName = "ALERT_DEFAULT.vm";
}
AlertEmailGenerator gen = AlertEmailGeneratorBuilder.newBuilder().
withEagleProps(config.getObject("eagleProps")).
withSubject(emailConfig.getSubject()).
withSender(emailConfig.getSender()).
withRecipients(emailConfig.getRecipients()).
withTplFile(tplFileName).
withExecutorPool(executorPool).
build();
gens.add(gen);
}
return gens;
}
/**
* 1. register both file and database configuration
* 2. create email generator from configuration
*/
@Override
public void init(){
executorPool = new ThreadPoolExecutor(DEFAULT_THREAD_POOL_CORE_SIZE, DEFAULT_THREAD_POOL_MAX_SIZE, DEFAULT_THREAD_POOL_SHRINK_TIME, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
Map<String, List<AlertEmailGenerator>> tmpEmailGenerators = new HashMap<String, List<AlertEmailGenerator>> ();
String site = config.getString("eagleProps.site");
String dataSource = config.getString("eagleProps.dataSource");
Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
try {
initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
}
catch (Exception ex) {
LOG.error("fail to initialize initialAlertDefs: ", ex);
throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
}
if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
LOG.warn("No alert definitions found for site: "+site+", dataSource: "+dataSource);
}
else {
for (String alertExecutorId: alertExecutorIdList) {
if(initialAlertDefs.containsKey(alertExecutorId)) {
for (AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()) {
List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
tmpEmailGenerators.put(alertDef.getTags().get("policyId"), gens);
}
}else{
LOG.info(String.format("No alert definitions found for site: %s, dataSource: %s, alertExecutorId: %s",site,dataSource,alertExecutorId));
}
}
}
alertEmailGeneratorsMap = new CopyOnWriteHashMap<String, List<AlertEmailGenerator>>();
alertEmailGeneratorsMap.putAll(tmpEmailGenerators);
DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
policyLoader.init(initialAlertDefs, dao, config);
for (String alertExecutorId : alertExecutorIdList) {
policyLoader.addPolicyChangeListener(alertExecutorId, this);
}
}
@Override
public void prepareConfig(Config config) {
this.config = config;
}
@Override
public void flatMap(java.util.List<Object> input, Collector<Tuple1<String>> outputCollector){
String policyId = (String) input.get(0);
AlertAPIEntity alertEntity = (AlertAPIEntity) input.get(1);
processAlerts(policyId, Arrays.asList(alertEntity));
}
//TODO: add a thread pool for email sender?
private void processAlerts(String policyId, List<AlertAPIEntity> list) {
List<AlertEmailGenerator> generators;
synchronized(alertEmailGeneratorsMap) {
generators = alertEmailGeneratorsMap.get(policyId);
}
if (generators == null) {
LOG.warn("Notification config of policyId " + policyId + " has been deleted");
return;
}
for (AlertAPIEntity entity : list) {
for(AlertEmailGenerator generator : generators){
generator.sendAlertEmail(entity);
}
}
}
@Override
public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
if(LOG.isDebugEnabled()) LOG.debug(" alert notification config changed : " + added);
for(AlertDefinitionAPIEntity alertDef : added.values()){
LOG.info("alert notification config really changed " + alertDef);
List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
synchronized(alertEmailGeneratorsMap) {
alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
}
}
}
@Override
public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be added : " + changed);
for(AlertDefinitionAPIEntity alertDef : changed.values()){
LOG.info("alert notification config really added " + alertDef);
List<AlertEmailGenerator> gens = createAlertEmailGenerator(alertDef);
synchronized(alertEmailGeneratorsMap) {
alertEmailGeneratorsMap.put(alertDef.getTags().get("policyId"), gens);
}
}
}
@Override
public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
if(LOG.isDebugEnabled()) LOG.debug("alert notification config to be deleted : " + deleted);
for(AlertDefinitionAPIEntity alertDef : deleted.values()){
LOG.info("alert notification config really deleted " + alertDef);
synchronized(alertEmailGeneratorsMap) {
alertEmailGeneratorsMap.remove(alertDef.getTags().get("policyId"));
}
}
}
}