NIFI-9238: Mapped JMSConnectionFactoryProvider properties to Qpid JMS client setters
NIFI-9238: Added documentation
Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>
This closes #5408.
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java
index e628510..6e36921 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java
@@ -138,13 +138,15 @@
* @see #setProperty(String propertyName, Object propertyValue)
*/
void setConnectionFactoryProperties() {
+ String connectionFactoryValue = context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
if (context.getProperty(JMS_BROKER_URI).isSet()) {
String brokerValue = context.getProperty(JMS_BROKER_URI).evaluateAttributeExpressions().getValue();
- String connectionFactoryValue = context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
if (connectionFactoryValue.startsWith("org.apache.activemq")) {
setProperty("brokerURL", brokerValue);
} else if (connectionFactoryValue.startsWith("com.tibco.tibjms")) {
setProperty("serverUrl", brokerValue);
+ } else if (connectionFactoryValue.startsWith("org.apache.qpid.jms")) {
+ setProperty("remoteURI", brokerValue);
} else {
String[] brokerList = brokerValue.split(",");
if (connectionFactoryValue.startsWith("com.ibm.mq.jms")) {
@@ -170,10 +172,15 @@
}
}
- SSLContextService sc = context.getProperty(JMS_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- if (sc != null) {
- SSLContext ssl = sc.createContext();
- setProperty("sSLSocketFactory", ssl.getSocketFactory());
+ SSLContextService sslContextService = context.getProperty(JMS_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ if (sslContextService != null) {
+ SSLContext sslContext = sslContextService.createContext();
+ if (connectionFactoryValue.startsWith("org.apache.qpid.jms")) {
+ setProperty("sslContext", sslContext);
+ } else {
+ // IBM MQ (and others)
+ setProperty("sSLSocketFactory", sslContext.getSocketFactory());
+ }
}
propertyDescriptors.stream()
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html
index ef1ab82..bbf5e49 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/resources/docs/org.apache.nifi.jms.cf.JMSConnectionFactoryProvider/additionalDetails.html
@@ -38,6 +38,7 @@
<li>Apache ActiveMQ</li>
<li>IBM MQ</li>
<li>TIBCO EMS</li>
+ <li>Qpid JMS (AMQP 1.0)</li>
</ul>
<p>
This controller service exposes only a single mandatory static configuration property that are required across all
@@ -53,6 +54,7 @@
<li>Apache ActiveMQ - <a href="http://activemq.apache.org/maven/5.15.9/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html" target="_blank">org.apache.activemq.ActiveMQConnectionFactory</a></li>
<li>IBM MQ - <a href="https://www-01.ibm.com/support/knowledgecenter/SSFKSJ_8.0.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQQueueConnectionFactory.html" target="_blank">com.ibm.mq.jms.MQQueueConnectionFactory</a></li>
<li>TIBCO EMS - <a href="https://docs.tibco.com/pub/enterprise_message_service/8.1.0/doc/html/tib_ems_api_reference/api/javadoc/com/tibco/tibjms/TibjmsQueueConnectionFactory.html" target="_blank">com.tibco.tibjms.TibjmsQueueConnectionFactory</a></li>
+ <li>Qpid JMS (AMQP 1.0) - <a href="https://github.com/apache/qpid-jms/blob/1.1.0/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java" target="_blank">org.apache.qpid.jms.JmsConnectionFactory</a></li>
</ul>
</li>
</ul>
@@ -75,6 +77,9 @@
<li>TIBCO EMS - <i>tcp://myhost:1234</i> for single broker and
<i>tcp://myhost01:7222,tcp://myhost02:7222</i> for multiple brokers.
</li>
+ <li>Qpid JMS (AMQP 1.0) - <i>amqp[s]://myhost:1234</i> for single broker and
+ <i>failover:(amqp[s]://myhost01:1234,amqp[s]://myhost02:1234)</i> for multiple brokers.
+ </li>
</ul>
</li>
</ul>
@@ -96,8 +101,19 @@
<li>
<a href="https://docs.tibco.com/pub/enterprise_message_service/8.1.0/doc/html/tib_ems_api_reference/api/javadoc/com/tibco/tibjms/TibjmsConnectionFactory.html" target="_blank">TIBCO
EMS</a></li>
+ <li>
+ <a href="https://docs.tibco.com/pub/enterprise_message_service/8.1.0/doc/html/tib_ems_api_reference/api/javadoc/com/tibco/tibjms/TibjmsConnectionFactory.html" target="_blank">Qpid
+ JMS (AMQP 1.0)</a> </li>
</ul>
-
+<p>
+ Besides the dynamic properties and <i>set</i> methods described in the previous section, some providers also support additional
+ configuration via the Broker URI (as query parameters added to the URI):
+</p>
+<ul>
+ <li>
+ <a href="https://qpid.apache.org/releases/qpid-jms-1.1.0/docs/index.html#connection-uri" target="_blank">Qpid
+ JMS (AMQP 1.0)</a> </li>
+</ul>
<h2>Sample controller service configuration for IBM MQ</h2>
<table>
<tr>
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java
index c97c402..1dcdbb3 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderTest.java
@@ -20,6 +20,7 @@
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.MockConfigurationContext;
import org.apache.nifi.util.TestRunner;
@@ -30,11 +31,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.net.ssl.SSLContext;
import java.net.URISyntaxException;
import java.util.Collections;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Tests for {@link JMSConnectionFactoryProvider}
@@ -58,13 +61,16 @@
private static final String MULTIPLE_IBM_MQ_BROKERS = "myhost01(1414),myhost02(1414)";
private static final String MULTIPLE_IBM_MQ_MIXED_BROKERS = "myhost01:1414,myhost02(1414)";
private static final String MULTIPLE_IBM_MQ_COLON_PAIR_BROKERS = "myhost01:1414,myhost02:1414";
+ private static final String SINGLE_QPID_JMS_BROKER = "amqp://myhost:5672";
private static final String TEST_CONNECTION_FACTORY_IMPL = "org.apache.nifi.jms.testcflib.TestConnectionFactory";
private static final String ACTIVEMQ_CONNECTION_FACTORY_IMPL = "org.apache.activemq.ActiveMQConnectionFactory";
private static final String TIBCO_CONNECTION_FACTORY_IMPL = "com.tibco.tibjms.TibjmsConnectionFactory";
private static final String IBM_MQ_CONNECTION_FACTORY_IMPL = "com.ibm.mq.jms.MQConnectionFactory";
+ private static final String QPID_JMS_CONNECTION_FACTORY_IMPL = "org.apache.qpid.jms.JmsConnectionFactory";
- private static final String controllerServiceId = "cfProvider";
+ private static final String CF_PROVIDER_SERVICE_ID = "cfProvider";
+ private static final String SSL_CONTEXT_SERVICE_ID = "sslContextService";
private static final String DUMMY_JAR_1 = "dummy-lib.jar";
private static final String DUMMY_JAR_2 = "dummy-lib-2.jar";
@@ -86,7 +92,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, "foo");
@@ -101,7 +107,7 @@
runner.setValidateExpressionUsage(true);
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setVariable("broker.uri", SINGLE_TEST_BROKER_WITH_SCHEME_AND_IP);
runner.setVariable("client.lib", dummyResource);
@@ -119,7 +125,7 @@
runner.setValidateExpressionUsage(true);
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setVariable("broker.uri", SINGLE_TEST_BROKER_WITH_SCHEME_AND_IP);
runner.setVariable("client.lib", allDummyResources);
@@ -153,7 +159,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -167,7 +173,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER_WITH_SCHEME);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -181,7 +187,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_TEST_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -195,7 +201,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_ACTIVEMQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -209,7 +215,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_ACTIVEMQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -223,7 +229,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TIBCO_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -237,7 +243,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_TIBCO_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -251,7 +257,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_IBM_MQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -265,7 +271,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -279,7 +285,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_MIXED_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -293,7 +299,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_COLON_PAIR_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -303,11 +309,25 @@
}
@Test
+ public void validWithSingleQpidJmsBroker() throws InitializationException {
+ TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+ JMSConnectionFactoryProvider cfProvider = new JMSConnectionFactoryProvider();
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
+
+ runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_QPID_JMS_BROKER);
+ runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
+ runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, QPID_JMS_CONNECTION_FACTORY_IMPL);
+
+ runner.assertValid(cfProvider);
+ }
+
+ @Test
public void propertiesSetOnSingleTestBrokerConnectionFactory() throws InitializationException {
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -315,7 +335,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("hostName", HOSTNAME, "port", PORT));
+ assertEquals(ImmutableMap.of("hostName", HOSTNAME, "port", PORT), cfProvider.getConfiguredProperties());
}
@Test
@@ -323,7 +343,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER_WITH_SCHEME);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -331,7 +351,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of());
+ assertEquals(ImmutableMap.of(), cfProvider.getConfiguredProperties());
}
@Test
@@ -339,7 +359,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_TEST_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -347,7 +367,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("hostName", "myhost01", "port", "1234"));
+ assertEquals(ImmutableMap.of("hostName", "myhost01", "port", "1234"), cfProvider.getConfiguredProperties());
}
@Test
@@ -355,7 +375,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_ACTIVEMQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -363,7 +383,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("brokerURL", SINGLE_ACTIVEMQ_BROKER));
+ assertEquals(ImmutableMap.of("brokerURL", SINGLE_ACTIVEMQ_BROKER), cfProvider.getConfiguredProperties());
}
@Test
@@ -371,7 +391,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_ACTIVEMQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -379,7 +399,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("brokerURL", MULTIPLE_ACTIVEMQ_BROKERS));
+ assertEquals(ImmutableMap.of("brokerURL", MULTIPLE_ACTIVEMQ_BROKERS), cfProvider.getConfiguredProperties());
}
@Test
@@ -387,7 +407,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TIBCO_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -395,7 +415,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("serverUrl", SINGLE_TIBCO_BROKER));
+ assertEquals(ImmutableMap.of("serverUrl", SINGLE_TIBCO_BROKER), cfProvider.getConfiguredProperties());
}
@Test
@@ -403,7 +423,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_TIBCO_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -411,7 +431,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("serverUrl", MULTIPLE_TIBCO_BROKERS));
+ assertEquals(ImmutableMap.of("serverUrl", MULTIPLE_TIBCO_BROKERS), cfProvider.getConfiguredProperties());
}
@Test
@@ -419,7 +439,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_IBM_MQ_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -427,7 +447,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("connectionNameList", SINGLE_IBM_MQ_BROKER));
+ assertEquals(ImmutableMap.of("connectionNameList", SINGLE_IBM_MQ_BROKER), cfProvider.getConfiguredProperties());
}
@Test
@@ -435,7 +455,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -443,7 +463,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
+ assertEquals(ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS), cfProvider.getConfiguredProperties());
}
@Test
@@ -451,7 +471,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_MIXED_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -459,7 +479,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
+ assertEquals(ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS), cfProvider.getConfiguredProperties());
}
@Test
@@ -467,7 +487,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, MULTIPLE_IBM_MQ_COLON_PAIR_BROKERS);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -475,7 +495,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS));
+ assertEquals(ImmutableMap.of("connectionNameList", MULTIPLE_IBM_MQ_BROKERS), cfProvider.getConfiguredProperties());
}
@Test
@@ -483,7 +503,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_TEST_BROKER);
runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
@@ -491,7 +511,7 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("connectionNameList", HOSTNAME + "(" + PORT + ")"));
+ assertEquals(ImmutableMap.of("connectionNameList", HOSTNAME + "(" + PORT + ")"), cfProvider.getConfiguredProperties());
}
@Test
@@ -499,7 +519,7 @@
TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
- runner.addControllerService(controllerServiceId, cfProvider);
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
runner.setVariable("test", "dynamicValue");
@@ -510,6 +530,46 @@
runner.enableControllerService(cfProvider);
- assertEquals(cfProvider.getConfiguredProperties(), ImmutableMap.of("dynamicProperty", "dynamicValue", "hostName", HOSTNAME, "port", PORT));
+ assertEquals(ImmutableMap.of("dynamicProperty", "dynamicValue", "hostName", HOSTNAME, "port", PORT), cfProvider.getConfiguredProperties());
+ }
+
+ @Test
+ public void propertiesSetOnSingleQpidJmsConnectionFactory() throws Exception {
+ TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+ JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
+
+ runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_QPID_JMS_BROKER);
+ runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
+ runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, QPID_JMS_CONNECTION_FACTORY_IMPL);
+
+ runner.enableControllerService(cfProvider);
+
+ assertEquals(ImmutableMap.of("remoteURI", SINGLE_QPID_JMS_BROKER), cfProvider.getConfiguredProperties());
+ }
+
+ @Test
+ public void propertiesSetOnSingleQpidJmsWithSslConnectionFactory() throws Exception {
+ TestRunner runner = TestRunners.newTestRunner(mock(Processor.class));
+
+ JMSConnectionFactoryProviderForTest cfProvider = new JMSConnectionFactoryProviderForTest();
+ runner.addControllerService(CF_PROVIDER_SERVICE_ID, cfProvider);
+
+ runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_BROKER_URI, SINGLE_QPID_JMS_BROKER);
+ runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CLIENT_LIBRARIES, dummyResource);
+ runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_CONNECTION_FACTORY_IMPL, QPID_JMS_CONNECTION_FACTORY_IMPL);
+
+ SSLContext sslContext = SSLContext.getDefault();
+ SSLContextService sslContextService = mock(SSLContextService.class);
+ when(sslContextService.getIdentifier()).thenReturn(SSL_CONTEXT_SERVICE_ID);
+ when(sslContextService.createContext()).thenReturn(sslContext);
+
+ runner.addControllerService(SSL_CONTEXT_SERVICE_ID, sslContextService);
+ runner.setProperty(cfProvider, JMSConnectionFactoryProperties.JMS_SSL_CONTEXT_SERVICE, SSL_CONTEXT_SERVICE_ID);
+
+ runner.enableControllerService(cfProvider);
+
+ assertEquals(ImmutableMap.of("remoteURI", SINGLE_QPID_JMS_BROKER, "sslContext", sslContext), cfProvider.getConfiguredProperties());
}
}