blob: 77be04f1e3c297e4822ff08eb2b9a8968b3d3fe0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.jms;
import java.util.Date;
import java.util.Random;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.CoordinatorAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.AppType;
import org.apache.oozie.client.event.JobEvent.EventStatus;
import org.apache.oozie.client.event.Event.MessageType;
import org.apache.oozie.client.event.jms.JMSMessagingUtils;
import org.apache.oozie.client.event.jms.JMSHeaderConstants;
import org.apache.oozie.client.event.message.CoordinatorActionMessage;
import org.apache.oozie.client.event.message.WorkflowJobMessage;
import org.apache.oozie.event.*;
import org.apache.oozie.jms.ConnectionContext;
import org.apache.oozie.jms.JMSConnectionInfo;
import org.apache.oozie.jms.JMSJobEventListener;
import org.apache.oozie.service.JMSAccessorService;
import org.apache.oozie.service.JMSTopicService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.DateUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestJMSJobEventListener extends XTestCase {
private Services services;
private Configuration conf;
@Before
@Override
protected void setUp() throws Exception {
super.setUp();
services = new Services();
conf = services.getConf();
conf.set(Services.CONF_SERVICE_EXT_CLASSES,
JMSAccessorService.class.getName() + "," + JMSTopicService.class.getName());
conf.set(JMSJobEventListener.JMS_CONNECTION_PROPERTIES, "java.naming.factory.initial#" + ActiveMQConnFactory
+ ";" + "java.naming.provider.url#" + localActiveMQBroker + ";connectionFactoryNames#"
+ "ConnectionFactory");
services.init();
}
@After
@Override
protected void tearDown() throws Exception {
services.destroy();
super.tearDown();
}
@Test
public void testOnWorkflowJobStartedEvent() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.RUNNING, "user1",
"wf-app-name1", startDate, null);
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
assertFalse(message.getText().contains("endTime"));
WorkflowJobMessage wfStartMessage = JMSMessagingUtils.getEventMessage(message);
assertEquals(WorkflowJob.Status.RUNNING, wfStartMessage.getStatus());
assertEquals(startDate, wfStartMessage.getStartTime());
assertEquals("wfId1", wfStartMessage.getId());
assertEquals("caId1", wfStartMessage.getParentId());
assertEquals(MessageType.JOB, wfStartMessage.getMessageType());
assertEquals(AppType.WORKFLOW_JOB, wfStartMessage.getAppType());
assertEquals(EventStatus.STARTED, wfStartMessage.getEventStatus());
assertEquals("user1", wfStartMessage.getUser());
assertEquals("wf-app-name1", wfStartMessage.getAppName());
wfEventListener.destroy();
}
@Test
public void testOnWorkflowJobSuccessEvent() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
Date endDate = new Date();
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUCCEEDED, "user1",
"wf-app-name1", startDate, endDate);
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
WorkflowJobMessage wfSuccMessage = JMSMessagingUtils.getEventMessage(message);
assertEquals(WorkflowJob.Status.SUCCEEDED, wfSuccMessage.getStatus());
assertEquals(startDate, wfSuccMessage.getStartTime());
assertEquals(endDate, wfSuccMessage.getEndTime());
assertEquals("wfId1", wfSuccMessage.getId());
assertEquals("caId1", wfSuccMessage.getParentId());
assertEquals(MessageType.JOB, wfSuccMessage.getMessageType());
assertEquals(AppType.WORKFLOW_JOB, wfSuccMessage.getAppType());
assertEquals(EventStatus.SUCCESS, wfSuccMessage.getEventStatus());
assertEquals("user1", wfSuccMessage.getUser());
assertEquals("wf-app-name1", wfSuccMessage.getAppName());
wfEventListener.destroy();
}
@Test
public void testOnWorkflowJobFailureEvent() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
Date endDate = new Date();
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
"wf-app-name1", startDate, endDate);
wfe.setErrorCode("dummyErrorCode");
wfe.setErrorMessage("dummyErrorMessage");
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
assertEquals(startDate, wfFailMessage.getStartTime());
assertEquals(endDate, wfFailMessage.getEndTime());
assertEquals("wfId1", wfFailMessage.getId());
assertEquals("caId1", wfFailMessage.getParentId());
assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
assertEquals(EventStatus.FAILURE, wfFailMessage.getEventStatus());
assertEquals("user1", wfFailMessage.getUser());
assertEquals("wf-app-name1", wfFailMessage.getAppName());
assertEquals("dummyErrorCode", wfFailMessage.getErrorCode());
assertEquals("dummyErrorMessage", wfFailMessage.getErrorMessage());
wfEventListener.destroy();
}
@Test
public void testOnWorkflowJobSuspendEvent() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.SUSPENDED, "user1",
"wf-app-name1", startDate, null);
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
assertFalse(message.getText().contains("endTime"));
WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
assertEquals(WorkflowJob.Status.SUSPENDED, wfFailMessage.getStatus());
assertEquals(startDate, wfFailMessage.getStartTime());
assertEquals("wfId1", wfFailMessage.getId());
assertEquals("caId1", wfFailMessage.getParentId());
assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
assertEquals(AppType.WORKFLOW_JOB, wfFailMessage.getAppType());
assertEquals(EventStatus.SUSPEND, wfFailMessage.getEventStatus());
assertEquals("user1", wfFailMessage.getUser());
assertEquals("wf-app-name1", wfFailMessage.getAppName());
assertNull(wfFailMessage.getErrorCode());
assertNull(wfFailMessage.getErrorMessage());
wfEventListener.destroy();
}
@Test
public void testWorkflowJobSelectors() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user_1",
"wf-app-name1", new Date(), new Date());
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
String selector = JMSHeaderConstants.USER + "='user_1'";
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
assertEquals("user_1", wfFailMessage.getUser());
assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
wfEventListener.destroy();
}
@Test
public void testWorkflowJobSelectorsNegative() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
"wf-app-name1", new Date(), new Date());
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
// Pass a selector which wont match and assert for null message
String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
assertNull(message);
wfEventListener.destroy();
}
@Test
public void testWorkflowJobSelectorsOr() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
"wf-app-name1", new Date(), new Date());
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
// Pass a selector using OR condition
String selector = JMSHeaderConstants.USER + "='Non_matching_user' OR " + JMSHeaderConstants.APP_NAME
+ "='wf-app-name1'";
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
assertEquals("user1", wfFailMessage.getUser());
assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
wfEventListener.destroy();
}
@Test
public void testWorkflowJobSelectorsAnd() throws Exception {
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
"wf-app-name1", new Date(), new Date());
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
// Pass a selector using AND condition
String selector = JMSHeaderConstants.EVENT_STATUS + "='FAILURE' AND " + JMSHeaderConstants.APP_TYPE
+ "='WORKFLOW_JOB' AND " + JMSHeaderConstants.MESSAGE_TYPE + "='JOB'";
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe), selector);
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
WorkflowJobMessage wfFailMessage = JMSMessagingUtils.getEventMessage(message);
Assert.assertEquals(WorkflowJob.Status.FAILED, wfFailMessage.getStatus());
assertEquals("user1", wfFailMessage.getUser());
assertEquals(MessageType.JOB, wfFailMessage.getMessageType());
wfEventListener.destroy();
}
@Test
public void testConnectionDrop() throws Exception {
Random random = new Random();
BrokerService broker = null;
try {
services.destroy();
services = new Services();
Configuration conf = services.getConf();
conf.set(Services.CONF_SERVICE_EXT_CLASSES, JMSAccessorService.class.getName() + ","
+ JMSTopicService.class.getName());
int randomPort = 30000 + random.nextInt(10000);
String brokerURl = "tcp://localhost:" + randomPort;
conf.set(JMSJobEventListener.JMS_CONNECTION_PROPERTIES, "java.naming.factory.initial#"
+ ActiveMQConnFactory + ";" + "java.naming.provider.url#" + brokerURl + ";connectionFactoryNames#"
+ "ConnectionFactory");
services.init();
JMSJobEventListener wfEventListener = new JMSJobEventListener();
wfEventListener.init(conf);
broker = new BrokerService();
broker.setDataDirectory(getTestCaseDir());
broker.addConnector(brokerURl);
broker.setUseJmx(false);
broker.start();
ConnectionContext jmsContext = getConnectionContext();
assertNotNull(jmsContext);
broker.stop();
jmsContext = getConnectionContext();
// Exception Listener should have removed the old conn context
assertNull(jmsContext);
broker = new BrokerService();
broker.setDataDirectory(getTestCaseDir());
broker.addConnector(brokerURl);
broker.setUseJmx(false);
broker.start();
WorkflowJobEvent wfe = new WorkflowJobEvent("wfId1", "caId1", WorkflowJob.Status.FAILED, "user1",
"wf-app-name1", new Date(), new Date());
jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListener.getTopic(wfe));
wfEventListener.onWorkflowJobEvent(wfe);
TextMessage message = (TextMessage) consumer.receive(5000);
assertNotNull(message);
broker.stop();
wfEventListener.destroy();
}
finally {
if (broker != null) {
broker.stop();
}
}
}
@Test
public void testOnCoordinatorActionWaitingEvent() throws Exception {
JMSJobEventListener wfEventListner = new JMSJobEventListener();
wfEventListner.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.WAITING,
"user1", "wf-app-name1", nominalTime, startDate, "missingDep1");
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = jmsContext.createConsumer(session, wfEventListner.getTopic(cae));
wfEventListner.onCoordinatorActionEvent(cae);
TextMessage message = (TextMessage) consumer.receive(5000);
assertFalse(message.getText().contains("endTime"));
assertFalse(message.getText().contains("errorCode"));
assertFalse(message.getText().contains("errorMessage"));
CoordinatorActionMessage coordActionWaitingMessage = JMSMessagingUtils
.getEventMessage(message);
assertEquals(CoordinatorAction.Status.WAITING, coordActionWaitingMessage.getStatus());
assertEquals(startDate, coordActionWaitingMessage.getStartTime());
assertEquals(nominalTime, coordActionWaitingMessage.getNominalTime());
assertEquals("caJobId1", coordActionWaitingMessage.getParentId());
assertEquals("caId1", coordActionWaitingMessage.getId());
assertEquals(MessageType.JOB, coordActionWaitingMessage.getMessageType());
assertEquals(AppType.COORDINATOR_ACTION, coordActionWaitingMessage.getAppType());
assertEquals(EventStatus.WAITING, coordActionWaitingMessage.getEventStatus());
assertEquals("user1", coordActionWaitingMessage.getUser());
assertEquals("wf-app-name1", coordActionWaitingMessage.getAppName());
assertEquals("missingDep1", coordActionWaitingMessage.getMissingDependency());
}
@Test
public void testOnCoordinatorActionStartEvent() throws Exception {
JMSJobEventListener coordEventListener = new JMSJobEventListener();
coordEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.RUNNING,
"user1", "wf-app-name1", nominalTime, startDate, null);
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
coordEventListener.onCoordinatorActionEvent(cae);
TextMessage message = (TextMessage) consumer.receive(5000);
assertFalse(message.getText().contains("endTime"));
assertFalse(message.getText().contains("errorCode"));
assertFalse(message.getText().contains("errorMessage"));
assertFalse(message.getText().contains("missingDependency"));
CoordinatorActionMessage coordActionStartMessage = JMSMessagingUtils
.getEventMessage(message);
assertEquals(CoordinatorAction.Status.RUNNING, coordActionStartMessage.getStatus());
assertEquals(startDate, coordActionStartMessage.getStartTime());
assertEquals("caJobId1", coordActionStartMessage.getParentId());
assertEquals("caId1", coordActionStartMessage.getId());
assertEquals(MessageType.JOB, coordActionStartMessage.getMessageType());
assertEquals(AppType.COORDINATOR_ACTION, coordActionStartMessage.getAppType());
assertEquals(EventStatus.STARTED, coordActionStartMessage.getEventStatus());
assertEquals("user1", coordActionStartMessage.getUser());
assertEquals("wf-app-name1", coordActionStartMessage.getAppName());
}
@Test
public void testOnCoordinatorJobSuccessEvent() throws Exception {
JMSJobEventListener coordEventListener = new JMSJobEventListener();
coordEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
Date endDate = new Date();
CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1",
CoordinatorAction.Status.SUCCEEDED, "user1", "wf-app-name1", nominalTime, startDate, null);
cae.setEndTime(endDate);
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
coordEventListener.onCoordinatorActionEvent(cae);
TextMessage message = (TextMessage) consumer.receive(5000);
assertFalse(message.getText().contains("errorCode"));
assertFalse(message.getText().contains("errorMessage"));
assertFalse(message.getText().contains("missingDependency"));
CoordinatorActionMessage coordActionSuccessMessage = JMSMessagingUtils
.getEventMessage(message);
assertEquals(CoordinatorAction.Status.SUCCEEDED, coordActionSuccessMessage.getStatus());
assertEquals(startDate, coordActionSuccessMessage.getStartTime());
assertEquals(endDate, coordActionSuccessMessage.getEndTime());
assertEquals("caJobId1", coordActionSuccessMessage.getParentId());
assertEquals("caId1", coordActionSuccessMessage.getId());
assertEquals(MessageType.JOB, coordActionSuccessMessage.getMessageType());
assertEquals(AppType.COORDINATOR_ACTION, coordActionSuccessMessage.getAppType());
assertEquals(EventStatus.SUCCESS, coordActionSuccessMessage.getEventStatus());
assertEquals("user1", coordActionSuccessMessage.getUser());
assertEquals("wf-app-name1", coordActionSuccessMessage.getAppName());
}
@Test
public void testOnCoordinatorJobFailureEvent() throws Exception {
JMSJobEventListener coordEventListener = new JMSJobEventListener();
coordEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
Date endDate = new Date();
CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
"user1", "wf-app-name1", nominalTime, startDate, null);
cae.setEndTime(endDate);
cae.setErrorCode("E0101");
cae.setErrorMessage("dummyError");
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae));
coordEventListener.onCoordinatorActionEvent(cae);
TextMessage message = (TextMessage) consumer.receive(5000);
assertFalse(message.getText().contains("missingDependency"));
CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
.getEventMessage(message);
assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
assertEquals(startDate, coordActionFailMessage.getStartTime());
assertEquals(endDate, coordActionFailMessage.getEndTime());
assertEquals("caJobId1", coordActionFailMessage.getParentId());
assertEquals("caId1", coordActionFailMessage.getId());
assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
assertEquals(AppType.COORDINATOR_ACTION, coordActionFailMessage.getAppType());
assertEquals(EventStatus.FAILURE, coordActionFailMessage.getEventStatus());
assertEquals("user1", coordActionFailMessage.getUser());
assertEquals("wf-app-name1", coordActionFailMessage.getAppName());
assertEquals("E0101", coordActionFailMessage.getErrorCode());
assertEquals("dummyError", coordActionFailMessage.getErrorMessage());
}
@Test
public void testCoordinatorActionSelectors() throws Exception {
JMSJobEventListener coordEventListener = new JMSJobEventListener();
coordEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
"user1", "wf-app-name1", nominalTime, startDate, null);
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
String selector = JMSHeaderConstants.USER + "='user1'";
MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
coordEventListener.onCoordinatorActionEvent(cae);
TextMessage message = (TextMessage) consumer.receive(5000);
CoordinatorActionMessage coordActionFailMessage = JMSMessagingUtils
.getEventMessage(message);
Assert.assertEquals(CoordinatorAction.Status.FAILED, coordActionFailMessage.getStatus());
assertEquals("user1", coordActionFailMessage.getUser());
assertEquals(MessageType.JOB, coordActionFailMessage.getMessageType());
}
@Test
public void testCoordinatorActionSelectorsNegative() throws Exception {
JMSJobEventListener coordEventListener = new JMSJobEventListener();
coordEventListener.init(conf);
Date startDate = DateUtils.parseDateUTC("2012-07-22T00:00Z");
Date nominalTime = DateUtils.parseDateUTC("2011-07-11T00:00Z");
CoordinatorActionEvent cae = new CoordinatorActionEvent("caId1", "caJobId1", CoordinatorAction.Status.FAILED,
"user1", "wf-app-name1", nominalTime, startDate, null);
ConnectionContext jmsContext = getConnectionContext();
Session session = jmsContext.createSession(Session.AUTO_ACKNOWLEDGE);
// Pass a selector which wont match and assert for null message
String selector = JMSHeaderConstants.USER + "='Non_matching_user'";
MessageConsumer consumer = jmsContext.createConsumer(session, coordEventListener.getTopic(cae), selector);
coordEventListener.onCoordinatorActionEvent(cae);
TextMessage message = (TextMessage) consumer.receive(5000);
assertNull(message);
}
private ConnectionContext getConnectionContext() {
Configuration conf = services.getConf();
String jmsProps = conf.get(JMSJobEventListener.JMS_CONNECTION_PROPERTIES);
JMSConnectionInfo connInfo = new JMSConnectionInfo(jmsProps);
JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
ConnectionContext jmsContext = jmsService.createProducerConnectionContext(connInfo);
return jmsContext;
}
}