| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * <p/> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p/> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.eagle.jpm.analyzer.publisher; |
| |
| import com.typesafe.config.Config; |
| import com.typesafe.config.ConfigFactory; |
| import org.apache.eagle.alert.engine.publisher.PublishConstants; |
| import org.apache.eagle.app.service.ApplicationEmailService; |
| import org.apache.eagle.common.DateTimeUtil; |
| import org.apache.eagle.common.config.EagleConfigConstants; |
| import org.apache.eagle.common.mail.AlertEmailConstants; |
| import org.apache.eagle.common.mail.AlertEmailContext; |
| import org.apache.eagle.jpm.analyzer.meta.model.AnalyzerEntity; |
| import org.apache.eagle.jpm.analyzer.meta.model.UserEmailEntity; |
| import org.apache.eagle.jpm.analyzer.mr.sla.SLAJobEvaluator; |
| import org.apache.eagle.jpm.analyzer.mr.suggestion.JobSuggestionEvaluator; |
| import org.apache.eagle.jpm.analyzer.util.Constants; |
| import org.apache.eagle.jpm.analyzer.util.Utils; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.Serializable; |
| import java.util.*; |
| |
| public class EmailPublisher implements Publisher, Serializable { |
| private static final Logger LOG = LoggerFactory.getLogger(EmailPublisher.class); |
| private static final String ALERT_LEVEL_PATH = "application.analyzerReport.alertLevel"; |
| |
| private Config config; |
| private List<Class<?>> publishResult4Evaluators = new ArrayList<>(); |
| private TimeZone timeZone; |
| |
| public EmailPublisher(Config config) { |
| this.config = config; |
| //TODO |
| publishResult4Evaluators.add(SLAJobEvaluator.class); |
| publishResult4Evaluators.add(JobSuggestionEvaluator.class); |
| |
| timeZone = TimeZone.getTimeZone((config.hasPath(EagleConfigConstants.EAGLE_TIME_ZONE) |
| ? config.getString(EagleConfigConstants.EAGLE_TIME_ZONE) |
| : EagleConfigConstants.DEFAULT_EAGLE_TIME_ZONE)); |
| } |
| |
| @Override |
| //will refactor, just work now |
| public void publish(AnalyzerEntity analyzerJobEntity, Result result) { |
| if (!config.hasPath(Constants.ANALYZER_REPORT_CONFIG_PATH)) { |
| LOG.warn("no email configuration, skip send email"); |
| return; |
| } |
| |
| Map<String, List<Result.ProcessorResult>> targetResult = new HashMap<>(); |
| publishResult4Evaluators |
| .stream() |
| .filter(item -> result.getAlertMessages().containsKey(item.getName())) |
| .forEach(item -> targetResult.put(item.getName(), result.getAlertMessages().get(item.getName()))); |
| |
| Map<String, List<Result.ProcessorResult>> extend = new HashMap<>(); |
| |
| Result.ResultLevel serverAlertLevel = getServerAlertLevel(); |
| Result.ResultLevel jobAlertLevel = serverAlertLevel; |
| for (String evaluator : targetResult.keySet()) { |
| List<Result.ProcessorResult> alertMessages = new ArrayList<>(); |
| for (Result.ProcessorResult message : targetResult.get(evaluator)) { |
| if (isHigherAlertLevel(message.getResultLevel(), serverAlertLevel)) { |
| alertMessages.add(message); |
| |
| if (isHigherAlertLevel(message.getResultLevel(), jobAlertLevel)) { |
| jobAlertLevel = message.getResultLevel(); |
| } |
| |
| LOG.info("Job [{}] Got Message [{}], Level [{}] By Evaluator [{}]", |
| analyzerJobEntity.getJobId(), message.getMessage(), message.getResultLevel(), evaluator); |
| } |
| } |
| if (!alertMessages.isEmpty()) { |
| extend.put(evaluator, alertMessages); |
| } |
| } |
| |
| if (extend.size() == 0) { |
| return; |
| } |
| |
| LOG.info("EmailPublisher gets job {}", analyzerJobEntity.getJobDefId()); |
| |
| Map<String, String> basic = new HashMap<>(); |
| basic.put("site", analyzerJobEntity.getSiteId()); |
| basic.put("name", analyzerJobEntity.getJobDefId()); |
| basic.put("user", analyzerJobEntity.getUserId()); |
| basic.put("status", analyzerJobEntity.getCurrentState()); |
| basic.put("duration", analyzerJobEntity.getDurationTime() * 1.0 / 1000 + "s"); |
| basic.put("start", DateTimeUtil.secondsToHumanDate(analyzerJobEntity.getStartTime() / 1000, timeZone)); |
| basic.put("end", analyzerJobEntity.getEndTime() == 0 |
| ? "0" |
| : DateTimeUtil.secondsToHumanDate(analyzerJobEntity.getEndTime() / 1000, timeZone)); |
| double progress = org.apache.eagle.jpm.util.Constants.JobState.RUNNING.toString().equalsIgnoreCase(analyzerJobEntity.getCurrentState()) ? analyzerJobEntity.getProgress() : 100; |
| basic.put("progress", progress + "%"); |
| basic.put("detail", getJobLink(analyzerJobEntity)); |
| |
| Map<String, Object> alertData = new HashMap<>(); |
| alertData.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, jobAlertLevel.toString()); |
| alertData.put(Constants.ANALYZER_REPORT_DATA_BASIC_KEY, basic); |
| alertData.put(Constants.ANALYZER_REPORT_DATA_EXTEND_KEY, extend); |
| Config cloneConfig = ConfigFactory.empty().withFallback(config); |
| if (analyzerJobEntity.getUserId() != null) { |
| List<UserEmailEntity> users = Utils.getUserMail(config, analyzerJobEntity.getSiteId(), analyzerJobEntity.getUserId()); |
| if (users != null && users.size() > 0) { |
| Map<String, String> additionalConfig = new HashMap<>(); |
| additionalConfig.put(Constants.ANALYZER_REPORT_CONFIG_PATH + "." + AlertEmailConstants.RECIPIENTS, users.get(0).getMailAddress()); |
| cloneConfig = ConfigFactory.parseMap(additionalConfig).withFallback(cloneConfig); |
| } |
| } |
| ApplicationEmailService emailService = new ApplicationEmailService(cloneConfig, Constants.ANALYZER_REPORT_CONFIG_PATH); |
| String subject = String.format(Constants.ANALYZER_REPORT_SUBJECT, analyzerJobEntity.getJobDefId()); |
| alertData.put(PublishConstants.ALERT_EMAIL_SUBJECT, subject); |
| AlertEmailContext alertContext = emailService.buildEmailContext(subject); |
| emailService.onAlert(alertContext, alertData); |
| } |
| |
| private String getJobLink(AnalyzerEntity analyzerJobEntity) { |
| return "http://" |
| + config.getString(Constants.HOST_PATH) |
| + ":" |
| + config.getInt(Constants.PORT_PATH) |
| + "/#/site/" |
| + analyzerJobEntity.getSiteId() |
| + "/jpm/detail/" |
| + analyzerJobEntity.getJobId(); |
| } |
| |
| private Result.ResultLevel getServerAlertLevel() { |
| Result.ResultLevel alertLevel = Result.ResultLevel.WARNING; |
| if (config.hasPath(ALERT_LEVEL_PATH)) { |
| try { |
| alertLevel = Result.ResultLevel.fromString(config.getString(ALERT_LEVEL_PATH)); |
| } catch (Exception e) { |
| LOG.warn("invalid alert level config: {}, using WARNING level instead", config.getString(ALERT_LEVEL_PATH)); |
| } |
| } |
| return alertLevel; |
| } |
| |
| private boolean isHigherAlertLevel(Result.ResultLevel level, Result.ResultLevel serverLevel) { |
| return level.ordinal() >= serverLevel.ordinal(); |
| } |
| } |