/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.oozie.jms;

import java.util.Date;
import java.util.Random;

import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.broker.BrokerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.AppType;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.client.event.Event.MessageType;
import org.apache.oozie.client.event.jms.JMSMessagingUtils;
import org.apache.oozie.client.event.jms.JMSHeaderConstants;
import org.apache.oozie.client.event.message.CoordinatorActionMessage;
import org.apache.oozie.client.event.message.WorkflowJobMessage;
import org.apache.oozie.event.*;
import org.apache.oozie.jms.ConnectionContext;
import org.apache.oozie.jms.JMSConnectionInfo;
import org.apache.oozie.jms.JMSJobEventListener;
import org.apache.oozie.service.JMSAccessorService;
import org.apache.oozie.service.JMSTopicService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.DateUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestJMSJobEventListener extends XTestCase {
    private Services services;
    private Configuration conf;

    @Before
    @Override
    protected void setUp() throws Exception {
        super.setUp();
        services = new Services();
        conf = services.getConf();
        conf.set(Services.CONF_SERVICE_EXT_CLASSES,
                JMSAccessorService.class.getName() + "," + JMSTopicService.class.getName());
        conf.set(JMSJobEventListener.JMS_CONNECTION_PROPERTIES, "java.naming.factory.initial#" + ActiveMQConnFactory
                + ";" + "java.naming.provider.url#" + localActiveMQBroker + ";connectionFactoryNames#"
                + "ConnectionFactory");
        services.init();
    }

    @After
    @Override
    protected void tearDown() throws Exception {
        services.destroy();
        super.tearDown();
    }

    @Test
    public void testOnWorkflowJobStartedEvent() throws Exception {
        JMSJobEventListener wfEventListener = new JMSJobEventListener();
        wfEventListener.init(conf);
        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.RUNNING, "user1",
                "wf-app-name1", startDate, null);

        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
        wfEventListener.onWorkflowJobEvent(wfe);
        TextMessage message = (TextMessage) consumer.receive(5000);
        assertFalse(message.getText().contains("endTime"));
        WorkflowJobMessage wfStartMessage = JMSMessagingUtils.getEventMessage(message);
        assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus());
        assertEquals(startDate, wfStartMessage.getStartTime());
        assertEquals("wfId1", wfStartMessage.getId());
        assertEquals("caId1", wfStartMessage.getParentId());
        assertEquals(MessageType.JOB, wfStartMessage.getMessageType());
        assertEquals(AppType.WORKFLOW_JOB, wfStartMessage.getAppType());
        assertEquals(EventStatus.STARTED, wfStartMessage.getEventStatus());
        assertEquals("user1", wfStartMessage.getUser());
        assertEquals("wf-app-name1", wfStartMessage.getAppName());
        wfEventListener.destroy();
    }

    @Test
    public void testOnWorkflowJobSuccessEvent() throws Exception {
        JMSJobEventListener wfEventListener = new JMSJobEventListener();
        wfEventListener.init(conf);
        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date endDate = new Date();
        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUCCEEDED, "user1",
                "wf-app-name1", startDate, endDate);

        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
        wfEventListener.onWorkflowJobEvent(wfe);
        TextMessage message = (TextMessage) consumer.receive(5000);
        WorkflowJobMessage wfSuccMessage = JMSMessagingUtils.getEventMessage(message);
        assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus());
        assertEquals(startDate, wfSuccMessage.getStartTime());
        assertEquals(endDate, wfSuccMessage.getEndTime());
        assertEquals("wfId1", wfSuccMessage.getId());
        assertEquals("caId1", wfSuccMessage.getParentId());
        assertEquals(MessageType.JOB, wfSuccMessage.getMessageType());
        assertEquals(AppType.WORKFLOW_JOB, wfSuccMessage.getAppType());
        assertEquals(EventStatus.SUCCESS, wfSuccMessage.getEventStatus());
        assertEquals("user1", wfSuccMessage.getUser());
        assertEquals("wf-app-name1", wfSuccMessage.getAppName());
        wfEventListener.destroy();
    }

    @Test
    public void testOnWorkflowJobFailureEvent() throws Exception {
        JMSJobEventListener wfEventListener = new JMSJobEventListener();
        wfEventListener.init(conf);
        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date endDate = new Date();
        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
                "wf-app-name1", startDate, endDate);
        wfe.setErrorCode("dummyErrorCode");
        wfe.setErrorMessage("dummyErrorMessage");
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
        wfEventListener.onWorkflowJobEvent(wfe);
        TextMessage message = (TextMessage) consumer.receive(5000);
        WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
        assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
        assertEquals(startDate, wfFailMessage.getStartTime());
        assertEquals(endDate, wfFailMessage.getEndTime());
        assertEquals("wfId1", wfFailMessage.getId());
        assertEquals("caId1", wfFailMessage.getParentId());
        assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
        assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
        assertEquals(EventStatus.FAILURE, wfFailMessage.getEventStatus());
        assertEquals("user1", wfFailMessage.getUser());
        assertEquals("wf-app-name1", wfFailMessage.getAppName());
        assertEquals("dummyErrorCode", wfFailMessage.getErrorCode());
        assertEquals("dummyErrorMessage", wfFailMessage.getErrorMessage());
        wfEventListener.destroy();
    }

    @Test
    public void testOnWorkflowJobSuspendEvent() throws Exception {
        JMSJobEventListener wfEventListener = new JMSJobEventListener();
        wfEventListener.init(conf);
        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUSPENDED, "user1",
                "wf-app-name1", startDate, null);
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
        wfEventListener.onWorkflowJobEvent(wfe);
        TextMessage message = (TextMessage) consumer.receive(5000);
        assertFalse(message.getText().contains("endTime"));
        WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
        assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus());
        assertEquals(startDate, wfFailMessage.getStartTime());
        assertEquals("wfId1", wfFailMessage.getId());
        assertEquals("caId1", wfFailMessage.getParentId());
        assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
        assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
        assertEquals(EventStatus.SUSPEND, wfFailMessage.getEventStatus());
        assertEquals("user1", wfFailMessage.getUser());
        assertEquals("wf-app-name1", wfFailMessage.getAppName());
        assertNull(wfFailMessage.getErrorCode());
        assertNull(wfFailMessage.getErrorMessage());
        wfEventListener.destroy();
    }

    @Test
    public void testWorkflowJobSelectors() throws Exception {
        JMSJobEventListener wfEventListener = new JMSJobEventListener();
        wfEventListener.init(conf);
        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user_1",
                "wf-app-name1", new Date(), new Date());
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        String selector = JMSHeaderConstants.USER + "='user_1'";
        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
        wfEventListener.onWorkflowJobEvent(wfe);
        TextMessage message = (TextMessage) consumer.receive(5000);
        WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
        Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
        assertEquals("user_1", wfFailMessage.getUser());
        assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
        wfEventListener.destroy();
    }

    @Test
    public void testWorkflowJobSelectorsNegative() throws Exception {
        JMSJobEventListener wfEventListener = new JMSJobEventListener();
        wfEventListener.init(conf);
        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
                "wf-app-name1", new Date(), new Date());
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        // Pass a selector which wont match and assert for null message
        String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
        wfEventListener.onWorkflowJobEvent(wfe);
        TextMessage message = (TextMessage) consumer.receive(5000);
        assertNull(message);
        wfEventListener.destroy();
    }

    @Test
    public void testWorkflowJobSelectorsOr() throws Exception {
        JMSJobEventListener wfEventListener = new JMSJobEventListener();
        wfEventListener.init(conf);
        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
                "wf-app-name1", new Date(), new Date());
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        // Pass a selector using OR condition
        String selector = JMSHeaderConstants.USER + "='Non_matching_user' OR " + JMSHeaderConstants.APP_NAME
                + "='wf-app-name1'";
        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
        wfEventListener.onWorkflowJobEvent(wfe);
        TextMessage message = (TextMessage) consumer.receive(5000);
        WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
        Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
        assertEquals("user1", wfFailMessage.getUser());
        assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
        wfEventListener.destroy();
    }

    @Test
    public void testWorkflowJobSelectorsAnd() throws Exception {
        JMSJobEventListener wfEventListener = new JMSJobEventListener();
        wfEventListener.init(conf);
        WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
                "wf-app-name1", new Date(), new Date());
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        // Pass a selector using AND condition
        String selector = JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_TYPE
                + "='WORKFLOW_JOB' AND " + JMSHeaderConstants.MESSAGE_TYPE + "='JOB'";
        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
        wfEventListener.onWorkflowJobEvent(wfe);
        TextMessage message = (TextMessage) consumer.receive(5000);
        WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
        Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
        assertEquals("user1", wfFailMessage.getUser());
        assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
        wfEventListener.destroy();
    }

    @Test
    public void testConnectionDrop() throws Exception {
        Random random = new Random();
        BrokerService broker = null;
        try {
            services.destroy();
            services = new Services();
            Configuration conf = services.getConf();
            conf.set(Services.CONF_SERVICE_EXT_CLASSES, JMSAccessorService.class.getName() + ","
                    + JMSTopicService.class.getName());
            int randomPort = 30000 + random.nextInt(10000);
            String brokerURl = "tcp://localhost:" + randomPort;
            conf.set(JMSJobEventListener.JMS_CONNECTION_PROPERTIES, "java.naming.factory.initial#"
                    + ActiveMQConnFactory + ";" + "java.naming.provider.url#" + brokerURl + ";connectionFactoryNames#"
                    + "ConnectionFactory");
            services.init();
            JMSJobEventListener wfEventListener = new JMSJobEventListener();
            wfEventListener.init(conf);
            broker = new BrokerService();
            broker.setDataDirectory(getTestCaseDir());
            broker.addConnector(brokerURl);
            broker.setUseJmx(false);
            broker.start();
            ConnectionContext jmsContext = getConnectionContext();
            assertNotNull(jmsContext);
            broker.stop();
            jmsContext = getConnectionContext();
            // Exception Listener should have removed the old conn context
            assertNull(jmsContext);
            broker = new BrokerService();
            broker.setDataDirectory(getTestCaseDir());
            broker.addConnector(brokerURl);
            broker.setUseJmx(false);
            broker.start();
            WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
                    "wf-app-name1", new Date(), new Date());

            jmsContext = getConnectionContext();
            Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
            MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
            wfEventListener.onWorkflowJobEvent(wfe);
            TextMessage message = (TextMessage) consumer.receive(5000);
            assertNotNull(message);
            broker.stop();
            wfEventListener.destroy();
        }
        finally {
            if (broker != null) {
                broker.stop();
            }
        }

    }

    @Test
    public void testOnCoordinatorActionWaitingEvent() throws Exception {
        JMSJobEventListener wfEventListner = new JMSJobEventListener();
        wfEventListner.init(conf);
        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.WAITING,
                "user1", "wf-app-name1", nominalTime, startDate, "missingDep1");
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListner.getTopic(cae));
        wfEventListner.onCoordinatorActionEvent(cae);
        TextMessage message = (TextMessage) consumer.receive(5000);
        assertFalse(message.getText().contains("endTime"));
        assertFalse(message.getText().contains("errorCode"));
        assertFalse(message.getText().contains("errorMessage"));
        CoordinatorActionMessage coordActionWaitingMessage = JMSMessagingUtils
                .getEventMessage(message);
        assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus());
        assertEquals(startDate, coordActionWaitingMessage.getStartTime());
        assertEquals(nominalTime, coordActionWaitingMessage.getNominalTime());
        assertEquals("caJobId1", coordActionWaitingMessage.getParentId());
        assertEquals("caId1", coordActionWaitingMessage.getId());
        assertEquals(MessageType.JOB, coordActionWaitingMessage.getMessageType());
        assertEquals(AppType.COORDINATOR_ACTION, coordActionWaitingMessage.getAppType());
        assertEquals(EventStatus.WAITING, coordActionWaitingMessage.getEventStatus());
        assertEquals("user1", coordActionWaitingMessage.getUser());
        assertEquals("wf-app-name1", coordActionWaitingMessage.getAppName());
        assertEquals("missingDep1", coordActionWaitingMessage.getMissingDependency());
    }

    @Test
    public void testOnCoordinatorActionStartEvent() throws Exception {
        JMSJobEventListener coordEventListener = new JMSJobEventListener();
        coordEventListener.init(conf);
        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.RUNNING,
                "user1", "wf-app-name1", nominalTime, startDate, null);
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
        coordEventListener.onCoordinatorActionEvent(cae);
        TextMessage message = (TextMessage) consumer.receive(5000);
        assertFalse(message.getText().contains("endTime"));
        assertFalse(message.getText().contains("errorCode"));
        assertFalse(message.getText().contains("errorMessage"));
        assertFalse(message.getText().contains("missingDependency"));
        CoordinatorActionMessage coordActionStartMessage = JMSMessagingUtils
                .getEventMessage(message);
        assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus());
        assertEquals(startDate, coordActionStartMessage.getStartTime());
        assertEquals("caJobId1", coordActionStartMessage.getParentId());
        assertEquals("caId1", coordActionStartMessage.getId());
        assertEquals(MessageType.JOB, coordActionStartMessage.getMessageType());
        assertEquals(AppType.COORDINATOR_ACTION, coordActionStartMessage.getAppType());
        assertEquals(EventStatus.STARTED, coordActionStartMessage.getEventStatus());
        assertEquals("user1", coordActionStartMessage.getUser());
        assertEquals("wf-app-name1", coordActionStartMessage.getAppName());
    }

    @Test
    public void testOnCoordinatorJobSuccessEvent() throws Exception {
        JMSJobEventListener coordEventListener = new JMSJobEventListener();
        coordEventListener.init(conf);
        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
        Date endDate = new Date();
        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1",
                CoordinatorAction.Status.SUCCEEDED, "user1", "wf-app-name1", nominalTime, startDate, null);
        cae.setEndTime(endDate);
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
        coordEventListener.onCoordinatorActionEvent(cae);
        TextMessage message = (TextMessage) consumer.receive(5000);
        assertFalse(message.getText().contains("errorCode"));
        assertFalse(message.getText().contains("errorMessage"));
        assertFalse(message.getText().contains("missingDependency"));
        CoordinatorActionMessage coordActionSuccessMessage = JMSMessagingUtils
                .getEventMessage(message);
        assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus());
        assertEquals(startDate, coordActionSuccessMessage.getStartTime());
        assertEquals(endDate, coordActionSuccessMessage.getEndTime());
        assertEquals("caJobId1", coordActionSuccessMessage.getParentId());
        assertEquals("caId1", coordActionSuccessMessage.getId());
        assertEquals(MessageType.JOB, coordActionSuccessMessage.getMessageType());
        assertEquals(AppType.COORDINATOR_ACTION, coordActionSuccessMessage.getAppType());
        assertEquals(EventStatus.SUCCESS, coordActionSuccessMessage.getEventStatus());
        assertEquals("user1", coordActionSuccessMessage.getUser());
        assertEquals("wf-app-name1", coordActionSuccessMessage.getAppName());
    }

    @Test
    public void testOnCoordinatorJobFailureEvent() throws Exception {
        JMSJobEventListener coordEventListener = new JMSJobEventListener();
        coordEventListener.init(conf);
        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
        Date endDate = new Date();
        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
                "user1", "wf-app-name1", nominalTime, startDate, null);
        cae.setEndTime(endDate);
        cae.setErrorCode("E0101");
        cae.setErrorMessage("dummyError");
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
        coordEventListener.onCoordinatorActionEvent(cae);
        TextMessage message = (TextMessage) consumer.receive(5000);
        assertFalse(message.getText().contains("missingDependency"));
        CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
                .getEventMessage(message);
        assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
        assertEquals(startDate, coordActionFailMessage.getStartTime());
        assertEquals(endDate, coordActionFailMessage.getEndTime());
        assertEquals("caJobId1", coordActionFailMessage.getParentId());
        assertEquals("caId1", coordActionFailMessage.getId());
        assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
        assertEquals(AppType.COORDINATOR_ACTION, coordActionFailMessage.getAppType());
        assertEquals(EventStatus.FAILURE, coordActionFailMessage.getEventStatus());
        assertEquals("user1", coordActionFailMessage.getUser());
        assertEquals("wf-app-name1", coordActionFailMessage.getAppName());
        assertEquals("E0101", coordActionFailMessage.getErrorCode());
        assertEquals("dummyError", coordActionFailMessage.getErrorMessage());
    }

    @Test
    public void testCoordinatorActionSelectors() throws Exception {
        JMSJobEventListener coordEventListener = new JMSJobEventListener();
        coordEventListener.init(conf);
        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
                "user1", "wf-app-name1", nominalTime, startDate, null);
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        String selector = JMSHeaderConstants.USER + "='user1'";
        MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
        coordEventListener.onCoordinatorActionEvent(cae);
        TextMessage message = (TextMessage) consumer.receive(5000);
        CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
                .getEventMessage(message);
        Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
        assertEquals("user1", coordActionFailMessage.getUser());
        assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
    }

    @Test
    public void testCoordinatorActionSelectorsNegative() throws Exception {
        JMSJobEventListener coordEventListener = new JMSJobEventListener();
        coordEventListener.init(conf);
        Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
        Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
        CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
                "user1", "wf-app-name1", nominalTime, startDate, null);
        ConnectionContext jmsContext = getConnectionContext();
        Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
        // Pass a selector which wont match and assert for null message
        String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
        MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
        coordEventListener.onCoordinatorActionEvent(cae);
        TextMessage message = (TextMessage) consumer.receive(5000);
        assertNull(message);
    }

    private ConnectionContext getConnectionContext() {
        Configuration conf = services.getConf();
        String jmsProps = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
        JMSConnectionInfo connInfo = new JMSConnectionInfo(jmsProps);
        JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
        ConnectionContext jmsContext = jmsService.createProducerConnectionContext(connInfo);
        return jmsContext;

    }

}
