/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.camel.model;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlAttribute;
import javax.xml.bind.annotation.XmlTransient;

import org.apache.camel.Channel;
import org.apache.camel.Endpoint;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
import org.apache.camel.LoggingLevel;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.builder.DataFormatClause;
import org.apache.camel.builder.ErrorHandlerBuilder;
import org.apache.camel.builder.ErrorHandlerBuilderRef;
import org.apache.camel.builder.ExpressionBuilder;
import org.apache.camel.builder.ExpressionClause;
import org.apache.camel.builder.ProcessorBuilder;
import org.apache.camel.model.language.ConstantExpression;
import org.apache.camel.model.language.ExpressionDefinition;
import org.apache.camel.model.language.LanguageExpression;
import org.apache.camel.processor.DefaultChannel;
import org.apache.camel.processor.InterceptEndpointProcessor;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.interceptor.Delayer;
import org.apache.camel.processor.interceptor.HandleFault;
import org.apache.camel.processor.interceptor.StreamCaching;
import org.apache.camel.processor.loadbalancer.LoadBalancer;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.LifecycleStrategy;
import org.apache.camel.spi.Policy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.TransactedPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import static org.apache.camel.builder.Builder.body;

/**
 * Base class for processor types that most XML types extend.
 *
 * @version $Revision$
 */
@XmlAccessorType(XmlAccessType.PROPERTY)
public abstract class ProcessorDefinition<Type extends ProcessorDefinition> extends OptionalIdentifiedDefinition implements Block {
    protected final transient Log log = LogFactory.getLog(getClass());
    protected ErrorHandlerBuilder errorHandlerBuilder;
    protected String errorHandlerRef;
    protected Boolean inheritErrorHandler = Boolean.TRUE;
    private NodeFactory nodeFactory;
    private final LinkedList<Block> blocks = new LinkedList<Block>();
    private ProcessorDefinition<?> parent;
    private final List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>();

    // else to use an optional attribute in JAXB2
    public abstract List<ProcessorDefinition> getOutputs();

    /**
     * Whether this model is abstract or not.
     * <p/>
     * An abstract model is something that is used for configuring cross cutting concerns such as
     * error handling, transaction policies, interceptors etc.
     * <p/>
     * Regular definitions is what is part of the route, such as ToDefinition, WireTapDefinition and the likes.
     * <p/>
     * Will by default return <tt>false</tt> to indicate regular definition, so all the abstract definitions
     * must override this method and return <tt>true</tt> instead.
     * <p/>
     * This information is used in camel-spring to let Camel work a bit on the model provided by JAXB from the
     * Spring XML file. This is needed to handle those cross cutting concerns properly. The Java DSL does not
     * have this issue as it can work this out directly using the fluent builder methods.
     *
     * @return <tt>true</tt> for abstract, otherwise <tt>false</tt> for regular.
     */
    public boolean isAbstract() {
        return false;
    }

    /**
     * Override this in definition class and implement logic to create the processor
     * based on the definition model.
     */
    public Processor createProcessor(RouteContext routeContext) throws Exception {
        throw new UnsupportedOperationException("Not implemented yet for class: " + getClass().getName());
    }

    /**
     * Prefer to use {#link #createChildProcessor}.
     */
    public Processor createOutputsProcessor(RouteContext routeContext) throws Exception {
        Collection<ProcessorDefinition> outputs = getOutputs();
        return createOutputsProcessor(routeContext, outputs);
    }

    /**
     * Creates the child processor (outputs) from the current definition
     *
     * @param routeContext   the route context
     * @param mandatory      whether or not children is mandatory (ie the definition should have outputs)
     * @return the created children, or <tt>null</tt> if definition had no output
     * @throws Exception is thrown if error creating the child or if it was mandatory and there was no output defined on definition
     */
    public Processor createChildProcessor(RouteContext routeContext, boolean mandatory) throws Exception {
        Processor children = routeContext.createProcessor(this);
        if (children == null && mandatory) {
            throw new IllegalArgumentException("Definition has no children on " + this);
        }
        return children;
    }

    public void addOutput(ProcessorDefinition processorType) {
        processorType.setParent(this);
        configureChild(processorType);
        if (blocks.isEmpty()) {
            getOutputs().add(processorType);
        } else {
            Block block = blocks.getLast();
            block.addOutput(processorType);
        }
    }

    public void clearOutput() {
        getOutputs().clear();
        blocks.clear();
    }

    public void addRoutes(RouteContext routeContext, Collection<Route> routes) throws Exception {
        Processor processor = makeProcessor(routeContext);
        if (processor == null) {
            // no processor to add
            return;
        }

        if (!routeContext.isRouteAdded()) {
            boolean endpointInterceptor = false;

            // are we routing to an endpoint interceptor, if so we should not add it as an event driven
            // processor as we use the producer to trigger the interceptor
            if (processor instanceof Channel) {
                Channel channel = (Channel) processor;
                Processor next = channel.getNextProcessor();
                if (next instanceof InterceptEndpointProcessor) {
                    endpointInterceptor = true;
                }
            }

            // only add regular processors as event driven
            if (endpointInterceptor) {
                if (log.isDebugEnabled()) {
                    log.debug("Endpoint interceptor should not be added as an event driven consumer route: " + processor);
                }
            } else {
                if (log.isTraceEnabled()) {
                    log.trace("Adding event driven processor: " + processor);
                }
                routeContext.addEventDrivenProcessor(processor);
            }

        }
    }

    /**
     * Wraps the child processor in whatever necessary interceptors and error handlers
     */
    public Processor wrapProcessor(RouteContext routeContext, Processor processor) throws Exception {
        // dont double wrap
        if (processor instanceof Channel) {
            return processor;
        }
        return wrapChannel(routeContext, processor, null);
    }

    protected Processor wrapChannel(RouteContext routeContext, Processor processor, ProcessorDefinition child) throws Exception {
        // put a channel in between this and each output to control the route flow logic
        Channel channel = createChannel(routeContext);
        channel.setNextProcessor(processor);

        // add interceptor strategies to the channel must be in this order: camel context, route context, local
        addInterceptStrategies(routeContext, channel, routeContext.getCamelContext().getInterceptStrategies());
        addInterceptStrategies(routeContext, channel, routeContext.getInterceptStrategies());
        if (routeContext.getManagedInterceptStrategy() != null) {
            channel.addInterceptStrategy(routeContext.getManagedInterceptStrategy());
        }
        addInterceptStrategies(routeContext, channel, this.getInterceptStrategies());

        // must do this ugly cast to avoid compiler error on HP-UX
        ProcessorDefinition defn = (ProcessorDefinition) this;

        // set the child before init the channel
        channel.setChildDefinition(child);
        channel.initChannel(defn, routeContext);

        // set the error handler, must be done after init as we can set the error handler as first in the chain
        if (defn instanceof TryDefinition || defn instanceof CatchDefinition || defn instanceof FinallyDefinition) {
            // do not use error handler for try .. catch .. finally blocks as it will handle errors itself
        } else if (ProcessorDefinitionHelper.isParentOfType(TryDefinition.class, defn, true)
                || ProcessorDefinitionHelper.isParentOfType(CatchDefinition.class, defn, true)
                || ProcessorDefinitionHelper.isParentOfType(FinallyDefinition.class, defn, true)) {
            // do not use error handler for try .. catch .. finally blocks as it will handle errors itself
            // by checking that any of our parent(s) is not a try .. catch or finally type
        } else if (defn instanceof MulticastDefinition || defn instanceof RecipientListDefinition) {
            // do not use error handler for multicast or recipient list based as it offers fine grained error handlers for its outputs
        } else {
            if (inheritErrorHandler) {
                if (log.isTraceEnabled()) {
                    log.trace(defn + " is configured to inheritErrorHandler");
                }
                // only add error handler if we are configured to do so
                // regular definition so add the error handler
                Processor output = channel.getOutput();
                Processor errorHandler = wrapInErrorHandler(routeContext, getErrorHandlerBuilder(), output);
                // set error handler on channel
                channel.setErrorHandler(errorHandler);
            } else {
                if (log.isDebugEnabled()) {
                    log.debug(defn + " is configured to not inheritErrorHandler.");
                }
            }
        }

        if (log.isTraceEnabled()) {
            log.trace(defn + " wrapped in Channel: " + channel);
        }
        return channel;
    }

    /**
     * Wraps the given output in an error handler
     *
     * @param routeContext the route context
     * @param output the output
     * @return the output wrapped with the error handler
     * @throws Exception can be thrown if failed to create error handler builder
     */
    protected Processor wrapInErrorHandler(RouteContext routeContext, ErrorHandlerBuilder builder, Processor output) throws Exception {
        // create error handler
        Processor errorHandler = builder.createErrorHandler(routeContext, output);

        // invoke lifecycles so we can manage this error handler builder
        for (LifecycleStrategy strategy : routeContext.getCamelContext().getLifecycleStrategies()) {
            strategy.onErrorHandlerAdd(routeContext, errorHandler, builder);
        }

        return errorHandler;
    }

    /**
     * Adds the given list of interceptors to the channel.
     *
     * @param routeContext  the route context
     * @param channel       the channel to add strategies
     * @param strategies    list of strategies to add.
     */
    protected void addInterceptStrategies(RouteContext routeContext, Channel channel, List<InterceptStrategy> strategies) {
        for (InterceptStrategy strategy : strategies) {
            if (!routeContext.isStreamCaching() && strategy instanceof StreamCaching) {
                // stream cache is disabled so we should not add it
                continue;
            }
            if (!routeContext.isHandleFault() && strategy instanceof HandleFault) {
                // handle fault is disabled so we should not add it
                continue;
            }
            if (strategy instanceof Delayer) {
                if (routeContext.getDelayer() == null || routeContext.getDelayer() <= 0) {
                    // delayer is disabled so we should not add it
                    continue;
                } else {
                    // replace existing delayer as delayer have individual configuration
                    Iterator<InterceptStrategy> it = channel.getInterceptStrategies().iterator();
                    while (it.hasNext()) {
                        InterceptStrategy existing = it.next();
                        if (existing instanceof Delayer) {
                            it.remove();
                        }
                    }
                    // add the new correct delayer
                    channel.addInterceptStrategy(strategy);
                    continue;
                }
            }

            // add strategy
            channel.addInterceptStrategy(strategy);
        }
    }

    /**
     * Creates a new instance of some kind of composite processor which defaults
     * to using a {@link Pipeline} but derived classes could change the behaviour
     */
    protected Processor createCompositeProcessor(RouteContext routeContext, List<Processor> list) throws Exception {
        return new Pipeline(routeContext.getCamelContext(), list);
    }

    /**
     * Creates a new instance of the {@link Channel}.
     */
    protected Channel createChannel(RouteContext routeContext) throws Exception {
        return new DefaultChannel();
    }

    protected Processor createOutputsProcessor(RouteContext routeContext, Collection<ProcessorDefinition> outputs) throws Exception {
        List<Processor> list = new ArrayList<Processor>();
        for (ProcessorDefinition<?> output : outputs) {
            Processor processor = output.createProcessor(routeContext);
            if (output instanceof Channel && processor == null) {
                continue;
            }

            Processor channel = wrapChannel(routeContext, processor, output);
            list.add(channel);
        }

        // if more than one output wrap than in a composite processor else just keep it as is
        Processor processor = null;
        if (!list.isEmpty()) {
            if (list.size() == 1) {
                processor = list.get(0);
            } else {
                processor = createCompositeProcessor(routeContext, list);
            }
        }

        return processor;
    }

    /**
     * Creates the processor and wraps it in any necessary interceptors and error handlers
     */
    protected Processor makeProcessor(RouteContext routeContext) throws Exception {
        Processor processor = createProcessor(routeContext);
        if (processor == null) {
            // no processor to make
            return null;
        }
        return wrapProcessor(routeContext, processor);
    }

    protected ErrorHandlerBuilder createErrorHandlerBuilder() {
        if (errorHandlerRef != null) {
            return new ErrorHandlerBuilderRef(errorHandlerRef);
        }

        // return a reference to the default error handler
        return new ErrorHandlerBuilderRef(ErrorHandlerBuilderRef.DEFAULT_ERROR_HANDLER_BUILDER);
    }

    protected void configureChild(ProcessorDefinition output) {
        output.setNodeFactory(getNodeFactory());
        output.setErrorHandlerBuilder(getErrorHandlerBuilder());
    }

    // Fluent API
    // -------------------------------------------------------------------------

    /**
     * Sends the exchange to the given endpoint
     *
     * @param uri  the endpoint to send to
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type to(String uri) {
        addOutput(new ToDefinition(uri));
        return (Type) this;
    }   
    
    /**
     * Sends the exchange to the given endpoint
     *
     * @param uri  the String formatted endpoint uri to send to
     * @param args arguments for the string formatting of the uri
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type toF(String uri, Object... args) {
        addOutput(new ToDefinition(String.format(uri, args)));
        return (Type) this;
    }


    /**
     * Sends the exchange to the given endpoint
     *
     * @param endpoint  the endpoint to send to
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type to(Endpoint endpoint) {
        addOutput(new ToDefinition(endpoint));
        return (Type) this;
    }
    
    /**
     * Sends the exchange with certain exchange pattern to the given endpoint
     *
     * @param pattern the pattern to use for the message exchange
     * @param uri  the endpoint to send to
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type to(ExchangePattern pattern, String uri) {
        addOutput(new ToDefinition(uri, pattern));
        return (Type) this;
    }   
    

    /**
     * Sends the exchange with certain exchange pattern to the given endpoint
     *
     * @param pattern the pattern to use for the message exchange
     * @param endpoint  the endpoint to send to
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type to(ExchangePattern pattern, Endpoint endpoint) {
        addOutput(new ToDefinition(endpoint, pattern));
        return (Type) this;
    }

    /**
     * Sends the exchange to a list of endpoints
     *
     * @param uris  list of endpoints to send to
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type to(String... uris) {
        for (String uri : uris) {
            addOutput(new ToDefinition(uri));
        }
        return (Type) this;
    }


    /**
     * Sends the exchange to a list of endpoints
     *
     * @param endpoints  list of endpoints to send to
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type to(Endpoint... endpoints) {
        for (Endpoint endpoint : endpoints) {
            addOutput(new ToDefinition(endpoint));
        }
        return (Type) this;
    }

    /**
     * Sends the exchange to a list of endpoints
     *
     * @param endpoints  list of endpoints to send to
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type to(Iterable<Endpoint> endpoints) {
        for (Endpoint endpoint : endpoints) {
            addOutput(new ToDefinition(endpoint));
        }
        return (Type) this;
    }
    
    
    /**
     * Sends the exchange to a list of endpoints
     *
     * @param pattern the pattern to use for the message exchanges
     * @param uris  list of endpoints to send to
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type to(ExchangePattern pattern, String... uris) {
        for (String uri : uris) {
            addOutput(new ToDefinition(uri, pattern));
        }
        return (Type) this;
    }

    /**
     * Sends the exchange to a list of endpoints
     *
     * @param pattern the pattern to use for the message exchanges
     * @param endpoints  list of endpoints to send to
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type to(ExchangePattern pattern, Endpoint... endpoints) {
        for (Endpoint endpoint : endpoints) {
            addOutput(new ToDefinition(endpoint, pattern));
        }
        return (Type) this;
    }

    /**
     * Sends the exchange to a list of endpoints
     *
     * @param pattern the pattern to use for the message exchanges
     * @param endpoints  list of endpoints to send to
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type to(ExchangePattern pattern, Iterable<Endpoint> endpoints) {
        for (Endpoint endpoint : endpoints) {
            addOutput(new ToDefinition(endpoint, pattern));
        }
        return (Type) this;
    }

    /**
     * Sends the exchange to the given endpoint using asynchronous mode.
     *
     * @param uri  the endpoint to send to
     * @return the builder
     * @see org.apache.camel.processor.SendAsyncProcessor
     * @deprecated will be replaced with a new async routing engine in Camel 2.4
     */
    @Deprecated
    public ToDefinition toAsync(String uri) {
        ToDefinition answer = new ToDefinition(uri);
        answer.setAsync(true);
        addOutput(answer);
        // must push a block so we have a child route for the async reply
        // routing which is separated from the caller route
        pushBlock(answer);
        return answer;
    }

    /**
     * Sends the exchange to the given endpoint using asynchronous mode.
     *
     * @param uri  the endpoint to send to
     * @param poolSize the core pool size
     * @return the builder
     * @see org.apache.camel.processor.SendAsyncProcessor
     * @deprecated will be replaced with a new async routing engine in Camel 2.4
     */
    @Deprecated
    public ToDefinition toAsync(String uri, int poolSize) {
        ToDefinition answer = new ToDefinition(uri);
        answer.setAsync(true);
        answer.setPoolSize(poolSize);
        addOutput(answer);
        // must push a block so we have a child route for the async reply
        // routing which is separated from the caller route
        pushBlock(answer);
        return answer;
    }

    /**
     * Sends the exchange to the given endpoint using asynchronous mode.
     *
     * @param endpoint  the endpoint to send to
     * @return the builder
     * @see org.apache.camel.processor.SendAsyncProcessor
     * @deprecated will be replaced with a new async routing engine in Camel 2.4
     */
    @Deprecated
    public ToDefinition toAsync(Endpoint endpoint) {
        ToDefinition answer = new ToDefinition(endpoint);
        answer.setAsync(true);
        addOutput(answer);
        // must push a block so we have a child route for the async reply
        // routing which is separated from the caller route
        pushBlock(answer);
        return answer;
    }

    /**
     * Sends the exchange to the given endpoint using asynchronous mode.
     *
     * @param endpoint  the endpoint to send to
     * @param poolSize the core pool size
     * @return the builder
     * @see org.apache.camel.processor.SendAsyncProcessor
     * @deprecated will be replaced with a new async routing engine in Camel 2.4
     */
    @Deprecated
    public ToDefinition toAsync(Endpoint endpoint, int poolSize) {
        ToDefinition answer = new ToDefinition(endpoint);
        answer.setAsync(true);
        answer.setPoolSize(poolSize);
        addOutput(answer);
        // must push a block so we have a child route for the async reply
        // routing which is separated from the caller route
        pushBlock(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
     * set the ExchangePattern {@link ExchangePattern} into the exchange
     *
     * @param exchangePattern  instance of {@link ExchangePattern}
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type setExchangePattern(ExchangePattern exchangePattern) {
        addOutput(new SetExchangePatternDefinition(exchangePattern));
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
     * set the exchange's ExchangePattern {@link ExchangePattern} to be InOnly
     *
     *
     * @return the builder
     */
    public Type inOnly() {
        return setExchangePattern(ExchangePattern.InOnly);
    }

    /**
     * Sends the message to the given endpoint using an
     * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
     * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
     *
     * @param uri The endpoint uri which is used for sending the exchange
     * @return the builder
     */
    public Type inOnly(String uri) {
        return to(ExchangePattern.InOnly, uri);
    }

    /**
     * Sends the message to the given endpoint using an
     * <a href="http://camel.apache.org/event-message.html">Event Message</a> or 
     * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
     *
     * @param endpoint The endpoint which is used for sending the exchange
     * @return the builder
     */
    public Type inOnly(Endpoint endpoint) {
        return to(ExchangePattern.InOnly, endpoint);
    }


    /**
     * Sends the message to the given endpoints using an
     * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
     * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
     *
     * @param uris  list of endpoints to send to
     * @return the builder
     */
    public Type inOnly(String... uris) {
        return to(ExchangePattern.InOnly, uris);
    }


    /**
     * Sends the message to the given endpoints using an
     * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
     * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
     *
     * @param endpoints  list of endpoints to send to
     * @return the builder
     */
    public Type inOnly(Endpoint... endpoints) {
        return to(ExchangePattern.InOnly, endpoints);
    }

    /**
     * Sends the message to the given endpoints using an
     * <a href="http://camel.apache.org/event-message.html">Event Message</a> or
     * <a href="http://camel.apache.org/exchange-pattern.html">InOnly exchange pattern</a>
     *
     * @param endpoints  list of endpoints to send to
     * @return the builder
     */
    public Type inOnly(Iterable<Endpoint> endpoints) {
        return to(ExchangePattern.InOnly, endpoints);
    }


    /**
     * <a href="http://camel.apache.org/exchange-pattern.html">ExchangePattern:</a>
     * set the exchange's ExchangePattern {@link ExchangePattern} to be InOut
     *
     *
     * @return the builder
     */
    public Type inOut() {
        return setExchangePattern(ExchangePattern.InOut);
    }

    /**
     * Sends the message to the given endpoint using an
     * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
     * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
     *
     * @param uri The endpoint uri which is used for sending the exchange
     * @return the builder
     */
    public Type inOut(String uri) {
        return to(ExchangePattern.InOut, uri);
    }


    /**
     * Sends the message to the given endpoint using an
     * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
     * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
     *
     * @param endpoint The endpoint which is used for sending the exchange
     * @return the builder
     */
    public Type inOut(Endpoint endpoint) {
        return to(ExchangePattern.InOut, endpoint);
    }

    /**
     * Sends the message to the given endpoints using an
     * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
     * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
     *
     * @param uris  list of endpoints to send to
     * @return the builder
     */
    public Type inOut(String... uris) {
        return to(ExchangePattern.InOut, uris);
    }


    /**
     * Sends the message to the given endpoints using an
     * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
     * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
     *
     * @param endpoints  list of endpoints to send to
     * @return the builder
     */
    public Type inOut(Endpoint... endpoints) {
        return to(ExchangePattern.InOut, endpoints);
    }

    /**
     * Sends the message to the given endpoints using an
     * <a href="http://camel.apache.org/request-reply.html">Request Reply</a> or
     * <a href="http://camel.apache.org/exchange-pattern.html">InOut exchange pattern</a>
     *
     * @param endpoints  list of endpoints to send to
     * @return the builder
     */
    public Type inOut(Iterable<Endpoint> endpoints) {
        return to(ExchangePattern.InOut, endpoints);
    }

    /**
     * Sets the id of this node
     *
     * @param id  the id
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type id(String id) {
        if (getOutputs().isEmpty()) {
            // set id on this
            setId(id);
        } else {
            // set it on last output as this is what the user means to do
            getOutputs().get(getOutputs().size() - 1).setId(id);
        }

        return (Type) this;
    }

    /**
     * Set the route id for this route
     *
     * @param id  the route id
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type routeId(String id) {
        ProcessorDefinition def = this;

        RouteDefinition route = ProcessorDefinitionHelper.getRoute(def);
        if (route != null) {
            route.setId(id);
        }

        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a>
     * Multicasts messages to all its child outputs; so that each processor and
     * destination gets a copy of the original message to avoid the processors
     * interfering with each other.
     *
     * @return the builder
     */
    public MulticastDefinition multicast() {
        MulticastDefinition answer = new MulticastDefinition();
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a>
     * Multicasts messages to all its child outputs; so that each processor and
     * destination gets a copy of the original message to avoid the processors
     * interfering with each other.
     *
     * @param aggregationStrategy the strategy used to aggregate responses for
     *          every part
     * @param parallelProcessing if is <tt>true</tt> camel will fork thread to call the endpoint producer
     * @return the builder
     */
    public MulticastDefinition multicast(AggregationStrategy aggregationStrategy, boolean parallelProcessing) {
        MulticastDefinition answer = new MulticastDefinition();
        addOutput(answer);
        answer.setAggregationStrategy(aggregationStrategy);
        answer.setParallelProcessing(parallelProcessing);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/multicast.html">Multicast EIP:</a>
     * Multicasts messages to all its child outputs; so that each processor and
     * destination gets a copy of the original message to avoid the processors
     * interfering with each other.
     *
     * @param aggregationStrategy the strategy used to aggregate responses for every part
     * @return the builder
     */
    public MulticastDefinition multicast(AggregationStrategy aggregationStrategy) {
        MulticastDefinition answer = new MulticastDefinition();
        addOutput(answer);
        answer.setAggregationStrategy(aggregationStrategy);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a>
     * Creates a {@link Pipeline} so that the message
     * will get processed by each endpoint in turn and for request/response the
     * output of one endpoint will be the input of the next endpoint
     *
     * @return the builder
     */
    public PipelineDefinition pipeline() {
        PipelineDefinition answer = new PipelineDefinition();
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a>
     * Creates a {@link Pipeline} of the list of endpoints so that the message
     * will get processed by each endpoint in turn and for request/response the
     * output of one endpoint will be the input of the next endpoint
     *
     * @param uris  list of endpoints
     * @return the builder
     */
    public Type pipeline(String... uris) {
        return to(uris);
    }

    /**
     * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a>
     * Creates a {@link Pipeline} of the list of endpoints so that the message
     * will get processed by each endpoint in turn and for request/response the
     * output of one endpoint will be the input of the next endpoint
     *
     * @param endpoints  list of endpoints
     * @return the builder
     */
    public Type pipeline(Endpoint... endpoints) {
        return to(endpoints);
    }

    /**
     * <a href="http://camel.apache.org/pipes-nd-filters.html">Pipes and Filters EIP:</a>
     * Creates a {@link Pipeline} of the list of endpoints so that the message
     * will get processed by each endpoint in turn and for request/response the
     * output of one endpoint will be the input of the next endpoint
     *
     * @param endpoints  list of endpoints
     * @return the builder
     */
    public Type pipeline(Collection<Endpoint> endpoints) {
        return to(endpoints);
    }

    /**
     * Leverages a thread pool for multi threading processing exchanges.
     * <p/>
     * The caller thread will either wait for the async route
     * to complete or immediately continue. If continue the OUT message will
     * contain a {@link java.util.concurrent.Future} handle so you can get the real response
     * later using this handle.
     * <p/>
     * Will default <tt>Always</tt> wait for the async route to complete, but this behavior can be overriden by:
     * <ul>
     *   <li>Configuring the <tt>waitForTaskToComplete</tt> option</li>
     *   <li>Provide an IN header with the key {@link org.apache.camel.Exchange#ASYNC_WAIT} with the
     * value containing a type {@link org.apache.camel.WaitForTaskToComplete}. The header will take precedence, if provided.</li>
     * </ul>
     * <p/>
     * If no <tt>corePoolSize</tt> is set then a default CachedExecutorService is used which automatic grown and shrinks.
     * If no <tt>maxPoolSize</tt> is set, then the <tt>corePoolSize</tt> is used as max.
     *
     * @return the builder
     */
    public ThreadsDefinition threads() {
        ThreadsDefinition answer = new ThreadsDefinition();
        addOutput(answer);
        return answer;
    }

    /**
     * Leverages a thread pool for multi threading processing exchanges.
     * <p/>
     * See {@link #threads()} for more details.
     * If no <tt>maxPoolSize</tt> is set, then the <tt>corePoolSize</tt> is used as max.
     *
     * @param poolSize the core pool size
     * @return the builder
     */
    public ThreadsDefinition threads(int poolSize) {
        ThreadsDefinition answer = threads();
        answer.setPoolSize(poolSize);
        return answer;
    }

    /**
     * Leverages a thread pool for multi threading processing exchanges.
     * <p/>
     * See {@link #threads()} for more details.
     *
     * @param poolSize    the core pool size
     * @param maxPoolSize the maximum pool size
     * @return the builder
     */
    public ThreadsDefinition threads(int poolSize, int maxPoolSize) {
        ThreadsDefinition answer = threads();
        answer.setPoolSize(poolSize);
        answer.setMaxPoolSize(maxPoolSize);
        return answer;
    }

    /**
     * Wraps the sub route using AOP allowing you to do before and after work (AOP around).
     *
     * @return the builder
     */
    public AOPDefinition aop() {
        AOPDefinition answer = new AOPDefinition();
        addOutput(answer);
        return answer;
    }

    /**
     * Ends the current block
     *
     * @return the builder
     */
    public ProcessorDefinition end() {
        // when using doTry .. doCatch .. doFinally we should always
        // end the try definition to avoid having to use 2 x end() in the route
        // this is counter intuitive for end users
        ProcessorDefinition defn = (ProcessorDefinition) this;
        if (defn instanceof TryDefinition) {
            popBlock();
        }

        if (blocks.isEmpty()) {
            if (parent == null) {
                return this; 
            }
            return parent;
        }
        popBlock();
        return this;
    }

    /**
     * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
     * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
     * to avoid duplicate messages
     *      
     * @return the builder
     */
    public IdempotentConsumerDefinition idempotentConsumer() {
        IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
     * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
     * to avoid duplicate messages
     *
     * @param messageIdExpression  expression to test of duplicate messages
     * @param idempotentRepository  the repository to use for duplicate chedck
     * @return the builder
     */
    public IdempotentConsumerDefinition idempotentConsumer(Expression messageIdExpression, IdempotentRepository<?> idempotentRepository) {
        IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition(messageIdExpression, idempotentRepository);
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/idempotent-consumer.html">Idempotent consumer EIP:</a>
     * Creates an {@link org.apache.camel.processor.idempotent.IdempotentConsumer IdempotentConsumer}
     * to avoid duplicate messages
     *
     * @param idempotentRepository the repository to use for duplicate chedck
     * @return the builder used to create the expression
     */
    public ExpressionClause<IdempotentConsumerDefinition> idempotentConsumer(IdempotentRepository<?> idempotentRepository) {
        IdempotentConsumerDefinition answer = new IdempotentConsumerDefinition();
        answer.setMessageIdRepository(idempotentRepository);
        addOutput(answer);
        return ExpressionClause.createAndSetExpression(answer);
    }

    /**
     * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
     * Creates a predicate expression which only if it is <tt>true</tt> then the
     * exchange is forwarded to the destination
     *
     * @return the clause used to create the filter expression
     */
    public ExpressionClause<FilterDefinition> filter() {
        FilterDefinition filter = new FilterDefinition();
        addOutput(filter);
        return ExpressionClause.createAndSetExpression(filter);
    }

    /**
     * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
     * Creates a predicate which is applied and only if it is <tt>true</tt> then the
     * exchange is forwarded to the destination
     *
     * @param predicate  predicate to use
     * @return the builder 
     */
    public FilterDefinition filter(Predicate predicate) {
        FilterDefinition filter = new FilterDefinition(predicate);
        addOutput(filter);
        return filter;
    }

    /**
     * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
     * Creates a predicate expression which only if it is <tt>true</tt> then the
     * exchange is forwarded to the destination
     *
     * @param expression  the predicate expression to use
     * @return the builder
     */
    public FilterDefinition filter(ExpressionDefinition expression) {
        FilterDefinition filter = getNodeFactory().createFilter();
        filter.setExpression(expression);
        addOutput(filter);
        return filter;
    }

    /**
     * <a href="http://camel.apache.org/message-filter.html">Message Filter EIP:</a>
     * Creates a predicate language expression which only if it is <tt>true</tt> then the
     * exchange is forwarded to the destination
     *
     * @param language     language for expression
     * @param expression   the expression
     * @return the builder
     */
    public FilterDefinition filter(String language, String expression) {
        return filter(new LanguageExpression(language, expression));
    }

    /**
     * <a href="http://camel.apache.org/load-balancer.html">Load Balancer EIP:</a>
     * Creates a loadbalance
     *
     * @return  the builder
     */
    public LoadBalanceDefinition loadBalance() {
        LoadBalanceDefinition answer = new LoadBalanceDefinition();
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/load-balancer.html">Load Balancer EIP:</a>
     * Creates a loadbalance
     *
     * @param loadBalancer a custom load balancer to use
     * @return  the builder
     */
    public LoadBalanceDefinition loadBalance(LoadBalancer loadBalancer) {
        LoadBalanceDefinition answer = new LoadBalanceDefinition();
        addOutput(answer);
        return answer.loadBalance(loadBalancer);
    }

    /**
     * Creates a log message to be logged at INFO level.
     *
     * @param message the log message, (you can use {@link org.apache.camel.language.simple.SimpleLanguage} syntax)
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type log(String message) {
        LogDefinition answer = new LogDefinition(message);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * Creates a log message to be logged at the given level.
     *
     * @param loggingLevel the logging level to use
     * @param message the log message, (you can use {@link org.apache.camel.language.simple.SimpleLanguage} syntax)
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type log(LoggingLevel loggingLevel, String message) {
        LogDefinition answer = new LogDefinition(message);
        answer.setLoggingLevel(loggingLevel);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * Creates a log message to be logged at the given level and name.
     *
     * @param loggingLevel the logging level to use
     * @param logName the log name to use
     * @param message the log message, (you can use {@link org.apache.camel.language.simple.SimpleLanguage} syntax)
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type log(LoggingLevel loggingLevel, String logName, String message) {
        LogDefinition answer = new LogDefinition(message);
        answer.setLoggingLevel(loggingLevel);
        answer.setLogName(logName);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/content-based-router.html">Content Based Router EIP:</a>
     * Creates a choice of one or more predicates with an otherwise clause
     *
     * @return the builder for a choice expression
     */
    public ChoiceDefinition choice() {
        ChoiceDefinition answer = new ChoiceDefinition();
        addOutput(answer);
        return answer;
    }

    /**
     * Creates a try/catch block
     *
     * @return the builder for a tryBlock expression
     */
    public TryDefinition doTry() {
        TryDefinition answer = new TryDefinition();
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/recipient-list.html">Recipient List EIP:</a>
     * Creates a dynamic recipient list allowing you to route messages to a number of dynamically specified recipients.
     * <p/>
     * Will use comma as default delimiter.
     *
     * @param recipients expression to decide the destinations
     * @return the builder
     */
    public RecipientListDefinition<Type> recipientList(Expression recipients) {
        RecipientListDefinition<Type> answer = new RecipientListDefinition<Type>(recipients);
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/recipient-list.html">Recipient List EIP:</a>
     * Creates a dynamic recipient list allowing you to route messages to a number of dynamically specified recipients
     *
     * @param recipients expression to decide the destinations
     * @param delimiter  a custom delimiter to use
     * @return the builder
     */
    public RecipientListDefinition<Type> recipientList(Expression recipients, String delimiter) {
        RecipientListDefinition<Type> answer = new RecipientListDefinition<Type>(recipients);
        answer.setDelimiter(delimiter);
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/recipient-list.html">Recipient List EIP:</a>
     * Creates a dynamic recipient list allowing you to route messages to a number of dynamically specified recipients
     *
     * @return the expression clause to configure the expression to decide the destinations
     */
    public ExpressionClause<RecipientListDefinition<Type>> recipientList() {
        RecipientListDefinition<Type> answer = new RecipientListDefinition<Type>();
        addOutput(answer);
        return ExpressionClause.createAndSetExpression(answer);
    }

    /**
     * <a href="http://camel.apache.org/routing-slip.html">Routing Slip EIP:</a>
     * Creates a routing slip allowing you to route a message consecutively through a series of processing
     * steps where the sequence of steps is not known at design time and can vary for each message.
     *
     * @param header  is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
     *                class will look in for the list of URIs to route the message to.
     * @param uriDelimiter  is the delimiter that will be used to split up
     *                      the list of URIs in the routing slip.
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type routingSlip(String header, String uriDelimiter) {
        RoutingSlipDefinition answer = new RoutingSlipDefinition(header, uriDelimiter);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/routing-slip.html">Routing Slip EIP:</a>
     * Creates a routing slip allowing you to route a message consecutively through a series of processing
     * steps where the sequence of steps is not known at design time and can vary for each message.
     * <p>
     * The list of URIs will be split based on the default delimiter {@link RoutingSlipDefinition#DEFAULT_DELIMITER}
     *
     * @param header  is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
     *                class will look in for the list of URIs to route the message to.
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type routingSlip(String header) {
        RoutingSlipDefinition answer = new RoutingSlipDefinition(header);
        addOutput(answer);
        return (Type) this;
    }
    
    /**
     * <a href="http://camel.apache.org/routing-slip.html">Routing Slip EIP:</a>
     * Creates a routing slip allowing you to route a message consecutively through a series of processing
     * steps where the sequence of steps is not known at design time and can vary for each message.
     *
     * @param header  is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
     *                class will look in for the list of URIs to route the message to.
     * @param uriDelimiter  is the delimiter that will be used to split up
     *                      the list of URIs in the routing slip.
     * @param ignoreInvalidEndpoints if this parameter is true, routingSlip will ignore the endpoints which
     *                               cannot be resolved or a producer cannot be created or started 
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type routingSlip(String header, String uriDelimiter, boolean ignoreInvalidEndpoints) {
        RoutingSlipDefinition answer = new RoutingSlipDefinition(header, uriDelimiter);
        answer.setIgnoreInvalidEndpoints(ignoreInvalidEndpoints);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/routing-slip.html">Routing Slip EIP:</a>
     * Creates a routing slip allowing you to route a message consecutively through a series of processing
     * steps where the sequence of steps is not known at design time and can vary for each message.
     * <p>
     * The list of URIs will be split based on the default delimiter {@link RoutingSlipDefinition#DEFAULT_DELIMITER}
     *
     * @param header  is the header that the {@link org.apache.camel.processor.RoutingSlip RoutingSlip}
     *                class will look in for the list of URIs to route the message to.
     * @param ignoreInvalidEndpoints if this parameter is true, routingSlip will ignore the endpoints which
     *                               cannot be resolved or a producer cannot be created or started 
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type routingSlip(String header, boolean ignoreInvalidEndpoints) {
        RoutingSlipDefinition answer = new RoutingSlipDefinition(header);
        answer.setIgnoreInvalidEndpoints(ignoreInvalidEndpoints);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/sampling.html">Sampling Throttler</a>
     * Creates a sampling throttler allowing you to extract a sample of
     * exchanges from the traffic on a route. It is configured with a sampling
     * period, during which only a single exchange is allowed to pass through.
     * All other exchanges will be stopped.
     * <p/>
     * Default period is one second.
     *
     * @return the builder
     */
    public SamplingDefinition sample() {
        return sample(1, TimeUnit.SECONDS);
    }

    /**
     * <a href="http://camel.apache.org/sampling.html">Sampling Throttler</a>
     * Creates a sampling throttler allowing you to extract a sample of exchanges
     * from the traffic through a route. It is configured with a sampling period
     * during which only a single exchange is allowed to pass through.
     * All other exchanges will be stopped.
     *
     * @param samplePeriod this is the sample interval, only one exchange is
     *            allowed through in this interval
     * @param unit this is the units for the samplePeriod e.g. Seconds
     * @return the builder
     */
    public SamplingDefinition sample(long samplePeriod, TimeUnit unit) {
        SamplingDefinition answer = new SamplingDefinition(samplePeriod, unit);
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a>
     * Creates a splitter allowing you split a message into a number of pieces and process them individually.
     * <p>
     * This splitter responds with the latest message returned from destination
     * endpoint.
     *
     * @return the expression clause builder for the expression on which to split
     */
    public ExpressionClause<SplitDefinition> split() {
        SplitDefinition answer = new SplitDefinition();
        addOutput(answer);
        return ExpressionClause.createAndSetExpression(answer);
    }

    /**
     * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a>
     * Creates a splitter allowing you split a message into a number of pieces and process them individually.
     * <p>
     * This splitter responds with the latest message returned from destination
     * endpoint.
     *
     * @param expression  the expression on which to split the message
     * @return the builder
     */
    public SplitDefinition split(Expression expression) {
        SplitDefinition answer = new SplitDefinition(expression);
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a>
     * Creates a splitter allowing you split a message into a number of pieces and process them individually.
     * <p>
     * The splitter responds with the answer produced by the given {@link AggregationStrategy}.
     *
     * @param expression  the expression on which to split
     * @param aggregationStrategy  the strategy used to aggregate responses for every part
     * @return the builder
     */
    public SplitDefinition split(Expression expression, AggregationStrategy aggregationStrategy) {
        SplitDefinition answer = new SplitDefinition(expression);
        addOutput(answer);
        answer.setAggregationStrategy(aggregationStrategy);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
     * Creates a resequencer allowing you to reorganize messages based on some comparator.
     *
     * @return the expression clause for the expressions on which to compare messages in order
     */
    public ExpressionClause<ResequenceDefinition> resequence() {
        ResequenceDefinition answer = new ResequenceDefinition();
        addOutput(answer);
        ExpressionClause<ResequenceDefinition> clause = new ExpressionClause<ResequenceDefinition>(answer);
        answer.expression(clause);
        return clause;
    }

    /**
     * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
     * Creates a resequencer allowing you to reorganize messages based on some comparator.
     *
     * @param expression the expression on which to compare messages in order
     * @return the builder
     */
    public ResequenceDefinition resequence(Expression expression) {
        return resequence(Collections.<Expression>singletonList(expression));
    }

    /**
     * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
     * Creates a resequencer allowing you to reorganize messages based on some comparator.
     *
     * @param expressions the list of expressions on which to compare messages in order
     * @return the builder
     */
    public ResequenceDefinition resequence(List<Expression> expressions) {
        ResequenceDefinition answer = new ResequenceDefinition(expressions);
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/resequencer.html">Resequencer EIP:</a>
     * Creates a splitter allowing you to reorganise messages based on some comparator.
     *
     * @param expressions the list of expressions on which to compare messages in order
     * @return the builder
     */
    public ResequenceDefinition resequence(Expression... expressions) {
        List<Expression> list = new ArrayList<Expression>();
        list.addAll(Arrays.asList(expressions));
        return resequence(list);
    }

    /**
     * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
     * Creates an aggregator allowing you to combine a number of messages together into a single message.
     *
     * @return the expression clause to be used as builder to configure the correlation expression
     */
    public ExpressionClause<AggregateDefinition> aggregate() {
        AggregateDefinition answer = new AggregateDefinition();
        addOutput(answer);
        return answer.createAndSetExpression();
    }

    /**
     * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
     * Creates an aggregator allowing you to combine a number of messages together into a single message.
     *
     * @param aggregationStrategy the strategy used for the aggregation
     * @return the expression clause to be used as builder to configure the correlation expression
     */
    public ExpressionClause<AggregateDefinition> aggregate(AggregationStrategy aggregationStrategy) {
        AggregateDefinition answer = new AggregateDefinition();
        answer.setAggregationStrategy(aggregationStrategy);
        addOutput(answer);
        return answer.createAndSetExpression();
    }

    /**
     * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
     * Creates an aggregator allowing you to combine a number of messages together into a single message.
     *
     * @param correlationExpression the expression used to calculate the
     *                              correlation key. For a JMS message this could be the
     *                              expression <code>header("JMSDestination")</code> or
     *                              <code>header("JMSCorrelationID")</code>
     * @return the builder
     */
    public AggregateDefinition aggregate(Expression correlationExpression) {
        AggregateDefinition answer = new AggregateDefinition(correlationExpression);
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/aggregator.html">Aggregator EIP:</a>
     * Creates an aggregator allowing you to combine a number of messages together into a single message.
     *
     * @param correlationExpression the expression used to calculate the
     *                              correlation key. For a JMS message this could be the
     *                              expression <code>header("JMSDestination")</code> or
     *                              <code>header("JMSCorrelationID")</code>
     * @param aggregationStrategy the strategy used for the aggregation
     * @return the builder
     */
    public AggregateDefinition aggregate(Expression correlationExpression, AggregationStrategy aggregationStrategy) {
        AggregateDefinition answer = new AggregateDefinition(correlationExpression, aggregationStrategy);
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a>
     * Creates a delayer allowing you to delay the delivery of messages to some destination.
     *
     * @param delay  an expression to calculate the delay time in millis
     * @return the builder
     */
    public DelayDefinition delay(Expression delay) {
        DelayDefinition answer = new DelayDefinition(delay);
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a>
     * Creates a delayer allowing you to delay the delivery of messages to some destination.
     *
     * @return the expression clause to create the expression
     */
    public ExpressionClause<DelayDefinition> delay() {
        DelayDefinition answer = new DelayDefinition();
        addOutput(answer);
        return ExpressionClause.createAndSetExpression(answer);
    }

    /**
     * <a href="http://camel.apache.org/delayer.html">Delayer EIP:</a>
     * Creates a delayer allowing you to delay the delivery of messages to some destination.
     *
     * @param delay  the delay in millis
     * @return the builder
     */
    public DelayDefinition delay(long delay) {
        return delay(ExpressionBuilder.constantExpression(delay));
    }

    /**
     * <a href="http://camel.apache.org/throttler.html">Throttler EIP:</a>
     * Creates a throttler allowing you to ensure that a specific endpoint does not get overloaded,
     * or that we don't exceed an agreed SLA with some external service.
     * <p/>
     * Will default use a time period of 1 second, so setting the maximumRequestCount to eg 10
     * will default ensure at most 10 messages per second. 
     *
     * @param maximumRequestCount  the maximum messages 
     * @return the builder
     */
    public ThrottleDefinition throttle(long maximumRequestCount) {
        ThrottleDefinition answer = new ThrottleDefinition(maximumRequestCount);
        addOutput(answer);
        return answer;
    }

    /**
     * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
     * Creates a loop allowing to process the a message a number of times and possibly process them
     * in a different way. Useful mostly for testing.
     *
     * @return the clause used to create the loop expression
     */
    public ExpressionClause<LoopDefinition> loop() {
        LoopDefinition loop = new LoopDefinition();
        addOutput(loop);
        return ExpressionClause.createAndSetExpression(loop);
    }

    /**
     * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
     * Creates a loop allowing to process the a message a number of times and possibly process them
     * in a different way. Useful mostly for testing.
     *
     * @param expression the loop expression
     * @return the builder
     */
    public LoopDefinition loop(Expression expression) {
        LoopDefinition loop = getNodeFactory().createLoop();
        loop.setExpression(expression);
        addOutput(loop);
        return loop;
    }

    /**
     * <a href="http://camel.apache.org/loop.html">Loop EIP:</a>
     * Creates a loop allowing to process the a message a number of times and possibly process them
     * in a different way. Useful mostly for testing.
     *
     * @param count  the number of times
     * @return the builder
     */
    public LoopDefinition loop(int count) {
        LoopDefinition loop = getNodeFactory().createLoop();
        loop.setExpression(new ConstantExpression(Integer.toString(count)));
        addOutput(loop);
        return loop;
    }

    /**
     * Sets the exception on the {@link org.apache.camel.Exchange}
     *
     * @param exception the exception to throw
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type throwException(Exception exception) {
        ThrowExceptionDefinition answer = new ThrowExceptionDefinition();
        answer.setException(exception);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * Marks the exchange for rollback only.
     * <p/>
     * Does <b>not</b> set any exception as opposed to {@link #rollback()} methods.
     *
     * @return the builder
     * @see #rollback()
     * @see #rollback(String)
     * @see #markRollbackOnlyLast()
     */
    @SuppressWarnings("unchecked")
    public Type markRollbackOnly() {
        RollbackDefinition answer = new RollbackDefinition();
        answer.setMarkRollbackOnly(true);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * Marks the exchange for rollback only, but only for the last (current) transaction.
     * <p/>
     * A last rollback is used when you have nested transactions and only want the last local transaction to rollback,
     * where as the outer transaction can still be completed
     * <p/>
     * Does <b>not</b> set any exception as opposed to {@link #rollback()} methods.
     *
     * @return the builder
     * @see #rollback()
     * @see #rollback(String)
     * @see #markRollbackOnly()
     */
    @SuppressWarnings("unchecked")
    public Type markRollbackOnlyLast() {
        RollbackDefinition answer = new RollbackDefinition();
        answer.setMarkRollbackOnlyLast(true);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * Marks the exchange for rollback only and sets an exception with a default message.
     * <p/>
     * This is done by setting a {@link org.apache.camel.RollbackExchangeException} on the Exchange
     * and mark it for rollback.
     *
     * @return the builder
     * @see #markRollbackOnly()
     */
    public Type rollback() {
        return rollback(null);
    }

    /**
     * Marks the exchange for rollback and sets an exception with the provided message.
     * <p/>
     * This is done by setting a {@link org.apache.camel.RollbackExchangeException} on the Exchange
     * and mark it for rollback.
     *
     * @param message an optional message used for logging purpose why the rollback was triggered
     * @return the builder
     * @see #markRollbackOnly()
     */
    @SuppressWarnings("unchecked")
    public Type rollback(String message) {
        RollbackDefinition answer = new RollbackDefinition(message);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
     * Sends messages to all its child outputs; so that each processor and
     * destination gets a copy of the original message to avoid the processors
     * interfering with each other using {@link ExchangePattern#InOnly}.
     *
     * @param uri  the destination
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type wireTap(String uri) {
        WireTapDefinition answer = new WireTapDefinition();
        answer.setUri(uri);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
     * Sends messages to all its child outputs; so that each processor and
     * destination gets a copy of the original message to avoid the processors
     * interfering with each other using {@link ExchangePattern#InOnly}.
     *
     * @param uri  the destination
     * @param      executorService a custom {@link ExecutorService} to use as thread pool
     *             for sending tapped exchanges
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type wireTap(String uri, ExecutorService executorService) {
        WireTapDefinition answer = new WireTapDefinition();
        answer.setUri(uri);
        answer.setExecutorService(executorService);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
     * Sends messages to all its child outputs; so that each processor and
     * destination gets a copy of the original message to avoid the processors
     * interfering with each other using {@link ExchangePattern#InOnly}.
     *
     * @param uri  the destination
     * @param      executorServiceRef reference to lookup a custom {@link ExecutorService}
     *             to use as thread pool for sending tapped exchanges
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type wireTap(String uri, String executorServiceRef) {
        WireTapDefinition answer = new WireTapDefinition();
        answer.setUri(uri);
        answer.setExecutorServiceRef(executorServiceRef);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
     * Sends a new {@link org.apache.camel.Exchange} to the destination
     * using {@link ExchangePattern#InOnly}.
     * <p/>
     * Will use a copy of the original Exchange which is passed in as argument
     * to the given expression
     *
     * @param uri  the destination
     * @param body expression that creates the body to send
     * @return the builder
     */
    public Type wireTap(String uri, Expression body) {
        return wireTap(uri, true, body);
    }

    /**
     * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
     * Sends a new {@link org.apache.camel.Exchange} to the destination
     * using {@link ExchangePattern#InOnly}.
     *
     * @param uri  the destination
     * @param copy whether or not use a copy of the original exchange or a new empty exchange
     * @param body expression that creates the body to send
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type wireTap(String uri, boolean copy, Expression body) {
        WireTapDefinition answer = new WireTapDefinition();
        answer.setUri(uri);
        answer.setCopy(copy);
        answer.setNewExchangeExpression(body);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
     * Sends a new {@link org.apache.camel.Exchange} to the destination
     * using {@link ExchangePattern#InOnly}.
     * <p/>
     * Will use a copy of the original Exchange which is passed in as argument
     * to the given processor
     *
     * @param uri  the destination
     * @param processor  processor preparing the new exchange to send
     * @return the builder
     */
    public Type wireTap(String uri, Processor processor) {
        return wireTap(uri, true, processor);
    }

    /**
     * <a href="http://camel.apache.org/wiretap.html">WireTap EIP:</a>
     * Sends a new {@link org.apache.camel.Exchange} to the destination
     * using {@link ExchangePattern#InOnly}.
     *
     * @param uri  the destination
     * @param copy whether or not use a copy of the original exchange or a new empty exchange
     * @param processor  processor preparing the new exchange to send
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type wireTap(String uri, boolean copy, Processor processor) {
        WireTapDefinition answer = new WireTapDefinition();
        answer.setUri(uri);
        answer.setCopy(copy);
        answer.setNewExchangeProcessor(processor);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * Pushes the given block on the stack as current block
     * @param block  the block
     */
    void pushBlock(Block block) {
        blocks.add(block);
    }

    /**
     * Pops the block off the stack as current block
     * @return the block
     */
    Block popBlock() {
        return blocks.isEmpty() ? null : blocks.removeLast();
    }

    /**
     * Stops continue routing the current {@link org.apache.camel.Exchange} and marks it as completed.
     *
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type stop() {
        StopDefinition stop = new StopDefinition();
        addOutput(stop);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/exception-clause.html">Exception clause</a>
     * for cathing certain exceptions and handling them.
     *
     * @param exceptionType  the exception to catch
     * @return the exception builder to configure
     */
    public OnExceptionDefinition onException(Class exceptionType) {
        OnExceptionDefinition answer = new OnExceptionDefinition(exceptionType);
        addOutput(answer);
        return answer;
    }

    /**
     * Apply a {@link Policy}.
     * <p/>
     * Policy can be used for transactional policies.
     *
     * @param policy  the policy to apply
     * @return the policy builder to configure
     */
    public PolicyDefinition policy(Policy policy) {
        PolicyDefinition answer = new PolicyDefinition(policy);
        addOutput(answer);
        return answer;
    }

    /**
     * Apply a {@link Policy}.
     * <p/>
     * Policy can be used for transactional policies.
     *
     * @param ref  reference to lookup a policy in the registry
     * @return the policy builder to configure
     */
    public PolicyDefinition policy(String ref) {
        PolicyDefinition answer = new PolicyDefinition();
        answer.setRef(ref);
        addOutput(answer);
        return answer;
    }

    /**
     * Marks this route as transacted and uses the default transacted policy found in the registry.
     *
     * @return the policy builder to configure
     */
    public PolicyDefinition transacted() {
        PolicyDefinition answer = new PolicyDefinition();
        answer.setType(TransactedPolicy.class);
        addOutput(answer);
        return answer;
    }

    /**
     * Marks this route as transacted.
     *
     * @param ref  reference to lookup a transacted policy in the registry
     * @return the policy builder to configure
     */
    public PolicyDefinition transacted(String ref) {
        PolicyDefinition answer = new PolicyDefinition();
        answer.setType(TransactedPolicy.class);
        answer.setRef(ref);
        addOutput(answer);
        return answer;
    }

    // Transformers
    // -------------------------------------------------------------------------

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds the custom processor to this destination which could be a final
     * destination, or could be a transformation in a pipeline
     *
     * @param processor  the custom {@link Processor}
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type process(Processor processor) {
        ProcessDefinition answer = new ProcessDefinition(processor);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds the custom processor reference to this destination which could be a final
     * destination, or could be a transformation in a pipeline
     *
     * @param ref   reference to a {@link Processor} to lookup in the registry
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type processRef(String ref) {
        ProcessDefinition answer = new ProcessDefinition();
        answer.setRef(ref);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
     *
     * @param bean  the bean to invoke
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type bean(Object bean) {
        BeanDefinition answer = new BeanDefinition();
        answer.setBean(bean);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
     *
     * @param bean  the bean to invoke
     * @param method  the method name to invoke on the bean (can be used to avoid ambiguty)
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type bean(Object bean, String method) {
        BeanDefinition answer = new BeanDefinition();
        answer.setBean(bean);
        answer.setMethod(method);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
     *
     * @param  beanType  the bean class, Camel will instantiate an object at runtime
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type bean(Class beanType) {
        BeanDefinition answer = new BeanDefinition();
        answer.setBeanType(beanType);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
     *
     * @param  beanType  the bean class, Camel will instantiate an object at runtime
     * @param method  the method name to invoke on the bean (can be used to avoid ambiguty)
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type bean(Class beanType, String method) {
        BeanDefinition answer = new BeanDefinition();
        answer.setBeanType(beanType);
        answer.setMethod(method);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
     *
     * @param ref  reference to a bean to lookup in the registry
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type beanRef(String ref) {
        BeanDefinition answer = new BeanDefinition(ref);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds a bean which is invoked which could be a final destination, or could be a transformation in a pipeline
     *
     * @param ref  reference to a bean to lookup in the registry
     * @param method  the method name to invoke on the bean (can be used to avoid ambiguty)
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type beanRef(String ref, String method) {
        BeanDefinition answer = new BeanDefinition(ref, method);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds a processor which sets the body on the IN message
     *
     * @return a expression builder clause to set the body
     */
    public ExpressionClause<ProcessorDefinition<Type>> setBody() {
        ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
        SetBodyDefinition answer = new SetBodyDefinition(clause);
        addOutput(answer);
        return clause;
    }

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds a processor which sets the body on the IN message
     *
     * @param expression   the expression used to set the body
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type setBody(Expression expression) {
        SetBodyDefinition answer = new SetBodyDefinition(expression);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds a processor which sets the body on the OUT message
     *
     * @param expression   the expression used to set the body
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type transform(Expression expression) {
        TransformDefinition answer = new TransformDefinition(expression);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/message-translator.html">Message Translator EIP:</a>
     * Adds a processor which sets the body on the OUT message
     *
     * @return a expression builder clause to set the body
     */
    public ExpressionClause<ProcessorDefinition<Type>> transform() {
        ExpressionClause<ProcessorDefinition<Type>> clause = 
            new ExpressionClause<ProcessorDefinition<Type>>((ProcessorDefinition<Type>) this);
        TransformDefinition answer = new TransformDefinition(clause);
        addOutput(answer);
        return clause;
    }

    /**
     * Adds a processor which sets the body on the FAULT message
     *
     * @param expression   the expression used to set the body
     * @return the builder
     */
    public Type setFaultBody(Expression expression) {
        return process(ProcessorBuilder.setFaultBody(expression));
    }

    /**
     * Adds a processor which sets the header on the IN message
     *
     * @param name  the header name
     * @return a expression builder clause to set the header
     */
    public ExpressionClause<ProcessorDefinition<Type>> setHeader(String name) {
        ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
        SetHeaderDefinition answer = new SetHeaderDefinition(name, clause);
        addOutput(answer);
        return clause;
    }

    /**
     * Adds a processor which sets the header on the IN message
     *
     * @param name  the header name
     * @param expression  the expression used to set the header
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type setHeader(String name, Expression expression) {
        SetHeaderDefinition answer = new SetHeaderDefinition(name, expression);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * Adds a processor which sets the header on the OUT message
     *
     * @param name  the header name
     * @return a expression builder clause to set the header
     */
    public ExpressionClause<ProcessorDefinition<Type>> setOutHeader(String name) {
        ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
        SetOutHeaderDefinition answer = new SetOutHeaderDefinition(name, clause);
        addOutput(answer);
        return clause;
    }

    /**
     * Adds a processor which sets the header on the OUT message
     *
     * @param name  the header name
     * @param expression  the expression used to set the header
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type setOutHeader(String name, Expression expression) {
        SetOutHeaderDefinition answer = new SetOutHeaderDefinition(name, expression);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * Adds a processor which sets the header on the FAULT message
     *
     * @param name  the header name
     * @param expression  the expression used to set the header
     * @return the builder
     */
    public Type setFaultHeader(String name, Expression expression) {
        return process(ProcessorBuilder.setFaultHeader(name, expression));
    }

    /**
     * Adds a processor which sets the exchange property
     *
     * @param name  the property name
     * @param expression  the expression used to set the property
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type setProperty(String name, Expression expression) {
        SetPropertyDefinition answer = new SetPropertyDefinition(name, expression);
        addOutput(answer);
        return (Type) this;
    }


    /**
     * Adds a processor which sets the exchange property
     *
     * @param name  the property name
     * @return a expression builder clause to set the property
     */
    public ExpressionClause<ProcessorDefinition<Type>> setProperty(String name) {
        ExpressionClause<ProcessorDefinition<Type>> clause = new ExpressionClause<ProcessorDefinition<Type>>(this);
        SetPropertyDefinition answer = new SetPropertyDefinition(name, clause);
        addOutput(answer);
        return clause;
    }

    /**
     * Adds a processor which removes the header on the IN message
     *
     * @param name  the header name
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type removeHeader(String name) {
        RemoveHeaderDefinition answer = new RemoveHeaderDefinition(name);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * Adds a processor which removes the headers on the IN message
     *
     * @param pattern  a pattern to match header names to be removed
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type removeHeaders(String pattern) {
        RemoveHeadersDefinition answer = new RemoveHeadersDefinition(pattern);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * Adds a processor which removes the header on the FAULT message
     *
     * @param name  the header name
     * @return the builder
     */
    public Type removeFaultHeader(String name) {
        return process(ProcessorBuilder.removeFaultHeader(name));
    }

    /**
     * Adds a processor which removes the exchange property
     *
     * @param name  the property name
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type removeProperty(String name) {
        RemovePropertyDefinition answer = new RemovePropertyDefinition(name);
        addOutput(answer);
        return (Type) this;
    }

    /**
     * Converts the IN message body to the specified type
     *
     * @param type the type to convert to
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type convertBodyTo(Class type) {
        addOutput(new ConvertBodyDefinition(type));
        return (Type) this;
    }
    
    /**
     * Converts the IN message body to the specified type
     *
     * @param type the type to convert to
     * @param charset the charset to use by type converters (not all converters support specifc charset)
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type convertBodyTo(Class type, String charset) {
        addOutput(new ConvertBodyDefinition(type, charset));
        return (Type) this;
    }

    /**
     * Sorts the IN message body using the given comparator.
     * The IN body mut be convertable to {@link List}.
     *
     * @param comparator  the comparator to use for sorting
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type sortBody(Comparator comparator) {
        addOutput(new SortDefinition(body(), comparator));
        return (Type) this;
    }

    /**
     * Sorts the IN message body using a default sorting based on toString representation.
     * The IN body mut be convertable to {@link List}.
     *
     * @return the builder
     */
    public Type sortBody() {
        return sortBody(null);
    }

    /**
     * Sorts the expression using the given comparator
     *
     * @param expression  the expression, must be convertable to {@link List}
     * @param comparator  the comparator to use for sorting
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type sort(Expression expression, Comparator comparator) {
        addOutput(new SortDefinition(expression, comparator));
        return (Type) this;
    }

    /**
     * Sorts the expression using a default sorting based on toString representation. 
     *
     * @param expression  the expression, must be convertable to {@link List}
     * @return the builder
     */
    public Type sort(Expression expression) {
        return sort(expression, null);
    }

    /**
     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
     * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
     * 
     * @param resourceUri           URI of resource endpoint for obtaining additional data.
     * @param aggregationStrategy   aggregation strategy to aggregate input data and additional data.
     * @return the builder
     * @see org.apache.camel.processor.Enricher
     */
    @SuppressWarnings("unchecked")
    public Type enrich(String resourceUri, AggregationStrategy aggregationStrategy) {
        addOutput(new EnrichDefinition(aggregationStrategy, resourceUri));
        return (Type) this;
    }

    /**
     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
     * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
     * <p/>
     * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
     * to obatin the additional data, where as pollEnrich uses a polling consumer.
     *
     * @param resourceUri           URI of resource endpoint for obtaining additional data.
     * @return the builder
     * @see org.apache.camel.processor.Enricher
     */
    @SuppressWarnings("unchecked")
    public Type enrich(String resourceUri) {
        addOutput(new EnrichDefinition(resourceUri));
        return (Type) this;
    }

    /**
     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
     * enriches an exchange with additional data obtained from a <code>resourceUri</code>.
     * <p/>
     * The difference between this and {@link #pollEnrich(String)} is that this uses a producer
     * to obatin the additional data, where as pollEnrich uses a polling consumer.
     *
     * @param resourceRef            Reference of resource endpoint for obtaining additional data.
     * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
     * @return the builder
     * @see org.apache.camel.processor.Enricher
     */
    @SuppressWarnings("unchecked")
    public Type enrichRef(String resourceRef, String aggregationStrategyRef) {
        EnrichDefinition enrich = new EnrichDefinition();
        enrich.setResourceRef(resourceRef);
        enrich.setAggregationStrategyRef(aggregationStrategyRef);
        addOutput(enrich);
        return (Type) this;
    }

    /**
     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
     * <p/>
     * The difference between this and {@link #enrich(String)} is that this uses a consumer
     * to obatin the additional data, where as enrich uses a producer.
     * <p/>
     * This method will block until data is avialable, use the method with timeout if you do not
     * want to risk waiting a long time before data is available from the resourceUri.
     *
     * @param resourceUri           URI of resource endpoint for obtaining additional data.
     * @return the builder
     * @see org.apache.camel.processor.PollEnricher
     */
    @SuppressWarnings("unchecked")
    public Type pollEnrich(String resourceUri) {
        addOutput(new PollEnrichDefinition(null, resourceUri, 0));
        return (Type) this;
    }

    /**
     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
     * <p/>
     * The difference between this and {@link #enrich(String)} is that this uses a consumer
     * to obatin the additional data, where as enrich uses a producer.
     * <p/>
     * This method will block until data is avialable, use the method with timeout if you do not
     * want to risk waiting a long time before data is available from the resourceUri.
     *
     * @param resourceUri           URI of resource endpoint for obtaining additional data.
     * @param aggregationStrategy   aggregation strategy to aggregate input data and additional data.
     * @return the builder
     * @see org.apache.camel.processor.PollEnricher
     */
    @SuppressWarnings("unchecked")
    public Type pollEnrich(String resourceUri, AggregationStrategy aggregationStrategy) {
        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, 0));
        return (Type) this;
    }

    /**
     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
     * <p/>
     * The difference between this and {@link #enrich(String)} is that this uses a consumer
     * to obatin the additional data, where as enrich uses a producer.
     * <p/>
     * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
     * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
     * otherwise we use <tt>receive(timeout)</tt>.
     *
     * @param resourceUri           URI of resource endpoint for obtaining additional data.
     * @param timeout               timeout in millis to wait at most for data to be available.
     * @param aggregationStrategy   aggregation strategy to aggregate input data and additional data.
     * @return the builder
     * @see org.apache.camel.processor.PollEnricher
     */
    @SuppressWarnings("unchecked")
    public Type pollEnrich(String resourceUri, long timeout, AggregationStrategy aggregationStrategy) {
        addOutput(new PollEnrichDefinition(aggregationStrategy, resourceUri, timeout));
        return (Type) this;
    }

    /**
     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
     * <p/>
     * The difference between this and {@link #enrich(String)} is that this uses a consumer
     * to obatin the additional data, where as enrich uses a producer.
     * <p/>
     * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
     * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
     * otherwise we use <tt>receive(timeout)</tt>.
     *
     * @param resourceUri           URI of resource endpoint for obtaining additional data.
     * @param timeout               timeout in millis to wait at most for data to be available.
     * @return the builder
     * @see org.apache.camel.processor.PollEnricher
     */
    @SuppressWarnings("unchecked")
    public Type pollEnrich(String resourceUri, long timeout) {
        addOutput(new PollEnrichDefinition(null, resourceUri, timeout));
        return (Type) this;
    }

    /**
     * The <a href="http://camel.apache.org/content-enricher.html">Content Enricher EIP</a>
     * enriches an exchange with additional data obtained from a <code>resourceUri</code>
     * using a {@link org.apache.camel.PollingConsumer} to poll the endpoint.
     * <p/>
     * The difference between this and {@link #enrich(String)} is that this uses a consumer
     * to obatin the additional data, where as enrich uses a producer.
     * <p/>
     * The timeout controls which operation to use on {@link org.apache.camel.PollingConsumer}.
     * If timeout is negative, we use <tt>receive</tt>. If timeout is 0 then we use <tt>receiveNoWait</tt>
     * otherwise we use <tt>receive(timeout)</tt>.
     *
     * @param resourceRef            Reference of resource endpoint for obtaining additional data.
     * @param timeout                timeout in millis to wait at most for data to be available.
     * @param aggregationStrategyRef Reference of aggregation strategy to aggregate input data and additional data.
     * @return the builder
     * @see org.apache.camel.processor.PollEnricher
     */
    @SuppressWarnings("unchecked")
    public Type pollEnrichRef(String resourceRef, long timeout, String aggregationStrategyRef) {
        PollEnrichDefinition pollEnrich = new PollEnrichDefinition();
        pollEnrich.setResourceRef(resourceRef);
        pollEnrich.setTimeout(timeout);
        pollEnrich.setAggregationStrategyRef(aggregationStrategyRef);
        addOutput(pollEnrich);
        return (Type) this;
    }

    /**
     * Adds a onComplection {@link org.apache.camel.spi.Synchronization} hook that invoke this route as
     * a callback when the {@link org.apache.camel.Exchange} has finished being processed.
     * The hook invoke callbacks for either onComplete or onFailure.
     * <p/>
     * Will by default always trigger when the {@link org.apache.camel.Exchange} is complete
     * (either with success or failed).
     * <br/>
     * You can limit the callback to either onComplete or onFailure but invoking the nested
     * builder method.
     * <p/>
     * For onFailure the caused exception is stored as a property on the {@link org.apache.camel.Exchange}
     * with the key {@link org.apache.camel.Exchange#EXCEPTION_CAUGHT}.
     *
     * @return the builder
     */
    public OnCompletionDefinition onCompletion() {
        OnCompletionDefinition answer = new OnCompletionDefinition();
        // we must remove all existing on completion definition (as they are global)
        // and thus we are the only one as route scoped should override any global scoped
        answer.removeAllOnCompletionDefinition(this);
        popBlock();
        addOutput(answer);
        pushBlock(answer);
        return answer;
    }

    // DataFormat support
    // -------------------------------------------------------------------------

    /**
     * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
     * Unmarshals the in body using a {@link DataFormat} expression to define
     * the format of the input message and the output will be set on the out message body.
     *
     * @return the expression to create the {@link DataFormat}
     */
    public DataFormatClause<ProcessorDefinition<Type>> unmarshal() {
        return new DataFormatClause<ProcessorDefinition<Type>>(this, DataFormatClause.Operation.Unmarshal);
    }

    /**
     * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
     * Unmarshals the in body using the specified {@link DataFormat}
     * and sets the output on the out message body.
     *
     * @param dataFormatType  the dataformat
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type unmarshal(DataFormatDefinition dataFormatType) {
        addOutput(new UnmarshalDefinition(dataFormatType));
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
     * Unmarshals the in body using the specified {@link DataFormat}
     * and sets the output on the out message body.
     *
     * @param dataFormat  the dataformat
     * @return the builder
     */
    public Type unmarshal(DataFormat dataFormat) {
        return unmarshal(new DataFormatDefinition(dataFormat));
    }

    /**
     * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
     * Unmarshals the in body using the specified {@link DataFormat}
     * reference in the {@link org.apache.camel.spi.Registry} and sets
     * the output on the out message body.
     *
     * @param dataTypeRef  reference to a {@link DataFormat} to lookup in the registry
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type unmarshal(String dataTypeRef) {
        addOutput(new UnmarshalDefinition(dataTypeRef));
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
     * Marshals the in body using a {@link DataFormat} expression to define
     * the format of the output which will be added to the out body.
     *
     * @return the expression to create the {@link DataFormat}
     */
    public DataFormatClause<ProcessorDefinition<Type>> marshal() {
        return new DataFormatClause<ProcessorDefinition<Type>>(this, DataFormatClause.Operation.Marshal);
    }

    /**
     * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
     * Marshals the in body using the specified {@link DataFormat}
     * and sets the output on the out message body.
     *
     * @param dataFormatType  the dataformat
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type marshal(DataFormatDefinition dataFormatType) {
        addOutput(new MarshalDefinition(dataFormatType));
        return (Type) this;
    }

    /**
     * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
     * Marshals the in body using the specified {@link DataFormat}
     * and sets the output on the out message body.
     *
     * @param dataFormat  the dataformat
     * @return the builder
     */
    public Type marshal(DataFormat dataFormat) {
        return marshal(new DataFormatDefinition(dataFormat));
    }

    /**
     * <a href="http://camel.apache.org/data-format.html">DataFormat:</a>
     * Marshals the in body the specified {@link DataFormat}
     * reference in the {@link org.apache.camel.spi.Registry} and sets
     * the output on the out message body.
     *
     * @param dataTypeRef  reference to a {@link DataFormat} to lookup in the registry
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type marshal(String dataTypeRef) {
        addOutput(new MarshalDefinition(dataTypeRef));
        return (Type) this;
    }

    /**
     * Sets whether or not to inherit the configured error handler.
     * <br/>
     * The default value is <tt>true</tt>.
     * <p/>
     * You can use this to disable using the inherited error handler for a given
     * DSL such as a load balancer where you want to use a custom error handler strategy.
     *
     * @param inheritErrorHandler whether to not to inherit the error handler for this node
     * @return the builder
     */
    @SuppressWarnings("unchecked")
    public Type inheritErrorHandler(boolean inheritErrorHandler) {
        // set on last output
        int size = getOutputs().size();
        if (size == 0) {
            // if no outputs then configure this DSL
            setInheritErrorHandler(inheritErrorHandler);
        } else {
            // configure on last output as its the intended
            ProcessorDefinition output = getOutputs().get(size - 1);
            if (output != null) {
                output.setInheritErrorHandler(inheritErrorHandler);
            }
        }
        return (Type) this;
    }

    // Properties
    // -------------------------------------------------------------------------
    @XmlTransient
    public ProcessorDefinition getParent() {
        return parent;
    }

    public void setParent(ProcessorDefinition parent) {
        this.parent = parent;
    }

    @XmlTransient
    public ErrorHandlerBuilder getErrorHandlerBuilder() {
        if (errorHandlerBuilder == null) {
            errorHandlerBuilder = createErrorHandlerBuilder();
        }
        return errorHandlerBuilder;
    }

    /**
     * Sets the error handler to use with processors created by this builder
     */
    public void setErrorHandlerBuilder(ErrorHandlerBuilder errorHandlerBuilder) {
        this.errorHandlerBuilder = errorHandlerBuilder;
    }

    @XmlTransient
    public NodeFactory getNodeFactory() {
        if (nodeFactory == null) {
            nodeFactory = new NodeFactory();
        }
        return nodeFactory;
    }

    public void setNodeFactory(NodeFactory nodeFactory) {
        this.nodeFactory = nodeFactory;
    }

    @XmlTransient
    public List<InterceptStrategy> getInterceptStrategies() {
        return interceptStrategies;
    }

    public void addInterceptStrategy(InterceptStrategy strategy) {
        this.interceptStrategies.add(strategy);
    }

    public Boolean isInheritErrorHandler() {
        return inheritErrorHandler;
    }

    @XmlAttribute
    public void setInheritErrorHandler(Boolean inheritErrorHandler) {
        this.inheritErrorHandler = inheritErrorHandler;
    }

    /**
     * Returns a label to describe this node such as the expression if some kind of expression node
     */
    public String getLabel() {
        return "";
    }

}
