blob: f495cf8a1cc5074cdf0635ee9de752e21e169dbf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.eagle.jpm.analyzer.publisher;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.eagle.alert.engine.publisher.PublishConstants;
import org.apache.eagle.app.service.ApplicationEmailService;
import org.apache.eagle.common.DateTimeUtil;
import org.apache.eagle.common.config.EagleConfigConstants;
import org.apache.eagle.common.mail.AlertEmailConstants;
import org.apache.eagle.common.mail.AlertEmailContext;
import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity;
import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity;
import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator;
import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator;
import org.apache.eagle.jpm.analyzer.util.Constants;
import org.apache.eagle.jpm.analyzer.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
import java.util.*;
public class EmailPublisher implements Publisher, Serializable {
private static final Logger LOG = LoggerFactory.getLogger(EmailPublisher.class);
private static final String ALERT_LEVEL_PATH = "application.analyzerReport.alertLevel";
private Config config;
private List<Class<?>> publishResult4Evaluators = new ArrayList<>();
private TimeZone timeZone;
public EmailPublisher(Config config) {
this.config = config;
//TODO
publishResult4Evaluators.add(SLAJobEvaluator.class);
publishResult4Evaluators.add(JobSuggestionEvaluator.class);
timeZone = TimeZone.getTimeZone((config.hasPath(EagleConfigConstants.EAGLE_TIME_ZONE)
? config.getString(EagleConfigConstants.EAGLE_TIME_ZONE)
: EagleConfigConstants.DEFAULT_EAGLE_TIME_ZONE));
}
@Override
//will refactor, just work now
public void publish(AnalyzerEntity analyzerJobEntity, Result result) {
if (!config.hasPath(Constants.ANALYZER_REPORT_CONFIG_PATH)) {
LOG.warn("no email configuration, skip send email");
return;
}
Map<String, List<Result.ProcessorResult>> targetResult = new HashMap<>();
publishResult4Evaluators
.stream()
.filter(item -> result.getAlertMessages().containsKey(item.getName()))
.forEach(item -> targetResult.put(item.getName(), result.getAlertMessages().get(item.getName())));
Map<String, List<Result.ProcessorResult>> extend = new HashMap<>();
Result.ResultLevel serverAlertLevel = getServerAlertLevel();
Result.ResultLevel jobAlertLevel = serverAlertLevel;
for (String evaluator : targetResult.keySet()) {
List<Result.ProcessorResult> alertMessages = new ArrayList<>();
for (Result.ProcessorResult message : targetResult.get(evaluator)) {
if (isHigherAlertLevel(message.getResultLevel(), serverAlertLevel)) {
alertMessages.add(message);
if (isHigherAlertLevel(message.getResultLevel(), jobAlertLevel)) {
jobAlertLevel = message.getResultLevel();
}
LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]",
analyzerJobEntity.getJobId(), message.getMessage(), message.getResultLevel(), evaluator);
}
}
if (!alertMessages.isEmpty()) {
extend.put(evaluator, alertMessages);
}
}
if (extend.size() == 0) {
return;
}
LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId());
Map<String, String> basic = new HashMap<>();
basic.put("site", analyzerJobEntity.getSiteId());
basic.put("name", analyzerJobEntity.getJobDefId());
basic.put("user", analyzerJobEntity.getUserId());
basic.put("status", analyzerJobEntity.getCurrentState());
basic.put("duration", analyzerJobEntity.getDurationTime() * 1.0 / 1000 + "s");
basic.put("start", DateTimeUtil.secondsToHumanDate(analyzerJobEntity.getStartTime() / 1000, timeZone));
basic.put("end", analyzerJobEntity.getEndTime() == 0
? "0"
: DateTimeUtil.secondsToHumanDate(analyzerJobEntity.getEndTime() / 1000, timeZone));
double progress = org.apache.eagle.jpm.util.Constants.JobState.RUNNING.toString().equalsIgnoreCase(analyzerJobEntity.getCurrentState()) ? analyzerJobEntity.getProgress() : 100;
basic.put("progress", progress + "%");
basic.put("detail", getJobLink(analyzerJobEntity));
Map<String, Object> alertData = new HashMap<>();
alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, jobAlertLevel.toString());
alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic);
alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend);
Config cloneConfig = ConfigFactory.empty().withFallback(config);
if (analyzerJobEntity.getUserId() != null) {
List<UserEmailEntity> users = Utils.getUserMail(config, analyzerJobEntity.getSiteId(), analyzerJobEntity.getUserId());
if (users != null && users.size() > 0) {
Map<String, String> additionalConfig = new HashMap<>();
additionalConfig.put(Constants.ANALYZER_REPORT_CONFIG_PATH + "." + AlertEmailConstants.RECIPIENTS, users.get(0).getMailAddress());
cloneConfig = ConfigFactory.parseMap(additionalConfig).withFallback(cloneConfig);
}
}
ApplicationEmailService emailService = new ApplicationEmailService(cloneConfig, Constants.ANALYZER_REPORT_CONFIG_PATH);
String subject = String.format(Constants.ANALYZER_REPORT_SUBJECT, analyzerJobEntity.getJobDefId());
alertData.put(PublishConstants.ALERT_EMAIL_SUBJECT, subject);
AlertEmailContext alertContext = emailService.buildEmailContext(subject);
emailService.onAlert(alertContext, alertData);
}
private String getJobLink(AnalyzerEntity analyzerJobEntity) {
return "http://"
+ config.getString(Constants.HOST_PATH)
+ ":"
+ config.getInt(Constants.PORT_PATH)
+ "/#/site/"
+ analyzerJobEntity.getSiteId()
+ "/jpm/detail/"
+ analyzerJobEntity.getJobId();
}
private Result.ResultLevel getServerAlertLevel() {
Result.ResultLevel alertLevel = Result.ResultLevel.WARNING;
if (config.hasPath(ALERT_LEVEL_PATH)) {
try {
alertLevel = Result.ResultLevel.fromString(config.getString(ALERT_LEVEL_PATH));
} catch (Exception e) {
LOG.warn("invalid alert level config: {}, using WARNING level instead", config.getString(ALERT_LEVEL_PATH));
}
}
return alertLevel;
}
private boolean isHigherAlertLevel(Result.ResultLevel level, Result.ResultLevel serverLevel) {
return level.ordinal() >= serverLevel.ordinal();
}
}