blob: a10ffc906f0d6dfa0f15611e85768e222b741795 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oozie.service;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Random;
import javax.jms.Session;
import org.apache.activemq.broker.BrokerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.dependency.hcat.HCatMessageHandler;
import org.apache.oozie.jms.ConnectionContext;
import org.apache.oozie.jms.DefaultConnectionContext;
import org.apache.oozie.jms.JMSConnectionInfo;
import org.apache.oozie.jms.MessageReceiver;
import org.apache.oozie.test.XTestCase;
import org.junit.Test;
public class TestJMSAccessorService extends XTestCase {
private Services services;
private static Random random = new Random();
private static final int JMS_TIMEOUT_MS = 5000;
@Override
protected void setUp() throws Exception {
super.setUp();
services = super.setupServicesForHCatalog();
services.init();
}
@Override
protected void tearDown() throws Exception {
services.destroy();
super.tearDown();
}
@Test
public void testConnection() throws Exception {
HCatAccessorService hcatService = services.get(HCatAccessorService.class);
JMSAccessorService jmsService = services.get(JMSAccessorService.class);
// both servers should connect to default JMS server
JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
ConnectionContext ctxt1 = jmsService.createConnectionContext(connInfo);
assertTrue(ctxt1.isConnectionInitialized());
JMSConnectionInfo connInfo1 = hcatService.getJMSConnectionInfo(new URI("http://unknown:80"));
ConnectionContext ctxt2 = jmsService.createConnectionContext(connInfo1);
assertTrue(ctxt2.isConnectionInitialized());
assertEquals(ctxt1, ctxt2);
ctxt1.close();
}
@Test
public void testRegisterSingleConsumerPerTopic() throws URISyntaxException {
HCatAccessorService hcatService = services.get(HCatAccessorService.class);
JMSAccessorService jmsService = services.get(JMSAccessorService.class);
String server = "hcat.server.com:5080";
String topic = "hcat.mydb.mytable";
JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
MessageReceiver receiver2 = jmsService.getMessageReceiver(connInfo, topic);
assertEquals(receiver1, receiver2);
}
@Test
public void testUnRegisterTopic() throws URISyntaxException {
HCatAccessorService hcatService = services.get(HCatAccessorService.class);
JMSAccessorService jmsService = services.get(JMSAccessorService.class);
String server = "hcat.server.com:5080";
String topic = "hcatalog.mydb.mytable";
JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(server));
MessageReceiver receiver1 = jmsService.getMessageReceiver(connInfo, topic);
assertNotNull(receiver1);
jmsService.unregisterFromNotification(connInfo, topic);
receiver1 = jmsService.getMessageReceiver(connInfo, topic);
assertEquals(null, receiver1);
}
@Test
public void testConnectionContext() throws Exception {
services.destroy();
services = super.setupServicesForHCatalog();
Configuration conf = services.getConf();
// set the connection factory name
String jmsURL = "hcat://${1}.${2}.server.com:8020=java.naming.factory.initial#" +
"org.apache.activemq.jndi.ActiveMQInitialContextFactory" +
";java.naming.provider.url#vm://localhost?broker.persistent=false;" +
"connectionFactoryNames#dynamicFactories/hcat.prod.${1}";
conf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, jmsURL);
services.init();
HCatAccessorService hcatService = services.get(HCatAccessorService.class);
JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcatserver.blue.server.com:8020"));
assertEquals(
"java.naming.factory.initial#org.apache.activemq.jndi.ActiveMQInitialContextFactory;java.naming.provider.url#" +
"vm://localhost?broker.persistent=false;connectionFactoryNames#dynamicFactories/hcat.prod.hcatserver",
connInfo.getJNDIPropertiesString());
ConnectionContext ctx = new DefaultConnectionContext();
ctx.createConnection(connInfo.getJNDIProperties());
ctx.close();
}
@Test
public void testConnectionRetry() throws Exception {
services.destroy();
services = super.setupServicesForHCatalog();
int randomPort = 30000 + random.nextInt(10000);
String brokerURl = "tcp://localhost:" + randomPort;
Configuration servicesConf = services.getConf();
servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1");
servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3");
servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=java.naming.factory.initial#"
+ ActiveMQConnFactory + ";" + "java.naming.provider.url#" + brokerURl + ";" + "connectionFactoryNames#"
+ "ConnectionFactory");
services.init();
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
String publisherAuthority = "hcat.server.com:5080";
String topic = "topic.topic1";
JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
assertFalse(jmsService.isListeningToTopic(connInfo, topic));
assertTrue(jmsService.isConnectionInRetryList(connInfo));
assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
// Start the broker and check if listening to topic now
BrokerService broker = new BrokerService();
try {
broker.addConnector(brokerURl);
broker.setDataDirectory(getTestCaseDir());
broker.setUseJmx(false);
broker.start();
waitFor(JMS_TIMEOUT_MS, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return jmsService.isListeningToTopic(connInfo, topic)
&& !jmsService.isConnectionInRetryList(connInfo)
&& !jmsService.isTopicInRetryList(connInfo, topic);
}
});
assertTrue(jmsService.isListeningToTopic(connInfo, topic));
assertFalse(jmsService.isConnectionInRetryList(connInfo));
assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
} finally {
broker.stop();
}
}
@Test
public void testConnectionRetryExceptionListener() throws Exception {
BrokerService broker = null;
try {
services.destroy();
services = super.setupServicesForHCatalog();
int randomPort = 30000 + random.nextInt(10000);
String brokerURL = "tcp://localhost:" + randomPort;
String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";"
+ "java.naming.provider.url#" + brokerURL + ";" + "connectionFactoryNames#" + "ConnectionFactory";
Configuration servicesConf = services.getConf();
servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1");
servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "3");
servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString);
services.init();
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
String publisherAuthority = "hcat.server.com:5080";
String topic = "topic.topic1";
// Start the broker
broker = new BrokerService();
broker.addConnector(brokerURL);
broker.setDataDirectory(getTestCaseDir());
broker.setUseJmx(false);
broker.start();
JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
assertTrue(jmsService.isListeningToTopic(connInfo, topic));
assertFalse(jmsService.isConnectionInRetryList(connInfo));
assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
ConnectionContext connCtxt = jmsService.createConnectionContext(connInfo);
broker.stop();
try {
connCtxt.createSession(Session.AUTO_ACKNOWLEDGE);
fail("Exception expected");
}
catch (Exception e) {
waitFor(JMS_TIMEOUT_MS, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return !jmsService.isListeningToTopic(connInfo, topic)
&& jmsService.isConnectionInRetryList(connInfo)
&& jmsService.isTopicInRetryList(connInfo, topic);
}
});
assertFalse(jmsService.isListeningToTopic(connInfo, topic));
assertTrue(jmsService.isConnectionInRetryList(connInfo));
assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
}
broker = new BrokerService();
broker.addConnector(brokerURL);
broker.setDataDirectory(getTestCaseDir());
broker.setUseJmx(false);
broker.start();
waitFor(JMS_TIMEOUT_MS, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return jmsService.isListeningToTopic(connInfo, topic)
&& !jmsService.isConnectionInRetryList(connInfo)
&& !jmsService.isTopicInRetryList(connInfo, topic);
}
});
assertTrue(jmsService.isListeningToTopic(connInfo, topic));
assertFalse(jmsService.isConnectionInRetryList(connInfo));
assertFalse(jmsService.isTopicInRetryList(connInfo, topic));
broker.stop();
} finally {
if (broker != null) {
broker.stop();
}
}
}
@Test
public void testConnectionRetryMaxAttempt() throws Exception {
services.destroy();
services = super.setupServicesForHCatalog();
String jndiPropertiesString = "java.naming.factory.initial#" + ActiveMQConnFactory + ";"
+ "java.naming.provider.url#" + "tcp://localhost:12345;connectionFactoryNames#ConnectionFactory";
Configuration servicesConf = services.getConf();
servicesConf.set(JMSAccessorService.CONF_RETRY_INITIAL_DELAY, "1");
servicesConf.set(JMSAccessorService.CONF_RETRY_MAX_ATTEMPTS, "1");
servicesConf.set(HCatAccessorService.JMS_CONNECTIONS_PROPERTIES, "default=" + jndiPropertiesString);
services.init();
HCatAccessorService hcatService = Services.get().get(HCatAccessorService.class);
JMSAccessorService jmsService = Services.get().get(JMSAccessorService.class);
String publisherAuthority = "hcat.server.com:5080";
String topic = "topic.topic1";
JMSConnectionInfo connInfo = hcatService.getJMSConnectionInfo(new URI("hcat://hcat.server.com:8020"));
jmsService.registerForNotification(connInfo, topic, new HCatMessageHandler(publisherAuthority));
assertTrue(jmsService.isConnectionInRetryList(connInfo));
assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
assertFalse(jmsService.isListeningToTopic(connInfo, topic));
waitFor(JMS_TIMEOUT_MS, new Predicate() {
@Override
public boolean evaluate() throws Exception {
return jmsService.getNumConnectionAttempts(connInfo) == 1
&& jmsService.isConnectionInRetryList(connInfo)
&& jmsService.isTopicInRetryList(connInfo, topic)
&& !jmsService.isListeningToTopic(connInfo, topic)
&& !jmsService.retryConnection(connInfo);
}
});
// Should not retry again as max attempt is 1
assertTrue(jmsService.isConnectionInRetryList(connInfo));
assertTrue(jmsService.isTopicInRetryList(connInfo, topic));
assertFalse(jmsService.isListeningToTopic(connInfo, topic));
assertFalse(jmsService.retryConnection(connInfo));
}
}