blob: 9b18606948ea145e3183f62780fecf408c22ecf5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.command.sla;
import java.util.Date;
import org.apache.oozie.XException;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.event.SLAEvent.EventStatus;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.XCommand;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.SLACalcStatus;
import org.apache.oozie.sla.SLASummaryBean;
public abstract class SLAJobEventXCommand extends XCommand<Void> {
private long lockTimeOut = 0 ;
JPAService jpaService = Services.get().get(JPAService.class);
SLACalcStatus slaCalc;
final static String SLA_LOCK_PREFIX = "sla_";
private boolean isEnded = false;
private boolean isEndMiss = false;
public SLAJobEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) {
super("SLA.job.event", "SLA.job.event", 1);
this.slaCalc = slaCalc;
this.lockTimeOut = lockTimeOut;
}
@Override
protected boolean isLockRequired() {
return true;
}
@Override
protected boolean isReQueueRequired() {
return false;
}
@Override
public String getEntityKey() {
return SLA_LOCK_PREFIX + slaCalc.getId();
}
protected long getLockTimeOut() {
return lockTimeOut;
}
@Override
protected void verifyPrecondition() throws CommandException, PreconditionException {
}
@Override
protected Void execute() throws CommandException {
updateJobInfo();
if (isEnded) {
processForEnd();
}
else {
processForRunning();
}
try {
writeToDB();
}
catch (XException e) {
throw new CommandException(e);
}
return null;
}
/**
* Verify job.
*/
protected abstract void updateJobInfo();
/**
* Should alert.
*
* @param slaObj the sla obj
* @return true, if successful
*/
private boolean shouldAlert(SLACalcStatus slaObj) {
return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT);
}
/**
* Queue event.
*
* @param event the event
*/
private void queueEvent(SLACalcStatus event) {
Services.get().get(EventHandlerService.class).queueEvent(event);
}
/**
* Process duration sla.
*
* @param expected the expected
* @param actual the actual
* @param slaCalc the sla calc
*/
private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) {
if (expected != -1) {
if (actual > expected) {
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
}
else if (actual <= expected) {
slaCalc.setEventStatus(EventStatus.DURATION_MET);
}
if (shouldAlert(slaCalc)) {
queueEvent(new SLACalcStatus(slaCalc));
}
}
}
/**
* WriteSLA object to DB.
*
* @throws JPAExecutorException the JPA executor exception
*/
private void writeToDB() throws JPAExecutorException {
byte eventProc = slaCalc.getEventProcessed();
// no more processing, no transfer to history set
if (slaCalc.getEventProcessed() >= 8) {
slaCalc.setEventProcessed(8);
}
SLASummaryBean slaSummaryBean = new SLASummaryBean();
slaSummaryBean.setId(slaCalc.getId());
slaSummaryBean.setEventProcessed(eventProc);
slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
slaSummaryBean.setActualStart(slaCalc.getActualStart());
slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
slaSummaryBean.setLastModifiedTime(new Date());
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
slaSummaryBean);
LOG.debug(" Stored SLA SummaryBean Job [{0}] eventProc = [{1}], status = [{2}]", slaCalc.getId(),
slaCalc.getEventProcessed(), slaCalc.getJobStatus());
}
/**
* Process for end.
*/
private void processForEnd() {
byte eventProc = slaCalc.getEventProcessed();
LOG.debug("Job {0} has ended. endtime = [{1}]", slaCalc.getId(), slaCalc.getActualEnd());
if (isEndMiss()) {
slaCalc.setSLAStatus(SLAStatus.MISS);
}
else {
slaCalc.setSLAStatus(SLAStatus.MET);
}
if (eventProc != 8 && slaCalc.getActualStart() != null) {
if ((eventProc & 1) == 0) {
if (slaCalc.getExpectedStart() != null) {
if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
slaCalc.setEventStatus(EventStatus.START_MISS);
}
else {
slaCalc.setEventStatus(EventStatus.START_MET);
}
if (shouldAlert(slaCalc)) {
queueEvent(new SLACalcStatus(slaCalc));
}
}
}
slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime());
if (((eventProc >> 1) & 1) == 0) {
processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc);
}
}
if (eventProc != 8 && eventProc < 4) {
if (isEndMiss()) {
slaCalc.setEventStatus(EventStatus.END_MISS);
}
else {
slaCalc.setEventStatus(EventStatus.END_MET);
}
if (shouldAlert(slaCalc)) {
queueEvent(new SLACalcStatus(slaCalc));
}
}
slaCalc.setEventProcessed(8);
}
/**
* Process for running.
*/
private void processForRunning() {
byte eventProc = slaCalc.getEventProcessed();
if (eventProc != 8 && slaCalc.getActualStart() != null) {
slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
}
if (eventProc != 8 && (eventProc & 1) == 0) {
if (slaCalc.getExpectedStart() == null) {
eventProc++;
}
else if (slaCalc.getActualStart() != null) {
if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) {
slaCalc.setEventStatus(EventStatus.START_MISS);
}
else {
slaCalc.setEventStatus(EventStatus.START_MET);
}
if (shouldAlert(slaCalc)) {
queueEvent(new SLACalcStatus(slaCalc));
}
eventProc++;
}
else if (slaCalc.getExpectedStart() != null
&& slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) {
slaCalc.setEventStatus(EventStatus.START_MISS);
if (shouldAlert(slaCalc)) {
queueEvent(new SLACalcStatus(slaCalc));
}
eventProc++;
}
}
if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
if (slaCalc.getExpectedDuration() == -1) {
eventProc += 2;
}
else if (slaCalc.getActualStart() != null && slaCalc.getExpectedDuration() != -1) {
if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
if (shouldAlert(slaCalc)) {
queueEvent(new SLACalcStatus(slaCalc));
}
eventProc += 2;
}
}
}
if (eventProc < 4) {
if (slaCalc.getExpectedEnd() != null) {
if (slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
slaCalc.setEventStatus(EventStatus.END_MISS);
slaCalc.setSLAStatus(SLAStatus.MISS);
if (shouldAlert(slaCalc)) {
queueEvent(new SLACalcStatus(slaCalc));
}
eventProc += 4;
}
}
else {
eventProc += 4;
}
}
slaCalc.setEventProcessed(eventProc);
}
public boolean isEnded() {
return isEnded;
}
public void setEnded(boolean isEnded) {
this.isEnded = isEnded;
}
public boolean isEndMiss() {
return isEndMiss;
}
public void setEndMiss(boolean isEndMiss) {
this.isEndMiss = isEndMiss;
}
}