blob: 49a95c8880728bb277556bdd8e18fc6ddcc158be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.jms.cf;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
import org.apache.nifi.controller.VerifiableControllerService;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSSecurityException;
import javax.jms.Session;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
* Provides a factory service that creates and initializes
* {@link ConnectionFactory} specific to the third party JMS system.
* <p>
* It accomplishes it by adjusting current classpath by adding to it the
* additional resources (i.e., JMS client libraries) provided by the user via
* {@link JMSConnectionFactoryProperties#JMS_CLIENT_LIBRARIES}, allowing it then to create an instance of the
* target {@link ConnectionFactory} based on the provided
* {@link JMSConnectionFactoryProperties#JMS_CONNECTION_FACTORY_IMPL} which can be than access via
* {@link #getConnectionFactory()} method.
*/
@Tags({"jms", "messaging", "integration", "queue", "topic", "publish", "subscribe"})
@CapabilityDescription("Provides a generic service to create vendor specific javax.jms.ConnectionFactory implementations. "
+ "The Connection Factory can be served once this service is configured successfully.")
@DynamicProperty(name = "The name of a Connection Factory configuration property.", value = "The value of a given Connection Factory configuration property.",
description = "The properties that are set following Java Beans convention where a property name is derived from the 'set*' method of the vendor "
+ "specific ConnectionFactory's implementation. For example, 'com.ibm.mq.jms.MQConnectionFactory.setChannel(String)' would imply 'channel' "
+ "property and 'com.ibm.mq.jms.MQConnectionFactory.setTransportType(int)' would imply 'transportType' property.",
expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
@SeeAlso(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS", "org.apache.nifi.jms.processors.PublishJMS"})
public class JMSConnectionFactoryProvider extends AbstractControllerService implements JMSConnectionFactoryProviderDefinition, VerifiableControllerService {
private static final String ESTABLISH_CONNECTION = "Establish Connection";
private static final String VERIFY_JMS_INTERACTION = "Verify JMS Interaction";
protected volatile JMSConnectionFactoryHandler delegate;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return JMSConnectionFactoryProperties.getPropertyDescriptors();
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return JMSConnectionFactoryProperties.getDynamicPropertyDescriptor(propertyDescriptorName);
}
@OnEnabled
public void onEnabled(ConfigurationContext context) {
delegate = new JMSConnectionFactoryHandler(context, getLogger());
}
@OnDisabled
public void onDisabled() {
delegate = null;
}
@Override
public ConnectionFactory getConnectionFactory() {
return delegate.getConnectionFactory();
}
@Override
public void resetConnectionFactory(ConnectionFactory cachedFactory) {
delegate.resetConnectionFactory(cachedFactory);
}
@Override
public List<ConfigVerificationResult> verify(final ConfigurationContext context, final ComponentLog verificationLogger, final Map<String, String> variables) {
final List<ConfigVerificationResult> results = new ArrayList<>();
final JMSConnectionFactoryHandler handler = new JMSConnectionFactoryHandler(context, verificationLogger);
final AtomicReference<Exception> failureReason = new AtomicReference<>();
final ExceptionListener listener = failureReason::set;
final Connection connection = createConnection(handler.getConnectionFactory(), results, listener, verificationLogger);
if (connection != null) {
try {
createSession(connection, results, failureReason.get(), verificationLogger);
} finally {
try {
connection.close();
} catch (final Exception ignored) {
}
}
}
return results;
}
private Connection createConnection(final ConnectionFactory connectionFactory, final List<ConfigVerificationResult> results, final ExceptionListener exceptionListener, final ComponentLog logger) {
try {
final Connection connection = connectionFactory.createConnection();
connection.setExceptionListener(exceptionListener);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName(ESTABLISH_CONNECTION)
.outcome(Outcome.SUCCESSFUL)
.explanation("Successfully established a JMS Connection")
.build());
return connection;
} catch (final JMSSecurityException se) {
// If we encounter a JMS Security Exception, the documentation states that it is because of an invalid username or password.
// There is no username or password configured for the Controller Service itself, however. Those are configured in processors, etc.
// As a result, if this is encountered, we will skip verification.
logger.debug("Failed to establish a connection to the JMS Server in order to verify configuration because encountered JMS Security Exception", se);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName(ESTABLISH_CONNECTION)
.outcome(Outcome.SKIPPED)
.explanation("Could not establish a Connection because doing so requires that a username and password be provided")
.build());
} catch (final Exception e) {
logger.warn("Failed to establish a connection to the JMS Server in order to verify configuration", e);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName(ESTABLISH_CONNECTION)
.outcome(Outcome.FAILED)
.explanation("Was not able to establish a connection to the JMS Server: " + e.toString())
.build());
}
return null;
}
private void createSession(final Connection connection, final List<ConfigVerificationResult> results, final Exception capturedException, final ComponentLog logger) {
try {
final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
session.close();
results.add(new ConfigVerificationResult.Builder()
.verificationStepName(VERIFY_JMS_INTERACTION)
.outcome(Outcome.SUCCESSFUL)
.explanation("Established a JMS Session with server and successfully terminated it")
.build());
} catch (final Exception e) {
final Exception failure;
if (capturedException == null) {
failure = e;
} else {
failure = capturedException;
failure.addSuppressed(e);
}
logger.warn("Failed to create a JMS Session in order to verify configuration", failure);
results.add(new ConfigVerificationResult.Builder()
.verificationStepName(VERIFY_JMS_INTERACTION)
.outcome(Outcome.FAILED)
.explanation("Was not able to create a JMS Session: " + failure.toString())
.build());
}
}
}