blob: 3ba3f3819702f10faefbdcdaab3f8785bb6ea9df [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.model;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlElementRef;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.Expression;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.builder.ExpressionClause;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.processor.UnitOfWorkProcessor;
import org.apache.camel.processor.aggregate.AggregateProcessor;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.GroupedExchangeAggregationStrategy;
import org.apache.camel.spi.AggregationRepository;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.util.concurrent.ExecutorServiceHelper;
/**
* Represents an XML <aggregate/> element
*
* @version
*/
@XmlRootElement(name = "aggregate")
@XmlAccessorType(XmlAccessType.FIELD)
public class AggregateDefinition extends ProcessorDefinition<AggregateDefinition> implements ExecutorServiceAwareDefinition<AggregateDefinition> {
@XmlElement(name = "correlationExpression", required = true)
private ExpressionSubElementDefinition correlationExpression;
@XmlElement(name = "completionPredicate")
private ExpressionSubElementDefinition completionPredicate;
@XmlElement(name = "completionTimeout")
private ExpressionSubElementDefinition completionTimeoutExpression;
@XmlElement(name = "completionSize")
private ExpressionSubElementDefinition completionSizeExpression;
@XmlTransient
private ExpressionDefinition expression;
@XmlElementRef
private List<ProcessorDefinition> outputs = new ArrayList<ProcessorDefinition>();
@XmlTransient
private AggregationStrategy aggregationStrategy;
@XmlTransient
private ExecutorService executorService;
@XmlTransient
private AggregationRepository aggregationRepository;
@XmlAttribute
private Boolean parallelProcessing;
@XmlAttribute
private String executorServiceRef;
@XmlAttribute
private String aggregationRepositoryRef;
@XmlAttribute
private String strategyRef;
@XmlAttribute
private Integer completionSize;
@XmlAttribute
private Long completionInterval;
@XmlAttribute
private Long completionTimeout;
@XmlAttribute
private Boolean completionFromBatchConsumer;
@XmlAttribute
private Boolean groupExchanges;
@XmlAttribute
private Boolean eagerCheckCompletion;
@XmlAttribute
private Boolean ignoreInvalidCorrelationKeys;
@XmlAttribute
private Integer closeCorrelationKeyOnCompletion;
@XmlAttribute
private Boolean discardOnCompletionTimeout;
public AggregateDefinition() {
}
public AggregateDefinition(Predicate predicate) {
if (predicate != null) {
setExpression(new ExpressionDefinition(predicate));
}
}
public AggregateDefinition(Expression correlationExpression) {
if (correlationExpression != null) {
setExpression(new ExpressionDefinition(correlationExpression));
}
}
public AggregateDefinition(ExpressionDefinition correlationExpression) {
this.expression = correlationExpression;
}
public AggregateDefinition(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
this(correlationExpression);
this.aggregationStrategy = aggregationStrategy;
}
@Override
public String toString() {
String expressionString = (getExpression() != null) ? getExpression().getLabel() : "";
return "Aggregate[" + expressionString + " -> " + getOutputs() + "]";
}
@Override
public String getShortName() {
return "aggregate";
}
@Override
public String getLabel() {
return "aggregate";
}
@Override
public Processor createProcessor(RouteContext routeContext) throws Exception {
return createAggregator(routeContext);
}
protected AggregateProcessor createAggregator(RouteContext routeContext) throws Exception {
Processor processor = this.createChildProcessor(routeContext, true);
// wrap the aggregated route in a unit of work processor
processor = new UnitOfWorkProcessor(routeContext, processor);
Expression correlation = getExpression().createExpression(routeContext);
AggregationStrategy strategy = createAggregationStrategy(routeContext);
// executor service is mandatory for the Aggregator
executorService = ExecutorServiceHelper.getConfiguredExecutorService(routeContext, "Aggregator", this);
if (executorService == null) {
if (isParallelProcessing()) {
// we are running in parallel so create a thread pool
executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newDefaultThreadPool(this, "Aggregator");
} else {
// use a synchronous thread pool if we are not running in parallel (will always use caller thread)
executorService = routeContext.getCamelContext().getExecutorServiceStrategy().newSynchronousThreadPool(this, "Aggregator");
}
}
AggregateProcessor answer = new AggregateProcessor(routeContext.getCamelContext(), processor, correlation, strategy, executorService);
AggregationRepository repository = createAggregationRepository(routeContext);
if (repository != null) {
answer.setAggregationRepository(repository);
}
// set other options
answer.setParallelProcessing(isParallelProcessing());
if (getCompletionPredicate() != null) {
Predicate predicate = getCompletionPredicate().createPredicate(routeContext);
answer.setCompletionPredicate(predicate);
}
if (getCompletionTimeoutExpression() != null) {
Expression expression = getCompletionTimeoutExpression().createExpression(routeContext);
answer.setCompletionTimeoutExpression(expression);
}
if (getCompletionTimeout() != null) {
answer.setCompletionTimeout(getCompletionTimeout());
}
if (getCompletionInterval() != null) {
answer.setCompletionInterval(getCompletionInterval());
}
if (getCompletionSizeExpression() != null) {
Expression expression = getCompletionSizeExpression().createExpression(routeContext);
answer.setCompletionSizeExpression(expression);
}
if (getCompletionSize() != null) {
answer.setCompletionSize(getCompletionSize());
}
if (getCompletionFromBatchConsumer() != null) {
answer.setCompletionFromBatchConsumer(isCompletionFromBatchConsumer());
}
if (getEagerCheckCompletion() != null) {
answer.setEagerCheckCompletion(isEagerCheckCompletion());
}
if (getIgnoreInvalidCorrelationKeys() != null) {
answer.setIgnoreInvalidCorrelationKeys(isIgnoreInvalidCorrelationKeys());
}
if (getCloseCorrelationKeyOnCompletion() != null) {
answer.setCloseCorrelationKeyOnCompletion(getCloseCorrelationKeyOnCompletion());
}
if (getDiscardOnCompletionTimeout() != null) {
answer.setDiscardOnCompletionTimeout(isDiscardOnCompletionTimeout());
}
return answer;
}
@Override
protected void configureChild(ProcessorDefinition output) {
if (expression != null && expression instanceof ExpressionClause) {
ExpressionClause clause = (ExpressionClause) expression;
if (clause.getExpressionType() != null) {
// if using the Java DSL then the expression may have been set using the
// ExpressionClause which is a fancy builder to define expressions and predicates
// using fluent builders in the DSL. However we need afterwards a callback to
// reset the expression to the expression type the ExpressionClause did build for us
expression = clause.getExpressionType();
// set the correlation expression from the expression type, as the model definition
// would then be accurate
correlationExpression = new ExpressionSubElementDefinition();
correlationExpression.setExpressionType(clause.getExpressionType());
}
}
}
private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
AggregationStrategy strategy = getAggregationStrategy();
if (strategy == null && strategyRef != null) {
strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
}
if (groupExchanges != null && groupExchanges) {
if (strategy != null || strategyRef != null) {
throw new IllegalArgumentException("Options groupExchanges and AggregationStrategy cannot be enabled at the same time");
}
if (eagerCheckCompletion != null && !eagerCheckCompletion) {
throw new IllegalArgumentException("Option eagerCheckCompletion cannot be false when groupExchanges has been enabled");
}
// set eager check to enabled by default when using grouped exchanges
setEagerCheckCompletion(true);
// if grouped exchange is enabled then use special strategy for that
strategy = new GroupedExchangeAggregationStrategy();
}
if (strategy == null) {
throw new IllegalArgumentException("AggregationStrategy or AggregationStrategyRef must be set on " + this);
}
return strategy;
}
private AggregationRepository createAggregationRepository(RouteContext routeContext) {
AggregationRepository repository = getAggregationRepository();
if (repository == null && aggregationRepositoryRef != null) {
repository = routeContext.lookup(aggregationRepositoryRef, AggregationRepository.class);
if (repository == null) {
throw new IllegalArgumentException("AggregationRepositoryRef " + aggregationRepositoryRef + " not found in registry.");
}
}
return repository;
}
public AggregationStrategy getAggregationStrategy() {
return aggregationStrategy;
}
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
public String getAggregationStrategyRef() {
return strategyRef;
}
public void setAggregationStrategyRef(String aggregationStrategyRef) {
this.strategyRef = aggregationStrategyRef;
}
public Integer getCompletionSize() {
return completionSize;
}
public void setCompletionSize(Integer completionSize) {
this.completionSize = completionSize;
}
public Long getCompletionInterval() {
return completionInterval;
}
public void setCompletionInterval(Long completionInterval) {
this.completionInterval = completionInterval;
}
public Long getCompletionTimeout() {
return completionTimeout;
}
public void setCompletionTimeout(Long completionTimeout) {
this.completionTimeout = completionTimeout;
}
public ExpressionSubElementDefinition getCompletionPredicate() {
return completionPredicate;
}
public void setCompletionPredicate(ExpressionSubElementDefinition completionPredicate) {
this.completionPredicate = completionPredicate;
}
public ExpressionSubElementDefinition getCompletionTimeoutExpression() {
return completionTimeoutExpression;
}
public void setCompletionTimeoutExpression(ExpressionSubElementDefinition completionTimeoutExpression) {
this.completionTimeoutExpression = completionTimeoutExpression;
}
public ExpressionSubElementDefinition getCompletionSizeExpression() {
return completionSizeExpression;
}
public void setCompletionSizeExpression(ExpressionSubElementDefinition completionSizeExpression) {
this.completionSizeExpression = completionSizeExpression;
}
public Boolean getGroupExchanges() {
return groupExchanges;
}
public boolean isGroupExchanges() {
return groupExchanges != null && groupExchanges;
}
public void setGroupExchanges(Boolean groupExchanges) {
this.groupExchanges = groupExchanges;
}
public Boolean getCompletionFromBatchConsumer() {
return completionFromBatchConsumer;
}
public boolean isCompletionFromBatchConsumer() {
return completionFromBatchConsumer != null && completionFromBatchConsumer;
}
public void setCompletionFromBatchConsumer(Boolean completionFromBatchConsumer) {
this.completionFromBatchConsumer = completionFromBatchConsumer;
}
public ExecutorService getExecutorService() {
return executorService;
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
public Boolean getParallelProcessing() {
return parallelProcessing;
}
public boolean isParallelProcessing() {
return parallelProcessing != null && parallelProcessing;
}
public void setParallelProcessing(boolean parallelProcessing) {
this.parallelProcessing = parallelProcessing;
}
public String getExecutorServiceRef() {
return executorServiceRef;
}
public void setExecutorServiceRef(String executorServiceRef) {
this.executorServiceRef = executorServiceRef;
}
public String getStrategyRef() {
return strategyRef;
}
public void setStrategyRef(String strategyRef) {
this.strategyRef = strategyRef;
}
public Boolean getEagerCheckCompletion() {
return eagerCheckCompletion;
}
public boolean isEagerCheckCompletion() {
return eagerCheckCompletion != null && eagerCheckCompletion;
}
public void setEagerCheckCompletion(Boolean eagerCheckCompletion) {
this.eagerCheckCompletion = eagerCheckCompletion;
}
public Boolean getIgnoreInvalidCorrelationKeys() {
return ignoreInvalidCorrelationKeys;
}
public boolean isIgnoreInvalidCorrelationKeys() {
return ignoreInvalidCorrelationKeys != null && ignoreInvalidCorrelationKeys;
}
public void setIgnoreInvalidCorrelationKeys(Boolean ignoreInvalidCorrelationKeys) {
this.ignoreInvalidCorrelationKeys = ignoreInvalidCorrelationKeys;
}
public Integer getCloseCorrelationKeyOnCompletion() {
return closeCorrelationKeyOnCompletion;
}
public void setCloseCorrelationKeyOnCompletion(Integer closeCorrelationKeyOnCompletion) {
this.closeCorrelationKeyOnCompletion = closeCorrelationKeyOnCompletion;
}
public AggregationRepository getAggregationRepository() {
return aggregationRepository;
}
public void setAggregationRepository(AggregationRepository aggregationRepository) {
this.aggregationRepository = aggregationRepository;
}
public String getAggregationRepositoryRef() {
return aggregationRepositoryRef;
}
public void setAggregationRepositoryRef(String aggregationRepositoryRef) {
this.aggregationRepositoryRef = aggregationRepositoryRef;
}
public Boolean getDiscardOnCompletionTimeout() {
return discardOnCompletionTimeout;
}
public boolean isDiscardOnCompletionTimeout() {
return discardOnCompletionTimeout != null && discardOnCompletionTimeout;
}
public void setDiscardOnCompletionTimeout(Boolean discardOnCompletionTimeout) {
this.discardOnCompletionTimeout = discardOnCompletionTimeout;
}
// Fluent API
//-------------------------------------------------------------------------
/**
* Use eager completion checking which means that the {{completionPredicate}} will use the incoming Exchange.
* At opposed to without eager completion checking the {{completionPredicate}} will use the aggregated Exchange.
*
* @return builder
*/
public AggregateDefinition eagerCheckCompletion() {
setEagerCheckCompletion(true);
return this;
}
/**
* If a correlation key cannot be successfully evaluated it will be ignored by logging a {{DEBUG}} and then just
* ignore the incoming Exchange.
*
* @return builder
*/
public AggregateDefinition ignoreInvalidCorrelationKeys() {
setIgnoreInvalidCorrelationKeys(true);
return this;
}
/**
* Closes a correlation key when its complete. Any <i>late</i> received exchanges which has a correlation key
* that has been closed, it will be defined and a {@link org.apache.camel.processor.aggregate.ClosedCorrelationKeyException}
* is thrown.
*
* @param capacity the maximum capacity of the closed correlation key cache.
* Use <tt>0</tt> or negative value for unbounded capacity.
* @return builder
*/
public AggregateDefinition closeCorrelationKeyOnCompletion(int capacity) {
setCloseCorrelationKeyOnCompletion(capacity);
return this;
}
/**
* Discards the aggregated message on completion timeout.
* <p/>
* This means on timeout the aggregated message is dropped and not sent out of the aggregator.
*
* @return builder
*/
public AggregateDefinition discardOnCompletionTimeout() {
setDiscardOnCompletionTimeout(true);
return this;
}
/**
* Enables the batch completion mode where we aggregate from a {@link org.apache.camel.BatchConsumer}
* and aggregate the total number of exchanges the {@link org.apache.camel.BatchConsumer} has reported
* as total by checking the exchange property {@link org.apache.camel.Exchange#BATCH_COMPLETE} when its complete.
*
* @return builder
*/
public AggregateDefinition completionFromBatchConsumer() {
setCompletionFromBatchConsumer(true);
return this;
}
/**
* Sets the completion size, which is the number of aggregated exchanges which would
* cause the aggregate to consider the group as complete and send out the aggregated exchange.
*
* @param completionSize the completion size
* @return builder
*/
public AggregateDefinition completionSize(int completionSize) {
setCompletionSize(completionSize);
return this;
}
/**
* Sets the completion size, which is the number of aggregated exchanges which would
* cause the aggregate to consider the group as complete and send out the aggregated exchange.
*
* @param completionSize the completion size as an {@link org.apache.camel.Expression} which is evaluated as a {@link Integer} type
* @return builder
*/
public AggregateDefinition completionSize(Expression completionSize) {
setCompletionSizeExpression(new ExpressionSubElementDefinition(completionSize));
return this;
}
/**
* Sets the completion interval, which would cause the aggregate to consider the group as complete
* and send out the aggregated exchange.
*
* @param completionInterval the interval in millis
* @return the builder
*/
public AggregateDefinition completionInterval(long completionInterval) {
setCompletionInterval(completionInterval);
return this;
}
/**
* Sets the completion timeout, which would cause the aggregate to consider the group as complete
* and send out the aggregated exchange.
*
* @param completionTimeout the timeout in millis
* @return the builder
*/
public AggregateDefinition completionTimeout(long completionTimeout) {
setCompletionTimeout(completionTimeout);
return this;
}
/**
* Sets the completion timeout, which would cause the aggregate to consider the group as complete
* and send out the aggregated exchange.
*
* @param completionTimeout the timeout as an {@link Expression} which is evaluated as a {@link Long} type
* @return the builder
*/
public AggregateDefinition completionTimeout(Expression completionTimeout) {
setCompletionTimeoutExpression(new ExpressionSubElementDefinition(completionTimeout));
return this;
}
/**
* Sets the aggregate strategy to use
*
* @param aggregationStrategy the aggregate strategy to use
* @return the builder
*/
public AggregateDefinition aggregationStrategy(AggregationStrategy aggregationStrategy) {
setAggregationStrategy(aggregationStrategy);
return this;
}
/**
* Sets the aggregate strategy to use
*
* @param aggregationStrategyRef reference to the strategy to lookup in the registry
* @return the builder
*/
public AggregateDefinition aggregationStrategyRef(String aggregationStrategyRef) {
setAggregationStrategyRef(aggregationStrategyRef);
return this;
}
/**
* Sets the custom aggregate repository to use.
* <p/>
* Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
*
* @param aggregationRepository the aggregate repository to use
* @return the builder
*/
public AggregateDefinition aggregationRepository(AggregationRepository aggregationRepository) {
setAggregationRepository(aggregationRepository);
return this;
}
/**
* Sets the custom aggregate repository to use
* <p/>
* Will by default use {@link org.apache.camel.processor.aggregate.MemoryAggregationRepository}
*
* @param aggregationRepositoryRef reference to the repository to lookup in the registry
* @return the builder
*/
public AggregateDefinition aggregationRepositoryRef(String aggregationRepositoryRef) {
setAggregationRepositoryRef(aggregationRepositoryRef);
return this;
}
/**
* Enables grouped exchanges, so the aggregator will group all aggregated exchanges into a single
* combined Exchange holding all the aggregated exchanges in a {@link java.util.List} as a exchange
* property with the key {@link org.apache.camel.Exchange#GROUPED_EXCHANGE}.
*
* @return the builder
*/
public AggregateDefinition groupExchanges() {
setGroupExchanges(true);
// must use eager check when using grouped exchanges
setEagerCheckCompletion(true);
return this;
}
/**
* Sets the predicate used to determine if the aggregation is completed
*
* @param predicate the predicate
* @return the builder
*/
public AggregateDefinition completionPredicate(Predicate predicate) {
checkNoCompletedPredicate();
setCompletionPredicate(new ExpressionSubElementDefinition(predicate));
return this;
}
/**
* Sending the aggregated output in parallel
*
* @return the builder
*/
public AggregateDefinition parallelProcessing() {
setParallelProcessing(true);
return this;
}
public AggregateDefinition executorService(ExecutorService executorService) {
setExecutorService(executorService);
return this;
}
public AggregateDefinition executorServiceRef(String executorServiceRef) {
setExecutorServiceRef(executorServiceRef);
return this;
}
protected void checkNoCompletedPredicate() {
if (getCompletionPredicate() != null) {
throw new IllegalArgumentException("There is already a completionPredicate defined for this aggregator: " + this);
}
}
public void setCorrelationExpression(ExpressionSubElementDefinition correlationExpression) {
this.correlationExpression = correlationExpression;
}
public ExpressionSubElementDefinition getCorrelationExpression() {
return correlationExpression;
}
// Section - Methods from ExpressionNode
// Needed to copy methods from ExpressionNode here so that I could specify the
// correlation expression as optional in JAXB
public ExpressionDefinition getExpression() {
if (expression == null && correlationExpression != null) {
expression = correlationExpression.getExpressionType();
}
return expression;
}
public void setExpression(ExpressionDefinition expression) {
this.expression = expression;
}
public List<ProcessorDefinition> getOutputs() {
return outputs;
}
public boolean isOutputSupported() {
return true;
}
public void setOutputs(List<ProcessorDefinition> outputs) {
this.outputs = outputs;
}
}