/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.oozie.event;

import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.AppType;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.DagEngine;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.control.ControlNodeActionExecutor;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.event.Event;
import org.apache.oozie.client.event.JobEvent;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.client.rest.RestConstants;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.coord.CoordActionCheckXCommand;
import org.apache.oozie.command.coord.CoordActionInputCheckXCommand;
import org.apache.oozie.command.coord.CoordMaterializeTransitionXCommand;
import org.apache.oozie.command.coord.CoordRerunXCommand;
import org.apache.oozie.command.coord.CoordResumeXCommand;
import org.apache.oozie.command.coord.CoordinatorXCommand;
import org.apache.oozie.command.wf.ActionCheckXCommand;
import org.apache.oozie.command.wf.ActionKillXCommand;
import org.apache.oozie.command.wf.ActionStartXCommand;
import org.apache.oozie.command.wf.ActionXCommand;
import org.apache.oozie.command.wf.KillXCommand;
import org.apache.oozie.command.wf.ResumeXCommand;
import org.apache.oozie.command.wf.SignalXCommand;
import org.apache.oozie.command.wf.StartXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
import org.apache.oozie.command.wf.WorkflowXCommand;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.EventHandlerService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.workflow.WorkflowApp;
import org.apache.oozie.workflow.WorkflowInstance;
import org.apache.oozie.workflow.lite.ActionNodeDef;
import org.apache.oozie.workflow.lite.EndNodeDef;
import org.apache.oozie.workflow.lite.LiteWorkflowApp;
import org.apache.oozie.workflow.lite.LiteWorkflowInstance;
import org.apache.oozie.workflow.lite.StartNodeDef;
import org.apache.oozie.workflow.lite.TestLiteWorkflowLib;
import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestActionNodeHandler;
import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestControlNodeHandler;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/**
 * Testcase to test that events are correctly generated from corresponding
 * Commands and inserted into events queue
 */
public class TestEventGeneration extends XDataTestCase {

    EventQueue queue;
    Services services;
    EventHandlerService ehs;
    JPAService jpaService;

    @Override
    @Before
    protected void setUp() throws Exception {
        super.setUp();
        services = new Services();
        Configuration conf = services.getConf();
        // The EventHandlerService manipulates the queues in the background, so the actual test results depend on the
        // circumstances (like the speed of the machine, debugging etc).
        conf.setInt("oozie.service.EventHandlerService.worker.threads", 0);
        conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService");
        services.init();
        ehs = services.get(EventHandlerService.class);
        queue = ehs.getEventQueue();
        jpaService = services.get(JPAService.class);
    }

    @Override
    @After
    protected void tearDown() throws Exception {
        Services.get().destroy();
        super.tearDown();
    }

    @Test
    public void testWorkflowJobEvent() throws Exception {
        assertEquals(0, queue.size());
        WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", new StartNodeDef(
                LiteWorkflowStoreService.LiteControlNodeHandler.class, "fs-node")).addNode(
                new ActionNodeDef("fs-node", "", TestLiteWorkflowLib.TestActionNodeHandler.class, "end", "end"))
                .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
        WorkflowJobBean job = addRecordToWfJobTable(app, WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP);

        // Starting job
        new StartXCommand(job.getId()).call();
        WorkflowJobGetJPAExecutor wfJobGetCmd = new WorkflowJobGetJPAExecutor(job.getId());
        job = jpaService.execute(wfJobGetCmd);
        assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
        assertEquals(1, queue.size());
        JobEvent event = (JobEvent) queue.poll();
        assertNotNull(event);
        assertEquals(EventStatus.STARTED, event.getEventStatus());
        assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
        assertEquals(job.getId(), event.getId());
        assertEquals(job.getUser(), event.getUser());
        assertEquals(job.getAppName(), event.getAppName());
        assertEquals(job.getStartTime(), event.getStartTime());
        assertEquals(0, queue.size());

        // Suspending job
        new SuspendXCommand(job.getId()).call();
        job = jpaService.execute(wfJobGetCmd);
        assertEquals(WorkflowJob.Status.SUSPENDED, job.getStatus());
        assertEquals(1, queue.size());
        event = (JobEvent) queue.poll();
        assertNotNull(event);
        assertEquals(EventStatus.SUSPEND, event.getEventStatus());
        assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
        assertEquals(job.getId(), event.getId());
        assertEquals(job.getUser(), event.getUser());
        assertEquals(job.getAppName(), event.getAppName());
        assertEquals(0, queue.size());

        // Resuming job
        new ResumeXCommand(job.getId()).call();
        job = jpaService.execute(wfJobGetCmd);
        assertEquals(WorkflowJob.Status.RUNNING, job.getStatus());
        assertEquals(1, queue.size());
        event = (JobEvent) queue.poll();
        assertNotNull(event);
        assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
        assertEquals(job.getId(), event.getId());
        assertEquals(job.getUser(), event.getUser());
        assertEquals(job.getAppName(), event.getAppName());
        assertEquals(job.getStartTime(), event.getStartTime());
        assertEquals(0, queue.size());

        // Killing job
        new KillXCommand(job.getId()).call();
        job = jpaService.execute(wfJobGetCmd);
        assertEquals(WorkflowJob.Status.KILLED, job.getStatus());
        assertEquals(1, queue.size());
        event = (JobEvent) queue.poll();
        assertNotNull(event);
        assertEquals(EventStatus.FAILURE, event.getEventStatus());
        assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
        assertEquals(job.getId(), event.getId());
        assertEquals(job.getUser(), event.getUser());
        assertEquals(job.getAppName(), event.getAppName());
        assertEquals(job.getStartTime(), event.getStartTime());
        assertEquals(job.getEndTime(), event.getEndTime());
        assertEquals(0, queue.size());

        // Successful job (testing SignalX)
        job = _createWorkflowJob();
        LiteWorkflowInstance wfInstance = (LiteWorkflowInstance) job.getWorkflowInstance();
        wfInstance.start();
        job.setWorkflowInstance(wfInstance);
        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, job);
        WorkflowActionBean wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(job.getId() + "@one"));
        new SignalXCommand(job.getId(), wfAction.getId()).call();
        job = jpaService.execute(new WorkflowJobGetJPAExecutor(job.getId()));
        assertEquals(WorkflowJob.Status.SUCCEEDED, job.getStatus());
        assertEquals(1, queue.size());
        event = (JobEvent) queue.poll();
        assertNotNull(event);
        assertEquals(AppType.WORKFLOW_JOB, event.getAppType());
        assertEquals(job.getId(), event.getId());
        assertEquals(job.getUser(), event.getUser());
        assertEquals(job.getAppName(), event.getAppName());
        assertEquals(job.getStartTime(), event.getStartTime());
        assertEquals(job.getEndTime(), event.getEndTime());

    }

    @Test
    public void testCoordinatorActionEvent() throws Exception {
        // avoid noise from other apptype events by setting it to only
        // coord action
        ehs.setAppTypes(new HashSet<String>(Arrays.asList("coordinator_action")));
        assertEquals(queue.size(), 0);
        Date startTime = DateUtils.parseDateOozieTZ("2013-01-01T10:00Z");
        Date endTime = DateUtils.parseDateOozieTZ("2013-01-01T10:01Z");
        CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
                false, 0);
        modifyCoordForRunning(coord);

        // Action WAITING on materialization
        new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
        final CoordActionGetJPAExecutor coordGetCmd = new CoordActionGetJPAExecutor(coord.getId() + "@1");
        CoordinatorActionBean action = jpaService.execute(coordGetCmd);
        assertEquals(CoordinatorAction.Status.WAITING, action.getStatus());
        assertEquals(1, queue.size());
        JobEvent event = (JobEvent) queue.poll();
        assertNotNull(event);
        assertEquals(EventStatus.WAITING, event.getEventStatus());
        assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
        assertEquals(action.getId(), event.getId());
        assertEquals(action.getJobId(), event.getParentId());
        assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
        assertNull(event.getStartTime());
        assertEquals(coord.getUser(), event.getUser());
        assertEquals(coord.getAppName(), event.getAppName());
        assertEquals(0, queue.size());

        // Make Action ready
        // In this case it will proceed to Running since n(ready_actions) < concurrency
        new CoordActionInputCheckXCommand(action.getId(), coord.getId()).call();
        action = jpaService.execute(coordGetCmd);
        assertEquals(CoordinatorAction.Status.RUNNING, action.getStatus());

        event = (JobEvent) queue.poll();
        assertEquals(EventStatus.STARTED, event.getEventStatus());
        assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
        assertEquals(action.getId(), event.getId());
        assertEquals(action.getJobId(), event.getParentId());
        assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
        WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(action.getExternalId()));
        assertEquals(wfJob.getStartTime(), event.getStartTime());
        assertEquals(coord.getUser(), event.getUser());
        assertEquals(coord.getAppName(), event.getAppName());

        sleep(2000);

        // Action Success
        wfJob.setStatus(WorkflowJob.Status.SUCCEEDED);
        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
        action.setStatus(CoordinatorAction.Status.RUNNING);
        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
        new CoordActionCheckXCommand(action.getId(), 0).call();
        action = jpaService.execute(coordGetCmd);
        assertEquals(CoordinatorAction.Status.SUCCEEDED, action.getStatus());
        List<Event> list =  queue.pollBatch();
        event = (JobEvent)list.get(list.size()-1);
        assertEquals(EventStatus.SUCCESS, event.getEventStatus());
        assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
        assertEquals(action.getId(), event.getId());
        assertEquals(action.getJobId(), event.getParentId());
        assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
        assertEquals(wfJob.getStartTime(), event.getStartTime());
        assertEquals(coord.getUser(), event.getUser());
        assertEquals(coord.getAppName(), event.getAppName());

        // Action Failure
        wfJob.setStatus(WorkflowJob.Status.FAILED);
        action.setStatus(CoordinatorAction.Status.RUNNING);
        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
        new CoordActionCheckXCommand(action.getId(), 0).call();
        action = jpaService.execute(coordGetCmd);
        assertEquals(CoordinatorAction.Status.FAILED, action.getStatus());
        event = (JobEvent) queue.poll();
        assertEquals(EventStatus.FAILURE, event.getEventStatus());
        assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
        assertEquals(action.getId(), event.getId());
        assertEquals(action.getJobId(), event.getParentId());
        assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
        assertEquals(wfJob.getStartTime(), event.getStartTime());
        assertEquals(coord.getUser(), event.getUser());
        assertEquals(coord.getAppName(), event.getAppName());

        // Action Suspended
        wfJob.setStatus(WorkflowJob.Status.SUSPENDED);
        action.setStatus(CoordinatorAction.Status.RUNNING);
        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
                action);
        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, wfJob);
        new CoordActionCheckXCommand(action.getId(), 0).call();
        action = jpaService.execute(coordGetCmd);
        assertEquals(CoordinatorAction.Status.SUSPENDED, action.getStatus());
        event = (JobEvent) queue.poll();
        assertEquals(EventStatus.SUSPEND, event.getEventStatus());
        assertEquals(AppType.COORDINATOR_ACTION, event.getAppType());
        assertEquals(action.getId(), event.getId());
        assertEquals(action.getJobId(), event.getParentId());
        assertEquals(action.getNominalTime(), ((CoordinatorActionEvent) event).getNominalTime());
        assertEquals(wfJob.getStartTime(), event.getStartTime());
        assertEquals(coord.getUser(), event.getUser());
        assertEquals(coord.getAppName(), event.getAppName());

        // Action start on Coord Resume
        coord.setStatus(CoordinatorJobBean.Status.SUSPENDED);
        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_STATUS, coord);
        action.setStatus(CoordinatorAction.Status.SUSPENDED);
        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
        wfJob.setStatus(WorkflowJob.Status.SUSPENDED);
        WorkflowInstance wfInstance = wfJob.getWorkflowInstance();
        ((LiteWorkflowInstance) wfInstance).setStatus(WorkflowInstance.Status.SUSPENDED);
        wfJob.setWorkflowInstance(wfInstance);
        WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob);
        queue.clear();
        new CoordResumeXCommand(coord.getId()).call();
        waitForEventGeneration(1000);
        CoordinatorActionEvent cevent = (CoordinatorActionEvent) queue.poll();
        assertEquals(EventStatus.STARTED, cevent.getEventStatus());
        assertEquals(AppType.COORDINATOR_ACTION, cevent.getAppType());
        assertEquals(action.getId(), cevent.getId());
        assertEquals(action.getJobId(), cevent.getParentId());
        assertEquals(action.getNominalTime(), cevent.getNominalTime());
        coord = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, coord.getId());
        assertEquals(coord.getLastModifiedTime(), cevent.getStartTime());

        // Action going to WAITING on Coord Rerun
        action.setStatus(CoordinatorAction.Status.KILLED);
        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action);
        queue.clear();
        new CoordRerunXCommand(coord.getId(), RestConstants.JOB_COORD_SCOPE_ACTION, "1", false, true, false,
                null).call();
        waitFor(3 * 100, new Predicate() {
            @Override
            public boolean evaluate() throws Exception {
                return jpaService.execute(coordGetCmd).getStatus() == CoordinatorAction.Status.WAITING;
            }
        });
        cevent = (CoordinatorActionEvent) queue.poll();
        assertEquals(EventStatus.WAITING, cevent.getEventStatus());
        assertEquals(AppType.COORDINATOR_ACTION, cevent.getAppType());
        assertEquals(action.getId(), cevent.getId());
        assertEquals(action.getJobId(), cevent.getParentId());
        assertEquals(action.getNominalTime(), cevent.getNominalTime());
        assertEquals(wfJob.getStartTime(), event.getStartTime());
        assertNotNull(cevent.getMissingDeps());

    }

    @Test
    public void testWorkflowActionEvent() throws Exception {
        assertEquals(queue.size(), 0);
        // avoid noise from other apptype events by setting it to only
        // workflow action
        ehs.setAppTypes(new HashSet<String>(Arrays.asList("workflow_action")));
        WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING);
        WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP, true);
        // adding record sets externalChildID to dummy workflow-id so resetting it
        action.setExternalChildIDs(null);
        WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION_START, action);

        // Starting job
        new ActionStartXCommand(action.getId(), "map-reduce").call();
        WorkflowActionGetJPAExecutor wfActionGetCmd = new WorkflowActionGetJPAExecutor(action.getId());
        action = jpaService.execute(wfActionGetCmd);
        assertEquals(WorkflowAction.Status.RUNNING, action.getStatus());
        assertEquals(1, queue.size());
        WorkflowActionEvent event = (WorkflowActionEvent) queue.poll();
        assertNotNull(event);
        assertEquals(EventStatus.STARTED, event.getEventStatus());
        assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
        assertEquals(action.getId(), event.getId());
        assertEquals(job.getUser(), event.getUser());
        assertEquals(action.getName(), event.getAppName());
        assertEquals(action.getStartTime(), event.getStartTime());
        assertEquals(0, queue.size());

        // Suspending job
        ActionExecutor.Context context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
        ActionExecutor executor = Services.get().get(ActionService.class).getExecutor(action.getType());
        ActionCheckXCommandForTest dac = new ActionCheckXCommandForTest(context, executor, action.getId());
        dac.execute();
        action = dac.getAction();
        assertEquals(WorkflowAction.Status.START_MANUAL, action.getStatus());
        assertEquals(1, queue.size());
        event = (WorkflowActionEvent) queue.poll();
        assertNotNull(event);
        assertEquals(EventStatus.SUSPEND, event.getEventStatus());
        assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
        assertEquals(action.getId(), event.getId());
        assertEquals(job.getUser(), event.getUser());
        assertEquals(action.getName(), event.getAppName());
        assertEquals(0, queue.size());

        // Killing job
        action.setStatus(WorkflowAction.Status.KILLED);
        action.setPendingOnly();
        action.setEndTime(null); //its already set by XTestCase add action record method above
        WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION_END, action);
        new ActionKillXCommand(action.getId()).call();
        action = jpaService.execute(wfActionGetCmd);
        assertEquals(WorkflowAction.Status.KILLED, action.getStatus());
        assertEquals(1, queue.size());
        event = (WorkflowActionEvent) queue.poll();
        assertNotNull(event);
        assertEquals(EventStatus.FAILURE, event.getEventStatus());
        assertEquals(AppType.WORKFLOW_ACTION, event.getAppType());
        assertEquals(action.getId(), event.getId());
        assertEquals(job.getUser(), event.getUser());
        assertEquals(action.getName(), event.getAppName());
        assertEquals(action.getStartTime(), event.getStartTime());
        assertNotNull(action.getEndTime());
        assertNotNull(event.getEndTime());
        assertEquals(action.getEndTime(), event.getEndTime());
        assertEquals(0, queue.size());

    }

    @Test
    public void testWorkflowJobEventError() throws Exception {
        final WorkflowJobBean job = addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED);
        // create event with error code and message
        WorkflowXCommand<Void> myCmd = new KillXCommand(job.getId()) {
            @Override
            protected Void execute() {
                WorkflowXCommand.generateEvent(job, "errorCode", "errorMsg");
                return null;
            }
        };
        myCmd.call();
        WorkflowJobEvent event = (WorkflowJobEvent) queue.poll();
        assertNotNull(event);
        assertEquals("errorCode", event.getErrorCode());
        assertEquals("errorMsg", event.getErrorMessage());
        assertEquals(EventStatus.FAILURE, event.getEventStatus());

    }

    @Test
    public void testCoordinatorActionEventDependencies() throws Exception {
        final CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
        final CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1,
                CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
        WorkflowJobBean wjb = new WorkflowJobBean();
        wjb.setId(action.getExternalId());
        wjb.setLastModifiedTime(new Date());
        WorkflowJobQueryExecutor.getInstance().insert(wjb);

        CoordinatorXCommand<Void> myCmd = new CoordActionCheckXCommand(action.getId(), 0) {
            @Override
            protected Void execute() {
                CoordinatorXCommand.generateEvent(action, coord.getUser(), coord.getAppName(), null);
                return null;
            }
        };

        // CASE 1: Only pull missing deps
        action.setMissingDependencies("pull");
        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
        myCmd.call();
        CoordinatorActionEvent event = (CoordinatorActionEvent) queue.poll();
        assertNotNull(event);
        assertEquals("pull", event.getMissingDeps());

        // CASE 2: Only hcat (push) missing deps
        action.setMissingDependencies(null);
        action.setPushMissingDependencies("push");
        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
        myCmd.call();
        event = (CoordinatorActionEvent) queue.poll();
        assertNotNull(event);
        assertEquals("push", event.getMissingDeps());

        // CASE 3: Both types
        action.setMissingDependencies("pull");
        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_DEPENDENCIES, action);
        myCmd.call();
        event = (CoordinatorActionEvent) queue.poll();
        assertNotNull(event);
        assertEquals("pull" + CoordELFunctions.INSTANCE_SEPARATOR + "push", event.getMissingDeps());

    }

    @Test
    public void testForNoDuplicatesWorkflowEvents() throws Exception {
        // test workflow job events
        Reader reader = IOUtils.getResourceAsReader("wf-no-op.xml", -1);
        Writer writer = new OutputStreamWriter(new FileOutputStream(getTestCaseDir() + "/workflow.xml"),
                StandardCharsets.UTF_8);
        IOUtils.copyCharStream(reader, writer);

        final DagEngine engine = new DagEngine(getTestUser());
        Configuration conf = new XConfiguration();
        conf.set(OozieClient.APP_PATH, getTestCaseFileUri("workflow.xml"));
        conf.set(OozieClient.USER_NAME, getTestUser());

        final String jobId1 = engine.submitJob(conf, true);
        final WorkflowJobGetJPAExecutor readCmd = new WorkflowJobGetJPAExecutor(jobId1);

        waitFor(1 * 100, new Predicate() {
            @Override
            public boolean evaluate() throws Exception {
                return jpaService.execute(readCmd).getStatus() == WorkflowJob.Status.SUCCEEDED;
            }
        });
        assertEquals(2, queue.size());
        assertEquals(EventStatus.STARTED, ((JobEvent)queue.poll()).getEventStatus());
        assertEquals(EventStatus.SUCCESS, ((JobEvent)queue.poll()).getEventStatus());
        queue.clear();
    }

    @Test
    public void testForNoDuplicatesCoordinatorActionEvents() throws Exception {
        // test coordinator action events (failure case)
        Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z");
        Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z");
        CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false,
                false, 0);
        _modifyCoordForFailureAction(coord, "wf-invalid-fork.xml");
        new CoordMaterializeTransitionXCommand(coord.getId(), 3600).call();
        final CoordJobGetJPAExecutor readCmd1 = new CoordJobGetJPAExecutor(coord.getId());
        waitFor(1 * 100, new Predicate() {
            @Override
            public boolean evaluate() throws Exception {
                CoordinatorJobBean bean = jpaService.execute(readCmd1);
                return bean.getStatus() == CoordinatorJob.Status.SUCCEEDED
                        || bean.getStatus() == CoordinatorJob.Status.KILLED;
            }
        });
        assertEquals(2, queue.size());
        assertEquals(EventStatus.WAITING, ((JobEvent)queue.poll()).getEventStatus());
        assertEquals(EventStatus.FAILURE, ((JobEvent)queue.poll()).getEventStatus());
        queue.clear();
    }

    @Test
    public void testInvalidXMLCoordinatorFailsForNoDuplicates() throws Exception {
        Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z");
        Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z");

        // test coordinator action events (failure from ActionStartX)
        ehs.getAppTypes().add("workflow_action");
        CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0);
        CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1, CoordinatorAction.Status.RUNNING,
                "coord-action-sla1.xml", 0);
        WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING,
                action.getId());
        action.setExternalId(wf.getId());
        CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION, action);

        String waId = _createWorkflowAction(wf.getId(), "wf-action");
        new ActionStartXCommand(waId, action.getType()).call();

        final CoordJobGetJPAExecutor readCmd2 = new CoordJobGetJPAExecutor(coord.getId());
        waitFor(1 * 100, new Predicate() {
            @Override
            public boolean evaluate() throws Exception {
                return jpaService.execute(readCmd2).getStatus() == CoordinatorJob.Status.KILLED;
            }
        });

        assertEquals(3, queue.size());

        HashMap<AppType,JobEvent> eventsMap = new HashMap<AppType,JobEvent>();
        while (queue.size() > 0){
            JobEvent event = (JobEvent) queue.poll();
            eventsMap.put(event.getAppType(), event);
        }

        assertEquals(3, eventsMap.size());

        //Check the WF action
        {
            JobEvent wfActionEvent = eventsMap.get(AppType.WORKFLOW_ACTION);
            assertNotNull("There should be a WF action", wfActionEvent);
            assertEquals(EventStatus.FAILURE, wfActionEvent.getEventStatus());
            assertEquals(waId, wfActionEvent.getId());
            assertEquals(AppType.WORKFLOW_ACTION, wfActionEvent.getAppType());
        }

        //Check the WF job
        {
            JobEvent wfJobEvent = eventsMap.get(AppType.WORKFLOW_JOB);
            assertNotNull("There should be a WF job", wfJobEvent);
            assertEquals(EventStatus.FAILURE, wfJobEvent.getEventStatus());
            assertEquals(wf.getId(), wfJobEvent.getId());
            assertEquals(AppType.WORKFLOW_JOB, wfJobEvent.getAppType());
        }

        //Check the Coordinator action
        {
            JobEvent coordActionEvent = eventsMap.get(AppType.COORDINATOR_ACTION);
            assertNotNull("There should be a Coordinator action", coordActionEvent);
            assertEquals(EventStatus.FAILURE, coordActionEvent.getEventStatus());
            assertEquals(action.getId(), coordActionEvent.getId());
            assertEquals(AppType.COORDINATOR_ACTION, coordActionEvent.getAppType());
        }
        queue.clear();
    }

    private class ActionCheckXCommandForTest extends ActionCheckXCommand {

        ActionExecutor.Context context;
        ActionExecutor executor;
        WorkflowActionBean action;
        JPAService jpa;

        public ActionCheckXCommandForTest(ActionExecutor.Context context, ActionExecutor executor, String actionId)
                throws JPAExecutorException {
            super(actionId);
            this.context = context;
            this.executor = executor;
            jpa = Services.get().get(JPAService.class);
            this.action = jpa.execute(new WorkflowActionGetJPAExecutor(actionId));
        }

        @Override
        public Void execute() throws CommandException {
            handleNonTransient(context, executor, WorkflowAction.Status.START_MANUAL);
            action = (WorkflowActionBean) ((ActionExecutorContext) context).getAction();
            if (!(executor instanceof ControlNodeActionExecutor) && EventHandlerService.isEnabled()) {
                generateEvent(action, getTestUser());
            }
            return null;
        }

        public WorkflowActionBean getAction() {
            return action;
        }

    }

    private WorkflowJobBean _createWorkflowJob() throws Exception {
        LiteWorkflowApp app = new LiteWorkflowApp("my-app", "<workflow-app/>",
                new StartNodeDef(TestControlNodeHandler.class, "one"))
                .addNode(new ActionNodeDef("one", "<java></java>", TestActionNodeHandler.class, "end", "end"))
                .addNode(new EndNodeDef("end", TestControlNodeHandler.class));
        Configuration conf = new Configuration();
        Path appUri = new Path(getAppPath(), "workflow.xml");
        conf.set(OozieClient.APP_PATH, appUri.toString());
        conf.set(OozieClient.LOG_TOKEN, "testToken");
        conf.set(OozieClient.USER_NAME, getTestUser());
        WorkflowJobBean workflow = createWorkflow(app, conf, WorkflowJob.Status.PREP,
                WorkflowInstance.Status.PREP);
        String executionPath = "/";

        assertNotNull(jpaService);
        WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(workflow);
        jpaService.execute(wfInsertCmd);
        WorkflowActionBean wfAction = addRecordToWfActionTable(workflow.getId(), "one", WorkflowAction.Status.OK,
                executionPath, true);
        wfAction.setPending();
        wfAction.setSignalValue(WorkflowAction.Status.OK.name());
        WorkflowActionQueryExecutor.getInstance().executeUpdate(WorkflowActionQuery.UPDATE_ACTION, wfAction);

        return workflow;
    }

    private void _modifyCoordForFailureAction(CoordinatorJobBean coord, String resourceXml) throws Exception {
        String wfXml = IOUtils.getResourceAsString(resourceXml, -1);
        writeToFile(wfXml, getFsTestCaseDir(), "workflow.xml");
        String coordXml = coord.getJobXml();
        coord.setJobXml(coordXml.replace("hdfs:///tmp/workflows/", getFsTestCaseDir() + "/workflow.xml"));
        CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord);
    }

    private String _createWorkflowAction(String wfId, String actionName) throws JPAExecutorException {
        WorkflowActionBean action = new WorkflowActionBean();
        action.setName(actionName);
        action.setId(Services.get().get(UUIDService.class).generateChildId(wfId, actionName));
        action.setJobId(wfId);
        action.setType("java");
        action.setTransition("transition");
        action.setStatus(WorkflowAction.Status.PREP);
        action.setStartTime(new Date());
        action.setEndTime(new Date());
        action.setLastCheckTime(new Date());
        action.setPendingOnly();

        String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                + getNameNodeUri() + "</name-node>" + "<main-class>" + "${dummy}" + "</java>";
        action.setConf(actionXml);
        jpaService.execute(new WorkflowActionInsertJPAExecutor(action));
        return action.getId();
    }

    private void waitForEventGeneration(int wait) {
        waitFor(wait, new Predicate() {
            @Override
            public boolean evaluate() throws Exception {
                return ehs.getEventQueue().peek() != null;
            }
        });
    }

}
