| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.component.seda; |
| |
| import java.util.ArrayList; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.CopyOnWriteArraySet; |
| import java.util.concurrent.ExecutorService; |
| |
| import org.apache.camel.AsyncEndpoint; |
| import org.apache.camel.AsyncProcessor; |
| import org.apache.camel.Category; |
| import org.apache.camel.Component; |
| import org.apache.camel.Consumer; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.ExtendedCamelContext; |
| import org.apache.camel.MultipleConsumersSupport; |
| import org.apache.camel.PollingConsumer; |
| import org.apache.camel.Processor; |
| import org.apache.camel.Producer; |
| import org.apache.camel.WaitForTaskToComplete; |
| import org.apache.camel.api.management.ManagedAttribute; |
| import org.apache.camel.api.management.ManagedOperation; |
| import org.apache.camel.api.management.ManagedResource; |
| import org.apache.camel.spi.BrowsableEndpoint; |
| import org.apache.camel.spi.Metadata; |
| import org.apache.camel.spi.UriEndpoint; |
| import org.apache.camel.spi.UriParam; |
| import org.apache.camel.spi.UriPath; |
| import org.apache.camel.support.DefaultEndpoint; |
| import org.apache.camel.support.service.ServiceHelper; |
| import org.apache.camel.util.URISupport; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Asynchronously call another endpoint from any Camel Context in the same JVM. |
| */ |
| @ManagedResource(description = "Managed SedaEndpoint") |
| @UriEndpoint(firstVersion = "1.1.0", scheme = "seda", title = "SEDA", syntax = "seda:name", |
| category = { Category.CORE, Category.ENDPOINT }) |
| public class SedaEndpoint extends DefaultEndpoint implements AsyncEndpoint, BrowsableEndpoint, MultipleConsumersSupport { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(SedaEndpoint.class); |
| |
| private final Set<SedaProducer> producers = new CopyOnWriteArraySet<>(); |
| private final Set<SedaConsumer> consumers = new CopyOnWriteArraySet<>(); |
| private volatile AsyncProcessor consumerMulticastProcessor; |
| private volatile boolean multicastStarted; |
| private volatile ExecutorService multicastExecutor; |
| |
| @UriPath(description = "Name of queue") |
| @Metadata(required = true) |
| private String name; |
| @UriParam(label = "advanced", description = "Define the queue instance which will be used by the endpoint") |
| private BlockingQueue queue; |
| @UriParam(defaultValue = "" + SedaConstants.QUEUE_SIZE) |
| private int size = SedaConstants.QUEUE_SIZE; |
| |
| @UriParam(label = "consumer", defaultValue = "1") |
| private int concurrentConsumers = 1; |
| @UriParam(label = "consumer,advanced", defaultValue = "true") |
| private boolean limitConcurrentConsumers = true; |
| @UriParam(label = "consumer,advanced") |
| private boolean multipleConsumers; |
| @UriParam(label = "consumer,advanced") |
| private boolean purgeWhenStopping; |
| @UriParam(label = "consumer,advanced", defaultValue = "1000") |
| private int pollTimeout = 1000; |
| |
| @UriParam(label = "producer", defaultValue = "IfReplyExpected") |
| private WaitForTaskToComplete waitForTaskToComplete = WaitForTaskToComplete.IfReplyExpected; |
| @UriParam(label = "producer", defaultValue = "30000", javaType = "java.time.Duration") |
| private long timeout = 30000; |
| @UriParam(label = "producer", javaType = "java.time.Duration") |
| private long offerTimeout; |
| @UriParam(label = "producer") |
| private boolean blockWhenFull; |
| @UriParam(label = "producer") |
| private boolean discardWhenFull; |
| @UriParam(label = "producer") |
| private boolean failIfNoConsumers; |
| @UriParam(label = "producer") |
| private boolean discardIfNoConsumers; |
| |
| private BlockingQueueFactory<Exchange> queueFactory; |
| |
| public SedaEndpoint() { |
| queueFactory = new LinkedBlockingQueueFactory<>(); |
| } |
| |
| public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue) { |
| this(endpointUri, component, queue, 1); |
| } |
| |
| public SedaEndpoint(String endpointUri, Component component, BlockingQueue<Exchange> queue, int concurrentConsumers) { |
| this(endpointUri, component, concurrentConsumers); |
| this.queue = queue; |
| if (queue != null) { |
| this.size = queue.remainingCapacity(); |
| } |
| queueFactory = new LinkedBlockingQueueFactory<>(); |
| getComponent().registerQueue(this, queue); |
| } |
| |
| public SedaEndpoint(String endpointUri, Component component, BlockingQueueFactory<Exchange> queueFactory, |
| int concurrentConsumers) { |
| this(endpointUri, component, concurrentConsumers); |
| this.queueFactory = queueFactory; |
| } |
| |
| private SedaEndpoint(String endpointUri, Component component, int concurrentConsumers) { |
| super(endpointUri, component); |
| this.concurrentConsumers = concurrentConsumers; |
| } |
| |
| @Override |
| public SedaComponent getComponent() { |
| return (SedaComponent) super.getComponent(); |
| } |
| |
| @Override |
| public Producer createProducer() throws Exception { |
| return new SedaProducer( |
| this, getWaitForTaskToComplete(), getTimeout(), |
| isBlockWhenFull(), isDiscardWhenFull(), getOfferTimeout()); |
| } |
| |
| @Override |
| public Consumer createConsumer(Processor processor) throws Exception { |
| if (getComponent() != null) { |
| // all consumers must match having the same multipleConsumers options |
| String key = getComponent().getQueueKey(getEndpointUri()); |
| QueueReference ref = getComponent().getQueueReference(key); |
| if (ref != null && ref.getMultipleConsumers() != isMultipleConsumers()) { |
| // there is already a multiple consumers, so make sure they matches |
| throw new IllegalArgumentException( |
| "Cannot use existing queue " + key + " as the existing queue multiple consumers " |
| + ref.getMultipleConsumers() + " does not match given multiple consumers " |
| + multipleConsumers); |
| } |
| } |
| |
| Consumer answer = createNewConsumer(processor); |
| configureConsumer(answer); |
| return answer; |
| } |
| |
| protected SedaConsumer createNewConsumer(Processor processor) { |
| return new SedaConsumer(this, processor); |
| } |
| |
| @Override |
| public PollingConsumer createPollingConsumer() throws Exception { |
| SedaPollingConsumer answer = new SedaPollingConsumer(this); |
| configureConsumer(answer); |
| return answer; |
| } |
| |
| public synchronized BlockingQueue<Exchange> getQueue() { |
| if (queue == null) { |
| // prefer to lookup queue from component, so if this endpoint is re-created or re-started |
| // then the existing queue from the component can be used, so new producers and consumers |
| // can use the already existing queue referenced from the component |
| if (getComponent() != null) { |
| // use null to indicate default size (= use what the existing queue has been configured with) |
| Integer size = (getSize() == Integer.MAX_VALUE || getSize() == SedaConstants.QUEUE_SIZE) ? null : getSize(); |
| QueueReference ref = getComponent().getOrCreateQueue(this, size, isMultipleConsumers(), queueFactory); |
| queue = ref.getQueue(); |
| String key = getComponent().getQueueKey(getEndpointUri()); |
| LOG.info("Endpoint {} is using shared queue: {} with size: {}", this, key, |
| ref.getSize() != null ? ref.getSize() : Integer.MAX_VALUE); |
| // and set the size we are using |
| if (ref.getSize() != null) { |
| setSize(ref.getSize()); |
| } |
| } else { |
| // fallback and create queue (as this endpoint has no component) |
| queue = createQueue(); |
| LOG.info("Endpoint {} is using queue: {} with size: {}", this, getEndpointUri(), getSize()); |
| } |
| } |
| return queue; |
| } |
| |
| protected BlockingQueue<Exchange> createQueue() { |
| if (size > 0) { |
| return queueFactory.create(size); |
| } else { |
| return queueFactory.create(); |
| } |
| } |
| |
| /** |
| * Get's the {@link QueueReference} for the this endpoint. |
| * |
| * @return the reference, or <tt>null</tt> if no queue reference exists. |
| */ |
| public synchronized QueueReference getQueueReference() { |
| String key = getComponent().getQueueKey(getEndpointUri()); |
| QueueReference ref = getComponent().getQueueReference(key); |
| return ref; |
| } |
| |
| protected synchronized AsyncProcessor getConsumerMulticastProcessor() throws Exception { |
| if (!multicastStarted && consumerMulticastProcessor != null) { |
| // only start it on-demand to avoid starting it during stopping |
| ServiceHelper.startService(consumerMulticastProcessor); |
| multicastStarted = true; |
| } |
| return consumerMulticastProcessor; |
| } |
| |
| protected synchronized void updateMulticastProcessor() throws Exception { |
| // only needed if we support multiple consumers |
| if (!isMultipleConsumersSupported()) { |
| return; |
| } |
| |
| // stop old before we create a new |
| if (consumerMulticastProcessor != null) { |
| ServiceHelper.stopService(consumerMulticastProcessor); |
| consumerMulticastProcessor = null; |
| } |
| |
| int size = getConsumers().size(); |
| if (size >= 1) { |
| if (multicastExecutor == null) { |
| // create multicast executor as we need it when we have more than 1 processor |
| multicastExecutor = getCamelContext().getExecutorServiceManager().newDefaultThreadPool(this, |
| URISupport.sanitizeUri(getEndpointUri()) + "(multicast)"); |
| } |
| // create list of consumers to multicast to |
| List<Processor> processors = new ArrayList<>(size); |
| for (SedaConsumer consumer : getConsumers()) { |
| processors.add(consumer.getProcessor()); |
| } |
| // create multicast processor |
| multicastStarted = false; |
| |
| consumerMulticastProcessor = (AsyncProcessor) getCamelContext().adapt(ExtendedCamelContext.class) |
| .getProcessorFactory().createProcessor(getCamelContext(), "MulticastProcessor", |
| new Object[] { processors, multicastExecutor, false }); |
| } |
| } |
| |
| /** |
| * Define the queue instance which will be used by the endpoint. |
| * <p/> |
| * This option is only for rare use-cases where you want to use a custom queue instance. |
| */ |
| public void setQueue(BlockingQueue<Exchange> queue) { |
| this.queue = queue; |
| this.size = queue.remainingCapacity(); |
| } |
| |
| @ManagedAttribute(description = "Queue max capacity") |
| public int getSize() { |
| return size; |
| } |
| |
| /** |
| * The maximum capacity of the SEDA queue (i.e., the number of messages it can hold). Will by default use the |
| * defaultSize set on the SEDA component. |
| */ |
| public void setSize(int size) { |
| this.size = size; |
| } |
| |
| @ManagedAttribute(description = "Current queue size") |
| public int getCurrentQueueSize() { |
| return queue.size(); |
| } |
| |
| /** |
| * Whether a thread that sends messages to a full SEDA queue will block until the queue's capacity is no longer |
| * exhausted. By default, an exception will be thrown stating that the queue is full. By enabling this option, the |
| * calling thread will instead block and wait until the message can be accepted. |
| */ |
| public void setBlockWhenFull(boolean blockWhenFull) { |
| this.blockWhenFull = blockWhenFull; |
| } |
| |
| @ManagedAttribute(description = "Whether the caller will block sending to a full queue") |
| public boolean isBlockWhenFull() { |
| return blockWhenFull; |
| } |
| |
| /** |
| * Whether a thread that sends messages to a full SEDA queue will be discarded. By default, an exception will be |
| * thrown stating that the queue is full. By enabling this option, the calling thread will give up sending and |
| * continue, meaning that the message was not sent to the SEDA queue. |
| */ |
| public void setDiscardWhenFull(boolean discardWhenFull) { |
| this.discardWhenFull = discardWhenFull; |
| } |
| |
| @ManagedAttribute(description = "Whether the caller will discard sending to a full queue") |
| public boolean isDiscardWhenFull() { |
| return discardWhenFull; |
| } |
| |
| /** |
| * Number of concurrent threads processing exchanges. |
| */ |
| public void setConcurrentConsumers(int concurrentConsumers) { |
| this.concurrentConsumers = concurrentConsumers; |
| } |
| |
| @ManagedAttribute(description = "Number of concurrent consumers") |
| public int getConcurrentConsumers() { |
| return concurrentConsumers; |
| } |
| |
| @ManagedAttribute |
| public boolean isLimitConcurrentConsumers() { |
| return limitConcurrentConsumers; |
| } |
| |
| /** |
| * Whether to limit the number of concurrentConsumers to the maximum of 500. By default, an exception will be thrown |
| * if an endpoint is configured with a greater number. You can disable that check by turning this option off. |
| */ |
| public void setLimitConcurrentConsumers(boolean limitConcurrentConsumers) { |
| this.limitConcurrentConsumers = limitConcurrentConsumers; |
| } |
| |
| public WaitForTaskToComplete getWaitForTaskToComplete() { |
| return waitForTaskToComplete; |
| } |
| |
| /** |
| * Option to specify whether the caller should wait for the async task to complete or not before continuing. The |
| * following three options are supported: Always, Never or IfReplyExpected. The first two values are |
| * self-explanatory. The last value, IfReplyExpected, will only wait if the message is Request Reply based. The |
| * default option is IfReplyExpected. |
| */ |
| public void setWaitForTaskToComplete(WaitForTaskToComplete waitForTaskToComplete) { |
| this.waitForTaskToComplete = waitForTaskToComplete; |
| } |
| |
| @ManagedAttribute |
| public long getTimeout() { |
| return timeout; |
| } |
| |
| /** |
| * Timeout (in milliseconds) before a SEDA producer will stop waiting for an asynchronous task to complete. You can |
| * disable timeout by using 0 or a negative value. |
| */ |
| public void setTimeout(long timeout) { |
| this.timeout = timeout; |
| } |
| |
| @ManagedAttribute |
| public long getOfferTimeout() { |
| return offerTimeout; |
| } |
| |
| /** |
| * offerTimeout (in milliseconds) can be added to the block case when queue is full. You can disable timeout by |
| * using 0 or a negative value. |
| */ |
| public void setOfferTimeout(long offerTimeout) { |
| this.offerTimeout = offerTimeout; |
| } |
| |
| @ManagedAttribute |
| public boolean isFailIfNoConsumers() { |
| return failIfNoConsumers; |
| } |
| |
| /** |
| * Whether the producer should fail by throwing an exception, when sending to a queue with no active consumers. |
| * <p/> |
| * Only one of the options <tt>discardIfNoConsumers</tt> and <tt>failIfNoConsumers</tt> can be enabled at the same |
| * time. |
| */ |
| public void setFailIfNoConsumers(boolean failIfNoConsumers) { |
| this.failIfNoConsumers = failIfNoConsumers; |
| } |
| |
| @ManagedAttribute |
| public boolean isDiscardIfNoConsumers() { |
| return discardIfNoConsumers; |
| } |
| |
| /** |
| * Whether the producer should discard the message (do not add the message to the queue), when sending to a queue |
| * with no active consumers. |
| * <p/> |
| * Only one of the options <tt>discardIfNoConsumers</tt> and <tt>failIfNoConsumers</tt> can be enabled at the same |
| * time. |
| */ |
| public void setDiscardIfNoConsumers(boolean discardIfNoConsumers) { |
| this.discardIfNoConsumers = discardIfNoConsumers; |
| } |
| |
| @ManagedAttribute |
| public boolean isMultipleConsumers() { |
| return multipleConsumers; |
| } |
| |
| /** |
| * Specifies whether multiple consumers are allowed. If enabled, you can use SEDA for Publish-Subscribe messaging. |
| * That is, you can send a message to the SEDA queue and have each consumer receive a copy of the message. When |
| * enabled, this option should be specified on every consumer endpoint. |
| */ |
| public void setMultipleConsumers(boolean multipleConsumers) { |
| this.multipleConsumers = multipleConsumers; |
| } |
| |
| @ManagedAttribute |
| public int getPollTimeout() { |
| return pollTimeout; |
| } |
| |
| /** |
| * The timeout used when polling. When a timeout occurs, the consumer can check whether it is allowed to continue |
| * running. Setting a lower value allows the consumer to react more quickly upon shutdown. |
| */ |
| public void setPollTimeout(int pollTimeout) { |
| this.pollTimeout = pollTimeout; |
| } |
| |
| @ManagedAttribute |
| public boolean isPurgeWhenStopping() { |
| return purgeWhenStopping; |
| } |
| |
| /** |
| * Whether to purge the task queue when stopping the consumer/route. This allows to stop faster, as any pending |
| * messages on the queue is discarded. |
| */ |
| public void setPurgeWhenStopping(boolean purgeWhenStopping) { |
| this.purgeWhenStopping = purgeWhenStopping; |
| } |
| |
| /** |
| * Returns the current pending exchanges |
| */ |
| @Override |
| public List<Exchange> getExchanges() { |
| return new ArrayList<>(getQueue()); |
| } |
| |
| @Override |
| @ManagedAttribute |
| public boolean isMultipleConsumersSupported() { |
| return isMultipleConsumers(); |
| } |
| |
| /** |
| * Purges the queue |
| */ |
| @ManagedOperation(description = "Purges the seda queue") |
| public void purgeQueue() { |
| LOG.debug("Purging queue with {} exchanges", queue.size()); |
| queue.clear(); |
| } |
| |
| /** |
| * Returns the current active consumers on this endpoint |
| */ |
| public Set<SedaConsumer> getConsumers() { |
| return consumers; |
| } |
| |
| /** |
| * Returns the current active producers on this endpoint |
| */ |
| public Set<SedaProducer> getProducers() { |
| return new HashSet<>(producers); |
| } |
| |
| void onStarted(SedaProducer producer) { |
| producers.add(producer); |
| } |
| |
| void onStopped(SedaProducer producer) { |
| producers.remove(producer); |
| } |
| |
| void onStarted(SedaConsumer consumer) throws Exception { |
| consumers.add(consumer); |
| if (isMultipleConsumers()) { |
| updateMulticastProcessor(); |
| } |
| } |
| |
| void onStopped(SedaConsumer consumer) throws Exception { |
| consumers.remove(consumer); |
| if (isMultipleConsumers()) { |
| updateMulticastProcessor(); |
| } |
| } |
| |
| public boolean hasConsumers() { |
| return !this.consumers.isEmpty(); |
| } |
| |
| @Override |
| protected void doInit() throws Exception { |
| super.doInit(); |
| |
| if (discardWhenFull && blockWhenFull) { |
| throw new IllegalArgumentException( |
| "Cannot enable both discardWhenFull=true and blockWhenFull=true." |
| + " You can only either discard or block when full."); |
| } |
| |
| // force creating queue when starting |
| if (queue == null) { |
| queue = getQueue(); |
| } |
| |
| // special for unit testing where we can set a system property to make seda poll faster |
| // and therefore also react faster upon shutdown, which makes overall testing faster of the Camel project |
| String override = System.getProperty("CamelSedaPollTimeout", "" + getPollTimeout()); |
| setPollTimeout(Integer.valueOf(override)); |
| } |
| |
| @Override |
| public void stop() { |
| if (getConsumers().isEmpty()) { |
| super.stop(); |
| } else { |
| LOG.debug("There is still active consumers."); |
| } |
| } |
| |
| @Override |
| public void shutdown() { |
| if (isShutdown()) { |
| LOG.trace("Service already shut down"); |
| return; |
| } |
| |
| // notify component we are shutting down this endpoint |
| if (getComponent() != null) { |
| getComponent().onShutdownEndpoint(this); |
| } |
| |
| if (getConsumers().isEmpty()) { |
| super.shutdown(); |
| } else { |
| LOG.debug("There is still active consumers."); |
| } |
| } |
| |
| @Override |
| protected void doShutdown() throws Exception { |
| // shutdown thread pool if it was in use |
| if (multicastExecutor != null) { |
| getCamelContext().getExecutorServiceManager().shutdownNow(multicastExecutor); |
| multicastExecutor = null; |
| } |
| |
| // clear queue, as we are shutdown, so if re-created then the queue must be updated |
| queue = null; |
| } |
| |
| } |