| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| |
| package org.apache.oozie.service; |
| |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.oozie.AppType; |
| import org.apache.oozie.CoordinatorActionBean; |
| import org.apache.oozie.CoordinatorJobBean; |
| import org.apache.oozie.WorkflowJobBean; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.WorkflowJob; |
| import org.apache.oozie.client.event.JobEvent.EventStatus; |
| import org.apache.oozie.client.event.SLAEvent; |
| import org.apache.oozie.client.event.SLAEvent.SLAStatus; |
| import org.apache.oozie.client.rest.JsonBean; |
| import org.apache.oozie.client.rest.RestConstants; |
| import org.apache.oozie.event.EventQueue; |
| import org.apache.oozie.executor.jpa.BatchQueryExecutor; |
| import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; |
| import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; |
| import org.apache.oozie.executor.jpa.JPAExecutorException; |
| import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; |
| import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; |
| import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; |
| import org.apache.oozie.sla.SLACalcStatus; |
| import org.apache.oozie.sla.SLACalculatorMemory; |
| import org.apache.oozie.sla.SLARegistrationBean; |
| import org.apache.oozie.sla.SLASummaryBean; |
| import org.apache.oozie.sla.TestSLAService; |
| import org.apache.oozie.sla.listener.SLAEventListener; |
| import org.apache.oozie.sla.service.SLAService; |
| import org.apache.oozie.test.ZKXTestCase; |
| import org.apache.oozie.util.JobUtils; |
| import org.apache.oozie.util.Pair; |
| import org.apache.oozie.workflow.WorkflowInstance; |
| |
| public class TestHASLAService extends ZKXTestCase { |
| |
| private static StringBuilder output = new StringBuilder(); |
| |
| protected void setUp() throws Exception { |
| Configuration conf = new Configuration(false); |
| conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService," |
| + "org.apache.oozie.sla.service.SLAService"); |
| conf.setClass(EventHandlerService.CONF_LISTENERS, DummySLAEventListener.class, SLAEventListener.class); |
| conf.setLong(SLAService.CONF_JOB_EVENT_LATENCY, 0); |
| // manually do check in this test |
| conf.setInt(SLAService.CONF_SLA_CHECK_INITIAL_DELAY, 100000); |
| conf.setInt(SLAService.CONF_SLA_CHECK_INTERVAL, 100000); |
| conf.setInt(EventHandlerService.CONF_WORKER_THREADS, 0); |
| super.setUp(conf); |
| Services.get().setService(ZKJobsConcurrencyService.class); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| super.tearDown(); |
| } |
| |
| public void testSLAFailOverWithHA() throws Exception { |
| |
| SLAService slas = Services.get().get(SLAService.class); |
| SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator(); |
| EventHandlerService ehs = Services.get().get(EventHandlerService.class); |
| |
| // start another dummy oozie instance (dummy sla and eventhandler |
| // services) |
| DummyZKOozie dummyOozie_1 = null; |
| try { |
| dummyOozie_1 = new DummyZKOozie("a", "http://blah"); |
| DummySLACalculatorMemory dummyCalc = new DummySLACalculatorMemory(); |
| EventHandlerService dummyEhs = new EventHandlerService(); |
| dummyCalc.setEventHandlerService(dummyEhs); |
| dummyEhs.init(Services.get()); |
| dummyCalc.init(Services.get().getConf()); |
| |
| // Case 1 workflow job submitted to dummy server, |
| // but before start running, the dummy server is down |
| WorkflowJobBean wfJob1 = createWorkflow("job-1-W"); |
| SLARegistrationBean sla1 = TestSLAService._createSLARegistration("job-1-W", AppType.WORKFLOW_JOB); |
| sla1.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2 hr before |
| sla1.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 3600 * 1000)); // 1 hr before |
| sla1.setExpectedDuration(10 * 60 * 1000); // 10 mins |
| dummyCalc.addRegistration(sla1.getId(), sla1); |
| dummyCalc.updateAllSlaStatus(); |
| |
| // Case 2. workflow job submitted to dummy server, start running, |
| // then the dummy server is down |
| WorkflowJobBean wfJob2 = createWorkflow("job-2-W"); |
| SLARegistrationBean sla2 = TestSLAService._createSLARegistration("job-2-W", AppType.WORKFLOW_JOB); |
| sla2.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2hr before |
| sla2.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); // 1hr ahead |
| sla2.setExpectedDuration(10 * 60 * 1000); // 10 mins |
| dummyCalc.addRegistration(sla2.getId(), sla2); |
| dummyCalc.addJobStatus(sla2.getId(), WorkflowJob.Status.RUNNING.name(), EventStatus.STARTED, new Date(), |
| new Date()); |
| |
| dummyCalc.updateAllSlaStatus(); |
| dummyEhs.new EventWorker().run(); |
| assertTrue(output.toString().contains(sla2.getId() + " Sla START - MISS!!!")); |
| |
| // suppose dummy Server is down |
| dummyCalc.clear(); |
| dummyCalc = null; |
| dummyOozie_1.teardown(); |
| |
| slaCalcMem.updateAllSlaStatus(); |
| |
| // Job 1 started running on the living server --> start miss |
| slaCalcMem.addJobStatus(sla1.getId(), WorkflowJob.Status.RUNNING.name(), EventStatus.STARTED, new Date(), |
| new Date()); |
| // job 1 is added to slamap of living oozie server |
| assertNotNull(slaCalcMem.get(sla1.getId())); |
| ehs.new EventWorker().run(); |
| assertTrue(output.toString().contains(sla1.getId() + " Sla START - MISS!!!")); |
| |
| wfJob1.setStatus(WorkflowJob.Status.SUCCEEDED); |
| wfJob1.setEndTime(new Date()); |
| wfJob1.setStartTime(new Date()); |
| WorkflowJobQueryExecutor.getInstance().executeUpdate( |
| WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob1); |
| |
| // Job 1 succeeded on the living server --> duration met and end miss |
| slaCalcMem.addJobStatus(sla1.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(), |
| new Date()); |
| ehs.new EventWorker().run(); |
| assertTrue(output.toString().contains(sla1.getId() + " Sla DURATION - MET!!!")); |
| assertTrue(output.toString().contains(sla1.getId() + " Sla END - MISS!!!")); |
| |
| wfJob2.setStatus(WorkflowJob.Status.SUCCEEDED); |
| wfJob2.setEndTime(new Date()); |
| wfJob2.setStartTime(new Date()); |
| WorkflowJobQueryExecutor.getInstance().executeUpdate( |
| WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob2); |
| |
| // Job 2 succeeded on the living server --> duration met and end met |
| slaCalcMem.addJobStatus(sla2.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(), |
| new Date()); |
| // eventProc >= 7(already processed duration/end met), should be removed from slaMap |
| assertNull(slaCalcMem.get(sla2.getId())); |
| ehs.new EventWorker().run(); |
| assertTrue(output.toString().contains(sla2.getId() + " Sla DURATION - MET!!!")); |
| assertTrue(output.toString().contains(sla2.getId() + " Sla END - MET!!!")); |
| } |
| finally { |
| if (dummyOozie_1 != null) { |
| dummyOozie_1.teardown(); |
| } |
| } |
| } |
| |
| public CoordinatorActionBean updateCoordAction(String id, String status) throws JPAExecutorException { |
| CoordinatorActionBean action = new CoordinatorActionBean(); |
| action.setId(id); |
| action.setStatusStr(status); |
| action.setLastModifiedTime(new Date()); |
| CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, |
| action); |
| return action; |
| } |
| |
| public void testSLAUpdateWithHA() throws Exception { |
| |
| String id1 = "0000001-130521183438837-oozie-test-C@1"; |
| String id2 = "0000002-130521183438837-oozie-test-C@1"; |
| String id3 = "0000003-130521183438837-oozie-test-C@1"; |
| String id4 = "0000004-130521183438837-oozie-test-C@1"; |
| String id5 = "0000005-130521183438837-oozie-test-C@1"; |
| String id6 = "0000006-130521183438837-oozie-test-C@1"; |
| Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); // 2 hrs passed |
| Date expectedEndTS1 = new Date(System.currentTimeMillis() + 1 * 3600 * 1000); // 1 hour ahead |
| Date expectedEndTS2 = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // 1 hour passed |
| // Coord Action of jobs 1-4 not started yet |
| createDBEntry(id1, expectedStartTS, expectedEndTS1); |
| createDBEntry(id2, expectedStartTS, expectedEndTS1); |
| createDBEntry(id3, expectedStartTS, expectedEndTS1); |
| createDBEntry(id4, expectedStartTS, expectedEndTS1); |
| // Coord Action of jobs 5-6 already started and currently running (to test history set) |
| createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2, 1); |
| createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2, 1); |
| |
| SLAService slas = Services.get().get(SLAService.class); |
| SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator(); |
| slaCalcMem.init(Services.get().getConf()); |
| slaCalcMem.updateAllSlaStatus(); |
| List<String> slaMapKeys = new ArrayList<String>(); |
| Iterator<String> itr = slaCalcMem.iterator(); |
| while (itr.hasNext()) { |
| slaMapKeys.add(itr.next()); |
| } |
| //4 jobs expected end is not yet reached |
| //2 jobs has end miss, waiting for job to complete |
| assertEquals(4, slaMapKeys.size()); |
| assertEquals(2, slaCalcMem.getHistorySet().size()); |
| |
| DummyZKOozie dummyOozie_1 = null; |
| try { |
| // start another dummy oozie instance (dummy sla and event handler services) |
| dummyOozie_1 = new DummyZKOozie("a", "http://blah"); |
| DummySLACalculatorMemory dummySlaCalcMem = new DummySLACalculatorMemory(); |
| EventHandlerService dummyEhs = new EventHandlerService(); |
| dummySlaCalcMem.setEventHandlerService(dummyEhs); |
| dummyEhs.init(Services.get()); |
| dummySlaCalcMem.init(Services.get().getConf()); |
| dummySlaCalcMem.updateAllSlaStatus(); |
| slaMapKeys = new ArrayList<String>(); |
| itr = dummySlaCalcMem.iterator(); |
| while (itr.hasNext()) { |
| slaMapKeys.add(itr.next()); |
| } |
| assertEquals(4, slaMapKeys.size()); |
| assertEquals(2, dummySlaCalcMem.getHistorySet().size()); |
| // Coord Action 1,3 run and update status on *non-dummy* server |
| updateCoordAction(id1, "RUNNING"); |
| slaCalcMem |
| .addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null); |
| updateCoordAction(id3, "FAILED"); |
| slaCalcMem.addJobStatus(id3, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, null, new Date()); |
| |
| // Coord Action 2,4 run and update status on *dummy* server |
| updateCoordAction(id2, "RUNNING"); |
| dummySlaCalcMem.addJobStatus(id2, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), |
| null); |
| updateCoordAction(id4, "FAILED"); |
| dummySlaCalcMem.addJobStatus(id4, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, null, |
| new Date()); |
| |
| // Both servers iterate SlaMap (updateAllSlaStatus) |
| slaCalcMem.updateAllSlaStatus(); |
| dummySlaCalcMem.updateAllSlaStatus(); |
| |
| // SlaMap on both Servers synced |
| SLACalcStatus sla1_nodummy = slaCalcMem.get(id1); |
| SLACalcStatus sla1_dummy = dummySlaCalcMem.get(id1); |
| SLACalcStatus sla2_nodummy = slaCalcMem.get(id2); |
| SLACalcStatus sla2_dummy = dummySlaCalcMem.get(id2); |
| assertEquals(1, sla1_nodummy.getEventProcessed()); |
| assertEquals(1, sla1_dummy.getEventProcessed()); |
| assertEquals(1, sla2_dummy.getEventProcessed()); |
| assertEquals(1, sla2_nodummy.getEventProcessed()); |
| assertFalse(slaCalcMem.isJobIdInSLAMap(id3)); |
| assertFalse(dummySlaCalcMem.isJobIdInSLAMap(id3)); |
| assertFalse(slaCalcMem.isJobIdInSLAMap(id4)); |
| assertFalse(dummySlaCalcMem.isJobIdInSLAMap(id4)); |
| |
| Byte eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( |
| SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id3); |
| assertEquals(8, eventProc.byteValue()); |
| eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( |
| SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id4); |
| assertEquals(8, eventProc.byteValue()); |
| |
| // Action 5 was processed as END_MISS in updateAllSlaStatus, put into history set |
| assertTrue(slaCalcMem.isJobIdInHistorySet(id5)); |
| assertTrue(dummySlaCalcMem.isJobIdInHistorySet(id6)); |
| // Action 6 was processed as END_MISS in updateAllSlaStatus, put into history set |
| assertTrue(slaCalcMem.isJobIdInHistorySet(id5)); |
| assertTrue(dummySlaCalcMem.isJobIdInHistorySet(id6)); |
| |
| eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( |
| SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id5); |
| assertEquals(7, eventProc.byteValue()); |
| eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( |
| SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id6); |
| assertEquals(7, eventProc.byteValue()); |
| |
| // Action 1 Succeeded on non-dummy server |
| updateCoordAction(id1, "SUCCEEDED"); |
| slaCalcMem.addJobStatus(id1, CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date( |
| System.currentTimeMillis() - 1800 * 1000), new Date()); |
| |
| // Action 2 Succeeded on dummy server |
| updateCoordAction(id2, "SUCCEEDED"); |
| dummySlaCalcMem.addJobStatus(id2, CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date( |
| System.currentTimeMillis() - 1800 * 1000), new Date()); |
| |
| // Both servers iterate SlaMap (updateAllSlaStatus) |
| slaCalcMem.updateAllSlaStatus(); |
| dummySlaCalcMem.updateAllSlaStatus(); |
| |
| // Action 1, 2 are removed from both servers |
| assertNull(slaCalcMem.get(id1)); |
| assertNull(dummySlaCalcMem.get(id1)); |
| assertNull(slaCalcMem.get(id2)); |
| assertNull(dummySlaCalcMem.get(id2)); |
| eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( |
| SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id1); |
| assertEquals(8, eventProc.byteValue()); |
| eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( |
| SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id2); |
| assertEquals(8, eventProc.byteValue()); |
| |
| // Test HistoryPurgeWorker purges Action 5,6 from history set |
| updateCoordAction(id5, "SUCCEEDED"); |
| slaCalcMem.new HistoryPurgeWorker().run(); |
| assertFalse(slaCalcMem.isJobIdInHistorySet(id5)); |
| updateCoordAction(id6, "SUCCEEDED"); |
| dummySlaCalcMem.new HistoryPurgeWorker().run(); |
| assertFalse(dummySlaCalcMem.isJobIdInHistorySet(id6)); |
| |
| } |
| finally { |
| if (dummyOozie_1 != null) { |
| dummyOozie_1.teardown(); |
| } |
| } |
| } |
| |
| public void testNoDuplicateEventsInHA() throws Exception { |
| String id1 = "0000001-130521183438837-oozie-test-C@1"; |
| |
| SLAService slas = Services.get().get(SLAService.class); |
| SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator(); |
| slaCalcMem.init(Services.get().getConf()); // loads the job in sla map |
| |
| |
| EventHandlerService ehs = Services.get().get(EventHandlerService.class); |
| EventQueue ehs_q = ehs.getEventQueue(); |
| |
| DummyZKOozie dummyOozie_1 = null; |
| try { |
| // start another dummy oozie instance (dummy sla and event handler services) |
| dummyOozie_1 = new DummyZKOozie("a", "http://blah"); |
| DummySLACalculatorMemory dummySlaCalcMem = new DummySLACalculatorMemory(); |
| dummySlaCalcMem.init(Services.get().getConf()); |
| EventHandlerService dummyEhs = new EventHandlerService(); |
| dummySlaCalcMem.setEventHandlerService(dummyEhs); |
| dummyEhs.init(Services.get()); |
| EventQueue dummyEhs_q = dummyEhs.getEventQueue(); |
| |
| Date expectedStartTS = new Date(System.currentTimeMillis() + 2 * 3600 * 1000); // get MISS |
| Date expectedEndTS = new Date(System.currentTimeMillis() + 1 * 3600 * 1000); // get MISS |
| SLASummaryBean sla = createDBEntryForStarted(id1, expectedStartTS, expectedEndTS, 0); |
| sla.setExpectedDuration(-1); |
| sla.setLastModifiedTime(new Date()); |
| SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES, |
| sla); |
| |
| // Action started on Server 1 |
| updateCoordAction(id1, "RUNNING"); |
| |
| slaCalcMem |
| .addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null); |
| assertEquals(1, ehs_q.size()); |
| SLACalcStatus s1 = (SLACalcStatus) ehs_q.poll(); |
| |
| assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus()); |
| // Action ended on Server 2 |
| updateCoordAction(id1, "FAILED"); |
| dummySlaCalcMem.addJobStatus(id1, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, new Date( |
| System.currentTimeMillis() - 1800 * 1000), |
| new Date()); |
| SLACalcStatus s2 = (SLACalcStatus) dummyEhs_q.poll(); |
| |
| assertEquals(SLAStatus.MISS, s2.getSLAStatus()); |
| |
| slaCalcMem.updateAllSlaStatus(); |
| dummySlaCalcMem.updateAllSlaStatus(); |
| assertEquals(0, ehs_q.size()); // no dupe event should be created again by Server 1 |
| } |
| finally { |
| if (dummyOozie_1 != null) { |
| dummyOozie_1.teardown(); |
| } |
| } |
| } |
| |
| public void testSLAAlertCommandWithHA() throws Exception { |
| |
| //Test SLA ALERT commands in HA mode. |
| //slaCalcMem1 is for server 1 and slaCalcMem2 is for server2 |
| |
| String id = "0000001-130521183438837-oozie-test-C@1"; |
| Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); // 2 hrs passed |
| Date expectedEndTS1 = new Date(System.currentTimeMillis() + 1 * 3600 * 1000); // 1 hour ahead |
| // Coord Action of jobs 1-4 not started yet |
| createDBEntry(id, expectedStartTS, expectedEndTS1); |
| |
| SLAService slas = Services.get().get(SLAService.class); |
| SLACalculatorMemory slaCalcMem1 = (SLACalculatorMemory) slas.getSLACalculator(); |
| slaCalcMem1.init(Services.get().get(ConfigurationService.class).getConf()); |
| slaCalcMem1.updateAllSlaStatus(); |
| List<String> idList = new ArrayList<String>(); |
| idList.add(id); |
| slaCalcMem1.disableAlert(idList); |
| assertTrue(slaCalcMem1.get(id).getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)); |
| |
| DummyZKOozie dummyOozie_1 = null; |
| try { |
| // start another dummy oozie instance (dummy sla and event handler services) |
| dummyOozie_1 = new DummyZKOozie("a", "http://blah"); |
| DummySLACalculatorMemory slaCalcMem2 = new DummySLACalculatorMemory(); |
| EventHandlerService dummyEhs = new EventHandlerService(); |
| slaCalcMem2.setEventHandlerService(dummyEhs); |
| |
| // So that job sla updated doesn't run automatically |
| Services.get().get(ConfigurationService.class).getConf().setInt(SLAService.CONF_SLA_CHECK_INTERVAL, 100000); |
| Services.get().get(ConfigurationService.class).getConf().setInt(SLAService.CONF_SLA_CHECK_INITIAL_DELAY, 100000); |
| dummyEhs.init(Services.get()); |
| slaCalcMem2.init(Services.get().get(ConfigurationService.class).getConf()); |
| |
| slaCalcMem2.updateAllSlaStatus(); |
| assertTrue(slaCalcMem2.get(id).getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)); |
| |
| String newParams = RestConstants.SLA_MAX_DURATION + "=5"; |
| List<Pair<String, Map<String, String>>> jobIdsSLAPair = new ArrayList<Pair<String, Map<String, String>>>(); |
| jobIdsSLAPair.add(new Pair<String, Map<String, String>>(id, JobUtils.parseChangeValue(newParams))); |
| slaCalcMem1.changeDefinition(jobIdsSLAPair); |
| assertEquals(slaCalcMem1.get(id).getExpectedDuration(), 5 * 60 * 1000); |
| |
| //Before update, default is 10. |
| assertEquals(slaCalcMem2.get(id).getExpectedDuration(), 10 * 60 * 1000); |
| |
| slaCalcMem2.updateAllSlaStatus(); |
| assertEquals(slaCalcMem2.get(id).getExpectedDuration(), 5 * 60 * 1000); |
| |
| newParams = RestConstants.SLA_MAX_DURATION + "=15"; |
| jobIdsSLAPair.clear(); |
| jobIdsSLAPair.add(new Pair<String, Map<String, String>>(id, JobUtils.parseChangeValue(newParams))); |
| slaCalcMem1.changeDefinition(jobIdsSLAPair); |
| |
| // Before update |
| assertEquals(slaCalcMem2.get(id).getExpectedDuration(), 5 * 60 * 1000); |
| slaCalcMem2.updateAllSlaStatus(); |
| assertEquals(slaCalcMem2.get(id).getExpectedDuration(), 15 * 60 * 1000); |
| |
| } |
| finally { |
| if (dummyOozie_1 != null) { |
| dummyOozie_1.teardown(); |
| } |
| } |
| } |
| |
| private void createDBEntry(String actionId, Date expectedStartTS, Date expectedEndTS) throws Exception { |
| ArrayList<JsonBean> insertList = new ArrayList<JsonBean>(); |
| CoordinatorActionBean coordAction = new CoordinatorActionBean(); |
| Date modTime = new Date(System.currentTimeMillis() - 1000 * 3600 * 2); |
| coordAction.setId(actionId); |
| coordAction.setJobId(actionId.split("@", -1)[0]); |
| coordAction.setStatusStr("WAITING"); |
| coordAction.setLastModifiedTime(modTime); |
| |
| CoordinatorJobBean coordJob = new CoordinatorJobBean(); |
| coordJob.setId(actionId.split("@", -1)[0]); |
| coordJob.setUser("dummy"); |
| coordJob.setAppName("dummy"); |
| coordJob.setStatusStr("RUNNING"); |
| coordJob.setAppNamespace("dummy"); |
| |
| SLASummaryBean sla = new SLASummaryBean(); |
| sla.setId(actionId); |
| sla.setAppType(AppType.COORDINATOR_ACTION); |
| sla.setJobStatus("WAITING"); |
| sla.setSLAStatus(SLAStatus.NOT_STARTED); |
| sla.setEventProcessed(0); |
| sla.setLastModifiedTime(modTime); |
| sla.setExpectedStart(expectedStartTS); |
| sla.setExpectedEnd(expectedEndTS); |
| sla.setExpectedDuration(10 * 60 * 1000); |
| SLARegistrationBean reg = new SLARegistrationBean(); |
| reg.setId(actionId); |
| insertList.add(coordAction); |
| insertList.add(coordJob); |
| insertList.add(sla); |
| insertList.add(reg); |
| BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); |
| } |
| |
| private SLASummaryBean createDBEntryForStarted(String actionId, Date expectedStartTS, Date expectedEndTS, |
| int eventProcessed) throws Exception { |
| ArrayList<JsonBean> insertList = new ArrayList<JsonBean>(); |
| Date modTime = new Date(); |
| WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING, |
| actionId); |
| wf.setStatus(wf.getStatus()); |
| wf.setStartTime(expectedStartTS); |
| wf.setLastModifiedTime(modTime); |
| WorkflowJobQueryExecutor.getInstance().executeUpdate( |
| WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wf); |
| |
| CoordinatorActionBean coordAction = new CoordinatorActionBean(); |
| coordAction.setId(actionId); |
| coordAction.setJobId(actionId.split("@", -1)[0]); |
| coordAction.setStatusStr("RUNNING"); |
| coordAction.setLastModifiedTime(modTime); |
| coordAction.setExternalId(wf.getId()); |
| |
| CoordinatorJobBean coordJob = new CoordinatorJobBean(); |
| coordJob.setId(actionId.split("@", -1)[0]); |
| coordJob.setUser("dummy"); |
| coordJob.setAppName("dummy"); |
| coordJob.setStatusStr("RUNNING"); |
| coordJob.setAppNamespace("dummy"); |
| |
| SLASummaryBean sla = new SLASummaryBean(); |
| sla.setId(actionId); |
| sla.setAppType(AppType.COORDINATOR_ACTION); |
| sla.setJobStatus("RUNNING"); |
| sla.setSLAStatus(SLAStatus.IN_PROCESS); |
| sla.setEventProcessed(eventProcessed); |
| sla.setLastModifiedTime(modTime); |
| sla.setExpectedStart(expectedStartTS); |
| sla.setActualStart(expectedStartTS); |
| sla.setExpectedEnd(expectedEndTS); |
| sla.setExpectedDuration(10 * 60 * 1000); |
| SLARegistrationBean reg = new SLARegistrationBean(); |
| reg.setId(actionId); |
| insertList.add(coordAction); |
| |
| insertList.add(coordJob); |
| insertList.add(sla); |
| insertList.add(reg); |
| BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); |
| return sla; |
| } |
| |
| private WorkflowJobBean createWorkflow(String id) throws Exception { |
| List<JsonBean> insertList = new ArrayList<JsonBean>(); |
| WorkflowJobBean workflow = new WorkflowJobBean(); |
| workflow.setId(id); |
| workflow.setStatusStr("PREP"); |
| workflow.setStartTime(new Date()); |
| workflow.setSlaXml("<sla></sla>"); |
| insertList.add(workflow); |
| BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); |
| return workflow; |
| } |
| |
| public static class DummySLAEventListener extends SLAEventListener { |
| |
| @Override |
| public void onStartMet(SLAEvent sla) { |
| output.append(sla.getId() + " Sla START - MET!!!"); |
| } |
| |
| @Override |
| public void onStartMiss(SLAEvent sla) { |
| output.append(sla.getId() + " Sla START - MISS!!!"); |
| } |
| |
| @Override |
| public void onEndMet(SLAEvent sla) { |
| output.append(sla.getId() + " Sla END - MET!!!"); |
| } |
| |
| @Override |
| public void onEndMiss(SLAEvent sla) { |
| output.append(sla.getId() + " Sla END - MISS!!!"); |
| } |
| |
| @Override |
| public void onDurationMet(SLAEvent sla) { |
| output.append(sla.getId() + " Sla DURATION - MET!!!"); |
| } |
| |
| @Override |
| public void onDurationMiss(SLAEvent sla) { |
| output.append(sla.getId() + " Sla DURATION - MISS!!!"); |
| } |
| |
| @Override |
| public void init(Configuration conf) { |
| } |
| |
| @Override |
| public void destroy() { |
| } |
| } |
| } |
| |
| class DummySLACalculatorMemory extends SLACalculatorMemory { |
| public void setEventHandlerService(EventHandlerService ehs) { |
| this.eventHandler = ehs; |
| } |
| } |