| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.processor; |
| |
| import java.io.Closeable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CompletionService; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorCompletionService; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.camel.AsyncCallback; |
| import org.apache.camel.AsyncProcessor; |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.CamelContextAware; |
| import org.apache.camel.CamelExchangeException; |
| import org.apache.camel.Endpoint; |
| import org.apache.camel.ErrorHandlerFactory; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.Navigate; |
| import org.apache.camel.Processor; |
| import org.apache.camel.Producer; |
| import org.apache.camel.StreamCache; |
| import org.apache.camel.Traceable; |
| import org.apache.camel.processor.aggregate.AggregationStrategy; |
| import org.apache.camel.processor.aggregate.CompletionAwareAggregationStrategy; |
| import org.apache.camel.processor.aggregate.DelegateAggregationStrategy; |
| import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy; |
| import org.apache.camel.spi.IdAware; |
| import org.apache.camel.spi.RouteContext; |
| import org.apache.camel.spi.TracedRouteNodes; |
| import org.apache.camel.spi.UnitOfWork; |
| import org.apache.camel.support.ServiceSupport; |
| import org.apache.camel.util.AsyncProcessorConverterHelper; |
| import org.apache.camel.util.AsyncProcessorHelper; |
| import org.apache.camel.util.CastUtils; |
| import org.apache.camel.util.EventHelper; |
| import org.apache.camel.util.ExchangeHelper; |
| import org.apache.camel.util.IOHelper; |
| import org.apache.camel.util.KeyValueHolder; |
| import org.apache.camel.util.ObjectHelper; |
| import org.apache.camel.util.ServiceHelper; |
| import org.apache.camel.util.StopWatch; |
| import org.apache.camel.util.concurrent.AtomicException; |
| import org.apache.camel.util.concurrent.AtomicExchange; |
| import org.apache.camel.util.concurrent.SubmitOrderedCompletionService; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.camel.util.ObjectHelper.notNull; |
| |
| |
| /** |
| * Implements the Multicast pattern to send a message exchange to a number of |
| * endpoints, each endpoint receiving a copy of the message exchange. |
| * |
| * @version |
| * @see Pipeline |
| */ |
| public class MulticastProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable, IdAware { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(MulticastProcessor.class); |
| |
| /** |
| * Class that represent each step in the multicast route to do |
| */ |
| static final class DefaultProcessorExchangePair implements ProcessorExchangePair { |
| private final int index; |
| private final Processor processor; |
| private final Processor prepared; |
| private final Exchange exchange; |
| |
| private DefaultProcessorExchangePair(int index, Processor processor, Processor prepared, Exchange exchange) { |
| this.index = index; |
| this.processor = processor; |
| this.prepared = prepared; |
| this.exchange = exchange; |
| } |
| |
| public int getIndex() { |
| return index; |
| } |
| |
| public Exchange getExchange() { |
| return exchange; |
| } |
| |
| public Producer getProducer() { |
| if (processor instanceof Producer) { |
| return (Producer) processor; |
| } |
| return null; |
| } |
| |
| public Processor getProcessor() { |
| return prepared; |
| } |
| |
| public void begin() { |
| // noop |
| } |
| |
| public void done() { |
| // noop |
| } |
| |
| } |
| |
| /** |
| * Class that represents prepared fine grained error handlers when processing multicasted/splitted exchanges |
| * <p/> |
| * See the <tt>createProcessorExchangePair</tt> and <tt>createErrorHandler</tt> methods. |
| */ |
| static final class PreparedErrorHandler extends KeyValueHolder<RouteContext, Processor> { |
| |
| PreparedErrorHandler(RouteContext key, Processor value) { |
| super(key, value); |
| } |
| |
| } |
| |
| protected final Processor onPrepare; |
| private final CamelContext camelContext; |
| private String id; |
| private Collection<Processor> processors; |
| private final AggregationStrategy aggregationStrategy; |
| private final boolean parallelProcessing; |
| private final boolean streaming; |
| private final boolean parallelAggregate; |
| private final boolean stopOnAggregateException; |
| private final boolean stopOnException; |
| private final ExecutorService executorService; |
| private final boolean shutdownExecutorService; |
| private ExecutorService aggregateExecutorService; |
| private final long timeout; |
| private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>(); |
| private final boolean shareUnitOfWork; |
| |
| public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors) { |
| this(camelContext, processors, null); |
| } |
| |
| public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy) { |
| this(camelContext, processors, aggregationStrategy, false, null, false, false, false, 0, null, false, false); |
| } |
| |
| @Deprecated |
| public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, |
| boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, |
| boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork) { |
| this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, |
| streaming, stopOnException, timeout, onPrepare, shareUnitOfWork, false); |
| } |
| |
| public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, boolean parallelProcessing, |
| ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, boolean stopOnException, long timeout, Processor onPrepare, |
| boolean shareUnitOfWork, boolean parallelAggregate) { |
| this(camelContext, processors, aggregationStrategy, parallelProcessing, executorService, shutdownExecutorService, streaming, stopOnException, timeout, onPrepare, |
| shareUnitOfWork, false, false); |
| } |
| |
| public MulticastProcessor(CamelContext camelContext, Collection<Processor> processors, AggregationStrategy aggregationStrategy, |
| boolean parallelProcessing, ExecutorService executorService, boolean shutdownExecutorService, boolean streaming, |
| boolean stopOnException, long timeout, Processor onPrepare, boolean shareUnitOfWork, |
| boolean parallelAggregate, boolean stopOnAggregateException) { |
| notNull(camelContext, "camelContext"); |
| this.camelContext = camelContext; |
| this.processors = processors; |
| this.aggregationStrategy = aggregationStrategy; |
| this.executorService = executorService; |
| this.shutdownExecutorService = shutdownExecutorService; |
| this.streaming = streaming; |
| this.stopOnException = stopOnException; |
| // must enable parallel if executor service is provided |
| this.parallelProcessing = parallelProcessing || executorService != null; |
| this.timeout = timeout; |
| this.onPrepare = onPrepare; |
| this.shareUnitOfWork = shareUnitOfWork; |
| this.parallelAggregate = parallelAggregate; |
| this.stopOnAggregateException = stopOnAggregateException; |
| } |
| |
| @Override |
| public String toString() { |
| return "Multicast[" + getProcessors() + "]"; |
| } |
| |
| public String getId() { |
| return id; |
| } |
| |
| public void setId(String id) { |
| this.id = id; |
| } |
| |
| public String getTraceLabel() { |
| return "multicast"; |
| } |
| |
| public CamelContext getCamelContext() { |
| return camelContext; |
| } |
| |
| public void process(Exchange exchange) throws Exception { |
| AsyncProcessorHelper.process(this, exchange); |
| } |
| |
| public boolean process(Exchange exchange, AsyncCallback callback) { |
| final AtomicExchange result = new AtomicExchange(); |
| Iterable<ProcessorExchangePair> pairs = null; |
| |
| try { |
| boolean sync = true; |
| |
| pairs = createProcessorExchangePairs(exchange); |
| |
| if (isParallelProcessing()) { |
| // ensure an executor is set when running in parallel |
| ObjectHelper.notNull(executorService, "executorService", this); |
| doProcessParallel(exchange, result, pairs, isStreaming(), callback); |
| } else { |
| sync = doProcessSequential(exchange, result, pairs, callback); |
| } |
| |
| if (!sync) { |
| // the remainder of the multicast will be completed async |
| // so we break out now, then the callback will be invoked which then continue routing from where we left here |
| return false; |
| } |
| } catch (Throwable e) { |
| exchange.setException(e); |
| // unexpected exception was thrown, maybe from iterator etc. so do not regard as exhausted |
| // and do the done work |
| doDone(exchange, null, pairs, callback, true, false); |
| return true; |
| } |
| |
| // multicasting was processed successfully |
| // and do the done work |
| Exchange subExchange = result.get() != null ? result.get() : null; |
| doDone(exchange, subExchange, pairs, callback, true, true); |
| return true; |
| } |
| |
| protected void doProcessParallel(final Exchange original, final AtomicExchange result, final Iterable<ProcessorExchangePair> pairs, |
| final boolean streaming, final AsyncCallback callback) throws Exception { |
| |
| ObjectHelper.notNull(executorService, "ExecutorService", this); |
| ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this); |
| |
| final CompletionService<Exchange> completion; |
| if (streaming) { |
| // execute tasks in parallel+streaming and aggregate in the order they are finished (out of order sequence) |
| completion = new ExecutorCompletionService<Exchange>(executorService); |
| } else { |
| // execute tasks in parallel and aggregate in the order the tasks are submitted (in order sequence) |
| completion = new SubmitOrderedCompletionService<Exchange>(executorService); |
| } |
| |
| final AtomicInteger total = new AtomicInteger(0); |
| final Iterator<ProcessorExchangePair> it = pairs.iterator(); |
| |
| if (it.hasNext()) { |
| // when parallel then aggregate on the fly |
| final AtomicBoolean running = new AtomicBoolean(true); |
| final AtomicBoolean allTasksSubmitted = new AtomicBoolean(); |
| final CountDownLatch aggregationOnTheFlyDone = new CountDownLatch(1); |
| final AtomicException executionException = new AtomicException(); |
| |
| // issue task to execute in separate thread so it can aggregate on-the-fly |
| // while we submit new tasks, and those tasks complete concurrently |
| // this allows us to optimize work and reduce memory consumption |
| final AggregateOnTheFlyTask aggregateOnTheFlyTask = new AggregateOnTheFlyTask(result, original, total, completion, running, |
| aggregationOnTheFlyDone, allTasksSubmitted, executionException); |
| final AtomicBoolean aggregationTaskSubmitted = new AtomicBoolean(); |
| |
| LOG.trace("Starting to submit parallel tasks"); |
| |
| try { |
| while (it.hasNext()) { |
| final ProcessorExchangePair pair = it.next(); |
| // in case the iterator returns null then continue to next |
| if (pair == null) { |
| continue; |
| } |
| |
| final Exchange subExchange = pair.getExchange(); |
| updateNewExchange(subExchange, total.intValue(), pairs, it); |
| |
| completion.submit(new Callable<Exchange>() { |
| public Exchange call() throws Exception { |
| // start the aggregation task at this stage only in order not to pile up too many threads |
| if (aggregationTaskSubmitted.compareAndSet(false, true)) { |
| // but only submit the aggregation task once |
| aggregateExecutorService.submit(aggregateOnTheFlyTask); |
| } |
| |
| if (!running.get()) { |
| // do not start processing the task if we are not running |
| return subExchange; |
| } |
| |
| try { |
| doProcessParallel(pair); |
| } catch (Throwable e) { |
| subExchange.setException(e); |
| } |
| |
| // Decide whether to continue with the multicast or not; similar logic to the Pipeline |
| Integer number = getExchangeIndex(subExchange); |
| boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); |
| if (stopOnException && !continueProcessing) { |
| // signal to stop running |
| running.set(false); |
| // throw caused exception |
| if (subExchange.getException() != null) { |
| // wrap in exception to explain where it failed |
| CamelExchangeException cause = new CamelExchangeException("Parallel processing failed for number " + number, subExchange, subExchange.getException()); |
| subExchange.setException(cause); |
| } |
| } |
| |
| LOG.trace("Parallel processing complete for exchange: {}", subExchange); |
| return subExchange; |
| } |
| }); |
| |
| total.incrementAndGet(); |
| } |
| } catch (Throwable e) { |
| // The methods it.hasNext and it.next can throw RuntimeExceptions when custom iterators are implemented. |
| // We have to catch the exception here otherwise the aggregator threads would pile up. |
| if (e instanceof Exception) { |
| executionException.set((Exception) e); |
| } else { |
| executionException.set(ObjectHelper.wrapRuntimeCamelException(e)); |
| } |
| // and because of the exception we must signal we are done so the latch can open and let the other thread continue processing |
| LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId()); |
| LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId()); |
| aggregationOnTheFlyDone.countDown(); |
| } |
| |
| // signal all tasks has been submitted |
| LOG.trace("Signaling that all {} tasks has been submitted.", total.get()); |
| allTasksSubmitted.set(true); |
| |
| // its to hard to do parallel async routing so we let the caller thread be synchronously |
| // and have it pickup the replies and do the aggregation (eg we use a latch to wait) |
| // wait for aggregation to be done |
| LOG.debug("Waiting for on-the-fly aggregation to complete aggregating {} responses for exchangeId: {}", total.get(), original.getExchangeId()); |
| aggregationOnTheFlyDone.await(); |
| |
| // did we fail for whatever reason, if so throw that caused exception |
| if (executionException.get() != null) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Parallel processing failed due {}", executionException.get().getMessage()); |
| } |
| throw executionException.get(); |
| } |
| } |
| |
| // no everything is okay so we are done |
| LOG.debug("Done parallel processing {} exchanges", total); |
| } |
| |
| /** |
| * Boss worker to control aggregate on-the-fly for completed tasks when using parallel processing. |
| * <p/> |
| * This ensures lower memory consumption as we do not need to keep all completed tasks in memory |
| * before we perform aggregation. Instead this separate thread will run and aggregate when new |
| * completed tasks is done. |
| * <p/> |
| * The logic is fairly complex as this implementation has to keep track how far it got, and also |
| * signal back to the <i>main</t> thread when its done, so the <i>main</t> thread can continue |
| * processing when the entire splitting is done. |
| */ |
| private final class AggregateOnTheFlyTask implements Runnable { |
| |
| private final AtomicExchange result; |
| private final Exchange original; |
| private final AtomicInteger total; |
| private final CompletionService<Exchange> completion; |
| private final AtomicBoolean running; |
| private final CountDownLatch aggregationOnTheFlyDone; |
| private final AtomicBoolean allTasksSubmitted; |
| private final AtomicException executionException; |
| |
| private AggregateOnTheFlyTask(AtomicExchange result, Exchange original, AtomicInteger total, |
| CompletionService<Exchange> completion, AtomicBoolean running, |
| CountDownLatch aggregationOnTheFlyDone, AtomicBoolean allTasksSubmitted, |
| AtomicException executionException) { |
| this.result = result; |
| this.original = original; |
| this.total = total; |
| this.completion = completion; |
| this.running = running; |
| this.aggregationOnTheFlyDone = aggregationOnTheFlyDone; |
| this.allTasksSubmitted = allTasksSubmitted; |
| this.executionException = executionException; |
| } |
| |
| public void run() { |
| LOG.trace("Aggregate on the fly task started for exchangeId: {}", original.getExchangeId()); |
| |
| try { |
| aggregateOnTheFly(); |
| } catch (Throwable e) { |
| if (e instanceof Exception) { |
| executionException.set((Exception) e); |
| } else { |
| executionException.set(ObjectHelper.wrapRuntimeCamelException(e)); |
| } |
| } finally { |
| // must signal we are done so the latch can open and let the other thread continue processing |
| LOG.debug("Signaling we are done aggregating on the fly for exchangeId: {}", original.getExchangeId()); |
| LOG.trace("Aggregate on the fly task done for exchangeId: {}", original.getExchangeId()); |
| aggregationOnTheFlyDone.countDown(); |
| } |
| } |
| |
| private void aggregateOnTheFly() throws InterruptedException, ExecutionException { |
| final AtomicBoolean timedOut = new AtomicBoolean(); |
| boolean stoppedOnException = false; |
| final StopWatch watch = new StopWatch(); |
| final AtomicInteger aggregated = new AtomicInteger(); |
| boolean done = false; |
| // not a for loop as on the fly may still run |
| while (!done) { |
| // check if we have already aggregate everything |
| if (allTasksSubmitted.get() && aggregated.intValue() >= total.get()) { |
| LOG.debug("Done aggregating {} exchanges on the fly.", aggregated); |
| break; |
| } |
| |
| Future<Exchange> future; |
| if (timedOut.get()) { |
| // we are timed out but try to grab if some tasks has been completed |
| // poll will return null if no tasks is present |
| future = completion.poll(); |
| LOG.trace("Polled completion task #{} after timeout to grab already completed tasks: {}", aggregated, future); |
| } else if (timeout > 0) { |
| long left = timeout - watch.taken(); |
| if (left < 0) { |
| left = 0; |
| } |
| LOG.trace("Polling completion task #{} using timeout {} millis.", aggregated, left); |
| future = completion.poll(left, TimeUnit.MILLISECONDS); |
| } else { |
| LOG.trace("Polling completion task #{}", aggregated); |
| // we must not block so poll every second |
| future = completion.poll(1, TimeUnit.SECONDS); |
| if (future == null) { |
| // and continue loop which will recheck if we are done |
| continue; |
| } |
| } |
| |
| if (future == null) { |
| ParallelAggregateTimeoutTask task = new ParallelAggregateTimeoutTask(original, result, completion, aggregated, total, timedOut); |
| if (parallelAggregate) { |
| aggregateExecutorService.submit(task); |
| } else { |
| // in non parallel mode then just run the task |
| task.run(); |
| } |
| } else { |
| // there is a result to aggregate |
| Exchange subExchange = future.get(); |
| |
| // Decide whether to continue with the multicast or not; similar logic to the Pipeline |
| Integer number = getExchangeIndex(subExchange); |
| boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Parallel processing failed for number " + number, LOG); |
| if (stopOnException && !continueProcessing) { |
| // we want to stop on exception and an exception or failure occurred |
| // this is similar to what the pipeline does, so we should do the same to not surprise end users |
| // so we should set the failed exchange as the result and break out |
| result.set(subExchange); |
| stoppedOnException = true; |
| break; |
| } |
| |
| // we got a result so aggregate it |
| ParallelAggregateTask task = new ParallelAggregateTask(result, subExchange, aggregated); |
| if (parallelAggregate) { |
| aggregateExecutorService.submit(task); |
| } else { |
| // in non parallel mode then just run the task |
| task.run(); |
| } |
| } |
| } |
| |
| if (timedOut.get() || stoppedOnException) { |
| if (timedOut.get()) { |
| LOG.debug("Cancelling tasks due timeout after {} millis.", timeout); |
| } |
| if (stoppedOnException) { |
| LOG.debug("Cancelling tasks due stopOnException."); |
| } |
| // cancel tasks as we timed out (its safe to cancel done tasks) |
| running.set(false); |
| } |
| } |
| } |
| |
| /** |
| * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing. |
| */ |
| private final class ParallelAggregateTask implements Runnable { |
| |
| private final AtomicExchange result; |
| private final Exchange subExchange; |
| private final AtomicInteger aggregated; |
| |
| private ParallelAggregateTask(AtomicExchange result, Exchange subExchange, AtomicInteger aggregated) { |
| this.result = result; |
| this.subExchange = subExchange; |
| this.aggregated = aggregated; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| if (parallelAggregate) { |
| doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); |
| } else { |
| doAggregate(getAggregationStrategy(subExchange), result, subExchange); |
| } |
| } catch (Throwable e) { |
| if (isStopOnAggregateException()) { |
| throw e; |
| } else { |
| // wrap in exception to explain where it failed |
| CamelExchangeException cex = new CamelExchangeException("Parallel processing failed for number " + aggregated.get(), subExchange, e); |
| subExchange.setException(cex); |
| LOG.debug(cex.getMessage(), cex); |
| } |
| } finally { |
| aggregated.incrementAndGet(); |
| } |
| } |
| } |
| |
| /** |
| * Worker task to aggregate the old and new exchange on-the-fly for completed tasks when using parallel processing. |
| */ |
| private final class ParallelAggregateTimeoutTask implements Runnable { |
| |
| private final Exchange original; |
| private final AtomicExchange result; |
| private final CompletionService<Exchange> completion; |
| private final AtomicInteger aggregated; |
| private final AtomicInteger total; |
| private final AtomicBoolean timedOut; |
| |
| private ParallelAggregateTimeoutTask(Exchange original, AtomicExchange result, CompletionService<Exchange> completion, |
| AtomicInteger aggregated, AtomicInteger total, AtomicBoolean timedOut) { |
| this.original = original; |
| this.result = result; |
| this.completion = completion; |
| this.aggregated = aggregated; |
| this.total = total; |
| this.timedOut = timedOut; |
| } |
| |
| @Override |
| public void run() { |
| AggregationStrategy strategy = getAggregationStrategy(null); |
| if (strategy instanceof DelegateAggregationStrategy) { |
| strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); |
| } |
| if (strategy instanceof TimeoutAwareAggregationStrategy) { |
| // notify the strategy we timed out |
| Exchange oldExchange = result.get(); |
| if (oldExchange == null) { |
| // if they all timed out the result may not have been set yet, so use the original exchange |
| oldExchange = original; |
| } |
| ((TimeoutAwareAggregationStrategy) strategy).timeout(oldExchange, aggregated.intValue(), total.intValue(), timeout); |
| } else { |
| // log a WARN we timed out since it will not be aggregated and the Exchange will be lost |
| LOG.warn("Parallel processing timed out after {} millis for number {}. This task will be cancelled and will not be aggregated.", timeout, aggregated.intValue()); |
| } |
| LOG.debug("Timeout occurred after {} millis for number {} task.", timeout, aggregated.intValue()); |
| timedOut.set(true); |
| |
| // mark that index as timed out, which allows us to try to retrieve |
| // any already completed tasks in the next loop |
| if (completion instanceof SubmitOrderedCompletionService) { |
| ((SubmitOrderedCompletionService<?>) completion).timeoutTask(); |
| } |
| |
| // we timed out so increment the counter |
| aggregated.incrementAndGet(); |
| } |
| } |
| |
| protected boolean doProcessSequential(Exchange original, AtomicExchange result, Iterable<ProcessorExchangePair> pairs, AsyncCallback callback) throws Exception { |
| AtomicInteger total = new AtomicInteger(); |
| Iterator<ProcessorExchangePair> it = pairs.iterator(); |
| |
| while (it.hasNext()) { |
| ProcessorExchangePair pair = it.next(); |
| // in case the iterator returns null then continue to next |
| if (pair == null) { |
| continue; |
| } |
| Exchange subExchange = pair.getExchange(); |
| updateNewExchange(subExchange, total.get(), pairs, it); |
| |
| boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); |
| if (!sync) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", pair.getExchange().getExchangeId()); |
| } |
| // the remainder of the multicast will be completed async |
| // so we break out now, then the callback will be invoked which then continue routing from where we left here |
| return false; |
| } |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Processing exchangeId: {} is continued being processed synchronously", pair.getExchange().getExchangeId()); |
| } |
| |
| // Decide whether to continue with the multicast or not; similar logic to the Pipeline |
| // remember to test for stop on exception and aggregate before copying back results |
| boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); |
| if (stopOnException && !continueProcessing) { |
| if (subExchange.getException() != null) { |
| // wrap in exception to explain where it failed |
| CamelExchangeException cause = new CamelExchangeException("Sequential processing failed for number " + total.get(), subExchange, subExchange.getException()); |
| subExchange.setException(cause); |
| } |
| // we want to stop on exception, and the exception was handled by the error handler |
| // this is similar to what the pipeline does, so we should do the same to not surprise end users |
| // so we should set the failed exchange as the result and be done |
| result.set(subExchange); |
| return true; |
| } |
| |
| LOG.trace("Sequential processing complete for number {} exchange: {}", total, subExchange); |
| |
| if (parallelAggregate) { |
| doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); |
| } else { |
| doAggregate(getAggregationStrategy(subExchange), result, subExchange); |
| } |
| |
| total.incrementAndGet(); |
| } |
| |
| LOG.debug("Done sequential processing {} exchanges", total); |
| |
| return true; |
| } |
| |
| private boolean doProcessSequential(final Exchange original, final AtomicExchange result, |
| final Iterable<ProcessorExchangePair> pairs, final Iterator<ProcessorExchangePair> it, |
| final ProcessorExchangePair pair, final AsyncCallback callback, final AtomicInteger total) { |
| boolean sync = true; |
| |
| final Exchange exchange = pair.getExchange(); |
| Processor processor = pair.getProcessor(); |
| final Producer producer = pair.getProducer(); |
| |
| TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; |
| |
| try { |
| // prepare tracing starting from a new block |
| if (traced != null) { |
| traced.pushBlock(); |
| } |
| |
| StopWatch sw = null; |
| if (producer != null) { |
| boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); |
| if (sending) { |
| sw = new StopWatch(); |
| } |
| } |
| |
| // compute time taken if sending to another endpoint |
| final StopWatch watch = sw; |
| |
| // let the prepared process it, remember to begin the exchange pair |
| AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); |
| pair.begin(); |
| sync = async.process(exchange, new AsyncCallback() { |
| public void done(boolean doneSync) { |
| // we are done with the exchange pair |
| pair.done(); |
| |
| // okay we are done, so notify the exchange was sent |
| if (producer != null && watch != null) { |
| long timeTaken = watch.taken(); |
| Endpoint endpoint = producer.getEndpoint(); |
| // emit event that the exchange was sent to the endpoint |
| EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); |
| } |
| |
| // we only have to handle async completion of the routing slip |
| if (doneSync) { |
| return; |
| } |
| |
| // continue processing the multicast asynchronously |
| Exchange subExchange = exchange; |
| |
| // Decide whether to continue with the multicast or not; similar logic to the Pipeline |
| // remember to test for stop on exception and aggregate before copying back results |
| boolean continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); |
| if (stopOnException && !continueProcessing) { |
| if (subExchange.getException() != null) { |
| // wrap in exception to explain where it failed |
| subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); |
| } else { |
| // we want to stop on exception, and the exception was handled by the error handler |
| // this is similar to what the pipeline does, so we should do the same to not surprise end users |
| // so we should set the failed exchange as the result and be done |
| result.set(subExchange); |
| } |
| // and do the done work |
| doDone(original, subExchange, pairs, callback, false, true); |
| return; |
| } |
| |
| try { |
| if (parallelAggregate) { |
| doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); |
| } else { |
| doAggregate(getAggregationStrategy(subExchange), result, subExchange); |
| } |
| } catch (Throwable e) { |
| // wrap in exception to explain where it failed |
| subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); |
| // and do the done work |
| doDone(original, subExchange, pairs, callback, false, true); |
| return; |
| } |
| |
| total.incrementAndGet(); |
| |
| // maybe there are more processors to multicast |
| while (it.hasNext()) { |
| |
| // prepare and run the next |
| ProcessorExchangePair pair = it.next(); |
| subExchange = pair.getExchange(); |
| updateNewExchange(subExchange, total.get(), pairs, it); |
| boolean sync = doProcessSequential(original, result, pairs, it, pair, callback, total); |
| |
| if (!sync) { |
| LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", original.getExchangeId()); |
| return; |
| } |
| |
| // Decide whether to continue with the multicast or not; similar logic to the Pipeline |
| // remember to test for stop on exception and aggregate before copying back results |
| continueProcessing = PipelineHelper.continueProcessing(subExchange, "Sequential processing failed for number " + total.get(), LOG); |
| if (stopOnException && !continueProcessing) { |
| if (subExchange.getException() != null) { |
| // wrap in exception to explain where it failed |
| subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, subExchange.getException())); |
| } else { |
| // we want to stop on exception, and the exception was handled by the error handler |
| // this is similar to what the pipeline does, so we should do the same to not surprise end users |
| // so we should set the failed exchange as the result and be done |
| result.set(subExchange); |
| } |
| // and do the done work |
| doDone(original, subExchange, pairs, callback, false, true); |
| return; |
| } |
| |
| // must catch any exceptions from aggregation |
| try { |
| if (parallelAggregate) { |
| doAggregateInternal(getAggregationStrategy(subExchange), result, subExchange); |
| } else { |
| doAggregate(getAggregationStrategy(subExchange), result, subExchange); |
| } |
| } catch (Throwable e) { |
| // wrap in exception to explain where it failed |
| subExchange.setException(new CamelExchangeException("Sequential processing failed for number " + total, subExchange, e)); |
| // and do the done work |
| doDone(original, subExchange, pairs, callback, false, true); |
| return; |
| } |
| |
| total.incrementAndGet(); |
| } |
| |
| // do the done work |
| subExchange = result.get() != null ? result.get() : null; |
| doDone(original, subExchange, pairs, callback, false, true); |
| } |
| }); |
| } finally { |
| // pop the block so by next round we have the same staring point and thus the tracing looks accurate |
| if (traced != null) { |
| traced.popBlock(); |
| } |
| } |
| |
| return sync; |
| } |
| |
| private void doProcessParallel(final ProcessorExchangePair pair) throws Exception { |
| final Exchange exchange = pair.getExchange(); |
| Processor processor = pair.getProcessor(); |
| Producer producer = pair.getProducer(); |
| |
| TracedRouteNodes traced = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getTracedRouteNodes() : null; |
| |
| // compute time taken if sending to another endpoint |
| StopWatch watch = null; |
| try { |
| // prepare tracing starting from a new block |
| if (traced != null) { |
| traced.pushBlock(); |
| } |
| |
| if (producer != null) { |
| boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), exchange, producer.getEndpoint()); |
| if (sending) { |
| watch = new StopWatch(); |
| } |
| } |
| // let the prepared process it, remember to begin the exchange pair |
| AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); |
| pair.begin(); |
| // we invoke it synchronously as parallel async routing is too hard |
| AsyncProcessorHelper.process(async, exchange); |
| } finally { |
| pair.done(); |
| // pop the block so by next round we have the same staring point and thus the tracing looks accurate |
| if (traced != null) { |
| traced.popBlock(); |
| } |
| if (producer != null && watch != null) { |
| Endpoint endpoint = producer.getEndpoint(); |
| long timeTaken = watch.taken(); |
| // emit event that the exchange was sent to the endpoint |
| // this is okay to do here in the finally block, as the processing is not using the async routing engine |
| //( we invoke it synchronously as parallel async routing is too hard) |
| EventHelper.notifyExchangeSent(exchange.getContext(), exchange, endpoint, timeTaken); |
| } |
| } |
| } |
| |
| /** |
| * Common work which must be done when we are done multicasting. |
| * <p/> |
| * This logic applies for both running synchronous and asynchronous as there are multiple exist points |
| * when using the asynchronous routing engine. And therefore we want the logic in one method instead |
| * of being scattered. |
| * |
| * @param original the original exchange |
| * @param subExchange the current sub exchange, can be <tt>null</tt> for the synchronous part |
| * @param pairs the pairs with the exchanges to process |
| * @param callback the callback |
| * @param doneSync the <tt>doneSync</tt> parameter to call on callback |
| * @param forceExhaust whether or not error handling is exhausted |
| */ |
| protected void doDone(Exchange original, Exchange subExchange, final Iterable<ProcessorExchangePair> pairs, |
| AsyncCallback callback, boolean doneSync, boolean forceExhaust) { |
| |
| // we are done so close the pairs iterator |
| if (pairs instanceof Closeable) { |
| IOHelper.close((Closeable) pairs, "pairs", LOG); |
| } |
| |
| AggregationStrategy strategy = getAggregationStrategy(subExchange); |
| if (strategy instanceof DelegateAggregationStrategy) { |
| strategy = ((DelegateAggregationStrategy) strategy).getDelegate(); |
| } |
| // invoke the on completion callback |
| if (strategy instanceof CompletionAwareAggregationStrategy) { |
| ((CompletionAwareAggregationStrategy) strategy).onCompletion(subExchange); |
| } |
| |
| // cleanup any per exchange aggregation strategy |
| removeAggregationStrategyFromExchange(original); |
| |
| // we need to know if there was an exception, and if the stopOnException option was enabled |
| // also we would need to know if any error handler has attempted redelivery and exhausted |
| boolean stoppedOnException = false; |
| boolean exception = false; |
| boolean exhaust = forceExhaust || subExchange != null && (subExchange.getException() != null || ExchangeHelper.isRedeliveryExhausted(subExchange)); |
| if (original.getException() != null || subExchange != null && subExchange.getException() != null) { |
| // there was an exception and we stopped |
| stoppedOnException = isStopOnException(); |
| exception = true; |
| } |
| |
| // must copy results at this point |
| if (subExchange != null) { |
| if (stoppedOnException) { |
| // if we stopped due an exception then only propagate the exception |
| original.setException(subExchange.getException()); |
| } else { |
| // copy the current result to original so it will contain this result of this eip |
| ExchangeHelper.copyResults(original, subExchange); |
| } |
| } |
| |
| // .. and then if there was an exception we need to configure the redelivery exhaust |
| // for example the noErrorHandler will not cause redelivery exhaust so if this error |
| // handled has been in use, then the exhaust would be false (if not forced) |
| if (exception) { |
| // multicast uses error handling on its output processors and they have tried to redeliver |
| // so we shall signal back to the other error handlers that we are exhausted and they should not |
| // also try to redeliver as we will then do that twice |
| original.setProperty(Exchange.REDELIVERY_EXHAUSTED, exhaust); |
| } |
| |
| callback.done(doneSync); |
| } |
| |
| /** |
| * Aggregate the {@link Exchange} with the current result. |
| * This method is synchronized and is called directly when parallelAggregate is disabled (by default). |
| * |
| * @param strategy the aggregation strategy to use |
| * @param result the current result |
| * @param exchange the exchange to be added to the result |
| * @see #doAggregateInternal(org.apache.camel.processor.aggregate.AggregationStrategy, org.apache.camel.util.concurrent.AtomicExchange, org.apache.camel.Exchange) |
| */ |
| protected synchronized void doAggregate(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { |
| doAggregateInternal(strategy, result, exchange); |
| } |
| |
| /** |
| * Aggregate the {@link Exchange} with the current result. |
| * This method is unsynchronized and is called directly when parallelAggregate is enabled. |
| * In all other cases, this method is called from the doAggregate which is a synchronized method |
| * |
| * @param strategy the aggregation strategy to use |
| * @param result the current result |
| * @param exchange the exchange to be added to the result |
| * @see #doAggregate(org.apache.camel.processor.aggregate.AggregationStrategy, org.apache.camel.util.concurrent.AtomicExchange, org.apache.camel.Exchange) |
| */ |
| protected void doAggregateInternal(AggregationStrategy strategy, AtomicExchange result, Exchange exchange) { |
| if (strategy != null) { |
| // prepare the exchanges for aggregation |
| Exchange oldExchange = result.get(); |
| ExchangeHelper.prepareAggregation(oldExchange, exchange); |
| result.set(strategy.aggregate(oldExchange, exchange)); |
| } |
| } |
| |
| protected void updateNewExchange(Exchange exchange, int index, Iterable<ProcessorExchangePair> allPairs, |
| Iterator<ProcessorExchangePair> it) { |
| exchange.setProperty(Exchange.MULTICAST_INDEX, index); |
| if (it.hasNext()) { |
| exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.FALSE); |
| } else { |
| exchange.setProperty(Exchange.MULTICAST_COMPLETE, Boolean.TRUE); |
| } |
| } |
| |
| protected Integer getExchangeIndex(Exchange exchange) { |
| return exchange.getProperty(Exchange.MULTICAST_INDEX, Integer.class); |
| } |
| |
| protected Iterable<ProcessorExchangePair> createProcessorExchangePairs(Exchange exchange) throws Exception { |
| List<ProcessorExchangePair> result = new ArrayList<ProcessorExchangePair>(processors.size()); |
| |
| StreamCache streamCache = null; |
| if (isParallelProcessing() && exchange.getIn().getBody() instanceof StreamCache) { |
| // in parallel processing case, the stream must be copied, therefore get the stream |
| streamCache = (StreamCache) exchange.getIn().getBody(); |
| } |
| |
| int index = 0; |
| for (Processor processor : processors) { |
| // copy exchange, and do not share the unit of work |
| Exchange copy = ExchangeHelper.createCorrelatedCopy(exchange, false); |
| |
| if (streamCache != null) { |
| if (index > 0) { |
| // copy it otherwise parallel processing is not possible, |
| // because streams can only be read once |
| StreamCache copiedStreamCache = streamCache.copy(copy); |
| if (copiedStreamCache != null) { |
| copy.getIn().setBody(copiedStreamCache); |
| } |
| } |
| } |
| |
| // If the multi-cast processor has an aggregation strategy |
| // then the StreamCache created by the child routes must not be |
| // closed by the unit of work of the child route, but by the unit of |
| // work of the parent route or grand parent route or grand grand parent route ...(in case of nesting). |
| // Set therefore the unit of work of the parent route as stream cache unit of work, |
| // if it is not already set. |
| if (copy.getProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK) == null) { |
| copy.setProperty(Exchange.STREAM_CACHE_UNIT_OF_WORK, exchange.getUnitOfWork()); |
| } |
| // if we share unit of work, we need to prepare the child exchange |
| if (isShareUnitOfWork()) { |
| prepareSharedUnitOfWork(copy, exchange); |
| } |
| |
| // and add the pair |
| RouteContext routeContext = exchange.getUnitOfWork() != null ? exchange.getUnitOfWork().getRouteContext() : null; |
| result.add(createProcessorExchangePair(index++, processor, copy, routeContext)); |
| } |
| |
| if (exchange.getException() != null) { |
| // force any exceptions occurred during creation of exchange paris to be thrown |
| // before returning the answer; |
| throw exchange.getException(); |
| } |
| |
| return result; |
| } |
| |
| /** |
| * Creates the {@link ProcessorExchangePair} which holds the processor and exchange to be send out. |
| * <p/> |
| * You <b>must</b> use this method to create the instances of {@link ProcessorExchangePair} as they |
| * need to be specially prepared before use. |
| * |
| * @param index the index |
| * @param processor the processor |
| * @param exchange the exchange |
| * @param routeContext the route context |
| * @return prepared for use |
| */ |
| protected ProcessorExchangePair createProcessorExchangePair(int index, Processor processor, Exchange exchange, |
| RouteContext routeContext) { |
| Processor prepared = processor; |
| |
| // set property which endpoint we send to |
| setToEndpoint(exchange, prepared); |
| |
| // rework error handling to support fine grained error handling |
| prepared = createErrorHandler(routeContext, exchange, prepared); |
| |
| // invoke on prepare on the exchange if specified |
| if (onPrepare != null) { |
| try { |
| onPrepare.process(exchange); |
| } catch (Exception e) { |
| exchange.setException(e); |
| } |
| } |
| return new DefaultProcessorExchangePair(index, processor, prepared, exchange); |
| } |
| |
| protected Processor createErrorHandler(RouteContext routeContext, Exchange exchange, Processor processor) { |
| Processor answer; |
| |
| boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class); |
| |
| // do not wrap in error handler if we are inside a try block |
| if (!tryBlock && routeContext != null) { |
| // wrap the producer in error handler so we have fine grained error handling on |
| // the output side instead of the input side |
| // this is needed to support redelivery on that output alone and not doing redelivery |
| // for the entire multicast block again which will start from scratch again |
| |
| // create key for cache |
| final PreparedErrorHandler key = new PreparedErrorHandler(routeContext, processor); |
| |
| // lookup cached first to reuse and preserve memory |
| answer = errorHandlers.get(key); |
| if (answer != null) { |
| LOG.trace("Using existing error handler for: {}", processor); |
| return answer; |
| } |
| |
| LOG.trace("Creating error handler for: {}", processor); |
| ErrorHandlerFactory builder = routeContext.getRoute().getErrorHandlerBuilder(); |
| // create error handler (create error handler directly to keep it light weight, |
| // instead of using ProcessorDefinition.wrapInErrorHandler) |
| try { |
| processor = builder.createErrorHandler(routeContext, processor); |
| |
| // and wrap in unit of work processor so the copy exchange also can run under UoW |
| answer = createUnitOfWorkProcessor(routeContext, processor, exchange); |
| |
| boolean child = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class) != null; |
| |
| // must start the error handler |
| ServiceHelper.startServices(answer); |
| |
| // here we don't cache the child unit of work |
| if (!child) { |
| // add to cache |
| errorHandlers.putIfAbsent(key, answer); |
| } |
| |
| } catch (Exception e) { |
| throw ObjectHelper.wrapRuntimeCamelException(e); |
| } |
| } else { |
| // and wrap in unit of work processor so the copy exchange also can run under UoW |
| answer = createUnitOfWorkProcessor(routeContext, processor, exchange); |
| } |
| |
| return answer; |
| } |
| |
| /** |
| * Strategy to create the unit of work to be used for the sub route |
| * |
| * @param routeContext the route context |
| * @param processor the processor |
| * @param exchange the exchange |
| * @return the unit of work processor |
| */ |
| protected Processor createUnitOfWorkProcessor(RouteContext routeContext, Processor processor, Exchange exchange) { |
| CamelInternalProcessor internal = new CamelInternalProcessor(processor); |
| |
| // and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW |
| UnitOfWork parent = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class); |
| if (parent != null) { |
| internal.addAdvice(new CamelInternalProcessor.ChildUnitOfWorkProcessorAdvice(routeContext, parent)); |
| } else { |
| internal.addAdvice(new CamelInternalProcessor.UnitOfWorkProcessorAdvice(routeContext)); |
| } |
| |
| return internal; |
| } |
| |
| /** |
| * Prepares the exchange for participating in a shared unit of work |
| * <p/> |
| * This ensures a child exchange can access its parent {@link UnitOfWork} when it participate |
| * in a shared unit of work. |
| * |
| * @param childExchange the child exchange |
| * @param parentExchange the parent exchange |
| */ |
| protected void prepareSharedUnitOfWork(Exchange childExchange, Exchange parentExchange) { |
| childExchange.setProperty(Exchange.PARENT_UNIT_OF_WORK, parentExchange.getUnitOfWork()); |
| } |
| |
| protected void doStart() throws Exception { |
| if (isParallelProcessing() && executorService == null) { |
| throw new IllegalArgumentException("ParallelProcessing is enabled but ExecutorService has not been set"); |
| } |
| if (timeout > 0 && !isParallelProcessing()) { |
| throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled"); |
| } |
| if (isParallelProcessing() && aggregateExecutorService == null) { |
| // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread |
| // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run |
| // and signal completion during processing, which would lead to what would appear as a dead-lock or a slow processing |
| String name = getClass().getSimpleName() + "-AggregateTask"; |
| aggregateExecutorService = createAggregateExecutorService(name); |
| } |
| if (aggregationStrategy instanceof CamelContextAware) { |
| ((CamelContextAware) aggregationStrategy).setCamelContext(camelContext); |
| } |
| |
| ServiceHelper.startServices(aggregationStrategy, processors); |
| } |
| |
| /** |
| * Strategy to create the thread pool for the aggregator background task which waits for and aggregates |
| * completed tasks when running in parallel mode. |
| * |
| * @param name the suggested name for the background thread |
| * @return the thread pool |
| */ |
| protected synchronized ExecutorService createAggregateExecutorService(String name) { |
| // use a cached thread pool so we each on-the-fly task has a dedicated thread to process completions as they come in |
| return camelContext.getExecutorServiceManager().newCachedThreadPool(this, name); |
| } |
| |
| @Override |
| protected void doStop() throws Exception { |
| ServiceHelper.stopServices(processors, errorHandlers, aggregationStrategy); |
| } |
| |
| @Override |
| protected void doShutdown() throws Exception { |
| ServiceHelper.stopAndShutdownServices(processors, errorHandlers, aggregationStrategy); |
| // only clear error handlers when shutting down |
| errorHandlers.clear(); |
| |
| if (shutdownExecutorService && executorService != null) { |
| getCamelContext().getExecutorServiceManager().shutdownNow(executorService); |
| } |
| if (aggregateExecutorService != null) { |
| getCamelContext().getExecutorServiceManager().shutdownNow(aggregateExecutorService); |
| } |
| } |
| |
| protected static void setToEndpoint(Exchange exchange, Processor processor) { |
| if (processor instanceof Producer) { |
| Producer producer = (Producer) processor; |
| exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri()); |
| } |
| } |
| |
| protected AggregationStrategy getAggregationStrategy(Exchange exchange) { |
| AggregationStrategy answer = null; |
| |
| // prefer to use per Exchange aggregation strategy over a global strategy |
| if (exchange != null) { |
| Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); |
| Map<Object, AggregationStrategy> map = CastUtils.cast(property); |
| if (map != null) { |
| answer = map.get(this); |
| } |
| } |
| if (answer == null) { |
| // fallback to global strategy |
| answer = getAggregationStrategy(); |
| } |
| return answer; |
| } |
| |
| /** |
| * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}. |
| * |
| * @param exchange the exchange |
| * @param aggregationStrategy the strategy |
| */ |
| protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) { |
| Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); |
| Map<Object, AggregationStrategy> map = CastUtils.cast(property); |
| if (map == null) { |
| map = new ConcurrentHashMap<Object, AggregationStrategy>(); |
| } else { |
| // it is not safe to use the map directly as the exchange doesn't have the deep copy of it's properties |
| // we just create a new copy if we need to change the map |
| map = new ConcurrentHashMap<Object, AggregationStrategy>(map); |
| } |
| // store the strategy using this processor as the key |
| // (so we can store multiple strategies on the same exchange) |
| map.put(this, aggregationStrategy); |
| exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map); |
| } |
| |
| /** |
| * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange} |
| * which must be done after use. |
| * |
| * @param exchange the current exchange |
| */ |
| protected void removeAggregationStrategyFromExchange(Exchange exchange) { |
| Map<?, ?> property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class); |
| Map<Object, AggregationStrategy> map = CastUtils.cast(property); |
| if (map == null) { |
| return; |
| } |
| // remove the strategy using this processor as the key |
| map.remove(this); |
| } |
| |
| /** |
| * Is the multicast processor working in streaming mode? |
| * <p/> |
| * In streaming mode: |
| * <ul> |
| * <li>we use {@link Iterable} to ensure we can send messages as soon as the data becomes available</li> |
| * <li>for parallel processing, we start aggregating responses as they get send back to the processor; |
| * this means the {@link org.apache.camel.processor.aggregate.AggregationStrategy} has to take care of handling out-of-order arrival of exchanges</li> |
| * </ul> |
| */ |
| public boolean isStreaming() { |
| return streaming; |
| } |
| |
| /** |
| * Should the multicast processor stop processing further exchanges in case of an exception occurred? |
| */ |
| public boolean isStopOnException() { |
| return stopOnException; |
| } |
| |
| /** |
| * Returns the producers to multicast to |
| */ |
| public Collection<Processor> getProcessors() { |
| return processors; |
| } |
| |
| /** |
| * An optional timeout in millis when using parallel processing |
| */ |
| public long getTimeout() { |
| return timeout; |
| } |
| |
| /** |
| * Use {@link #getAggregationStrategy(org.apache.camel.Exchange)} instead. |
| */ |
| public AggregationStrategy getAggregationStrategy() { |
| return aggregationStrategy; |
| } |
| |
| public boolean isParallelProcessing() { |
| return parallelProcessing; |
| } |
| |
| public boolean isParallelAggregate() { |
| return parallelAggregate; |
| } |
| |
| public boolean isStopOnAggregateException() { |
| return stopOnAggregateException; |
| } |
| |
| public boolean isShareUnitOfWork() { |
| return shareUnitOfWork; |
| } |
| |
| public List<Processor> next() { |
| if (!hasNext()) { |
| return null; |
| } |
| return new ArrayList<Processor>(processors); |
| } |
| |
| public boolean hasNext() { |
| return processors != null && !processors.isEmpty(); |
| } |
| } |