blob: c350f8e2a2be30a7370db86783f1c2f9a42dd372 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.sql.Timestamp;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang3.StringUtils;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.action.email.EmailActionExecutor;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.coord.CoordKillXCommand;
import org.apache.oozie.command.wf.JobXCommand;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.XLog;
/**
* The Abandoned Coord Checker Service check finds out the abandoned coord jobs in system and kills it. A job is
* considered to be abandoned/faulty if total number of actions in failed/timedout/suspended >= limit and there are no
* succeeded action and job start time < job.older.than. Email will not be sent if
* oozie.service.AbandonedCoordCheckerService.email.address is not configured.
*/
public class AbandonedCoordCheckerService implements Service {
private static final String CONF_PREFIX = Service.CONF_PREFIX + "AbandonedCoordCheckerService.";
public static final String TO_ADDRESS = CONF_PREFIX + "email.address";
private static final String CONTENT_TYPE = "text/html";
private static final String SUBJECT = "Abandoned Coordinators report";
public static final String CONF_CHECK_INTERVAL = CONF_PREFIX + "check.interval";
public static final String CONF_CHECK_DELAY = CONF_PREFIX + "check.delay";
public static final String CONF_FAILURE_LEN = CONF_PREFIX + "failure.limit";
public static final String CONF_JOB_OLDER_THAN = CONF_PREFIX + "job.older.than";
public static final String CONF_JOB_KILL = CONF_PREFIX + "kill.jobs";
public static final String OOZIE_BASE_URL = "oozie.base.url";
private static String[] to;
private static String serverURL;
public static class AbandonedCoordCheckerRunnable implements Runnable {
final int failureLimit;
XLog LOG = XLog.getLog(getClass());
private boolean shouldKill = false;
public AbandonedCoordCheckerRunnable(int failureLimit) {
this(failureLimit, false);
}
public AbandonedCoordCheckerRunnable(int failureLimit, boolean shouldKill) {
this.failureLimit = failureLimit;
this.shouldKill = shouldKill;
}
public void run() {
if (!Services.get().get(JobsConcurrencyService.class).isLeader()) {
LOG.info("Server is not primary server. Skipping run");
return;
}
XLog.Info.get().clear();
try {
checkCoordJobs();
}
catch (Exception e) {
LOG.error("Error running AbandonedCoordChecker", e);
}
}
/**
* Check coordinator
* @throws Exception
*/
private void checkCoordJobs() throws Exception {
StringBuilder msg = new StringBuilder();
addTableHeader(msg);
List<CoordinatorJobBean> jobs;
try {
Timestamp createdTS = new Timestamp(System.currentTimeMillis()
- (ConfigurationService.getInt(CONF_JOB_OLDER_THAN) * 60 * 1000));
jobs = CoordJobQueryExecutor.getInstance().getList(CoordJobQuery.GET_COORD_FOR_ABANDONEDCHECK,
failureLimit, createdTS);
for (CoordinatorJobBean job : jobs) {
processJob(job, msg);
}
if (jobs.size() > 0) {
addTableTail(msg);
sendMail(msg.toString());
}
}
catch (JPAExecutorException je) {
throw new CommandException(je);
}
}
private void processJob(CoordinatorJobBean job, StringBuilder msg){
String killStatus = "Coord kill is disabled";
LOG.info("Abandoned Coord found : " + job.getId());
if (shouldKill) {
try {
new CoordKillXCommand(job.getId()).call();
LOG.info("Killed abandoned coord : " + job.getId());
killStatus = "Successful";
}
catch (Exception e) {
LOG.error("Can't kill abandoned coord : " + job.getId(), e);
killStatus = " Failed : " + e.getMessage();
}
}
addCoordToMessage(job, killStatus, msg);
}
public void addCoordToMessage(CoordinatorJobBean job, String killStatus, StringBuilder msg) {
msg.append("<tr>");
msg.append("<td><a href=\"").append(JobXCommand.getJobConsoleUrl(job.getId())).append("\">")
.append(job.getId()).append("</a></td>");
msg.append("<td>").append(job.getAppName()).append("</td>");
msg.append("<td>").append(job.getUser()).append("</td>");
msg.append("<td>").append(job.getGroup()).append("</td>");
msg.append("<td>").append(killStatus).append("</td>");
msg.append("</tr>");
}
public void addTableHeader(StringBuilder msg) {
msg.append("<!DOCTYPE html><html><head><style>table,th,td{border:1px solid black;border-collapse:collapse;}</style>"
+ "</head><body><table>");
msg.append("<tr>");
msg.append("<td>").append("Coordinator id").append("</td>");
msg.append("<td>").append("Coordinator name").append("</td>");
msg.append("<td>").append("User name").append("</td>");
msg.append("<td>").append("Group").append("</td>");
msg.append("<td>").append("Kill Status").append("</td>");
msg.append("</tr>");
}
public void addTableTail(StringBuilder msg) {
msg.append("</table></body></html>");
}
public void sendMail(String body) throws Exception {
if (to == null || to.length == 0 || (to.length == 1 && StringUtils.isEmpty(to[0]))) {
LOG.info(TO_ADDRESS + " is not configured. Not sending email");
return;
}
EmailActionExecutor email = new EmailActionExecutor();
String subject = SUBJECT + " for " + serverURL + " at " + DateUtils.formatDateOozieTZ(new Date());
email.email(to, new String[0], new String[0], subject, body, null, CONTENT_TYPE, null);
}
}
@Override
public void init(Services services) {
to = ConfigurationService.getStrings(TO_ADDRESS);
int failureLen = ConfigurationService.getInt(CONF_FAILURE_LEN);
boolean shouldKill = ConfigurationService.getBoolean(CONF_JOB_KILL);
serverURL = ConfigurationService.get(OOZIE_BASE_URL);
int delay = ConfigurationService.getInt(CONF_CHECK_DELAY);
Runnable actionCheckRunnable = new AbandonedCoordCheckerRunnable(failureLen, shouldKill);
services.get(SchedulerService.class).schedule(actionCheckRunnable, delay,
ConfigurationService.getInt(CONF_CHECK_INTERVAL), SchedulerService.Unit.MIN);
}
@Override
public void destroy() {
}
@Override
public Class<? extends Service> getInterface() {
return AbandonedCoordCheckerService.class;
}
}