blob: 646fe292c0d4a77bafabde796d14145db14dc94e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.sla.service;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.SchedulerService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.SLACalculator;
import org.apache.oozie.sla.SLACalculatorMemory;
import org.apache.oozie.sla.SLARegistrationBean;
import org.apache.oozie.util.Pair;
import org.apache.oozie.util.XLog;
import com.google.common.annotations.VisibleForTesting;
public class SLAService implements Service {
public static final String CONF_PREFIX = "oozie.sla.service.SLAService.";
public static final String CONF_CALCULATOR_IMPL = CONF_PREFIX + "calculator.impl";
public static final String CONF_CAPACITY = CONF_PREFIX + "capacity";
public static final String CONF_ALERT_EVENTS = CONF_PREFIX + "alert.events";
public static final String CONF_EVENTS_MODIFIED_AFTER = CONF_PREFIX + "events.modified.after";
public static final String CONF_JOB_EVENT_LATENCY = CONF_PREFIX + "job.event.latency";
//Time interval, in seconds, at which SLA Worker will be scheduled to run
public static final String CONF_SLA_CHECK_INTERVAL = CONF_PREFIX + "check.interval";
public static final String CONF_SLA_CHECK_INITIAL_DELAY = CONF_PREFIX + "check.initial.delay";
public static final String CONF_SLA_CALC_LOCK_TIMEOUT = CONF_PREFIX + "oozie.sla.calc.default.lock.timeout";
public static final String CONF_SLA_HISTORY_PURGE_INTERVAL = CONF_PREFIX + "history.purge.interval";
public static final String CONF_MAXIMUM_RETRY_COUNT = CONF_PREFIX + "maximum.retry.count";
private static SLACalculator calcImpl;
private static boolean slaEnabled = false;
private EventHandlerService eventHandler;
public static XLog LOG;
@Override
public void init(Services services) throws ServiceException {
try {
Configuration conf = services.getConf();
Class<? extends SLACalculator> calcClazz = (Class<? extends SLACalculator>) ConfigurationService.getClass(
conf, CONF_CALCULATOR_IMPL);
calcImpl = calcClazz == null ? new SLACalculatorMemory() : (SLACalculator) calcClazz.newInstance();
calcImpl.init(conf);
eventHandler = Services.get().get(EventHandlerService.class);
if (eventHandler == null) {
throw new ServiceException(ErrorCode.E0103, "EventHandlerService", "Add it under config "
+ Services.CONF_SERVICE_EXT_CLASSES + " or declare it BEFORE SLAService");
}
LOG = XLog.getLog(getClass());
java.util.Set<String> appTypes = eventHandler.getAppTypes();
appTypes.add("workflow_action");
eventHandler.setAppTypes(appTypes);
Runnable slaThread = new SLAWorker(calcImpl);
// schedule runnable by default every 30 sec
int slaCheckInterval = ConfigurationService.getInt(conf, CONF_SLA_CHECK_INTERVAL);
int slaCheckInitialDelay = ConfigurationService.getInt(conf, CONF_SLA_CHECK_INITIAL_DELAY);
services.get(SchedulerService.class).schedule(slaThread, slaCheckInitialDelay, slaCheckInterval,
SchedulerService.Unit.SEC);
slaEnabled = true;
LOG.info("SLAService initialized with impl [{0}] capacity [{1}]", calcImpl.getClass().getName(),
conf.get(SLAService.CONF_CAPACITY));
}
catch (Exception ex) {
throw new ServiceException(ErrorCode.E0102, ex.getMessage(), ex);
}
}
@Override
public void destroy() {
slaEnabled = false;
}
@Override
public Class<? extends Service> getInterface() {
return SLAService.class;
}
public static boolean isEnabled() {
return slaEnabled;
}
@VisibleForTesting
public SLACalculator getSLACalculator() {
return calcImpl;
}
public void runSLAWorker() {
new SLAWorker(calcImpl).run();
}
@VisibleForTesting
public void startSLAWorker() {
new Thread(new SLAWorker(calcImpl)).start();
}
private class SLAWorker implements Runnable {
SLACalculator calc;
public SLAWorker(SLACalculator calc) {
this.calc = calc;
}
@Override
public void run() {
if (Thread.currentThread().isInterrupted()) {
return;
}
try {
calc.updateAllSlaStatus();
}
catch (Throwable error) {
XLog.getLog(SLAService.class).debug("Throwable in SLAWorker thread run : ", error);
}
}
}
public boolean addRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
try {
if (calcImpl.addRegistration(reg.getId(), reg)) {
return true;
}
else {
LOG.warn("SLA queue full. Unable to add new SLA entry for job [{0}]", reg.getId());
}
}
catch (JPAExecutorException ex) {
LOG.warn("Could not add new SLA entry for job [{0}]", reg.getId(), ex);
}
return false;
}
public boolean updateRegistrationEvent(SLARegistrationBean reg) throws ServiceException {
try {
if (calcImpl.updateRegistration(reg.getId(), reg)) {
return true;
}
else {
LOG.warn("SLA queue full. Unable to update the SLA entry for job [{0}]", reg.getId());
}
}
catch (JPAExecutorException ex) {
LOG.warn("Could not update SLA entry for job [{0}]", reg.getId(), ex);
}
return false;
}
public boolean addStatusEvent(String jobId, String status, EventStatus eventStatus, Date startTime, Date endTime)
throws ServiceException {
try {
if (calcImpl.addJobStatus(jobId, status, eventStatus, startTime, endTime)) {
return true;
}
}
catch (JPAExecutorException jpe) {
LOG.error("Exception while adding SLA Status event for Job [{0}]", jobId);
}
return false;
}
public void removeRegistration(String jobId) {
calcImpl.removeRegistration(jobId);
}
/**
* Enable jobs sla alert.
*
* @param jobIds the job ids
* @return true, if successful
* @throws ServiceException the service exception
*/
public boolean enableAlert(List<String> jobIds) throws ServiceException {
try {
return calcImpl.enableAlert(jobIds);
}
catch (JPAExecutorException jpe) {
LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0));
throw new ServiceException(jpe);
}
}
/**
* Enable child jobs sla alert.
*
* @param parentJobIds the parent job ids
* @return true, if successful
* @throws ServiceException the service exception
*/
public boolean enableChildJobAlert(List<String> parentJobIds) throws ServiceException {
try {
return calcImpl.enableChildJobAlert(parentJobIds);
}
catch (JPAExecutorException jpe) {
LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0));
throw new ServiceException(jpe);
}
}
/**
* Disable jobs Sla alert.
*
* @param jobIds the job ids
* @return true, if successful
* @throws ServiceException the service exception
*/
public boolean disableAlert(List<String> jobIds) throws ServiceException {
try {
return calcImpl.disableAlert(jobIds);
}
catch (JPAExecutorException jpe) {
LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0));
throw new ServiceException(jpe);
}
}
/**
* Disable child jobs Sla alert.
*
* @param parentJobIds the parent job ids
* @return true, if successful
* @throws ServiceException the service exception
*/
public boolean disableChildJobAlert(List<String> parentJobIds) throws ServiceException {
try {
return calcImpl.disableChildJobAlert(parentJobIds);
}
catch (JPAExecutorException jpe) {
LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0));
throw new ServiceException(jpe);
}
}
/**
* Change jobs Sla definitions
* It takes list of pairs of jobid and key/value pairs of el evaluated sla definition.
* Support definition are sla-should-start, sla-should-end, sla-nominal-time and sla-max-duration.
*
* @param idSlaDefinitionList the job ids sla pair
* @return true, if successful
* @throws ServiceException the service exception
*/
public boolean changeDefinition(List<Pair<String, Map<String, String>>> idSlaDefinitionList)
throws ServiceException {
try {
return calcImpl.changeDefinition(idSlaDefinitionList);
}
catch (JPAExecutorException jpe) {
throw new ServiceException(jpe);
}
}
}