blob: 3285153b608661085eb246b1052b43d0771b6736 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.sla;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import org.apache.oozie.AppType;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.client.event.SLAEvent;
import org.apache.oozie.client.event.SLAEvent.SLAStatus;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.executor.jpa.BatchQueryExecutor;
import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.ServiceException;
import org.apache.oozie.service.Services;
import org.apache.oozie.sla.service.SLAService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.JobUtils;
import org.apache.oozie.util.Pair;
import org.apache.oozie.util.db.AlwaysFailingHSQLDriverMapper;
import org.apache.oozie.util.db.FailingHSQLDBDriverWrapper;
import org.apache.oozie.util.db.FailingDBHelperForTest;
import org.apache.oozie.workflow.WorkflowInstance;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
public class TestSLACalculatorMemory extends XDataTestCase {
private Services services;
private JPAService jpaService;
private Instrumentation instrumentation;
@Override
@Before
protected void setUp() throws Exception {
super.setUp();
services = new Services();
Configuration conf = services.get(ConfigurationService.class).getConf();
conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService," +
"org.apache.oozie.sla.service.SLAService," +
"org.apache.oozie.service.InstrumentationService");
conf.setInt(SLAService.CONF_SLA_CHECK_INTERVAL, 600);
services.init();
jpaService = services.get(JPAService.class);
instrumentation = services.get(InstrumentationService.class).get();
}
@Override
@After
protected void tearDown() throws Exception {
services.destroy();
super.tearDown();
}
private void createWorkflow(List<String> idList) throws Exception {
List<JsonBean> insertList = new ArrayList<JsonBean>();
for (String id : idList) {
WorkflowJobBean workflow = new WorkflowJobBean();
workflow.setId(id);
workflow.setStatusStr("PREP");
workflow.setStartTime(new Date());
insertList.add(workflow);
}
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null);
}
@Test
public void testLoadOnRestart() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
String jobId1 = slaRegBean1.getId();
SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2-W", AppType.WORKFLOW_JOB);
String jobId2 = slaRegBean2.getId();
SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3-W", AppType.WORKFLOW_JOB);
String jobId3 = slaRegBean3.getId();
List<String> idList = new ArrayList<String>();
idList.add(slaRegBean1.getId());
idList.add(slaRegBean2.getId());
idList.add(slaRegBean3.getId());
createWorkflow(idList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
slaRegBean1.setAppName("app-name");
slaRegBean1.setExpectedDuration(123);
slaRegBean1.setExpectedEnd(sdf.parse("2012-02-07"));
slaRegBean1.setExpectedStart(sdf.parse("2011-02-07"));
slaRegBean1.setNominalTime(sdf.parse("2012-01-06"));
slaRegBean1.setUser("user");
slaRegBean1.setParentId("parentId");
slaRegBean1.setUpstreamApps("upstreamApps");
slaRegBean1.setNotificationMsg("notificationMsg");
slaRegBean1.setAlertContact("a@abc.com");
slaRegBean1.setAlertEvents("MISS");
slaRegBean1.setJobData("jobData");
Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back
Date endTime = new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000); // 1 hour back
slaRegBean3.setExpectedStart(startTime);
slaRegBean3.setExpectedEnd(endTime);
slaCalcMemory.addRegistration(jobId1, slaRegBean1);
slaCalcMemory.addRegistration(jobId2, slaRegBean2);
slaCalcMemory.addRegistration(jobId3, slaRegBean3);
slaCalcMemory.updateAllSlaStatus();
SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
SLACalcStatus calc2 = slaCalcMemory.get(jobId2);
SLACalcStatus calc3 = slaCalcMemory.get(jobId3);
calc1.setEventStatus(SLAEvent.EventStatus.END_MISS);
calc1.setSLAStatus(SLAEvent.SLAStatus.MISS);
calc1.setJobStatus(WorkflowJob.Status.FAILED.toString());
// set last modified time 5 days back
Date lastModifiedTime = new Date(System.currentTimeMillis() - 5*24*60*60*1000);
calc1.setLastModifiedTime(lastModifiedTime);
List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
WorkflowJobBean wf1 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId1);
wf1.setId(jobId1);
wf1.setStatus(WorkflowJob.Status.SUCCEEDED);
wf1.setStartTime(sdf.parse("2011-03-09"));
wf1.setEndTime(sdf.parse("2011-03-10"));
wf1.setLastModifiedTime(new Date());
WorkflowJobBean wf2 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId2);
wf2.setId(jobId2);
wf2.setStatus(WorkflowJob.Status.RUNNING);
wf2.setStartTime(sdf.parse("2011-03-09"));
wf2.setEndTime(null);
wf2.setLastModifiedTime(new Date());
WorkflowJobBean wf3 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId3);
wf3.setId(jobId3);
wf3.setStatus(WorkflowJob.Status.RUNNING);
wf3.setStartTime(startTime);
wf3.setEndTime(null);
wf3.setLastModifiedTime(new Date());
updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf1));
updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf2));
updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf3));
updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
new SLASummaryBean(calc2)));
updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,
new SLASummaryBean(calc3)));
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null);
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
slaCalcMemory.updateAllSlaStatus();
SLACalcStatus calc = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(
SLASummaryQuery.GET_SLA_SUMMARY, jobId1), SLARegistrationQueryExecutor.getInstance().get(
SLARegQuery.GET_SLA_REG_ON_RESTART, jobId1));
assertEquals("job-1-W", calc.getId());
assertEquals(AppType.WORKFLOW_JOB, calc.getAppType());
assertEquals("app-name", calc.getAppName());
assertEquals(123, calc.getExpectedDuration());
assertEquals(sdf.parse("2012-02-07"), calc.getExpectedEnd());
assertEquals(sdf.parse("2011-02-07"), calc.getExpectedStart());
assertEquals(sdf.parse("2012-01-06"), calc.getNominalTime());
assertEquals("user", calc.getUser());
assertEquals("parentId", calc.getParentId());
assertEquals("upstreamApps", calc.getUpstreamApps());
assertEquals("notificationMsg", calc.getNotificationMsg());
assertEquals("a@abc.com", calc.getAlertContact());
assertEquals("MISS", calc.getAlertEvents());
assertEquals("jobData", calc.getJobData());
assertEquals(sdf.parse("2011-03-09"), calc.getActualStart());
assertEquals(sdf.parse("2011-03-10"), calc.getActualEnd());
assertEquals(SLAEvent.EventStatus.END_MISS, calc1.getEventStatus());
assertEquals(SLAEvent.SLAStatus.MISS, calc1.getSLAStatus());
assertEquals(WorkflowJob.Status.FAILED.toString(), calc1.getJobStatus());
assertEquals(lastModifiedTime, calc1.getLastModifiedTime());
calc2 = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(
SLASummaryQuery.GET_SLA_SUMMARY, jobId2), SLARegistrationQueryExecutor.getInstance().get(
SLARegQuery.GET_SLA_REG_ON_RESTART, jobId2));
assertEquals(8, calc.getEventProcessed());
assertEquals(7, calc2.getEventProcessed());
// jobId2 should be in history set as eventprocessed is 7 (111)
//job3 will be in slamap
assertEquals(1, slaCalcMemory.size()); // 1 out of 3 jobs in map
WorkflowJobBean wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId3);
wf.setId(jobId3);
wf.setStatus(WorkflowJob.Status.SUCCEEDED);
wf.setEndTime(endTime);
wf.setStartTime(startTime);
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wf);
slaCalcMemory.addJobStatus(jobId3, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
startTime, endTime);
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId3);
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(startTime, slaSummary.getActualStart());
assertEquals(endTime, slaSummary.getActualEnd());
assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
}
@Test
public void testWorkflowJobSLAStatusOnRestart() throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
final String jobId1 = slaRegBean1.getId();
slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
slaCalcMemory.addRegistration(jobId1, slaRegBean1);
SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
calc1.setEventProcessed(1);
calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
calc1.setJobStatus(WorkflowJob.Status.RUNNING.name());
calc1.setLastModifiedTime(new Date());
SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL,slaSummaryBean);
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
// Simulate a lost success event
WorkflowJobBean wjb = new WorkflowJobBean();
wjb.setId(jobId1);
wjb.setStatus(WorkflowJob.Status.SUCCEEDED);
wjb.setStartTime(sdf.parse("2012-02-07"));
wjb.setEndTime(sdf.parse("2013-02-07"));
wjb.setLastModifiedTime(new Date());
WorkflowJobQueryExecutor.getInstance().insert(wjb);
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
slaCalcMemory.updateAllSlaStatus();
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
assertEquals("job-1-W", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.WORKFLOW_JOB, slaSummary.getAppType());
assertEquals("SUCCEEDED", slaSummary.getJobStatus());
assertEquals(SLAEvent.SLAStatus.MET, slaSummary.getSLAStatus());
assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(),
slaSummary.getActualDuration());
// Simulate a lost failed event
wjb.setStatus(WorkflowJob.Status.FAILED);
wjb.setLastModifiedTime(new Date());
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wjb);
// Reset the summary Bean
calc1.setEventProcessed(1);
calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
calc1.setJobStatus(WorkflowJob.Status.RUNNING.name());
slaSummaryBean = new SLASummaryBean(calc1);
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummaryBean);
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
slaCalcMemory.updateAllSlaStatus();
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
assertEquals("FAILED", slaSummary.getJobStatus());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus());
// Simulate a lost RUNNING event
wjb.setStatus(WorkflowJob.Status.RUNNING);
wjb.setLastModifiedTime(new Date());
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wjb);
// Reset the summary Bean
calc1.setEventProcessed(0);
calc1.setSLAStatus(SLAEvent.SLAStatus.NOT_STARTED);
calc1.setJobStatus(null);
slaSummaryBean = new SLASummaryBean(calc1);
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummaryBean);
SLAService slaService = Services.get().get(SLAService.class);
slaService.startSLAWorker();
slaService.addStatusEvent(jobId1, "RUNNING", null, null, null);
waitFor(60 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
return SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1)
.getEventProcessed() == 7;
}
});
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, slaSummaryBean.getId());
//since job is already running and it's a old job
assertEquals(7, slaSummary.getEventProcessed());
assertEquals("RUNNING", slaSummary.getJobStatus());
assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
assertNull(slaSummary.getActualEnd());
assertEquals(-1, slaSummary.getActualDuration());
assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus());
}
@Test
public void testWorkflowActionSLAStatusOnRestart() throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
SLARegistrationBean slaRegBean1 = _createSLARegistration("job-W@1", AppType.WORKFLOW_ACTION);
String jobId1 = slaRegBean1.getId();
slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
slaCalcMemory.addRegistration(jobId1, slaRegBean1);
SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
calc1.setEventProcessed(1);
calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
calc1.setJobStatus(WorkflowAction.Status.RUNNING.name());
calc1.setLastModifiedTime(new Date());
SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummaryBean);
// Simulate a lost success event
WorkflowActionBean wab = new WorkflowActionBean();
wab.setId(jobId1);
wab.setStatus(WorkflowAction.Status.OK);
wab.setStartTime(sdf.parse("2012-02-07"));
wab.setEndTime(sdf.parse("2013-02-07"));
WorkflowActionInsertJPAExecutor wfInsertCmd = new WorkflowActionInsertJPAExecutor(wab);
jpaService.execute(wfInsertCmd);
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
slaCalcMemory.updateAllSlaStatus();
assertEquals(0, slaCalcMemory.size());
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
assertEquals("job-W@1", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.WORKFLOW_ACTION, slaSummary.getAppType());
assertEquals("OK", slaSummary.getJobStatus());
assertEquals(SLAEvent.SLAStatus.MET, slaSummary.getSLAStatus());
assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(),
slaSummary.getActualDuration());
}
@Test
public void testCoordinatorActionSLAStatusOnRestart() throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
SLARegistrationBean slaRegBean1 = _createSLARegistration("job-C@1", AppType.COORDINATOR_ACTION);
String jobId1 = slaRegBean1.getId();
slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
slaCalcMemory.addRegistration(jobId1, slaRegBean1);
SLACalcStatus calc1 = slaCalcMemory.get(jobId1);
calc1.setEventProcessed(1);
calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
calc1.setJobStatus(WorkflowAction.Status.RUNNING.name());
calc1.setLastModifiedTime(new Date());
SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummaryBean);
// Simulate a lost failed event
CoordinatorActionBean cab = new CoordinatorActionBean();
cab.setId(jobId1);
cab.setStatus(CoordinatorAction.Status.FAILED);
cab.setLastModifiedTime(sdf.parse("2013-02-07"));
cab.setExternalId("wf_job-W");
CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(cab);
jpaService.execute(caInsertCmd);
WorkflowJobBean wjb = new WorkflowJobBean();
wjb.setId("wf_job-W");
wjb.setStartTime(sdf.parse("2012-02-07"));
wjb.setLastModifiedTime(new Date());
WorkflowJobQueryExecutor.getInstance().insert(wjb);
slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
slaCalcMemory.updateAllSlaStatus();
// As job succeeded, it should not be in memory
assertEquals(0, slaCalcMemory.size());
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
assertEquals("job-C@1", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(AppType.COORDINATOR_ACTION, slaSummary.getAppType());
assertEquals("FAILED", slaSummary.getJobStatus());
assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus());
assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart());
assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd());
assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(),
slaSummary.getActualDuration());
}
@Test
public void testEventMissOnRestart() throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
CoordinatorActionBean coordAction = new CoordinatorActionBean();
coordAction.setId("coordActionId-C@1");
coordAction.setStatus(CoordinatorAction.Status.RUNNING);
coordAction.setLastModifiedTime(sdf.parse("2013-02-07"));
CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(coordAction);
jpaService.execute(caInsertCmd);
CoordinatorActionBean coordAction2 = new CoordinatorActionBean();
coordAction2.setId("coordActionId-C@2");
coordAction2.setStatus(CoordinatorAction.Status.RUNNING);
coordAction2.setLastModifiedTime(sdf.parse("2013-02-07"));
caInsertCmd = new CoordActionInsertJPAExecutor(coordAction2);
jpaService.execute(caInsertCmd);
SLARegistrationBean slaRegBean1 = _createSLARegistration("coordActionId-C@1", AppType.COORDINATOR_ACTION);
String jobId1 = slaRegBean1.getId();
slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07"));
slaRegBean1.setExpectedStart(sdf.parse("2012-03-07"));
slaRegBean1.setExpectedDuration(100000); // long duration;
slaCalcMemory.addRegistration(jobId1, slaRegBean1);
slaCalcMemory.updateAllSlaStatus();
SLARegistrationBean slaRegBean2 = _createSLARegistration("coordActionId-C@2", AppType.COORDINATOR_ACTION);
String jobId2 = slaRegBean2.getId();
slaRegBean2.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); // 1 hour
slaRegBean2.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000)); // 2 hour
slaRegBean2.setExpectedDuration(100000); // long duration;
slaCalcMemory.addRegistration(jobId2, slaRegBean2);
slaCalcMemory.updateAllSlaStatus();
assertEquals(2, slaCalcMemory.size());
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
SLASummaryBean slaSummary2 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId2);
assertEquals("coordActionId-C@1", slaSummary.getId());
assertEquals(5, slaSummary.getEventProcessed());
assertEquals(-1, slaSummary.getActualDuration());
assertEquals("coordActionId-C@2", slaSummary2.getId());
assertEquals(0, slaSummary2.getEventProcessed());
assertEquals(-1, slaSummary2.getActualDuration());
coordAction.setStatusStr("FAILED");
coordAction2.setStatusStr("FAILED");
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction);
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction2);
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
slaCalcMemory.updateAllSlaStatus();
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1);
slaSummary2 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId2);
assertEquals("coordActionId-C@1", slaSummary.getId());
assertEquals(8, slaSummary.getEventProcessed());
assertEquals("coordActionId-C@2", slaSummary2.getId());
assertEquals(8, slaSummary2.getEventProcessed());
}
@Test
public void testSLAEvents1() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 hour
slaRegBean.setExpectedDuration(2 * 3600 * 1000);
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 hour
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(jobId, slaRegBean);
assertEquals(1, slaCalcMemory.size());
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
assertEquals("PREP", slaSummary.getJobStatus());
slaCalcMemory.updateJobSla(jobId);
assertEquals(2, ehs.getEventQueue().size());
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// both start miss and end miss (101)
assertEquals(5, slaSummary.getEventProcessed());
assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
job1.setStatusStr(WorkflowJob.Status.SUSPENDED.toString());
job1.setLastModifiedTime(new Date());
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, job1);
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUSPENDED.toString(), EventStatus.SUSPEND,
sdf.parse("2012-01-01"), null);
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(WorkflowJob.Status.SUSPENDED.toString(), slaSummary.getJobStatus());
assertEquals(5, slaSummary.getEventProcessed());
job1.setStatusStr(WorkflowJob.Status.SUCCEEDED.toString());
job1.setLastModifiedTime(new Date());
job1.setStartTime(sdf.parse("2012-01-01"));
job1.setEndTime(sdf.parse("2012-01-02"));
WorkflowJobQueryExecutor.getInstance().executeUpdate(
WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
assertEquals(3, ehs.getEventQueue().size());
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// All events processed and actual times stored (1000)
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(SLAStatus.MET, slaSummary.getSLAStatus());
assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus());
assertEquals(SLAEvent.EventStatus.DURATION_MISS, slaSummary.getEventStatus());
assertEquals(sdf.parse("2012-01-01").getTime(), slaSummary.getActualStart().getTime());
assertEquals(sdf.parse("2012-01-02").getTime(), slaSummary.getActualEnd().getTime());
assertEquals(sdf.parse("2012-01-02").getTime() - sdf.parse("2012-01-01").getTime(),
slaSummary.getActualDuration());
assertEquals(0, slaCalcMemory.size());
}
@Test
public void testSLAEvents2() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000));
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(jobId, slaRegBean);
assertEquals(1, slaCalcMemory.size());
slaCalcMemory.updateJobSla(jobId);
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// Duration bit should be processed as expected duration is not set
assertEquals(3, slaSummary.getEventProcessed());
// check only start event in queue
assertEquals(1, ehs.getEventQueue().size());
ehs.getEventQueue().clear();
// set back to 1, to make duration event not processed
slaSummary.setEventProcessed(1);
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummary);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
job1.setStatus(WorkflowJob.Status.SUCCEEDED);
job1.setStartTime(sdf.parse("2012-01-01"));
job1.setEndTime(sdf.parse("2012-01-02"));
WorkflowJobQueryExecutor.getInstance().executeUpdate(
WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
sdf.parse("2012-01-01"), sdf.parse("2012-01-02"));
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// all should be processed
assertEquals(8, slaSummary.getEventProcessed());
// check only end event is in queue
assertEquals(1, ehs.getEventQueue().size());
ehs.getEventQueue().clear();
slaSummary.setEventProcessed(1);
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummary);
WorkflowJobBean job2 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
slaRegBean = _createSLARegistration(job2.getId(), AppType.WORKFLOW_JOB);
slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000));
jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(jobId, slaRegBean);
assertEquals(1, slaCalcMemory.size());
job2.setStatus(WorkflowJob.Status.KILLED);
job2.setEndTime(sdf.parse("2012-01-02"));
WorkflowJobQueryExecutor.getInstance().executeUpdate(
WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job2);
slaCalcMemory.addJobStatus(job2.getId(), WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null,
sdf.parse("2012-01-02"));
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// Actual start null, so all events processed
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(1, ehs.getEventQueue().size());
assertNull(slaSummary.getActualStart());
assertEquals(sdf.parse("2012-01-02"), slaSummary.getActualEnd());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus());
}
@Test
public void testDuplicateStartMiss() throws Exception {
// test start-miss
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back
slaRegBean.setExpectedStart(startTime);
slaRegBean.setExpectedDuration(2* 3600 * 1000); //to avoid duration miss
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); // 1 hour ahead
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
slaCalcMemory.updateJobSla(jobId);
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(1, slaSummary.getEventProcessed());
assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus());
job1.setStatus(WorkflowJob.Status.RUNNING);
job1.setStartTime(startTime);
WorkflowJobQueryExecutor.getInstance().executeUpdate(
WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
new Date(System.currentTimeMillis()), null);
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(1, slaSummary.getEventProcessed());
assertEquals(SLAStatus.IN_PROCESS, slaSummary.getSLAStatus());
assertEquals(WorkflowJob.Status.RUNNING.toString(), slaSummary.getJobStatus());
assertEquals(1, ehs.getEventQueue().size());
}
@Test
public void testDuplicateEndMiss() throws Exception {
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
Date startTime = new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000); // 1 hour ahead
slaRegBean.setExpectedStart(startTime);
slaRegBean.setExpectedDuration(3600 * 1000);
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 hour back
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
slaCalcMemory.updateJobSla(jobId);
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
slaRegBean = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId);
assertNotNull(slaRegBean.getCreatedTimestamp());
assertEquals(slaRegBean.getCreatedTimestamp(), slaSummary.getCreatedTimestamp());
// Only end sla should be processed (100)
assertEquals(4, slaSummary.getEventProcessed());
slaCalcMemory.updateJobSla(jobId);
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(4, slaSummary.getEventProcessed());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
job1.setId(job1.getId());
job1.setStatus(WorkflowJob.Status.SUCCEEDED);
job1.setStartTime(new Date(System.currentTimeMillis()));
job1.setEndTime(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
WorkflowJobQueryExecutor.getInstance().executeUpdate(
WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// Only Duration sla should be processed as end is already processed
// (110)
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
// Recieve start event
assertTrue(slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED,
new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)));
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// Start event received so all bits should be processed (111)
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus());
assertEquals(0, slaCalcMemory.size());
assertEquals(3, ehs.getEventQueue().size());
}
public void testSLAHistorySet() throws Exception {
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000);
slaRegBean.setExpectedStart(startTime); // 1 hour back
slaRegBean.setExpectedDuration(1000);
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
slaCalcMemory.updateJobSla(jobId);
job1.setId(job1.getId());
job1.setStatus(WorkflowJob.Status.RUNNING);
job1.setStartTime(new Date(System.currentTimeMillis() - 3600 * 1000));
WorkflowJobQueryExecutor.getInstance().executeUpdate(
WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
System.currentTimeMillis() - 3600 * 1000), null);
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// The actual end times are not stored, but sla's processed so (111)
assertEquals(7, slaSummary.getEventProcessed());
// Moved from map to history set
assertEquals(0, slaCalcMemory.size());
// Add terminal state event so actual end time is stored
job1.setId(job1.getId());
job1.setStatus(WorkflowJob.Status.SUCCEEDED);
job1.setEndTime(new Date(System.currentTimeMillis() - 3600 * 1000));
job1.setStartTime(new Date(System.currentTimeMillis()));
WorkflowJobQueryExecutor.getInstance().executeUpdate(
WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
WorkflowJobQueryExecutor.getInstance().executeUpdate(
WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, new Date(
System.currentTimeMillis() - 3600 * 1000), new Date(System.currentTimeMillis()));
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// The actual times are stored, so event processed(1000)
assertEquals(8, slaSummary.getEventProcessed());
assertEquals(3, ehs.getEventQueue().size());
}
private SLARegistrationBean _createSLARegistration(String jobId, AppType appType) {
SLARegistrationBean bean = new SLARegistrationBean();
bean.setId(jobId);
bean.setAppType(appType);
return bean;
}
public void testHistoryPurge() throws Exception{
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000);
slaRegBean.setExpectedStart(startTime); // 1 hour back
slaRegBean.setExpectedDuration(1000);
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
slaCalcMemory.updateJobSla(jobId);
job1.setStatusStr("RUNNING");
job1.setLastModifiedTime(new Date());
job1.setStartTime(startTime);
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(
System.currentTimeMillis() - 3600 * 1000), null);
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
// The actual end times are not stored, but sla's processed so (111)
assertEquals(7, slaSummary.getEventProcessed());
assertTrue(slaCalcMemory.isJobIdInHistorySet(job1.getId()));
job1.setStatusStr("SUCCEEDED");
job1.setLastModifiedTime(new Date());
job1.setStartTime(startTime);
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1);
slaCalcMemory.new HistoryPurgeWorker().run();
assertFalse(slaCalcMemory.isJobIdInHistorySet(job1.getId()));
}
public void testErrorLoggingWithJobIdPrefix() throws Exception {
SLAService slaService = Services.get().get(SLAService.class);
SLACalculator slaCalculator = slaService.getSLACalculator();
slaCalculator.addRegistration("dummy-id", _createSLARegistration("dummy-id", AppType.WORKFLOW_JOB));
TestLogAppender appender = getTestLogAppender();
Logger logger = Logger.getLogger(SLACalculatorMemory.class);
logger.addAppender(appender);
logger.setLevel(Level.ERROR);
try {
slaService.runSLAWorker();
}
finally {
logger.removeAppender(appender);
}
List<LoggingEvent> log = appender.getLog();
LoggingEvent firstLogEntry = log.get(0);
assertEquals(Level.ERROR, firstLogEntry.getLevel());
assertTrue(firstLogEntry.getMessage().toString().contains("JOB[dummy-id]"));
assertEquals("org.apache.oozie.sla.SLACalculatorMemory", firstLogEntry.getLoggerName());
}
@SuppressWarnings("serial")
public void testDisablingAlertsEvents() throws Exception {
SLAService slaService = Services.get().get(SLAService.class);
EventHandlerService ehs = Services.get().get(EventHandlerService.class);
SLACalculator slaCalculator = slaService.getSLACalculator();
// create dummy sla records and coord action records
String id1 = _setupSlaMap(slaCalculator, "00020-1234567-wrkf-C", 1);
String id2 = _setupSlaMap(slaCalculator, "00020-1234567-wrkf-C", 2);
SLACalcStatus slaCalcObj1 = slaCalculator.get(id1);
assertFalse(slaCalcObj1.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT));
SLACalcStatus slaCalcObj2 = slaCalculator.get(id2);
assertFalse(slaCalcObj2.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT));
slaCalculator.updateAllSlaStatus();
assertTrue(ehs.getEventQueue().size() > 0);
// check that SLACalculator sends no event
ehs.getEventQueue().clear();
SLASummaryBean persistentSla = new SLASummaryBean(slaCalcObj1);
// reset eventProcessed for the sla calc objects
persistentSla.setEventProcessed(0);
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_EVENTPROCESSED,
persistentSla);
persistentSla = new SLASummaryBean(slaCalcObj2);
persistentSla.setEventProcessed(0);
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_EVENTPROCESSED,
persistentSla);
// CASE I : list of sla ids, no new params
slaService.enableChildJobAlert(Arrays.asList(id1, id2));
slaCalculator.updateAllSlaStatus();
assertTrue(ehs.getEventQueue().isEmpty());
// CASE II : ALL
_setupSlaMap(slaCalculator, "00020-1234567-wrkf-C", 3);
_setupSlaMap(slaCalculator, "00020-1234567-wrkf-C", 4);
slaCalculator.enableChildJobAlert(Arrays.asList("00020-1234567-wrkf-C"));
slaCalculator.updateAllSlaStatus();
assertFalse(ehs.getEventQueue().isEmpty());
// CASE III : resume w/ new params
final String id5 = _setupSlaMap(slaCalculator, "00020-1234567-wrkf-C", 5);
Date now = new Date();
now.setTime(now.getTime() - 10 * 60 * 1000);
final String newParams = RestConstants.SLA_NOMINAL_TIME + "=" + DateUtils.formatDateOozieTZ(now) + ";"
+ RestConstants.SLA_SHOULD_END + "=5";
slaCalculator.changeDefinition(new ArrayList<Pair<String,Map<String,String>>>(){
{
add(new Pair<String,Map<String,String>>(id5, JobUtils.parseChangeValue(newParams)));
}
});
slaCalculator.updateAllSlaStatus();
assertTrue(ehs.getEventQueue().size() > 0);
}
private String _setupSlaMap(SLACalculator slaCalculator, String id, int actionNum) throws Exception {
CoordinatorActionBean action = addRecordToCoordActionTable(id, actionNum,
CoordinatorAction.Status.TIMEDOUT, "coord-action-get.xml", 0);
action.setExternalId(null);
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, action);
SLARegistrationBean slaRegBean = _createSLARegistration(action.getId(), AppType.COORDINATOR_ACTION);
Date startTime = new Date(System.currentTimeMillis() - 2 * 3600 * 1000);
slaRegBean.setExpectedStart(startTime); // 2 hours back
slaRegBean.setExpectedDuration(1000);
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 3600 * 1000)); // 1 hr back
slaRegBean.setParentId(id);
slaCalculator.addRegistration(slaRegBean.getId(), slaRegBean);
return action.getId();
}
@Test
public void testEventOutOfOrder() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
SLARegistrationBean slaRegBean = _createSLARegistration(wfJob.getId(), AppType.WORKFLOW_JOB);
Date startTime = new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000); // 1 hour ahead
slaRegBean.setExpectedStart(startTime);
slaRegBean.setExpectedDuration(3600 * 1000);
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 hour back
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
slaCalcMemory.updateJobSla(jobId);
SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
slaRegBean = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId);
assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.RUNNING.toString());
wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
wfJob.setEndTime(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, wfJob);
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString());
slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.SUCCESS,
new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000));
slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId);
assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString());
}
public void testWFEndNotCoord() throws Exception {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
SLARegistrationBean slaRegBean = _createSLARegistration("job-C@1", AppType.COORDINATOR_ACTION);
String coordActionId = slaRegBean.getId();
slaRegBean.setExpectedEnd(sdf.parse("2013-03-07"));
slaRegBean.setExpectedStart(sdf.parse("2012-03-07"));
slaCalcMemory.addRegistration(coordActionId, slaRegBean);
SLACalcStatus calc1 = slaCalcMemory.get(coordActionId);
calc1.setEventProcessed(1);
calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS);
calc1.setJobStatus(WorkflowAction.Status.RUNNING.name());
calc1.setLastModifiedTime(new Date());
SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1);
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummaryBean);
// Simulate a lost failed event
CoordinatorActionBean coordAction = new CoordinatorActionBean();
coordAction.setId(coordActionId);
coordAction.setStatus(CoordinatorAction.Status.RUNNING);
coordAction.setLastModifiedTime(sdf.parse("2013-02-07"));
coordAction.setExternalId("wf_job-W");
CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(coordAction);
jpaService.execute(caInsertCmd);
WorkflowJobBean wjb = new WorkflowJobBean();
wjb.setId("wf_job-W");
wjb.setStartTime(sdf.parse("2012-02-07"));
wjb.setLastModifiedTime(new Date());
wjb.setStatus(WorkflowJob.Status.SUCCEEDED);
WorkflowJobQueryExecutor.getInstance().insert(wjb);
calc1 = slaCalcMemory.get(coordActionId);
slaCalcMemory.updateJobSla(coordActionId);
slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, coordActionId);
//cord action is running and wf job is completed
assertEquals(slaSummaryBean.getJobStatus(), WorkflowInstance.Status.RUNNING.name());
coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED);
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction);
slaCalcMemory.addJobStatus(coordActionId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS,
sdf.parse("2012-02-07"), sdf.parse("2012-03-07"));
slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, coordActionId);
assertEquals(slaSummaryBean.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString());
}
public void testSingleAddUpdateRemoveInstrumentedCorrectly() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB);
Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000);
slaRegBean.setExpectedStart(startTime); // 1 hour back
slaRegBean.setExpectedDuration(1000);
slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000));
String jobId = slaRegBean.getId();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
long slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
get(SLACalculatorMemory.SLA_MAP).getValue();
assertEquals("SLA map size after add should be 1", 1, slaMapSize);
slaCalcMemory.updateJobSla(jobId);
slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
get(SLACalculatorMemory.SLA_MAP).getValue();
assertEquals("SLA map size after update should be 1", 1, slaMapSize);
slaCalcMemory.removeRegistration(jobId);
slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
get(SLACalculatorMemory.SLA_MAP).getValue();
assertEquals("SLA map size after remove should be 0", 0, slaMapSize);
}
public void testAddMultipleRestartRemoveMultipleInstrumentedCorrectly() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
String jobId1 = slaRegBean1.getId();
SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2-W", AppType.WORKFLOW_JOB);
String jobId2 = slaRegBean2.getId();
SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3-W", AppType.WORKFLOW_JOB);
String jobId3 = slaRegBean3.getId();
List<String> idList = new ArrayList<String>();
idList.add(slaRegBean1.getId());
idList.add(slaRegBean2.getId());
idList.add(slaRegBean3.getId());
createWorkflow(idList);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd");
slaRegBean1.setAppName("app-name");
slaRegBean1.setExpectedDuration(10000);
slaRegBean1.setExpectedEnd(sdf.parse("2012-02-07"));
slaRegBean1.setExpectedStart(sdf.parse("2011-02-07"));
slaRegBean1.setNominalTime(sdf.parse("2012-01-06"));
slaRegBean1.setUser("user");
slaRegBean1.setParentId("parentId");
slaRegBean1.setUpstreamApps("upstreamApps");
slaRegBean1.setNotificationMsg("notificationMsg");
slaRegBean1.setAlertContact("a@abc.com");
slaRegBean1.setAlertEvents("MISS");
slaRegBean1.setJobData("jobData");
Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back
Date endTime = new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000); // 1 hour back
slaRegBean3.setExpectedStart(startTime);
slaRegBean3.setExpectedEnd(endTime);
slaCalcMemory.addRegistration(jobId1, slaRegBean1);
slaCalcMemory.addRegistration(jobId2, slaRegBean2);
slaCalcMemory.addRegistration(jobId3, slaRegBean3);
long slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
get(SLACalculatorMemory.SLA_MAP).getValue();
assertEquals("SLA map size after add all should be 3", 3, slaMapSize);
slaCalcMemory.updateAllSlaStatus();
slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
get(SLACalculatorMemory.SLA_MAP).getValue();
assertEquals("SLA map size after update all should be 2. An instance of SLACalcStatus was removed", 2, slaMapSize);
slaCalcMemory.removeRegistration(jobId1);
slaCalcMemory.removeRegistration(jobId2);
slaCalcMemory.removeRegistration(jobId3);
slaMapSize = instrumentation.getCounters().get(SLACalculatorMemory.INSTRUMENTATION_GROUP).
get(SLACalculatorMemory.SLA_MAP).getValue();
assertEquals("SLA map size after remove all should be 0", 0, slaMapSize);
}
public void testWhenSLARegistrationIsAddedBeanIsStoredCorrectly() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
SLARegistrationBean slaRegBean = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
Assert.assertNotNull(slaCalcMemory.get(slaRegBean.getId()));
Assert.assertEquals(slaRegBean, slaCalcMemory.get(slaRegBean.getId()).getSLARegistrationBean());
}
public void testWhenSLARegistrationIsAddedAndAllDBCallsAreDisruptedBeanIsNotStored() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
SLARegistrationBean slaRegBean = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
try {
FailingDBHelperForTest.setDbPredicate(new SLARegistrationInsertUpdatePredicate());
prepareFailingDB();
slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean);
fail("Expected JPAExecutorException not thrown");
} catch (JPAExecutorException ex) {
Assert.assertNull(slaCalcMemory.get(slaRegBean.getId()));
} finally {
FailingDBHelperForTest.resetDbPredicate();
System.clearProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER);
}
}
public void testWhenSLARegistrationIsUpdatedBeanIsStoredCorrectly() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
final String jobId = "job-1-W";
SLARegistrationBean slaRegBean = _createSLARegistration(jobId, AppType.WORKFLOW_JOB);
SLARegistrationBean slaRegBean2 = _createSLARegistration(jobId, AppType.WORKFLOW_JOB);
addAndUpdateRegistration(slaCalcMemory, jobId, slaRegBean, slaRegBean2);
Assert.assertNotNull(slaCalcMemory.get(jobId));
Assert.assertEquals("The updated SLA registration bean should be in the cache",
slaRegBean2, slaCalcMemory.get(jobId).getSLARegistrationBean());
}
public void testWhenSLARegistrationIsUpdatedAndAllDBCallsAreDisruptedBeanIsNotStored() throws Exception {
SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory();
slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf());
final String jobId = "job-1-W";
SLARegistrationBean slaRegBean = _createSLARegistration(jobId, AppType.WORKFLOW_JOB);
SLARegistrationBean slaRegBean2 = _createSLARegistration(jobId, AppType.WORKFLOW_JOB);
final int expectedDuration = 1000;
slaRegBean2.setExpectedDuration(expectedDuration);
try {
addAndUpdateRegistrationWithDBCrushSimulation(slaCalcMemory, jobId, slaRegBean, slaRegBean2);
fail("Expected JPAExecutorException not thrown");
} catch (JPAExecutorException ex) {
Assert.assertNotNull(slaCalcMemory.get(slaRegBean.getId()));
// the update failed
Assert.assertEquals(slaRegBean, slaCalcMemory.get(jobId).getSLARegistrationBean());
} finally {
FailingDBHelperForTest.resetDbPredicate();
System.clearProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER);
}
}
public void testWhenSLARegistrationExistsWithoutSLASummaryUpdateSLARetries() throws Exception {
final SLACalculatorMemory slaCalculatorMemory = new SLACalculatorMemory();
slaCalculatorMemory.init(Services.get().get(ConfigurationService.class).getConf());
final String jobId = "job-1-W";
final SLARegistrationBean slaRegistration = _createSLARegistration(jobId, AppType.WORKFLOW_JOB);
slaCalculatorMemory.addRegistration(jobId, slaRegistration);
updateJobSlaFailing(slaCalculatorMemory, jobId,
new Runnable() {
@Override
public void run() {
try {
Assert.assertNotNull("after first update, SLACalcStatus should still be present",
slaCalculatorMemory.get(slaRegistration.getId()));
Assert.assertEquals("updating SLA_REGISTRATION should have been failed",
slaRegistration,
slaCalculatorMemory.get(jobId).getSLARegistrationBean());
Assert.assertEquals("SLACalcStatus.retryCount should have been increased",
1, slaCalculatorMemory.get(jobId).getRetryCount());
} catch (JPAExecutorException ignored) {
}
}
});
updateJobSlaFailing(slaCalculatorMemory, jobId,
new Runnable() {
@Override
public void run() {
try {
Assert.assertNotNull("after second update, SLACalcStatus should still be present",
slaCalculatorMemory.get(slaRegistration.getId()));
Assert.assertEquals("updating SLA_REGISTRATION should have been failed",
slaRegistration,
slaCalculatorMemory.get(jobId).getSLARegistrationBean());
Assert.assertEquals("SLACalcStatus.retryCount should have been increased",
2, slaCalculatorMemory.get(jobId).getRetryCount());
} catch (JPAExecutorException ignored) {
}
}
}
);
updateJobSlaFailing(slaCalculatorMemory, jobId,
new Runnable() {
@Override
public void run() {
try {
Assert.assertNotNull("after third update, SLACalcStatus should still be present",
slaCalculatorMemory.get(slaRegistration.getId()));
Assert.assertEquals("updating SLA_REGISTRATION should have been failed",
slaRegistration,
slaCalculatorMemory.get(jobId).getSLARegistrationBean());
Assert.assertEquals("SLACalcStatus.retryCount should have been increased",
3, slaCalculatorMemory.get(jobId).getRetryCount());
} catch (JPAExecutorException ignored) {
}
}
}
);
updateJobSlaFailing(slaCalculatorMemory, jobId,
new Runnable() {
@Override
public void run() {
try {
Assert.assertNull("after fourth update, SLACalcStatus should no more be present",
slaCalculatorMemory.get(slaRegistration.getId()));
} catch (JPAExecutorException ignored) {
}
}
}
);
}
private void updateJobSlaFailing(final SLACalculatorMemory slaCalculatorMemory,
final String jobId,
final Runnable assertsWhenFailing)
throws Exception {
try {
FailingDBHelperForTest.setDbPredicate(new SLASummarySelectPredicate(1));
prepareFailingDB();
slaCalculatorMemory.updateJobSla(jobId);
}
catch (final JPAExecutorException e) {
assertsWhenFailing.run();
}
finally {
FailingDBHelperForTest.resetDbPredicate();
System.clearProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER);
}
}
private void addAndUpdateRegistration(final SLACalculatorMemory slaCalcMemory, final String jobId,
final SLARegistrationBean slaRegBean, final SLARegistrationBean slaRegBean2)
throws JPAExecutorException {
slaCalcMemory.addRegistration(jobId, slaRegBean);
slaCalcMemory.updateRegistration(jobId, slaRegBean2);
}
private void addAndUpdateRegistrationWithDBCrushSimulation(final SLACalculatorMemory slaCalcMemory, final String jobId,
final SLARegistrationBean slaRegBean,
final SLARegistrationBean slaRegBean2) throws Exception {
slaCalcMemory.addRegistration(jobId, slaRegBean);
FailingDBHelperForTest.setDbPredicate(new SLARegistrationInsertUpdatePredicate());
prepareFailingDB();
slaCalcMemory.updateRegistration(jobId, slaRegBean2);
}
private void prepareFailingDB() throws ServiceException {
System.setProperty(FailingHSQLDBDriverWrapper.USE_FAILING_DRIVER, Boolean.TRUE.toString());
Configuration conf = services.get(ConfigurationService.class).getConf();
conf.set(JPAService.CONF_DRIVER, AlwaysFailingHSQLDriverMapper.class.getCanonicalName());
conf.setInt(JPAService.MAX_RETRY_COUNT, 2);
jpaService.destroy();
jpaService.init(services);
}
static class SLARegistrationInsertUpdatePredicate extends DmlPredicate {
SLARegistrationInsertUpdatePredicate() {
super("SLA_REGISTRATION", Sets.newHashSet("INSERT INTO ", "UPDATE "));
}
}
static class SLASummarySelectPredicate extends DmlPredicate {
private int remainingSuccessfulAttempts;
SLASummarySelectPredicate(final int remainingSuccessfulAttempts) {
super("SLA_SUMMARY", Sets.newHashSet("SELECT "));
this.remainingSuccessfulAttempts = remainingSuccessfulAttempts;
}
@Override
public boolean test(@Nullable String input) {
if (super.test(input)) {
if (remainingSuccessfulAttempts <= 0) {
return true;
}
else {
remainingSuccessfulAttempts--;
return false;
}
}
else {
return false;
}
}
}
static class DmlPredicate implements java.util.function.Predicate<String> {
private final String tableName;
private final Set<String> operationPrefixes;
DmlPredicate(final String tableName, final Set<String> operationPrefixes) {
this.tableName = tableName;
this.operationPrefixes = operationPrefixes;
}
@Override
public boolean test(@Nullable String input) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(input));
boolean operationMatch = false;
for (String s: operationPrefixes) {
if (input.startsWith(s)) {
operationMatch = true;
break;
}
}
return operationMatch && input.toUpperCase().contains(tableName);
}
}
}