| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.jms.processors; |
| |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.annotation.lifecycle.OnStopped; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.expression.ExpressionLanguageScope; |
| import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider; |
| import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition; |
| import org.apache.nifi.processor.AbstractProcessor; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.Processor; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.util.StandardValidators; |
| import org.springframework.jms.connection.CachingConnectionFactory; |
| import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter; |
| import org.springframework.jms.core.JmsTemplate; |
| |
| import javax.jms.ConnectionFactory; |
| import javax.jms.Message; |
| import java.nio.charset.Charset; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| /** |
| * Base JMS processor to support implementation of JMS producers and consumers. |
| * |
| * @param <T> the type of {@link JMSWorker} which could be {@link JMSPublisher} or {@link JMSConsumer} |
| * @see PublishJMS |
| * @see ConsumeJMS |
| * @see JMSConnectionFactoryProviderDefinition |
| */ |
| abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcessor { |
| |
| static final String QUEUE = "QUEUE"; |
| static final String TOPIC = "TOPIC"; |
| static final String TEXT_MESSAGE = "text"; |
| static final String BYTES_MESSAGE = "bytes"; |
| |
| static final PropertyDescriptor USER = new PropertyDescriptor.Builder() |
| .name("User Name") |
| .description("User Name used for authentication and authorization.") |
| .required(false) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) |
| .build(); |
| static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder() |
| .name("Password") |
| .description("Password used for authentication and authorization.") |
| .required(false) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .sensitive(true) |
| .build(); |
| static final PropertyDescriptor DESTINATION = new PropertyDescriptor.Builder() |
| .name("Destination Name") |
| .description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic' or 'myTopic').") |
| .required(true) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .build(); |
| static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder() |
| .name("Destination Type") |
| .description("The type of the JMS Destination. Could be one of 'QUEUE' or 'TOPIC'. Usually provided by the administrator. Defaults to 'QUEUE'") |
| .required(true) |
| .allowableValues(QUEUE, TOPIC) |
| .defaultValue(QUEUE) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| static final PropertyDescriptor CLIENT_ID = new PropertyDescriptor.Builder() |
| .name("Connection Client ID") |
| .description("The client id to be set on the connection, if set. For durable non shared consumer this is mandatory, " + |
| "for all others it is optional, typically with shared consumers it is undesirable to be set. " + |
| "Please see JMS spec for further details") |
| .required(false) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) |
| .build(); |
| static final PropertyDescriptor SESSION_CACHE_SIZE = new PropertyDescriptor.Builder() |
| .name("Session Cache size") |
| .description("This property is deprecated and no longer has any effect on the Processor. It will be removed in a later version.") |
| .required(false) |
| .defaultValue("1") |
| .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) |
| .build(); |
| static final PropertyDescriptor MESSAGE_BODY = new PropertyDescriptor.Builder() |
| .name("message-body-type") |
| .displayName("Message Body Type") |
| .description("The type of JMS message body to construct.") |
| .required(true) |
| .defaultValue(BYTES_MESSAGE) |
| .allowableValues(BYTES_MESSAGE, TEXT_MESSAGE) |
| .build(); |
| public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() |
| .name("character-set") |
| .displayName("Character Set") |
| .description("The name of the character set to use to construct or interpret TextMessages") |
| .required(true) |
| .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) |
| .defaultValue(Charset.defaultCharset().name()) |
| .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) |
| .build(); |
| static final PropertyDescriptor ALLOW_ILLEGAL_HEADER_CHARS = new PropertyDescriptor.Builder() |
| .name("allow-illegal-chars-in-jms-header-names") |
| .displayName("Allow Illegal Characters in Header Names") |
| .description("Specifies whether illegal characters in header names should be sent to the JMS broker. " + |
| "Usually hyphens and full-stops.") |
| .required(true) |
| .defaultValue("false") |
| .allowableValues("true", "false") |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .build(); |
| public static final PropertyDescriptor ATTRIBUTES_AS_HEADERS_REGEX = new PropertyDescriptor.Builder() |
| .name("attributes-to-send-as-jms-headers-regex") |
| .displayName("Attributes to Send as JMS Headers (Regex)") |
| .description("Specifies the Regular Expression that determines the names of FlowFile attributes that" + |
| " should be sent as JMS Headers") |
| .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR) |
| .defaultValue(".*") |
| .required(true) |
| .build(); |
| |
| |
| static final PropertyDescriptor CF_SERVICE = new PropertyDescriptor.Builder() |
| .name("Connection Factory Service") |
| .description("The Controller Service that is used to obtain ConnectionFactory") |
| .required(true) |
| .identifiesControllerService(JMSConnectionFactoryProviderDefinition.class) |
| .build(); |
| |
| static final List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(); |
| private volatile BlockingQueue<T> workerPool; |
| private final AtomicInteger clientIdCounter = new AtomicInteger(1); |
| |
| static { |
| propertyDescriptors.add(CF_SERVICE); |
| propertyDescriptors.add(DESTINATION); |
| propertyDescriptors.add(DESTINATION_TYPE); |
| propertyDescriptors.add(USER); |
| propertyDescriptors.add(PASSWORD); |
| propertyDescriptors.add(CLIENT_ID); |
| propertyDescriptors.add(SESSION_CACHE_SIZE); |
| propertyDescriptors.add(MESSAGE_BODY); |
| propertyDescriptors.add(CHARSET); |
| propertyDescriptors.add(ALLOW_ILLEGAL_HEADER_CHARS); |
| propertyDescriptors.add(ATTRIBUTES_AS_HEADERS_REGEX); |
| } |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| return propertyDescriptors; |
| } |
| |
| |
| @Override |
| public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { |
| T worker = workerPool.poll(); |
| if (worker == null) { |
| worker = buildTargetResource(context); |
| } |
| |
| try { |
| rendezvousWithJms(context, session, worker); |
| } catch (Exception e) { |
| getLogger().error("Error while trying to process JMS message", e); |
| } finally { |
| //in case of exception during worker's connection (consumer or publisher), |
| //an appropriate service is responsible to invalidate the worker. |
| //if worker is not valid anymore, don't put it back into a pool, try to rebuild it first, or discard. |
| //this will be helpful in a situation, when JNDI has changed, or JMS server is not available |
| //and reconnection is required. |
| if (worker == null || !worker.isValid()){ |
| getLogger().debug("Worker is invalid. Will try re-create... "); |
| final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class); |
| try { |
| // Safe to cast. Method #buildTargetResource(ProcessContext context) sets only CachingConnectionFactory |
| CachingConnectionFactory currentCF = (CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory(); |
| cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory()); |
| worker = buildTargetResource(context); |
| }catch(Exception e) { |
| getLogger().error("Failed to rebuild: " + cfProvider); |
| worker = null; |
| } |
| } |
| if (worker != null) { |
| worker.jmsTemplate.setExplicitQosEnabled(false); |
| worker.jmsTemplate.setDeliveryMode(Message.DEFAULT_DELIVERY_MODE); |
| worker.jmsTemplate.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE); |
| worker.jmsTemplate.setPriority(Message.DEFAULT_PRIORITY); |
| workerPool.offer(worker); |
| } |
| } |
| } |
| |
| @OnScheduled |
| public void setupWorkerPool(final ProcessContext context) { |
| workerPool = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks()); |
| } |
| |
| |
| @OnStopped |
| public void close() { |
| T worker; |
| while ((worker = workerPool.poll()) != null) { |
| worker.shutdown(); |
| } |
| } |
| |
| /** |
| * Delegate method to supplement |
| * {@link #onTrigger(ProcessContext, ProcessSession)} operation. It is |
| * implemented by sub-classes to perform {@link Processor} specific |
| * functionality. |
| */ |
| protected abstract void rendezvousWithJms(ProcessContext context, ProcessSession session, T jmsWorker) throws ProcessException; |
| |
| /** |
| * Finishes building one of the {@link JMSWorker} subclasses T. |
| * |
| * @see JMSPublisher |
| * @see JMSConsumer |
| */ |
| protected abstract T finishBuildingJmsWorker(CachingConnectionFactory connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext); |
| |
| /** |
| * This method essentially performs initialization of this Processor by |
| * obtaining an instance of the {@link ConnectionFactory} from the |
| * {@link JMSConnectionFactoryProvider} (ControllerService) and performing a |
| * series of {@link ConnectionFactory} adaptations which eventually results |
| * in an instance of the {@link CachingConnectionFactory} used to construct |
| * {@link JmsTemplate} used by this Processor. |
| */ |
| private T buildTargetResource(ProcessContext context) { |
| final JMSConnectionFactoryProviderDefinition cfProvider = context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class); |
| final ConnectionFactory connectionFactory = cfProvider.getConnectionFactory(); |
| |
| final UserCredentialsConnectionFactoryAdapter cfCredentialsAdapter = new UserCredentialsConnectionFactoryAdapter(); |
| cfCredentialsAdapter.setTargetConnectionFactory(connectionFactory); |
| cfCredentialsAdapter.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue()); |
| cfCredentialsAdapter.setPassword(context.getProperty(PASSWORD).getValue()); |
| |
| final CachingConnectionFactory cachingFactory = new CachingConnectionFactory(cfCredentialsAdapter); |
| |
| String clientId = context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue(); |
| if (clientId != null) { |
| clientId = clientId + "-" + clientIdCounter.getAndIncrement(); |
| cachingFactory.setClientId(clientId); |
| } |
| |
| JmsTemplate jmsTemplate = new JmsTemplate(); |
| jmsTemplate.setConnectionFactory(cachingFactory); |
| jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue())); |
| |
| return finishBuildingJmsWorker(cachingFactory, jmsTemplate, context); |
| } |
| } |