| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.processor.aggregate; |
| |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.CamelExchangeException; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.Expression; |
| import org.apache.camel.Navigate; |
| import org.apache.camel.Predicate; |
| import org.apache.camel.Processor; |
| import org.apache.camel.impl.LoggingExceptionHandler; |
| import org.apache.camel.impl.ServiceSupport; |
| import org.apache.camel.processor.Traceable; |
| import org.apache.camel.spi.AggregationRepository; |
| import org.apache.camel.spi.ExceptionHandler; |
| import org.apache.camel.spi.RecoverableAggregationRepository; |
| import org.apache.camel.spi.Synchronization; |
| import org.apache.camel.util.DefaultTimeoutMap; |
| import org.apache.camel.util.ExchangeHelper; |
| import org.apache.camel.util.LRUCache; |
| import org.apache.camel.util.ObjectHelper; |
| import org.apache.camel.util.ServiceHelper; |
| import org.apache.camel.util.TimeoutMap; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| /** |
| * An implementation of the <a |
| * href="http://camel.apache.org/aggregator2.html">Aggregator</a> |
| * pattern where a batch of messages are processed (up to a maximum amount or |
| * until some timeout is reached) and messages for the same correlation key are |
| * combined together using some kind of {@link AggregationStrategy} |
| * (by default the latest message is used) to compress many message exchanges |
| * into a smaller number of exchanges. |
| * <p/> |
| * A good example of this is stock market data; you may be receiving 30,000 |
| * messages/second and you may want to throttle it right down so that multiple |
| * messages for the same stock are combined (or just the latest message is used |
| * and older prices are discarded). Another idea is to combine line item messages |
| * together into a single invoice message. |
| * |
| * @version $Revision$ |
| */ |
| public class AggregateProcessor extends ServiceSupport implements Processor, Navigate<Processor>, Traceable { |
| |
| private static final Log LOG = LogFactory.getLog(AggregateProcessor.class); |
| |
| // use a fair lock so timeout checker will have a chance to acquire the lock if |
| // a lot of new messages keep arriving |
| private final Lock lock = new ReentrantLock(true); |
| private final CamelContext camelContext; |
| private final Processor processor; |
| private final AggregationStrategy aggregationStrategy; |
| private final Expression correlationExpression; |
| private final ExecutorService executorService; |
| private ScheduledExecutorService recoverService; |
| // store correlation key -> exchange id in timeout map |
| private TimeoutMap<String, String> timeoutMap; |
| private ExceptionHandler exceptionHandler = new LoggingExceptionHandler(getClass()); |
| private AggregationRepository aggregationRepository = new MemoryAggregationRepository(); |
| private Map<Object, Object> closedCorrelationKeys; |
| private Set<String> batchConsumerCorrelationKeys = new LinkedHashSet<String>(); |
| private final Set<String> inProgressCompleteExchanges = new HashSet<String>(); |
| private final Map<String, RedeliveryData> redeliveryState = new ConcurrentHashMap<String, RedeliveryData>(); |
| // optional dead letter channel for exhausted recovered exchanges |
| private Processor deadLetterProcessor; |
| |
| // keep booking about redelivery |
| private class RedeliveryData { |
| int redeliveryCounter; |
| long redeliveryDelay; |
| } |
| |
| // options |
| private boolean ignoreInvalidCorrelationKeys; |
| private Integer closeCorrelationKeyOnCompletion; |
| private boolean parallelProcessing; |
| |
| // different ways to have completion triggered |
| private boolean eagerCheckCompletion; |
| private Predicate completionPredicate; |
| private long completionTimeout; |
| private Expression completionTimeoutExpression; |
| private long completionInterval; |
| private int completionSize; |
| private Expression completionSizeExpression; |
| private boolean completionFromBatchConsumer; |
| private AtomicInteger batchConsumerCounter = new AtomicInteger(); |
| |
| public AggregateProcessor(CamelContext camelContext, Processor processor, |
| Expression correlationExpression, AggregationStrategy aggregationStrategy, |
| ExecutorService executorService) { |
| ObjectHelper.notNull(camelContext, "camelContext"); |
| ObjectHelper.notNull(processor, "processor"); |
| ObjectHelper.notNull(correlationExpression, "correlationExpression"); |
| ObjectHelper.notNull(aggregationStrategy, "aggregationStrategy"); |
| ObjectHelper.notNull(executorService, "executorService"); |
| this.camelContext = camelContext; |
| this.processor = processor; |
| this.correlationExpression = correlationExpression; |
| this.aggregationStrategy = aggregationStrategy; |
| this.executorService = executorService; |
| } |
| |
| @Override |
| public String toString() { |
| return "AggregateProcessor[to: " + processor + "]"; |
| } |
| |
| public String getTraceLabel() { |
| return "aggregate[" + correlationExpression + "]"; |
| } |
| |
| public List<Processor> next() { |
| if (!hasNext()) { |
| return null; |
| } |
| List<Processor> answer = new ArrayList<Processor>(1); |
| answer.add(processor); |
| return answer; |
| } |
| |
| public boolean hasNext() { |
| return processor != null; |
| } |
| |
| public void process(Exchange exchange) throws Exception { |
| // compute correlation expression |
| String key = correlationExpression.evaluate(exchange, String.class); |
| if (ObjectHelper.isEmpty(key)) { |
| // we have a bad correlation key |
| if (isIgnoreInvalidCorrelationKeys()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Invalid correlation key. This Exchange will be ignored: " + exchange); |
| } |
| return; |
| } else { |
| throw new CamelExchangeException("Invalid correlation key", exchange); |
| } |
| } |
| |
| // is the correlation key closed? |
| if (closedCorrelationKeys != null && closedCorrelationKeys.containsKey(key)) { |
| throw new ClosedCorrelationKeyException(key, exchange); |
| } |
| |
| // when memory based then its fast using synchronized, but if the aggregation repository is IO |
| // bound such as JPA etc then concurrent aggregation per correlation key could |
| // improve performance as we can run aggregation repository get/add in parallel |
| lock.lock(); |
| try { |
| doAggregation(key, exchange); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Aggregates the exchange with the given correlation key |
| * <p/> |
| * This method <b>must</b> be run synchronized as we cannot aggregate the same correlation key |
| * in parallel. |
| * |
| * @param key the correlation key |
| * @param exchange the exchange |
| * @return the aggregated exchange |
| */ |
| private Exchange doAggregation(String key, Exchange exchange) throws CamelExchangeException { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("onAggregation +++ start +++ with correlation key: " + key); |
| } |
| |
| Exchange answer; |
| Exchange oldExchange = aggregationRepository.get(exchange.getContext(), key); |
| Exchange newExchange = exchange; |
| |
| Integer size = 1; |
| if (oldExchange != null) { |
| size = oldExchange.getProperty(Exchange.AGGREGATED_SIZE, 0, Integer.class); |
| size++; |
| } |
| |
| // check if we are complete |
| String complete = null; |
| if (isEagerCheckCompletion()) { |
| // put the current aggregated size on the exchange so its avail during completion check |
| newExchange.setProperty(Exchange.AGGREGATED_SIZE, size); |
| complete = isCompleted(key, newExchange); |
| // remove it afterwards |
| newExchange.removeProperty(Exchange.AGGREGATED_SIZE); |
| } |
| |
| // prepare the exchanges for aggregation and aggregate it |
| ExchangeHelper.prepareAggregation(oldExchange, newExchange); |
| answer = onAggregation(oldExchange, exchange); |
| if (answer == null) { |
| throw new CamelExchangeException("AggregationStrategy " + aggregationStrategy + " returned null which is not allowed", exchange); |
| } |
| |
| // update the aggregated size |
| answer.setProperty(Exchange.AGGREGATED_SIZE, size); |
| |
| // maybe we should check completion after the aggregation |
| if (!isEagerCheckCompletion()) { |
| complete = isCompleted(key, answer); |
| } |
| |
| // only need to update aggregation repository if we are not complete |
| if (complete == null) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("In progress aggregated exchange: " + answer + " with correlation key:" + key); |
| } |
| aggregationRepository.add(exchange.getContext(), key, answer); |
| } else { |
| // if batch consumer completion is enabled then we need to complete the group |
| if ("consumer".equals(complete)) { |
| for (String batchKey : batchConsumerCorrelationKeys) { |
| Exchange batchAnswer = aggregationRepository.get(camelContext, batchKey); |
| batchAnswer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); |
| onCompletion(batchKey, batchAnswer, false); |
| } |
| batchConsumerCorrelationKeys.clear(); |
| } else { |
| // we are complete for this exchange |
| answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, complete); |
| onCompletion(key, answer, false); |
| } |
| } |
| |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("onAggregation +++ end +++ with correlation key: " + key); |
| } |
| |
| return answer; |
| } |
| |
| /** |
| * Tests whether the given exchange is complete or not |
| * |
| * @param key the correlation key |
| * @param exchange the incoming exchange |
| * @return <tt>null</tt> if not completed, otherwise a String with the type that triggered the completion |
| */ |
| protected String isCompleted(String key, Exchange exchange) { |
| if (getCompletionPredicate() != null) { |
| boolean answer = getCompletionPredicate().matches(exchange); |
| if (answer) { |
| return "predicate"; |
| } |
| } |
| |
| if (getCompletionSizeExpression() != null) { |
| Integer value = getCompletionSizeExpression().evaluate(exchange, Integer.class); |
| if (value != null && value > 0) { |
| int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); |
| if (size >= value) { |
| return "size"; |
| } |
| } |
| } |
| if (getCompletionSize() > 0) { |
| int size = exchange.getProperty(Exchange.AGGREGATED_SIZE, 1, Integer.class); |
| if (size >= getCompletionSize()) { |
| return "size"; |
| } |
| } |
| |
| // timeout can be either evaluated based on an expression or from a fixed value |
| // expression takes precedence |
| boolean timeoutSet = false; |
| if (getCompletionTimeoutExpression() != null) { |
| Long value = getCompletionTimeoutExpression().evaluate(exchange, Long.class); |
| if (value != null && value > 0) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Updating correlation key " + key + " to timeout after " |
| + value + " ms. as exchange received: " + exchange); |
| } |
| timeoutMap.put(key, exchange.getExchangeId(), value); |
| timeoutSet = true; |
| } |
| } |
| if (!timeoutSet && getCompletionTimeout() > 0) { |
| // timeout is used so use the timeout map to keep an eye on this |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Updating correlation key " + key + " to timeout after " |
| + getCompletionTimeout() + " ms. as exchange received: " + exchange); |
| } |
| timeoutMap.put(key, exchange.getExchangeId(), getCompletionTimeout()); |
| } |
| |
| if (isCompletionFromBatchConsumer()) { |
| batchConsumerCorrelationKeys.add(key); |
| batchConsumerCounter.incrementAndGet(); |
| int size = exchange.getProperty(Exchange.BATCH_SIZE, 0, Integer.class); |
| if (size > 0 && batchConsumerCounter.intValue() >= size) { |
| // batch consumer is complete then reset the counter |
| batchConsumerCounter.set(0); |
| return "consumer"; |
| } |
| } |
| |
| // not complete |
| return null; |
| } |
| |
| protected Exchange onAggregation(Exchange oldExchange, Exchange newExchange) { |
| return aggregationStrategy.aggregate(oldExchange, newExchange); |
| } |
| |
| protected void onCompletion(final String key, final Exchange exchange, boolean fromTimeout) { |
| // store the correlation key as property |
| exchange.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key); |
| // remove from repository as its completed |
| aggregationRepository.remove(exchange.getContext(), key, exchange); |
| if (!fromTimeout && timeoutMap != null) { |
| // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker) |
| timeoutMap.remove(key); |
| } |
| |
| // this key has been closed so add it to the closed map |
| if (closedCorrelationKeys != null) { |
| closedCorrelationKeys.put(key, key); |
| } |
| |
| onSubmitCompletion(key, exchange); |
| } |
| |
| private void onSubmitCompletion(final Object key, final Exchange exchange) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Aggregation complete for correlation key " + key + " sending aggregated exchange: " + exchange); |
| } |
| |
| // add this as in progress before we submit the task |
| inProgressCompleteExchanges.add(exchange.getExchangeId()); |
| |
| // send this exchange |
| executorService.submit(new Runnable() { |
| public void run() { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Processing aggregated exchange: " + exchange); |
| } |
| |
| // add on completion task so we remember to update the inProgressCompleteExchanges |
| exchange.addOnCompletion(new AggregateOnCompletion(exchange.getExchangeId())); |
| |
| try { |
| processor.process(exchange); |
| } catch (Exception e) { |
| exchange.setException(e); |
| } catch (Throwable t) { |
| // must catch throwable so we will handle all exceptions as the executor service will by default ignore them |
| exchange.setException(new CamelExchangeException("Error processing aggregated exchange", exchange, t)); |
| } |
| |
| // log exception if there was a problem |
| if (exchange.getException() != null) { |
| // if there was an exception then let the exception handler handle it |
| getExceptionHandler().handleException("Error processing aggregated exchange", exchange, exchange.getException()); |
| } else { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Processing aggregated exchange: " + exchange + " complete."); |
| } |
| } |
| } |
| }); |
| } |
| |
| public Predicate getCompletionPredicate() { |
| return completionPredicate; |
| } |
| |
| public void setCompletionPredicate(Predicate completionPredicate) { |
| this.completionPredicate = completionPredicate; |
| } |
| |
| public boolean isEagerCheckCompletion() { |
| return eagerCheckCompletion; |
| } |
| |
| public void setEagerCheckCompletion(boolean eagerCheckCompletion) { |
| this.eagerCheckCompletion = eagerCheckCompletion; |
| } |
| |
| public long getCompletionTimeout() { |
| return completionTimeout; |
| } |
| |
| public void setCompletionTimeout(long completionTimeout) { |
| this.completionTimeout = completionTimeout; |
| } |
| |
| public Expression getCompletionTimeoutExpression() { |
| return completionTimeoutExpression; |
| } |
| |
| public void setCompletionTimeoutExpression(Expression completionTimeoutExpression) { |
| this.completionTimeoutExpression = completionTimeoutExpression; |
| } |
| |
| public long getCompletionInterval() { |
| return completionInterval; |
| } |
| |
| public void setCompletionInterval(long completionInterval) { |
| this.completionInterval = completionInterval; |
| } |
| |
| public int getCompletionSize() { |
| return completionSize; |
| } |
| |
| public void setCompletionSize(int completionSize) { |
| this.completionSize = completionSize; |
| } |
| |
| public Expression getCompletionSizeExpression() { |
| return completionSizeExpression; |
| } |
| |
| public void setCompletionSizeExpression(Expression completionSizeExpression) { |
| this.completionSizeExpression = completionSizeExpression; |
| } |
| |
| public boolean isIgnoreInvalidCorrelationKeys() { |
| return ignoreInvalidCorrelationKeys; |
| } |
| |
| public void setIgnoreInvalidCorrelationKeys(boolean ignoreInvalidCorrelationKeys) { |
| this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys; |
| } |
| |
| public Integer getCloseCorrelationKeyOnCompletion() { |
| return closeCorrelationKeyOnCompletion; |
| } |
| |
| public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) { |
| this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion; |
| } |
| |
| public boolean isCompletionFromBatchConsumer() { |
| return completionFromBatchConsumer; |
| } |
| |
| public void setCompletionFromBatchConsumer(boolean completionFromBatchConsumer) { |
| this.completionFromBatchConsumer = completionFromBatchConsumer; |
| } |
| |
| public ExceptionHandler getExceptionHandler() { |
| return exceptionHandler; |
| } |
| |
| public void setExceptionHandler(ExceptionHandler exceptionHandler) { |
| this.exceptionHandler = exceptionHandler; |
| } |
| |
| public boolean isParallelProcessing() { |
| return parallelProcessing; |
| } |
| |
| public void setParallelProcessing(boolean parallelProcessing) { |
| this.parallelProcessing = parallelProcessing; |
| } |
| |
| public AggregationRepository getAggregationRepository() { |
| return aggregationRepository; |
| } |
| |
| public void setAggregationRepository(AggregationRepository aggregationRepository) { |
| this.aggregationRepository = aggregationRepository; |
| } |
| |
| /** |
| * On completion task which keeps the booking of the in progress up to date |
| */ |
| private final class AggregateOnCompletion implements Synchronization { |
| private final String exchangeId; |
| |
| private AggregateOnCompletion(String exchangeId) { |
| // must use the original exchange id as it could potentially change if send over SEDA etc. |
| this.exchangeId = exchangeId; |
| } |
| |
| public void onFailure(Exchange exchange) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Aggregated exchange onFailure: " + exchange); |
| } |
| |
| // must remember to remove in progress when we failed |
| inProgressCompleteExchanges.remove(exchangeId); |
| // do not remove redelivery state as we need it when we redeliver again later |
| } |
| |
| public void onComplete(Exchange exchange) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Aggregated exchange onComplete: " + exchange); |
| } |
| |
| // only confirm if we processed without a problem |
| try { |
| aggregationRepository.confirm(exchange.getContext(), exchangeId); |
| // and remove redelivery state as well |
| redeliveryState.remove(exchangeId); |
| } finally { |
| // must remember to remove in progress when we are complete |
| inProgressCompleteExchanges.remove(exchangeId); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "AggregateOnCompletion"; |
| } |
| } |
| |
| /** |
| * Background task that looks for aggregated exchanges which is triggered by completion timeouts. |
| */ |
| private final class AggregationTimeoutMap extends DefaultTimeoutMap<String, String> { |
| |
| private AggregationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { |
| super(executor, requestMapPollTimeMillis); |
| } |
| |
| @Override |
| public void onEviction(String key, String exchangeId) { |
| if (log.isDebugEnabled()) { |
| log.debug("Completion timeout triggered for correlation key: " + key); |
| } |
| |
| // double check that its not already in progress |
| boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); |
| if (inProgress) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Aggregated exchange with id: " + exchangeId + " is already in progress."); |
| } |
| return; |
| } |
| |
| // get the aggregated exchange |
| Exchange answer = aggregationRepository.get(camelContext, key); |
| |
| // indicate it was completed by timeout |
| answer.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "timeout"); |
| |
| lock.lock(); |
| try { |
| onCompletion(key, answer, true); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| } |
| |
| /** |
| * Background task that triggers completion based on interval. |
| */ |
| private final class AggregationIntervalTask implements Runnable { |
| |
| public void run() { |
| // only run if CamelContext has been fully started |
| if (!camelContext.getStatus().isStarted()) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Completion interval task cannot start due CamelContext(" + camelContext.getName() + ") has not been started yet"); |
| } |
| return; |
| } |
| |
| LOG.trace("Starting completion interval task"); |
| |
| // trigger completion for all in the repository |
| Set<String> keys = aggregationRepository.getKeys(); |
| |
| if (keys != null && !keys.isEmpty()) { |
| lock.lock(); |
| try { |
| for (String key : keys) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Completion interval triggered for correlation key: " + key); |
| } |
| Exchange exchange = aggregationRepository.get(camelContext, key); |
| |
| // indicate it was completed by interval |
| exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval"); |
| |
| onCompletion(key, exchange, false); |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| LOG.trace("Completion interval task complete"); |
| } |
| } |
| |
| /** |
| * Background task that looks for aggregated exchanges to recover. |
| */ |
| private final class RecoverTask implements Runnable { |
| private final RecoverableAggregationRepository recoverable; |
| |
| private RecoverTask(RecoverableAggregationRepository recoverable) { |
| this.recoverable = recoverable; |
| } |
| |
| public void run() { |
| // only run if CamelContext has been fully started |
| if (!camelContext.getStatus().isStarted()) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Recover check cannot start due CamelContext(" + camelContext.getName() + ") has not been started yet"); |
| } |
| return; |
| } |
| |
| LOG.trace("Starting recover check"); |
| |
| Set<String> exchangeIds = recoverable.scan(camelContext); |
| for (String exchangeId : exchangeIds) { |
| |
| // we may shutdown while doing recovery |
| if (!isRunAllowed()) { |
| LOG.info("We are shutting down so stop recovering"); |
| return; |
| } |
| |
| boolean inProgress = inProgressCompleteExchanges.contains(exchangeId); |
| if (inProgress) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Aggregated exchange with id: " + exchangeId + " is already in progress."); |
| } |
| } else { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Loading aggregated exchange with id: " + exchangeId + " to be recovered."); |
| } |
| Exchange exchange = recoverable.recover(camelContext, exchangeId); |
| if (exchange != null) { |
| // get the correlation key |
| String key = exchange.getProperty(Exchange.AGGREGATED_CORRELATION_KEY, String.class); |
| // and mark it as redelivered |
| exchange.getIn().setHeader(Exchange.REDELIVERED, Boolean.TRUE); |
| |
| // get the current redelivery data |
| RedeliveryData data = redeliveryState.get(exchange.getExchangeId()); |
| |
| // if we are exhausted, then move to dead letter channel |
| if (data != null && recoverable.getMaximumRedeliveries() > 0 && data.redeliveryCounter >= recoverable.getMaximumRedeliveries()) { |
| LOG.warn("The recovered exchange is exhausted after " + recoverable.getMaximumRedeliveries() |
| + " attempts, will now be moved to dead letter channel: " + recoverable.getDeadLetterUri()); |
| |
| // send to DLC |
| try { |
| // set redelivery counter |
| exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); |
| exchange.getIn().setHeader(Exchange.REDELIVERY_EXHAUSTED, Boolean.TRUE); |
| deadLetterProcessor.process(exchange); |
| } catch (Exception e) { |
| exchange.setException(e); |
| } |
| |
| // handle if failed |
| if (exchange.getException() != null) { |
| getExceptionHandler().handleException("Failed to move recovered Exchange to dead letter channel: " + recoverable.getDeadLetterUri(), exchange.getException()); |
| } else { |
| // it was ok, so confirm after it has been moved to dead letter channel, so we wont recover it again |
| recoverable.confirm(camelContext, exchangeId); |
| } |
| } else { |
| // update current redelivery state |
| if (data == null) { |
| // create new data |
| data = new RedeliveryData(); |
| redeliveryState.put(exchange.getExchangeId(), data); |
| } |
| data.redeliveryCounter++; |
| |
| // set redelivery counter |
| exchange.getIn().setHeader(Exchange.REDELIVERY_COUNTER, data.redeliveryCounter); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Delivery attempt: " + data.redeliveryCounter + " to recover aggregated exchange with id: " + exchangeId + ""); |
| } |
| // not exhaust so resubmit the recovered exchange |
| lock.lock(); |
| try { |
| onSubmitCompletion(key, exchange); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| } |
| } |
| } |
| |
| LOG.trace("Recover check complete"); |
| } |
| } |
| |
| @Override |
| protected void doStart() throws Exception { |
| if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null |
| && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null |
| && getCompletionSizeExpression() == null) { |
| throw new IllegalStateException("At least one of the completions options" |
| + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set"); |
| } |
| |
| if (getCloseCorrelationKeyOnCompletion() != null) { |
| if (getCloseCorrelationKeyOnCompletion() > 0) { |
| LOG.info("Using ClosedCorrelationKeys with a LRUCache with a capacity of " + getCloseCorrelationKeyOnCompletion()); |
| closedCorrelationKeys = new LRUCache<Object, Object>(getCloseCorrelationKeyOnCompletion()); |
| } else { |
| LOG.info("Using ClosedCorrelationKeys with unbounded capacity"); |
| closedCorrelationKeys = new HashMap<Object, Object>(); |
| } |
| } |
| |
| ServiceHelper.startServices(processor, aggregationRepository); |
| |
| // should we use recover checker |
| if (aggregationRepository instanceof RecoverableAggregationRepository) { |
| RecoverableAggregationRepository recoverable = (RecoverableAggregationRepository) aggregationRepository; |
| if (recoverable.isUseRecovery()) { |
| long interval = recoverable.getRecoveryIntervalInMillis(); |
| if (interval <= 0) { |
| throw new IllegalArgumentException("AggregationRepository has recovery enabled and the RecoveryInterval option must be a positive number, was: " + interval); |
| } |
| |
| // create a background recover thread to check every interval |
| recoverService = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateRecoverChecker", 1); |
| Runnable recoverTask = new RecoverTask(recoverable); |
| LOG.info("Using RecoverableAggregationRepository by scheduling recover checker to run every " + interval + " millis."); |
| // use fixed delay so there is X interval between each run |
| recoverService.scheduleWithFixedDelay(recoverTask, 1000L, interval, TimeUnit.MILLISECONDS); |
| |
| if (recoverable.getDeadLetterUri() != null) { |
| int max = recoverable.getMaximumRedeliveries(); |
| if (max <= 0) { |
| throw new IllegalArgumentException("Option maximumRedeliveries must be a positive number, was: " + max); |
| } |
| LOG.info("After " + max + " failed redelivery attempts Exchanges will be moved to deadLetterUri: " + recoverable.getDeadLetterUri()); |
| deadLetterProcessor = camelContext.getEndpoint(recoverable.getDeadLetterUri()).createProducer(); |
| ServiceHelper.startService(deadLetterProcessor); |
| } |
| } |
| } |
| |
| if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) { |
| throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both."); |
| } |
| if (getCompletionInterval() > 0) { |
| LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis."); |
| ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1); |
| // trigger completion based on interval |
| scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS); |
| } |
| |
| // start timeout service if its in use |
| if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) { |
| LOG.info("Using CompletionTimeout to trigger after " + getCompletionTimeout() + " millis of inactivity."); |
| ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1); |
| // check for timed out aggregated messages once every second |
| timeoutMap = new AggregationTimeoutMap(scheduler, 1000L); |
| ServiceHelper.startService(timeoutMap); |
| } |
| } |
| |
| @Override |
| protected void doStop() throws Exception { |
| if (recoverService != null) { |
| camelContext.getExecutorServiceStrategy().shutdownNow(recoverService); |
| } |
| ServiceHelper.stopServices(timeoutMap, processor, deadLetterProcessor); |
| |
| if (closedCorrelationKeys != null) { |
| // it may be a service so stop it as well |
| ServiceHelper.stopService(closedCorrelationKeys); |
| closedCorrelationKeys.clear(); |
| } |
| batchConsumerCorrelationKeys.clear(); |
| redeliveryState.clear(); |
| } |
| |
| @Override |
| protected void doShutdown() throws Exception { |
| // shutdown aggregation repository |
| ServiceHelper.stopService(aggregationRepository); |
| |
| // cleanup when shutting down |
| inProgressCompleteExchanges.clear(); |
| |
| super.doShutdown(); |
| } |
| |
| } |