| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.oozie.service; |
| |
| import java.util.Date; |
| |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.oozie.BundleActionBean; |
| import org.apache.oozie.BundleJobBean; |
| import org.apache.oozie.CoordinatorActionBean; |
| import org.apache.oozie.CoordinatorJobBean; |
| import org.apache.oozie.WorkflowJobBean; |
| import org.apache.oozie.client.CoordinatorAction; |
| import org.apache.oozie.client.CoordinatorJob; |
| import org.apache.oozie.client.Job; |
| import org.apache.oozie.client.OozieClient; |
| import org.apache.oozie.client.WorkflowJob; |
| import org.apache.oozie.command.bundle.BundleJobResumeXCommand; |
| import org.apache.oozie.command.bundle.BundleJobSuspendXCommand; |
| import org.apache.oozie.command.bundle.BundleKillXCommand; |
| import org.apache.oozie.command.coord.CoordKillXCommand; |
| import org.apache.oozie.command.coord.CoordResumeXCommand; |
| import org.apache.oozie.command.coord.CoordSuspendXCommand; |
| import org.apache.oozie.executor.jpa.BundleActionGetJPAExecutor; |
| import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; |
| import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; |
| import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; |
| import org.apache.oozie.executor.jpa.BundleJobInsertJPAExecutor; |
| import org.apache.oozie.executor.jpa.BundleJobQueryExecutor; |
| import org.apache.oozie.executor.jpa.BundleJobQueryExecutor.BundleJobQuery; |
| import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; |
| import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; |
| import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; |
| import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; |
| import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; |
| import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; |
| import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; |
| import org.apache.oozie.executor.jpa.JPAExecutorException; |
| import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; |
| import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; |
| import org.apache.oozie.lock.LockToken; |
| import org.apache.oozie.service.StatusTransitService.StatusTransitRunnable; |
| import org.apache.oozie.test.XDataTestCase; |
| import org.apache.oozie.util.DateUtils; |
| import org.apache.oozie.util.LockerCoordinator; |
| import org.apache.oozie.util.XLog; |
| import org.apache.oozie.workflow.WorkflowApp; |
| import org.apache.oozie.workflow.WorkflowInstance; |
| import org.apache.oozie.workflow.lite.EndNodeDef; |
| import org.apache.oozie.workflow.lite.LiteWorkflowApp; |
| import org.apache.oozie.workflow.lite.StartNodeDef; |
| |
| public class TestStatusTransitService extends XDataTestCase { |
| private Services services; |
| private String[] excludedServices = { "org.apache.oozie.service.StatusTransitService", |
| "org.apache.oozie.service.PauseTransitService", |
| "org.apache.oozie.service.CoordMaterializeTriggerService", "org.apache.oozie.service.RecoveryService" }; |
| |
| |
| @Override |
| protected void setUp() throws Exception { |
| super.setUp(); |
| services = new Services(); |
| setClassesToBeExcluded(services.getConf(), excludedServices); |
| services.init(); |
| } |
| |
| @Override |
| protected void tearDown() throws Exception { |
| services.destroy(); |
| super.tearDown(); |
| } |
| |
| /** |
| * Tests functionality of the StatusTransitService Runnable command. </p> Insert a coordinator job with RUNNING and |
| * pending true and coordinator actions with pending false. Then, runs the StatusTransitService runnable and ensures |
| * the job status changes to SUCCEEDED. |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceSucceeded() throws Exception { |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, true, 3); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| sleep(1000); |
| |
| JPAService jpaService = Services.get().get(JPAService.class); |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| CoordinatorJobBean coordJob = jpaService.execute(coordGetCmd); |
| assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus()); |
| } |
| |
| /** |
| * Tests functionality of the StatusTransitService Runnable command. </p> Insert a coordinator job with RUNNING and |
| * pending true and coordinator actions with pending false, but one of action is KILLED. |
| * Then, runs the StatusTransitService runnable and ensures the job status changes to DONEWITHERROR. |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceDoneWithError() throws Exception { |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, true, 3); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| sleep(1000); |
| |
| JPAService jpaService = Services.get().get(JPAService.class); |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| CoordinatorJobBean coordJob = jpaService.execute(coordGetCmd); |
| assertEquals(CoordinatorJob.Status.DONEWITHERROR, coordJob.getStatus()); |
| } |
| |
| /** |
| * Tests functionality of the StatusTransitService Runnable command. </p> Insert a coordinator job with RUNNING and |
| * pending true and coordinator actions with pending false, but one of action is KILLED. |
| * Set oozie.service.StatusTransitService.backward.support.for.coord.status=true |
| * and use uri:oozie:coordinator:0.1 namespace, then, runs the StatusTransitService runnable and |
| * ensures the job status stay in RUNNING. |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceNoDoneWithErrorForBackwardSupport() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_COORD_STATUS, "true"); |
| services = new Services(); |
| setClassesToBeExcluded(services.getConf(), excludedServices); |
| services.init(); |
| |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, true, 3); |
| |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| coordJob.setAppNamespace(SchemaService.COORDINATOR_NAMESPACE_URI_1); |
| CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_APPNAMESPACE, coordJob); |
| |
| addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(coordJob.getId(), 2, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(coordJob.getId(), 3, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| sleep(1000); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId()); |
| coordJob = jpaService.execute(coordGetCmd); |
| assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus()); |
| } |
| |
| /** |
| * Tests functionality of the StatusTransitService Runnable command. </p> Insert a coordinator job with RUNNING and |
| * pending false and coordinator actions with pending false. Then, runs the CoordKillXCommand and |
| * StatusTransitService runnable and ensures the job pending changes to false. |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceKilledByUser1() throws Exception { |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, |
| 1); |
| WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| final String wfJobId = wfJob.getId(); |
| CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJobId, "RUNNING", 0); |
| |
| new CoordKillXCommand(coordJob.getId()).call(); |
| |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(wfJobId); |
| WorkflowJobBean wfBean = jpaService.execute(wfGetCmd); |
| return wfBean.getStatusStr().equals("KILLED"); |
| } |
| }); |
| |
| assertNotNull(jpaService); |
| final CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId()); |
| CoordActionGetJPAExecutor coordActionGetCmd = new CoordActionGetJPAExecutor(coordAction.getId()); |
| WorkflowJobGetJPAExecutor wfGetCmd = new WorkflowJobGetJPAExecutor(wfJobId); |
| |
| coordJob = jpaService.execute(coordJobGetCmd); |
| coordAction = jpaService.execute(coordActionGetCmd); |
| wfJob = jpaService.execute(wfGetCmd); |
| assertEquals(CoordinatorJob.Status.KILLED, coordJob.getStatus()); |
| assertEquals(CoordinatorAction.Status.KILLED, coordAction.getStatus()); |
| assertEquals(WorkflowJob.Status.KILLED, wfJob.getStatus()); |
| assertEquals(false, coordAction.isPending()); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| // Status of coordJobBean is being updated asynchronously. |
| // Increasing wait time to atmost 10s to make sure there is |
| // sufficient time for the status to get updated. Thus, resulting |
| // in following assertion not failing. |
| waitFor(10 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJobBean = jpaService.execute(coordJobGetCmd); |
| return !coordJobBean.isPending(); |
| } |
| }); |
| |
| coordJob = jpaService.execute(coordJobGetCmd); |
| assertEquals(false, coordJob.isPending()); |
| } |
| |
| /** |
| * Test : coord job killed by user - pending update to false |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceKilledByUser2() throws Exception { |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, start, end, true, false, 3); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0); |
| |
| final String jobId = job.getId(); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); |
| return coordJob.isPending() == false; |
| } |
| }); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| job = jpaService.execute(coordGetCmd); |
| assertFalse(job.isPending()); |
| } |
| |
| /** |
| * Test : coord job suspended by user and all coord actions are succeeded - pending update to false |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceSuspendedByUser() throws Exception { |
| String currentDateplusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDateplusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDateplusMonth); |
| |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.SUSPENDED, start, end, true, true, 3); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| |
| final String jobId = job.getId(); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| waitFor(10 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); |
| return coordJob.isPending() == false; |
| } |
| }); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| job = jpaService.execute(coordGetCmd); |
| assertFalse(job.isPending()); |
| assertEquals(Job.Status.SUCCEEDED, job.getStatus()); |
| } |
| |
| /** |
| * Test : coord actions suspended and 1 succeeded - check status change to SUSPENDED and pending update to false |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceSuspendedBottomUp() throws Exception { |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, true, 4); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUSPENDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.SUSPENDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.SUSPENDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 4, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| |
| |
| final String jobId = job.getId(); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| // Keeping wait time to 20s to ensure status is updated |
| waitFor(20 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); |
| return coordJob.getStatus() == CoordinatorJob.Status.SUSPENDED; |
| } |
| }); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| job = jpaService.execute(coordGetCmd); |
| assertEquals(CoordinatorJob.Status.SUSPENDED, job.getStatus()); |
| assertFalse(job.isPending()); |
| } |
| |
| /** |
| * Test : all coord actions suspended except one which is killed - check status change to SUSPENDEDWITHERROR |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceSuspendedWithError() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| setClassesToBeExcluded(services.getConf(), excludedServices); |
| services.init(); |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, true, 4); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.SUSPENDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.SUSPENDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 4, CoordinatorAction.Status.SUSPENDED, "coord-action-get.xml", 0); |
| |
| |
| final String jobId = job.getId(); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| // Keeping wait time to 20s to ensure status is updated |
| waitFor(20 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); |
| return coordJob.getStatus() == CoordinatorJob.Status.SUSPENDEDWITHERROR; |
| } |
| }); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| job = jpaService.execute(coordGetCmd); |
| assertEquals(CoordinatorJob.Status.SUSPENDEDWITHERROR, job.getStatus()); |
| assertFalse(job.isPending()); |
| } |
| |
| /** |
| * Test : Suspend and resume a coordinator job which has finished materialization and all actions are succeeded. |
| * </p> |
| * Coordinator job changes to succeeded after resume |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceSuspendAndResume() throws Exception { |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, true, 2); |
| final String coordJobId = coordJob.getId(); |
| |
| final CoordinatorActionBean coordAction1_1 = addRecordToCoordActionTable(coordJobId, 1, |
| CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| final CoordinatorActionBean coordAction1_2 = addRecordToCoordActionTable(coordJobId, 2, |
| CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| |
| this.addRecordToWfJobTable(coordAction1_1.getExternalId(), WorkflowJob.Status.SUCCEEDED, |
| WorkflowInstance.Status.SUCCEEDED); |
| this.addRecordToWfJobTable(coordAction1_2.getExternalId(), WorkflowJob.Status.SUCCEEDED, |
| WorkflowInstance.Status.SUCCEEDED); |
| |
| new CoordSuspendXCommand(coordJobId).call(); |
| |
| CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJobId); |
| coordJob = jpaService.execute(coordJobGetCmd); |
| |
| assertEquals(Job.Status.SUSPENDED, coordJob.getStatus()); |
| |
| sleep(3000); |
| |
| new CoordResumeXCommand(coordJobId).call(); |
| |
| coordJob = jpaService.execute(coordJobGetCmd); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| waitFor(20 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(coordJobId)); |
| return job.getStatus().equals(Job.Status.SUCCEEDED); |
| } |
| }); |
| |
| CoordinatorJobBean coordJob1 = jpaService.execute(new CoordJobGetJPAExecutor(coordJobId)); |
| assertFalse(coordJob1.isPending()); |
| assertEquals(Job.Status.SUCCEEDED, coordJob1.getStatus()); |
| } |
| |
| /** |
| * If you have a PREP coordinator job (with no actions) and you suspend it, it goes into PREPSUSPENDED. |
| * The StatusTransitService should not transition it to RUNNING automatically. This test verifies that the job remains in |
| * PREPSUSPENDED; and transitions back to PREP after a resume command. |
| * |
| * @throws Exception |
| */ |
| public void testFoo() throws Exception { |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, start, end, true, false, 0); |
| final String coordJobId = coordJob.getId(); |
| |
| new CoordSuspendXCommand(coordJobId).call(); |
| |
| CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJobId); |
| coordJob = jpaService.execute(coordJobGetCmd); |
| |
| assertEquals(Job.Status.PREPSUSPENDED, coordJob.getStatus()); |
| |
| new StatusTransitRunnable().run(); |
| |
| CoordinatorJobBean coordJob1 = jpaService.execute(new CoordJobGetJPAExecutor(coordJobId)); |
| assertEquals(Job.Status.PREPSUSPENDED, coordJob1.getStatus()); |
| |
| new CoordResumeXCommand(coordJobId).call(); |
| |
| coordJob = jpaService.execute(coordJobGetCmd); |
| |
| assertEquals(Job.Status.PREP, coordJob.getStatus()); |
| |
| new StatusTransitRunnable().run(); |
| |
| waitFor(20 * 1000, new Predicate() { |
| @Override |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(coordJobId)); |
| return job.getStatus().equals(Job.Status.PREP); |
| } |
| }); |
| |
| coordJob1 = jpaService.execute(new CoordJobGetJPAExecutor(coordJobId)); |
| assertEquals(Job.Status.PREP, coordJob1.getStatus()); |
| } |
| |
| /** |
| * Test : all coord actions are running, job pending is reset |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceRunning1() throws Exception { |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 3); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| |
| final String jobId = job.getId(); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); |
| return coordJob.isPending() == false; |
| } |
| }); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| job = jpaService.execute(coordGetCmd); |
| assertFalse(job.isPending()); |
| assertEquals(job.getStatus(), Job.Status.RUNNING); |
| } |
| |
| |
| /** |
| * Test : 2 coord actions are running, 1 suspended and 1 succeeded, check job pending is reset |
| * and status changed to RUNNING |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceRunning2() throws Exception { |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNINGWITHERROR, start, end, true, false, 4); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 4, CoordinatorAction.Status.SUSPENDED, "coord-action-get.xml", 0); |
| |
| final String jobId = job.getId(); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| waitFor(10 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); |
| return coordJob.getStatus() == Job.Status.RUNNING; |
| } |
| }); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| job = jpaService.execute(coordGetCmd); |
| assertFalse(job.isPending()); |
| assertEquals(job.getStatus(), Job.Status.RUNNING); |
| } |
| |
| /** |
| * Test : Keep the backward support for states on. 2 coord actions are running, 1 killed, check if job pending is reset |
| * and state changed to |
| * RUNNING. Make sure the status is not RUNNINGWITHERROR |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceBackwardSupport() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "true"); |
| services = new Services(); |
| setClassesToBeExcluded(services.getConf(), excludedServices); |
| services.init(); |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 3); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| |
| final String jobId = job.getId(); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); |
| return coordJob.isPending() == false; |
| } |
| }); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| job = jpaService.execute(coordGetCmd); |
| assertFalse(job.isPending()); |
| assertEquals(job.getStatus(), Job.Status.RUNNING); |
| } |
| |
| |
| /** |
| * Test : 2 coord actions are running, 1 killed, check if job pending is reset and state changed to |
| * RUNNINGWITHERROR |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceRunning3() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| setClassesToBeExcluded(services.getConf(), excludedServices); |
| services.init(); |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 3); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| |
| final String jobId = job.getId(); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); |
| return coordJob.isPending() == false; |
| } |
| }); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| job = jpaService.execute(coordGetCmd); |
| assertFalse(job.isPending()); |
| assertEquals(job.getStatus(), Job.Status.RUNNINGWITHERROR); |
| } |
| |
| public void testCoordStatusTransitServiceUpdateLastModifiedTime() throws Exception { |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, 3); |
| Date lastModifiedDate = job.getLastModifiedTime(); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 1); |
| |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| sleep(1000); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| job = jpaService.execute(coordGetCmd); |
| // As state of job has not changed, lastModified should not be updated |
| assertEquals(lastModifiedDate.getTime(), job.getLastModifiedTime().getTime()); |
| assertEquals(Job.Status.RUNNING, job.getStatus()); |
| } |
| |
| /** |
| * Test : all coord actions are running, job pending is reset |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServicePaused() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| setClassesToBeExcluded(services.getConf(), excludedServices); |
| services.init(); |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| CoordinatorJobBean coordJob = createCoordJob(CoordinatorJob.Status.PAUSED, start, end, true, false, 3); |
| // set some pause time explicity to make sure the job is not unpaused |
| coordJob.setPauseTime(DateUtils.parseDateOozieTZ("2009-02-01T01:00Z")); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(coordJob); |
| jpaService.execute(coordInsertCmd); |
| |
| addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(coordJob.getId(), 2, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(coordJob.getId(), 3, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| |
| final String jobId = coordJob.getId(); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); |
| return coordJob.isPending() == false; |
| } |
| }); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(coordJob.getId()); |
| coordJob = jpaService.execute(coordGetCmd); |
| assertFalse(coordJob.isPending()); |
| assertEquals(CoordinatorJob.Status.PAUSEDWITHERROR, coordJob.getStatus()); |
| } |
| |
| /** |
| * Test : all coord actions are running, job pending is reset |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServicePausedWithError() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| setClassesToBeExcluded(services.getConf(), excludedServices); |
| services.init(); |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean job = createCoordJob(CoordinatorJob.Status.PAUSEDWITHERROR, start, end, true, false, 3); |
| // set the pause time explicity to make sure the job is not unpaused |
| job.setPauseTime(DateUtils.parseDateOozieTZ("2009-02-01T01:00Z")); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| CoordJobInsertJPAExecutor coordInsertCmd = new CoordJobInsertJPAExecutor(job); |
| jpaService.execute(coordInsertCmd); |
| |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| |
| final String jobId = job.getId(); |
| assertNotNull(jpaService); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor(jobId)); |
| return coordJob.isPending() == false; |
| } |
| }); |
| |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| job = jpaService.execute(coordGetCmd); |
| assertFalse(job.isPending()); |
| assertEquals(CoordinatorJob.Status.PAUSED, job.getStatus()); |
| } |
| /** |
| * Tests functionality of the StatusTransitService Runnable command. </p> Insert a coordinator job with RUNNING and |
| * pending true and coordinator actions with TIMEDOUT state. Then, runs the StatusTransitService runnable and |
| * ensures the job state changes to DONEWITHERROR. |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceForTimeout() throws Exception { |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, true, 3); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.TIMEDOUT, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.TIMEDOUT, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.TIMEDOUT, "coord-action-get.xml", 0); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| sleep(1000); |
| |
| JPAService jpaService = Services.get().get(JPAService.class); |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| CoordinatorJobBean coordJob = jpaService.execute(coordGetCmd); |
| assertEquals(CoordinatorJob.Status.DONEWITHERROR, coordJob.getStatus()); |
| } |
| |
| /** |
| * Inserts a coordinator job in KILLED state with pending materialization. |
| * Make sure the status doesn't change to DONEWITHERROR |
| * |
| * @throws Exception |
| */ |
| public void testCoordNotTransitionfromKilled() throws Exception { |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, true, false); |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| sleep(1000); |
| |
| JPAService jpaService = Services.get().get(JPAService.class); |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| CoordinatorJobBean coordJob = jpaService.execute(coordGetCmd); |
| assertEquals(CoordinatorJob.Status.KILLED, coordJob.getStatus()); |
| |
| } |
| |
| /** |
| * Test : all bundle actions are succeeded - bundle job's status will be updated to succeeded. |
| * |
| * @throws Exception |
| */ |
| public void testBundleStatusTransitServiceSucceeded1() throws Exception { |
| BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| final String jobId = job.getId(); |
| BundleActionBean ba1 = addRecordToBundleActionTable(jobId, "action1", 0, Job.Status.SUCCEEDED); |
| addRecordToBundleActionTable(jobId, "action2", 0, Job.Status.SUCCEEDED); |
| addRecordToBundleActionTable(jobId, "action3", 0, Job.Status.SUCCEEDED); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); |
| return bundle.getStatus().equals(Job.Status.SUCCEEDED); |
| } |
| }); |
| |
| job = jpaService.execute(new BundleJobGetJPAExecutor(jobId)); |
| assertEquals(Job.Status.SUCCEEDED, job.getStatus()); |
| } |
| |
| /** |
| * Test : all coord jobs are succeeded - bundle job's status will be updated to succeeded. coordinator action -> |
| * coordinator job -> bundle action -> bundle job |
| * |
| * @throws Exception |
| */ |
| public void testBundleStatusTransitServiceSucceeded2() throws Exception { |
| BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| final String bundleId = job.getId(); |
| addRecordToBundleActionTable(bundleId, "action1", 0, Job.Status.RUNNING); |
| addRecordToBundleActionTable(bundleId, "action2", 0, Job.Status.RUNNING); |
| |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| addRecordToCoordJobTableWithBundle(bundleId, "action1", CoordinatorJob.Status.RUNNING, start, end, true, true, 2); |
| addRecordToCoordJobTableWithBundle(bundleId, "action2", CoordinatorJob.Status.RUNNING, start, end, true, true, 2); |
| |
| addRecordToCoordActionTable("action1", 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable("action1", 2, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| |
| addRecordToCoordActionTable("action2", 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable("action2", 2, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| waitFor(15 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| return bundle.getStatus().equals(Job.Status.SUCCEEDED); |
| } |
| }); |
| |
| job = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertEquals(Job.Status.SUCCEEDED, job.getStatus()); |
| } |
| |
| /** |
| * Insert a coordinator job in KILLED state with one coordinator action as |
| * SUCCEEDED. Make sure the status of the job changes to DONEWITHERROR. |
| * Also, the status of bundle action and bundle job should change to |
| * DONEWITHERROR |
| * |
| * @throws Exception |
| */ |
| public void testBundleStatusTransitServiceForTerminalStates() throws Exception { |
| BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| final String bundleId = job.getId(); |
| CoordinatorJobBean coord1 = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false); |
| addRecordToBundleActionTable(bundleId, coord1.getId(), "action1", 0, Job.Status.KILLED); |
| |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| addRecordToCoordJobTableWithBundle(bundleId, "action1", CoordinatorJob.Status.KILLED, start, end, true, true, 2); |
| |
| addRecordToCoordActionTable("action1", 1, CoordinatorAction.Status.KILLED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable("action1", 2, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| waitFor(15 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor("action1")); |
| return coordJob.getStatus().equals(Job.Status.DONEWITHERROR); |
| } |
| }); |
| |
| CoordinatorJobBean coordJob = jpaService.execute(new CoordJobGetJPAExecutor("action1")); |
| assertEquals(Job.Status.KILLED, coordJob.getStatus()); |
| |
| BundleActionBean bab = jpaService.execute(new BundleActionGetJPAExecutor(bundleId, "action1")); |
| assertEquals(Job.Status.KILLED, bab.getStatus()); |
| |
| job = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertEquals(Job.Status.KILLED, job.getStatus()); |
| } |
| |
| /** |
| * Test : all coord jobs are succeeded - bundle job's status will be updated to succeeded after suspend and resume. |
| * coordinator action -> coordinator job -> bundle action -> bundle job |
| * |
| * @throws Exception |
| */ |
| public void testBundleStatusTransitServiceSucceeded3() throws Exception { |
| BundleJobBean job = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| final String bundleId = job.getId(); |
| addRecordToBundleActionTable(bundleId, "action1", 1, Job.Status.RUNNING); |
| addRecordToBundleActionTable(bundleId, "action2", 0, Job.Status.RUNNING); |
| |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| addRecordToCoordJobTableWithBundle(bundleId, "action1", CoordinatorJob.Status.RUNNING, start, end, true, true, 2); |
| addRecordToCoordJobTableWithBundle(bundleId, "action2", CoordinatorJob.Status.RUNNING, start, end, true, true, 2); |
| |
| addRecordToCoordActionTable("action1", 1, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable("action1", 2, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| |
| addRecordToCoordActionTable("action2", 1, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable("action2", 2, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| |
| new BundleJobSuspendXCommand(bundleId).call(); |
| |
| BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(job.getId()); |
| job = jpaService.execute(bundleJobGetCmd); |
| |
| assertEquals(Job.Status.SUSPENDED, job.getStatus()); |
| |
| sleep(3000); |
| |
| new BundleJobResumeXCommand(bundleId).call(); |
| |
| job = jpaService.execute(bundleJobGetCmd); |
| assertEquals(Job.Status.RUNNING, job.getStatus()); |
| } |
| |
| /** |
| * Test : kill a bundle job - bundle job's pending will be updated to false. |
| * <p/> |
| * The pending is updated bottom-up. workflow job -> coordinator action -> coordinator job -> bundle action -> |
| * bundle job |
| * |
| * @throws Exception |
| */ |
| public void testBundleStatusTransitServiceKilled() throws Exception { |
| BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.KILLED, true); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| final String bundleId = bundleJob.getId(); |
| addRecordToBundleActionTable(bundleId, "action1-C", 1, Job.Status.KILLED); |
| addRecordToBundleActionTable(bundleId, "action2-C", 1, Job.Status.KILLED); |
| |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| addRecordToCoordJobTableWithBundle(bundleId, "action1-C", CoordinatorJob.Status.RUNNING, start, end, false, true, 2); |
| addRecordToCoordJobTableWithBundle(bundleId, "action2-C", CoordinatorJob.Status.RUNNING, start, end, false, true, 2); |
| |
| WorkflowJobBean wfJob1_1 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowJobBean wfJob1_2 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowJobBean wfJob1_3 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowJobBean wfJob1_4 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| |
| final CoordinatorActionBean coordAction1_1 = addRecordToCoordActionTable("action1-C", 1, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_1.getId(), wfJob1_1.getStatusStr(), 0); |
| final CoordinatorActionBean coordAction1_2 = addRecordToCoordActionTable("action1-C", 2, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_2.getId(), wfJob1_2.getStatusStr(), 0); |
| |
| final CoordinatorActionBean coordAction1_3 = addRecordToCoordActionTable("action2-C", 1, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_3.getId(), wfJob1_3.getStatusStr(), 0); |
| final CoordinatorActionBean coordAction1_4 = addRecordToCoordActionTable("action2-C", 2, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_4.getId(), wfJob1_4.getStatusStr(), 0); |
| |
| new CoordKillXCommand("action1-C").call(); |
| new CoordKillXCommand("action2-C").call(); |
| |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| WorkflowJobBean wfJob = jpaService |
| .execute(new WorkflowJobGetJPAExecutor(coordAction1_4.getExternalId())); |
| return wfJob.getStatus().equals(Job.Status.KILLED); |
| } |
| }); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| return bundle.isPending() == false; |
| } |
| }); |
| |
| bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertFalse(bundleJob.isPending()); |
| assertEquals(Job.Status.KILLED, bundleJob.getStatus()); |
| |
| BundleActionBean bundleAction1 = jpaService.execute(new BundleActionGetJPAExecutor(bundleId, "action1-C")); |
| assertFalse(bundleAction1.isPending()); |
| assertEquals(Job.Status.KILLED, bundleAction1.getStatus()); |
| |
| CoordinatorJobBean coordJob1 = jpaService.execute(new CoordJobGetJPAExecutor("action1-C")); |
| assertFalse(coordJob1.isPending()); |
| assertEquals(Job.Status.KILLED, coordJob1.getStatus()); |
| |
| BundleActionBean bundleAction2 = jpaService.execute(new BundleActionGetJPAExecutor(bundleId, "action2-C")); |
| assertFalse(bundleAction2.isPending()); |
| assertEquals(Job.Status.KILLED, bundleAction2.getStatus()); |
| |
| CoordinatorJobBean coordJob2 = jpaService.execute(new CoordJobGetJPAExecutor("action2-C")); |
| assertFalse(coordJob2.isPending()); |
| assertEquals(Job.Status.KILLED, coordJob2.getStatus()); |
| } |
| |
| /** |
| * Test : kill a bundle job with coord jobs as DONEWITHERROR |
| * <p/> |
| * Bundle status should be kill. |
| * |
| * @throws Exception |
| */ |
| public void testBundleStatusNotTransitionFromKilled() throws Exception { |
| BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| final String bundleId = bundleJob.getId(); |
| addRecordToBundleActionTable(bundleId, "action1", 0, Job.Status.DONEWITHERROR); |
| addRecordToBundleActionTable(bundleId, "action2", 0, Job.Status.DONEWITHERROR); |
| |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| addRecordToCoordJobTableWithBundle(bundleId, "action1", CoordinatorJob.Status.DONEWITHERROR, start, end, false, true, 2); |
| addRecordToCoordJobTableWithBundle(bundleId, "action2", CoordinatorJob.Status.DONEWITHERROR, start, end, false, true, 2); |
| |
| new BundleKillXCommand(bundleId).call(); |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| return bundle.getStatus() == Job.Status.KILLED; |
| } |
| }); |
| |
| bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertEquals(Job.Status.KILLED, bundleJob.getStatus()); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| waitFor(15 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| return bundle.getStatus() == Job.Status.KILLED; |
| } |
| }); |
| |
| bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertEquals(Job.Status.KILLED, bundleJob.getStatus()); |
| |
| } |
| |
| |
| /** |
| * Test : Make the bundle kill itself by having one of the coord job fail preparation. |
| * |
| * @throws Exception |
| */ |
| public void testBundleStatusTransitServiceKilled2() throws Exception { |
| BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| final String bundleId = bundleJob.getId(); |
| |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean coord = addRecordToCoordJobTableWithBundle(bundleId, "action2", |
| CoordinatorJob.Status.RUNNING, start, end, true, true, 2); |
| addRecordToCoordActionTable("action2", 1, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0); |
| |
| // Add a bundle action with no coordinator to make it fail |
| addRecordToBundleActionTable(bundleId, null, 0, Job.Status.KILLED); |
| addRecordToBundleActionTable(bundleId, coord.getId(), "action2", 0, Job.Status.RUNNING); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| // first time, service will call bundle kill |
| runnable.run(); |
| sleep(10000); |
| runnable.run(); |
| |
| waitFor(25 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| return bundle.getStatus() == Job.Status.KILLED; |
| } |
| }); |
| |
| bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertEquals(Job.Status.KILLED, bundleJob.getStatus()); |
| } |
| |
| |
| /** |
| * Test : kill one coord job and keep the other running. Check whether the bundle job's status |
| * is updated to RUNNINGWITHERROR |
| * @throws Exception |
| */ |
| public void testBundleStatusTransitServiceRunningWithError() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| setClassesToBeExcluded(services.getConf(), excludedServices); |
| services.init(); |
| BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| final String bundleId = bundleJob.getId(); |
| addRecordToBundleActionTable(bundleId, "action1-C", 1, Job.Status.RUNNING); |
| addRecordToBundleActionTable(bundleId, "action2-C", 1, Job.Status.RUNNING); |
| |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| addRecordToCoordJobTableWithBundle(bundleId, "action1-C", CoordinatorJob.Status.RUNNING, start, end, false, true, 2); |
| addRecordToCoordJobTableWithBundle(bundleId, "action2-C", CoordinatorJob.Status.RUNNING, start, end, true, false, 2); |
| |
| WorkflowJobBean wfJob1_1 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowJobBean wfJob1_2 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowJobBean wfJob1_3 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowJobBean wfJob1_4 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| |
| final CoordinatorActionBean coordAction1_1 = addRecordToCoordActionTable("action1-C", 1, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_1.getId(), wfJob1_1.getStatusStr(), 0); |
| final CoordinatorActionBean coordAction1_2 = addRecordToCoordActionTable("action1-C", 2, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_2.getId(), wfJob1_2.getStatusStr(), 0); |
| |
| final CoordinatorActionBean coordAction1_3 = addRecordToCoordActionTable("action2-C", 1, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_3.getId(), wfJob1_3.getStatusStr(), 1); |
| final CoordinatorActionBean coordAction1_4 = addRecordToCoordActionTable("action2-C", 2, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_4.getId(), wfJob1_4.getStatusStr(), 1); |
| |
| new CoordKillXCommand("action1-C").call(); |
| |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| WorkflowJobBean wfJob = jpaService |
| .execute(new WorkflowJobGetJPAExecutor(coordAction1_1.getExternalId())); |
| return wfJob.getStatus().equals(Job.Status.KILLED); |
| } |
| }); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| return bundle.isPending() == false; |
| } |
| }); |
| |
| bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertTrue(bundleJob.isPending()); |
| assertEquals(Job.Status.RUNNINGWITHERROR, bundleJob.getStatus()); |
| |
| BundleActionBean bundleAction1 = jpaService.execute(new BundleActionGetJPAExecutor(bundleId, "action1-C")); |
| assertFalse(bundleAction1.isPending()); |
| assertEquals(Job.Status.KILLED, bundleAction1.getStatus()); |
| |
| CoordinatorJobBean coordJob1 = jpaService.execute(new CoordJobGetJPAExecutor("action1-C")); |
| assertFalse(coordJob1.isPending()); |
| assertEquals(Job.Status.KILLED, coordJob1.getStatus()); |
| |
| BundleActionBean bundleAction2 = jpaService.execute(new BundleActionGetJPAExecutor(bundleId, "action2-C")); |
| assertTrue(bundleAction2.isPending()); |
| assertEquals(Job.Status.RUNNING, bundleAction2.getStatus()); |
| |
| CoordinatorJobBean coordJob2 = jpaService.execute(new CoordJobGetJPAExecutor("action2-C")); |
| assertTrue(coordJob2.isPending()); |
| assertEquals(Job.Status.RUNNING, coordJob2.getStatus()); |
| } |
| |
| /** |
| * Test : Suspend a bundle job - bundle job's pending will be updated to false. |
| * <p/> |
| * The pending is updated bottom-up. workflow job -> coordinator action -> coordinator job -> bundle action -> |
| * bundle job |
| * |
| * @throws Exception |
| */ |
| public void testBundleStatusTransitServiceSuspended() throws Exception { |
| BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.SUSPENDED, true); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| final String bundleId = bundleJob.getId(); |
| addRecordToBundleActionTable(bundleId, "action1-C", 1, Job.Status.SUSPENDED); |
| addRecordToBundleActionTable(bundleId, "action2-C", 1, Job.Status.SUSPENDED); |
| |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| addRecordToCoordJobTableWithBundle(bundleId, "action1-C", CoordinatorJob.Status.RUNNING, start, end, false, false, 2); |
| addRecordToCoordJobTableWithBundle(bundleId, "action2-C", CoordinatorJob.Status.RUNNING, start, end, false, false, 2); |
| |
| WorkflowJobBean wfJob1_1 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowJobBean wfJob1_2 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowJobBean wfJob1_3 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| WorkflowJobBean wfJob1_4 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); |
| |
| final CoordinatorActionBean coordAction1_1 = addRecordToCoordActionTable("action1-C", 1, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_1.getId(), wfJob1_1.getStatusStr(), 0); |
| final CoordinatorActionBean coordAction1_2 = addRecordToCoordActionTable("action1-C", 2, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_2.getId(), wfJob1_2.getStatusStr(), 0); |
| |
| final CoordinatorActionBean coordAction1_3 = addRecordToCoordActionTable("action2-C", 1, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_3.getId(), wfJob1_3.getStatusStr(), 0); |
| final CoordinatorActionBean coordAction1_4 = addRecordToCoordActionTable("action2-C", 2, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", wfJob1_4.getId(), wfJob1_4.getStatusStr(), 0); |
| |
| new CoordSuspendXCommand("action1-C").call(); |
| new CoordSuspendXCommand("action2-C").call(); |
| |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| WorkflowJobBean wfJob = jpaService |
| .execute(new WorkflowJobGetJPAExecutor(coordAction1_4.getExternalId())); |
| return wfJob.getStatus().equals(Job.Status.SUSPENDED); |
| } |
| }); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| return bundle.isPending() == false; |
| } |
| }); |
| |
| bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertFalse(bundleJob.isPending()); |
| assertEquals(Job.Status.SUSPENDED, bundleJob.getStatus()); |
| |
| BundleActionBean bundleAction1 = jpaService.execute(new BundleActionGetJPAExecutor(bundleId, "action1-C")); |
| assertFalse(bundleAction1.isPending()); |
| assertEquals(Job.Status.SUSPENDED, bundleAction1.getStatus()); |
| |
| CoordinatorJobBean coordJob1 = jpaService.execute(new CoordJobGetJPAExecutor("action1-C")); |
| assertFalse(coordJob1.isPending()); |
| assertEquals(Job.Status.SUSPENDED, coordJob1.getStatus()); |
| |
| BundleActionBean bundleAction2 = jpaService.execute(new BundleActionGetJPAExecutor(bundleId, "action2-C")); |
| assertFalse(bundleAction2.isPending()); |
| assertEquals(Job.Status.SUSPENDED, bundleAction2.getStatus()); |
| |
| CoordinatorJobBean coordJob2 = jpaService.execute(new CoordJobGetJPAExecutor("action2-C")); |
| assertFalse(coordJob2.isPending()); |
| assertEquals(Job.Status.SUSPENDED, coordJob2.getStatus()); |
| } |
| |
| |
| /** |
| * Test : Check the transition of a bundle job from RUNNING TO SUSPENDEDWITHERROR |
| * @throws Exception |
| */ |
| public void testBundleStatusTransitServiceSuspendedWithError() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| setClassesToBeExcluded(services.getConf(), excludedServices); |
| services.init(); |
| BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| |
| final String bundleId = bundleJob.getId(); |
| addRecordToBundleActionTable(bundleId, "action1", 0, Job.Status.SUSPENDED); |
| addRecordToBundleActionTable(bundleId, "action2", 0, Job.Status.SUSPENDEDWITHERROR); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| return bundle.isPending() == false; |
| } |
| }); |
| |
| bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertFalse(bundleJob.isPending()); |
| assertEquals(Job.Status.SUSPENDEDWITHERROR, bundleJob.getStatus()); |
| |
| } |
| |
| /** |
| * Test : Check the transition of a PAUSED bundle job to PAUSEDWITHERROR |
| * @throws Exception |
| */ |
| public void testBundleStatusTransitServicePausedWithError() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| setClassesToBeExcluded(services.getConf(), excludedServices); |
| services.init(); |
| BundleJobBean bundleJob = createBundleJob(Job.Status.PAUSED, true); |
| bundleJob.setPauseTime(DateUtils.parseDateOozieTZ("2009-02-01T01:00Z")); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| BundleJobInsertJPAExecutor bundleInsertjpa = new BundleJobInsertJPAExecutor(bundleJob); |
| jpaService.execute(bundleInsertjpa); |
| |
| final String bundleId = bundleJob.getId(); |
| addRecordToBundleActionTable(bundleId, "action1", 1, Job.Status.PAUSED); |
| addRecordToBundleActionTable(bundleId, "action2", 1, Job.Status.PAUSED); |
| BundleActionBean bundleAction = addRecordToBundleActionTable(bundleId, "action3", 0, Job.Status.FAILED); |
| bundleAction.setCoordId("test"); |
| BundleActionQueryExecutor.getInstance().executeUpdate( |
| BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, bundleAction); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| return bundle.getStatus() == Job.Status.PAUSEDWITHERROR; |
| } |
| }); |
| |
| bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertEquals(Job.Status.PAUSEDWITHERROR, bundleJob.getStatus()); |
| } |
| |
| |
| /** |
| * Test : Check the transition of a PAUSEDWITHERROR bundle job to PAUSED |
| * @throws Exception |
| */ |
| public void testBundleStatusTransitServicePaused() throws Exception { |
| Services.get().destroy(); |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| setClassesToBeExcluded(services.getConf(), excludedServices); |
| services.init(); |
| BundleJobBean bundleJob = createBundleJob(Job.Status.PAUSEDWITHERROR, true); |
| bundleJob.setPauseTime(DateUtils.parseDateOozieTZ("2009-02-01T01:00Z")); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| BundleJobInsertJPAExecutor bundleInsertjpa = new BundleJobInsertJPAExecutor(bundleJob); |
| jpaService.execute(bundleInsertjpa); |
| |
| final String bundleId = bundleJob.getId(); |
| addRecordToBundleActionTable(bundleId, "action1", 1, Job.Status.PAUSED); |
| addRecordToBundleActionTable(bundleId, "action2", 1, Job.Status.PAUSED); |
| addRecordToBundleActionTable(bundleId, "action3", 0, Job.Status.SUCCEEDED); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| BundleJobBean bundle = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| return bundle.getStatus() == Job.Status.PAUSED; |
| } |
| }); |
| |
| bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertEquals(Job.Status.PAUSED, bundleJob.getStatus()); |
| } |
| |
| |
| protected WorkflowJobBean addRecordToWfJobTable(String wfId, WorkflowJob.Status jobStatus, |
| WorkflowInstance.Status instanceStatus) throws Exception { |
| WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", |
| new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "end")). |
| addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class)); |
| Configuration conf = new Configuration(); |
| Path appUri = new Path(getAppPath(), "workflow.xml"); |
| conf.set(OozieClient.APP_PATH, appUri.toString()); |
| conf.set(OozieClient.LOG_TOKEN, "testToken"); |
| conf.set(OozieClient.USER_NAME, getTestUser()); |
| |
| WorkflowJobBean wfBean = createWorkflow(app, conf, jobStatus, instanceStatus); |
| wfBean.setId(wfId); |
| try { |
| JPAService jpaService = Services.get().get(JPAService.class); |
| assertNotNull(jpaService); |
| WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(wfBean); |
| jpaService.execute(wfInsertCmd); |
| } |
| catch (JPAExecutorException je) { |
| je.printStackTrace(); |
| fail("Unable to insert the test wf job record to table"); |
| throw je; |
| } |
| return wfBean; |
| } |
| |
| /** |
| * Tests functionality of the StatusTransitService Runnable command. </p> Insert a coordinator job with RUNNING and |
| * pending true and coordinator actions for that job with pending false. Insert a coordinator action with a stale coord |
| * job id. Then, runs the StatusTransitService runnable and ensures |
| * the job status of the good job changes to SUCCEEDED. |
| * |
| * @throws Exception |
| */ |
| public void testCoordStatusTransitServiceStaleCoordActions() throws Exception { |
| |
| // this block will initialize the lastinstancetime for status transit service |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, true, 3); |
| // add a record with stale reference to coord job id |
| addRecordToCoordActionTable("ABCD", 3, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| // add records with reference to correct job ids |
| addRecordToCoordActionTable(job.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 2, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| addRecordToCoordActionTable(job.getId(), 3, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); |
| |
| runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| sleep(1000); |
| |
| JPAService jpaService = Services.get().get(JPAService.class); |
| CoordJobGetJPAExecutor coordGetCmd = new CoordJobGetJPAExecutor(job.getId()); |
| CoordinatorJobBean coordJob = jpaService.execute(coordGetCmd); |
| assertEquals(CoordinatorJob.Status.SUCCEEDED, coordJob.getStatus()); |
| } |
| |
| // Test coord transition from killed to running when one action is rerun. |
| public void testCoordStatusTransitRunningFromKilled() throws Exception { |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, false, |
| 1); |
| final CoordinatorActionBean coordAction = addRecordToCoordActionTable(coordJob.getId(), 1, |
| CoordinatorAction.Status.RUNNING, "coord-action-get.xml", null, "RUNNING", 0); |
| |
| new CoordKillXCommand(coordJob.getId()).call(); |
| final CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId()); |
| waitFor(5 * 1000, new Predicate() { |
| public boolean evaluate() throws Exception { |
| CoordinatorJobBean coordJob = jpaService.execute(coordJobGetCmd); |
| return coordJob.getStatusStr().equals("KILLED"); |
| } |
| }); |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| sleep(1000); |
| coordJob = jpaService.execute(coordJobGetCmd); |
| assertEquals(CoordinatorJob.Status.KILLED, coordJob.getStatus()); |
| coordAction.setStatus(CoordinatorAction.Status.RUNNING); |
| coordJob.setPending(); |
| CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS_PENDING, coordJob); |
| CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, coordAction); |
| runnable.run(); |
| sleep(1000); |
| coordJob = jpaService.execute(coordJobGetCmd); |
| assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus()); |
| } |
| |
| // Test bundle transition from killed to running when one action is rerun. |
| public void testBundleStatusTransitRunningFromKilled() throws Exception { |
| BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| final String bundleId = bundleJob.getId(); |
| CoordinatorJobBean coord1 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); |
| BundleActionBean bundleAction1 = addRecordToBundleActionTable(bundleId, coord1.getId(), "action1-C", 1, |
| Job.Status.RUNNING); |
| bundleJob.setPending(); |
| bundleAction1.setStatus(Job.Status.KILLED); |
| bundleAction1.setPending(0); |
| BundleActionQueryExecutor.getInstance().executeUpdate( |
| BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, bundleAction1); |
| BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| sleep(1000); |
| bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertEquals(Job.Status.KILLED, bundleJob.getStatus()); |
| bundleAction1 = jpaService.execute(new BundleActionGetJPAExecutor(bundleId, "action1-C")); |
| assertEquals(Job.Status.KILLED, bundleAction1.getStatus()); |
| |
| bundleAction1.setPending(1); |
| bundleAction1.setStatus(Job.Status.RUNNING); |
| bundleAction1.setLastModifiedTime(new Date()); |
| bundleJob.setPending(); |
| bundleJob.setLastModifiedTime(new Date()); |
| BundleActionQueryExecutor.getInstance().executeUpdate( |
| BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, bundleAction1); |
| BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob); |
| runnable.run(); |
| sleep(1000); |
| bundleJob = jpaService.execute(new BundleJobGetJPAExecutor(bundleId)); |
| assertEquals(CoordinatorJob.Status.RUNNING, bundleJob.getStatus()); |
| } |
| |
| // Test bundle transition from running to runningwitherror when one action is killed. |
| public void testBundleStatusTransitRunningWithError() throws Exception { |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| services.init(); |
| |
| final CoordinatorJobBean coord1 = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, false, false); |
| final CoordinatorJobBean coord2 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); |
| final CoordinatorJobBean coord3 = addRecordToCoordJobTable(CoordinatorJob.Status.DONEWITHERROR, false, false); |
| |
| BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); |
| final String bundleId = bundleJob.getId(); |
| addRecordToBundleActionTable(bundleId, coord1.getId(), "action1-C", 0, Job.Status.PREP); |
| addRecordToBundleActionTable(bundleId, coord2.getId(), "action2-C", 0, Job.Status.RUNNING); |
| BundleActionBean action3 = addRecordToBundleActionTable(bundleId, coord3.getId(), "action3-C", 0, |
| Job.Status.DONEWITHERROR); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); |
| assertEquals(Job.Status.RUNNINGWITHERROR, bundleJob.getStatus()); |
| |
| action3.setStatus(Job.Status.SUSPENDED); |
| action3.setPending(1); |
| action3.setLastModifiedTime(new Date()); |
| BundleActionQueryExecutor.getInstance().executeUpdate( |
| BundleActionQuery.UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID, action3); |
| runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); |
| assertEquals(Job.Status.RUNNING, bundleJob.getStatus()); |
| |
| } |
| |
| public void testBundleStatusTransitWithLock() throws Exception { |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| services.init(); |
| |
| BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); |
| |
| final String jobId = bundleJob.getId(); |
| final String bundleId = bundleJob.getId(); |
| addRecordToBundleActionTable(bundleId, "action1-C", 0, Job.Status.PREP); |
| addRecordToBundleActionTable(bundleId, "action2-C", 0, Job.Status.RUNNING); |
| addRecordToBundleActionTable(bundleId, "action3-C", 0, Job.Status.DONEWITHERROR); |
| |
| LockerCoordinator coordinator = new LockerCoordinator(); |
| JobLock lockThread = new JobLock(jobId, coordinator); |
| new Thread(lockThread).start(); |
| coordinator.awaitLockAcquire(); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); |
| assertEquals(Job.Status.RUNNING, bundleJob.getStatus()); |
| |
| coordinator.signalLockerContinue(); |
| coordinator.awaitTermination(); |
| |
| runnable.run(); |
| bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); |
| assertEquals(Job.Status.RUNNINGWITHERROR, bundleJob.getStatus()); |
| } |
| |
| public void testCoordStatusTransitWithLock() throws Exception { |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| services.init(); |
| |
| final JPAService jpaService = Services.get().get(JPAService.class); |
| String currentDatePlusMonth = XDataTestCase.getCurrentDateafterIncrementingInMonths(1); |
| Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth); |
| |
| CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, true, false, |
| 1); |
| addRecordToCoordActionTable(coordJob.getId(), 1, CoordinatorAction.Status.KILLED, "coord-action-get.xml", null, |
| "KILLED", 0); |
| |
| final CoordJobGetJPAExecutor coordJobGetCmd = new CoordJobGetJPAExecutor(coordJob.getId()); |
| LockerCoordinator coordinator = new LockerCoordinator(); |
| JobLock lockThread = new JobLock(coordJob.getId(), coordinator); |
| new Thread(lockThread).start(); |
| coordinator.awaitLockAcquire(); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| coordJob = jpaService.execute(coordJobGetCmd); |
| assertEquals(CoordinatorJob.Status.RUNNING, coordJob.getStatus()); |
| |
| coordinator.signalLockerContinue(); |
| coordinator.awaitTermination(); |
| |
| runnable.run(); |
| coordJob = jpaService.execute(coordJobGetCmd); |
| assertEquals(CoordinatorJob.Status.RUNNINGWITHERROR, coordJob.getStatus()); |
| } |
| |
| public void testBundleStatusCoordSubmitFails() throws Exception { |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); |
| |
| final String bundleId = bundleJob.getId(); |
| addRecordToBundleActionTable(bundleId, null, 0, Job.Status.FAILED); |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| // First try will kill the job. |
| bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); |
| assertEquals(Job.Status.FAILED, bundleJob.getStatus()); |
| |
| bundleJob.setStatus(Job.Status.RUNNING); |
| BundleJobQueryExecutor.getInstance().executeUpdate(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PENDING, bundleJob); |
| runnable.run(); |
| // second try will change the status. |
| bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); |
| assertEquals(Job.Status.FAILED, bundleJob.getStatus()); |
| } |
| |
| public void testBundleRunningAfterCoordResume() throws Exception { |
| |
| setSystemProperty(StatusTransitService.CONF_BACKWARD_SUPPORT_FOR_STATES_WITHOUT_ERROR, "false"); |
| services = new Services(); |
| services.init(); |
| CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); |
| |
| BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, true); |
| final String bundleId = bundleJob.getId(); |
| addRecordToBundleActionTable(bundleId, coord.getId(), "COORD-TEST", 0, Job.Status.RUNNING); |
| new CoordSuspendXCommand(coord.getId()).call(); |
| |
| coord = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coord.getId()); |
| assertEquals(Job.Status.SUSPENDED, coord.getStatus()); |
| |
| Runnable runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); |
| assertEquals(Job.Status.SUSPENDED, bundleJob.getStatus()); |
| |
| new CoordResumeXCommand(coord.getId()).call(); |
| coord = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coord.getId()); |
| assertEquals(Job.Status.RUNNING, coord.getStatus()); |
| |
| runnable = new StatusTransitRunnable(); |
| runnable.run(); |
| bundleJob = BundleJobQueryExecutor.getInstance().get(BundleJobQuery.GET_BUNDLE_JOB_STATUS, bundleId); |
| assertEquals(Job.Status.RUNNING, bundleJob.getStatus()); |
| |
| } |
| |
| static class JobLock implements Runnable { |
| private static XLog log = new XLog(LogFactory.getLog(JobLock.class)); |
| private final String jobId; |
| private final LockerCoordinator coordinator; |
| |
| public JobLock(String jobId, LockerCoordinator coordinator) { |
| this.jobId = jobId; |
| this.coordinator = coordinator; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| LockToken lock = Services.get().get(MemoryLocksService.class).getWriteLock(jobId, 0); |
| coordinator.lockAcquireDone(); |
| coordinator.awaitContinueSignal(); |
| lock.release(); |
| } |
| catch (InterruptedException e) { |
| log.error("InterruptedException caught", e); |
| } |
| finally { |
| coordinator.terminated(); |
| } |
| } |
| } |
| } |