blob: 98c74f9e591c58298d10db3f8504633b4e1b187e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.event;
import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.AppType;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.DagEngine;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.control.ControlNodeActionExecutor;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.Event;
import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.coord.CoordActionCheckXCommand;
import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
import org.apache.oozie.command.coord.CoordRerunXCommand;
import org.apache.oozie.command.coord.CoordResumeXCommand;
import org.apache.oozie.command.coord.CoordinatorXCommand;
import org.apache.oozie.command.wf.ActionCheckXCommand;
import org.apache.oozie.command.wf.ActionKillXCommand;
import org.apache.oozie.command.wf.ActionStartXCommand;
import org.apache.oozie.command.wf.ActionXCommand;
import org.apache.oozie.command.wf.KillXCommand;
import org.apache.oozie.command.wf.ResumeXCommand;
import org.apache.oozie.command.wf.SignalXCommand;
import org.apache.oozie.command.wf.StartXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
import org.apache.oozie.command.wf.WorkflowXCommand;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.workflow.WorkflowApp;
import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.lite.ActionNodeDef;
import org.apache.oozie.workflow.lite.EndNodeDef;
import org.apache.oozie.workflow.lite.LiteWorkflowApp;
import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
import org.apache.oozie.workflow.lite.StartNodeDef;
import org.apache.oozie.workflow.lite.TestLiteWorkflowLib;
import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestActionNodeHandler;
import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestControlNodeHandler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Testcase to test that events are correctly generated from corresponding
* Commands and inserted into events queue
*/
public class TestEventGeneration extends XDataTestCase {
EventQueue queue;
Services services;
EventHandlerService ehs;
JPAService jpaService;
@Override
@Before
protected void setUp() throws Exception {
super.setUp();
services = new Services();
Configuration conf = services.getConf();
// The EventHandlerService manipulates the queues in the background, so the actual test results depend on the
// circumstances (like the speed of the machine, debugging etc).
conf.setInt("oozie.service.EventHandlerService.worker.threads", 0);
conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
services.init();
ehs = services.get(EventHandlerService.class);
queue = ehs.getEventQueue();
jpaService = services.get(JPAService.class);
}
@Override
@After
protected void tearDown() throws Exception {
Services.get().destroy();
super.tearDown();
}
@Test
public void testWorkflowJobEvent() throws Exception {
assertEquals(0, queue.size());
WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", new StartNodeDef(
LiteWorkflowStoreService.LiteControlNodeHandler.class, "fs-node")).addNode(
new ActionNodeDef("fs-node", "", TestLiteWorkflowLib.TestActionNodeHandler.class, "end", "end"))
.addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
WorkflowJobBean job = addRecordToWfJobTable(app, WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);
// Starting job
new StartXCommand(job.getId()).call();
WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
job = jpaService.execute(wfJobGetCmd);
assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
assertEquals(1, queue.size());
JobEvent event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(EventStatus.STARTED, event.getEventStatus());
assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
assertEquals(job.getId(), event.getId());
assertEquals(job.getUser(), event.getUser());
assertEquals(job.getAppName(), event.getAppName());
assertEquals(job.getStartTime(), event.getStartTime());
assertEquals(0, queue.size());
// Suspending job
new SuspendXCommand(job.getId()).call();
job = jpaService.execute(wfJobGetCmd);
assertEquals(WorkflowJob.Status.SUSPENDED, job.getStatus());
assertEquals(1, queue.size());
event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(EventStatus.SUSPEND, event.getEventStatus());
assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
assertEquals(job.getId(), event.getId());
assertEquals(job.getUser(), event.getUser());
assertEquals(job.getAppName(), event.getAppName());
assertEquals(0, queue.size());
// Resuming job
new ResumeXCommand(job.getId()).call();
job = jpaService.execute(wfJobGetCmd);
assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
assertEquals(1, queue.size());
event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
assertEquals(job.getId(), event.getId());
assertEquals(job.getUser(), event.getUser());
assertEquals(job.getAppName(), event.getAppName());
assertEquals(job.getStartTime(), event.getStartTime());
assertEquals(0, queue.size());
// Killing job
new KillXCommand(job.getId()).call();
job = jpaService.execute(wfJobGetCmd);
assertEquals(WorkflowJob.Status.KILLED, job.getStatus());
assertEquals(1, queue.size());
event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(EventStatus.FAILURE, event.getEventStatus());
assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
assertEquals(job.getId(), event.getId());
assertEquals(job.getUser(), event.getUser());
assertEquals(job.getAppName(), event.getAppName());
assertEquals(job.getStartTime(), event.getStartTime());
assertEquals(job.getEndTime(), event.getEndTime());
assertEquals(0, queue.size());
// Successful job (testing SignalX)
job = _createWorkflowJob();
LiteWorkflowInstance wfInstance = (LiteWorkflowInstance) job.getWorkflowInstance();
wfInstance.start();
job.setWorkflowInstance(wfInstance);
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, job);
WorkflowActionBean wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(job.getId() + "@one"));
new SignalXCommand(job.getId(), wfAction.getId()).call();
job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
assertEquals(WorkflowJob.Status.SUCCEEDED, job.getStatus());
assertEquals(1, queue.size());
event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
assertEquals(job.getId(), event.getId());
assertEquals(job.getUser(), event.getUser());
assertEquals(job.getAppName(), event.getAppName());
assertEquals(job.getStartTime(), event.getStartTime());
assertEquals(job.getEndTime(), event.getEndTime());
}
@Test
public void testCoordinatorActionEvent() throws Exception {
// avoid noise from other apptype events by setting it to only
// coord action
ehs.setAppTypes(new HashSet<String>(Arrays.asList("coordinator_action")));
assertEquals(queue.size(), 0);
Date startTime = DateUtils.parseDateOozieTZ("2013-01-01T10:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2013-01-01T10:01Z");
CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
false, 0);
modifyCoordForRunning(coord);
// Action WAITING on materialization
new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
final CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(coord.getId() + "@1");
CoordinatorActionBean action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.WAITING, action.getStatus());
assertEquals(1, queue.size());
JobEvent event = (JobEvent) queue.poll();
assertNotNull(event);
assertEquals(EventStatus.WAITING, event.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
assertNull(event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
assertEquals(0, queue.size());
// Make Action ready
// In this case it will proceed to Running since n(ready_actions) < concurrency
new CoordActionInputCheckXCommand(action.getId(), coord.getId()).call();
action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.RUNNING, action.getStatus());
event = (JobEvent) queue.poll();
assertEquals(EventStatus.STARTED, event.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
assertEquals(wfJob.getStartTime(), event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
sleep(2000);
// Action Success
wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
action.setStatus(CoordinatorAction.Status.RUNNING);
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
new CoordActionCheckXCommand(action.getId(), 0).call();
action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
List<Event> list = queue.pollBatch();
event = (JobEvent)list.get(list.size()-1);
assertEquals(EventStatus.SUCCESS, event.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
assertEquals(wfJob.getStartTime(), event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
// Action Failure
wfJob.setStatus(WorkflowJob.Status.FAILED);
action.setStatus(CoordinatorAction.Status.RUNNING);
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
new CoordActionCheckXCommand(action.getId(), 0).call();
action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.FAILED, action.getStatus());
event = (JobEvent) queue.poll();
assertEquals(EventStatus.FAILURE, event.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
assertEquals(wfJob.getStartTime(), event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
// Action Suspended
wfJob.setStatus(WorkflowJob.Status.SUSPENDED);
action.setStatus(CoordinatorAction.Status.RUNNING);
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
action);
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
new CoordActionCheckXCommand(action.getId(), 0).call();
action = jpaService.execute(coordGetCmd);
assertEquals(CoordinatorAction.Status.SUSPENDED, action.getStatus());
event = (JobEvent) queue.poll();
assertEquals(EventStatus.SUSPEND, event.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(action.getJobId(), event.getParentId());
assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
assertEquals(wfJob.getStartTime(), event.getStartTime());
assertEquals(coord.getUser(), event.getUser());
assertEquals(coord.getAppName(), event.getAppName());
// Action start on Coord Resume
coord.setStatus(CoordinatorJobBean.Status.SUSPENDED);
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, coord);
action.setStatus(CoordinatorAction.Status.SUSPENDED);
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
wfJob.setStatus(WorkflowJob.Status.SUSPENDED);
WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
wfJob.setWorkflowInstance(wfInstance);
WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob);
queue.clear();
new CoordResumeXCommand(coord.getId()).call();
waitForEventGeneration(1000);
CoordinatorActionEvent cevent = (CoordinatorActionEvent) queue.poll();
assertEquals(EventStatus.STARTED, cevent.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, cevent.getAppType());
assertEquals(action.getId(), cevent.getId());
assertEquals(action.getJobId(), cevent.getParentId());
assertEquals(action.getNominalTime(), cevent.getNominalTime());
coord = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coord.getId());
assertEquals(coord.getLastModifiedTime(), cevent.getStartTime());
// Action going to WAITING on Coord Rerun
action.setStatus(CoordinatorAction.Status.KILLED);
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
queue.clear();
new CoordRerunXCommand(coord.getId(), RestConstants.JOB_COORD_SCOPE_ACTION, "1", false, true, false,
null).call();
waitFor(3 * 100, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return jpaService.execute(coordGetCmd).getStatus() == CoordinatorAction.Status.WAITING;
}
});
cevent = (CoordinatorActionEvent) queue.poll();
assertEquals(EventStatus.WAITING, cevent.getEventStatus());
assertEquals(AppType.COORDINATOR_ACTION, cevent.getAppType());
assertEquals(action.getId(), cevent.getId());
assertEquals(action.getJobId(), cevent.getParentId());
assertEquals(action.getNominalTime(), cevent.getNominalTime());
assertEquals(wfJob.getStartTime(), event.getStartTime());
assertNotNull(cevent.getMissingDeps());
}
@Test
public void testWorkflowActionEvent() throws Exception {
assertEquals(queue.size(), 0);
// avoid noise from other apptype events by setting it to only
// workflow action
ehs.setAppTypes(new HashSet<String>(Arrays.asList("workflow_action")));
WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP, true);
// adding record sets externalChildID to dummy workflow-id so resetting it
action.setExternalChildIDs(null);
WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION_START, action);
// Starting job
new ActionStartXCommand(action.getId(), "map-reduce").call();
WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
action = jpaService.execute(wfActionGetCmd);
assertEquals(WorkflowAction.Status.RUNNING, action.getStatus());
assertEquals(1, queue.size());
WorkflowActionEvent event = (WorkflowActionEvent) queue.poll();
assertNotNull(event);
assertEquals(EventStatus.STARTED, event.getEventStatus());
assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(job.getUser(), event.getUser());
assertEquals(action.getName(), event.getAppName());
assertEquals(action.getStartTime(), event.getStartTime());
assertEquals(0, queue.size());
// Suspending job
ActionExecutor.Context context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(action.getType());
ActionCheckXCommandForTest dac = new ActionCheckXCommandForTest(context, executor, action.getId());
dac.execute();
action = dac.getAction();
assertEquals(WorkflowAction.Status.START_MANUAL, action.getStatus());
assertEquals(1, queue.size());
event = (WorkflowActionEvent) queue.poll();
assertNotNull(event);
assertEquals(EventStatus.SUSPEND, event.getEventStatus());
assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(job.getUser(), event.getUser());
assertEquals(action.getName(), event.getAppName());
assertEquals(0, queue.size());
// Killing job
action.setStatus(WorkflowAction.Status.KILLED);
action.setPendingOnly();
action.setEndTime(null); //its already set by XTestCase add action record method above
WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION_END, action);
new ActionKillXCommand(action.getId()).call();
action = jpaService.execute(wfActionGetCmd);
assertEquals(WorkflowAction.Status.KILLED, action.getStatus());
assertEquals(1, queue.size());
event = (WorkflowActionEvent) queue.poll();
assertNotNull(event);
assertEquals(EventStatus.FAILURE, event.getEventStatus());
assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
assertEquals(action.getId(), event.getId());
assertEquals(job.getUser(), event.getUser());
assertEquals(action.getName(), event.getAppName());
assertEquals(action.getStartTime(), event.getStartTime());
assertNotNull(action.getEndTime());
assertNotNull(event.getEndTime());
assertEquals(action.getEndTime(), event.getEndTime());
assertEquals(0, queue.size());
}
@Test
public void testWorkflowJobEventError() throws Exception {
final WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
// create event with error code and message
WorkflowXCommand<Void> myCmd = new KillXCommand(job.getId()) {
@Override
protected Void execute() {
WorkflowXCommand.generateEvent(job, "errorCode", "errorMsg");
return null;
}
};
myCmd.call();
WorkflowJobEvent event = (WorkflowJobEvent) queue.poll();
assertNotNull(event);
assertEquals("errorCode", event.getErrorCode());
assertEquals("errorMsg", event.getErrorMessage());
assertEquals(EventStatus.FAILURE, event.getEventStatus());
}
@Test
public void testCoordinatorActionEventDependencies() throws Exception {
final CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
final CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1,
CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
WorkflowJobBean wjb = new WorkflowJobBean();
wjb.setId(action.getExternalId());
wjb.setLastModifiedTime(new Date());
WorkflowJobQueryExecutor.getInstance().insert(wjb);
CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
@Override
protected Void execute() {
CoordinatorXCommand.generateEvent(action, coord.getUser(), coord.getAppName(), null);
return null;
}
};
// CASE 1: Only pull missing deps
action.setMissingDependencies("pull");
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
myCmd.call();
CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
assertNotNull(event);
assertEquals("pull", event.getMissingDeps());
// CASE 2: Only hcat (push) missing deps
action.setMissingDependencies(null);
action.setPushMissingDependencies("push");
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
myCmd.call();
event = (CoordinatorActionEvent) queue.poll();
assertNotNull(event);
assertEquals("push", event.getMissingDeps());
// CASE 3: Both types
action.setMissingDependencies("pull");
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
myCmd.call();
event = (CoordinatorActionEvent) queue.poll();
assertNotNull(event);
assertEquals("pull" + CoordELFunctions.INSTANCE_SEPARATOR + "push", event.getMissingDeps());
}
@Test
public void testForNoDuplicatesWorkflowEvents() throws Exception {
// test workflow job events
Reader reader = IOUtils.getResourceAsReader("wf-no-op.xml", -1);
Writer writer = new OutputStreamWriter(new FileOutputStream(getTestCaseDir() + "/workflow.xml"),
StandardCharsets.UTF_8);
IOUtils.copyCharStream(reader, writer);
final DagEngine engine = new DagEngine(getTestUser());
Configuration conf = new XConfiguration();
conf.set(OozieClient.APP_PATH, getTestCaseFileUri("workflow.xml"));
conf.set(OozieClient.USER_NAME, getTestUser());
final String jobId1 = engine.submitJob(conf, true);
final WorkflowJobGetJPAExecutor readCmd = new WorkflowJobGetJPAExecutor(jobId1);
waitFor(1 * 100, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return jpaService.execute(readCmd).getStatus() == WorkflowJob.Status.SUCCEEDED;
}
});
assertEquals(2, queue.size());
assertEquals(EventStatus.STARTED, ((JobEvent)queue.poll()).getEventStatus());
assertEquals(EventStatus.SUCCESS, ((JobEvent)queue.poll()).getEventStatus());
queue.clear();
}
@Test
public void testForNoDuplicatesCoordinatorActionEvents() throws Exception {
// test coordinator action events (failure case)
Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z");
CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
false, 0);
_modifyCoordForFailureAction(coord, "wf-invalid-fork.xml");
new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
final CoordJobGetJPAExecutor readCmd1 = new CoordJobGetJPAExecutor(coord.getId());
waitFor(1 * 100, new Predicate() {
@Override
public boolean evaluate() throws Exception {
CoordinatorJobBean bean = jpaService.execute(readCmd1);
return bean.getStatus() == CoordinatorJob.Status.SUCCEEDED
|| bean.getStatus() == CoordinatorJob.Status.KILLED;
}
});
assertEquals(2, queue.size());
assertEquals(EventStatus.WAITING, ((JobEvent)queue.poll()).getEventStatus());
assertEquals(EventStatus.FAILURE, ((JobEvent)queue.poll()).getEventStatus());
queue.clear();
}
@Test
public void testInvalidXMLCoordinatorFailsForNoDuplicates() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z");
// test coordinator action events (failure from ActionStartX)
ehs.getAppTypes().add("workflow_action");
CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1, CoordinatorAction.Status.RUNNING,
"coord-action-sla1.xml", 0);
WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
action.getId());
action.setExternalId(wf.getId());
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action);
String waId = _createWorkflowAction(wf.getId(), "wf-action");
new ActionStartXCommand(waId, action.getType()).call();
final CoordJobGetJPAExecutor readCmd2 = new CoordJobGetJPAExecutor(coord.getId());
waitFor(1 * 100, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return jpaService.execute(readCmd2).getStatus() == CoordinatorJob.Status.KILLED;
}
});
assertEquals(3, queue.size());
HashMap<AppType,JobEvent> eventsMap = new HashMap<AppType,JobEvent>();
while (queue.size() > 0){
JobEvent event = (JobEvent) queue.poll();
eventsMap.put(event.getAppType(), event);
}
assertEquals(3, eventsMap.size());
//Check the WF action
{
JobEvent wfActionEvent = eventsMap.get(AppType.WORKFLOW_ACTION);
assertNotNull("There should be a WF action", wfActionEvent);
assertEquals(EventStatus.FAILURE, wfActionEvent.getEventStatus());
assertEquals(waId, wfActionEvent.getId());
assertEquals(AppType.WORKFLOW_ACTION, wfActionEvent.getAppType());
}
//Check the WF job
{
JobEvent wfJobEvent = eventsMap.get(AppType.WORKFLOW_JOB);
assertNotNull("There should be a WF job", wfJobEvent);
assertEquals(EventStatus.FAILURE, wfJobEvent.getEventStatus());
assertEquals(wf.getId(), wfJobEvent.getId());
assertEquals(AppType.WORKFLOW_JOB, wfJobEvent.getAppType());
}
//Check the Coordinator action
{
JobEvent coordActionEvent = eventsMap.get(AppType.COORDINATOR_ACTION);
assertNotNull("There should be a Coordinator action", coordActionEvent);
assertEquals(EventStatus.FAILURE, coordActionEvent.getEventStatus());
assertEquals(action.getId(), coordActionEvent.getId());
assertEquals(AppType.COORDINATOR_ACTION, coordActionEvent.getAppType());
}
queue.clear();
}
private class ActionCheckXCommandForTest extends ActionCheckXCommand {
ActionExecutor.Context context;
ActionExecutor executor;
WorkflowActionBean action;
JPAService jpa;
public ActionCheckXCommandForTest(ActionExecutor.Context context, ActionExecutor executor, String actionId)
throws JPAExecutorException {
super(actionId);
this.context = context;
this.executor = executor;
jpa = Services.get().get(JPAService.class);
this.action = jpa.execute(new WorkflowActionGetJPAExecutor(actionId));
}
@Override
public Void execute() throws CommandException {
handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
action = (WorkflowActionBean) ((ActionExecutorContext) context).getAction();
if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
generateEvent(action, getTestUser());
}
return null;
}
public WorkflowActionBean getAction() {
return action;
}
}
private WorkflowJobBean _createWorkflowJob() throws Exception {
LiteWorkflowApp app = new LiteWorkflowApp("my-app", "<workflow-app/>",
new StartNodeDef(TestControlNodeHandler.class, "one"))
.addNode(new ActionNodeDef("one", "<java></java>", TestActionNodeHandler.class, "end", "end"))
.addNode(new EndNodeDef("end", TestControlNodeHandler.class));
Configuration conf = new Configuration();
Path appUri = new Path(getAppPath(), "workflow.xml");
conf.set(OozieClient.APP_PATH, appUri.toString());
conf.set(OozieClient.LOG_TOKEN, "testToken");
conf.set(OozieClient.USER_NAME, getTestUser());
WorkflowJobBean workflow = createWorkflow(app, conf, WorkflowJob.Status.PREP,
WorkflowInstance.Status.PREP);
String executionPath = "/";
assertNotNull(jpaService);
WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(workflow);
jpaService.execute(wfInsertCmd);
WorkflowActionBean wfAction = addRecordToWfActionTable(workflow.getId(), "one", WorkflowAction.Status.OK,
executionPath, true);
wfAction.setPending();
wfAction.setSignalValue(WorkflowAction.Status.OK.name());
WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION, wfAction);
return workflow;
}
private void _modifyCoordForFailureAction(CoordinatorJobBean coord, String resourceXml) throws Exception {
String wfXml = IOUtils.getResourceAsString(resourceXml, -1);
writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
String coordXml = coord.getJobXml();
coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml"));
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord);
}
private String _createWorkflowAction(String wfId, String actionName) throws JPAExecutorException {
WorkflowActionBean action = new WorkflowActionBean();
action.setName(actionName);
action.setId(Services.get().get(UUIDService.class).generateChildId(wfId, actionName));
action.setJobId(wfId);
action.setType("java");
action.setTransition("transition");
action.setStatus(WorkflowAction.Status.PREP);
action.setStartTime(new Date());
action.setEndTime(new Date());
action.setLastCheckTime(new Date());
action.setPendingOnly();
String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + "<main-class>" + "${dummy}" + "</java>";
action.setConf(actionXml);
jpaService.execute(new WorkflowActionInsertJPAExecutor(action));
return action.getId();
}
private void waitForEventGeneration(int wait) {
waitFor(wait, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return ehs.getEventQueue().peek() != null;
}
});
}
}