blob: e7c893c6c54f650046da121c9f56726f43a6f7be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;
import java.util.Iterator;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.FailedToCreateProducerException;
import org.apache.camel.Message;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Route;
import org.apache.camel.Traceable;
import org.apache.camel.impl.engine.DefaultProducerCache;
import org.apache.camel.impl.engine.EmptyProducerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.ObjectHelper;
import org.apache.camel.support.builder.ExpressionBuilder;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.camel.processor.PipelineHelper.continueProcessing;
import static org.apache.camel.util.ObjectHelper.notNull;
/**
* Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a> pattern where the list of actual
* endpoints to send a message exchange to are dependent on the value of a message header.
* <p/>
* This implementation mirrors the logic from the {@link org.apache.camel.processor.Pipeline} in the async variation as
* the failover load balancer is a specialized pipeline. So the trick is to keep doing the same as the pipeline to
* ensure it works the same and the async routing engine is flawless.
*/
public class RoutingSlip extends AsyncProcessorSupport implements Traceable, IdAware, RouteIdAware {
private static final Logger LOG = LoggerFactory.getLogger(RoutingSlip.class);
protected String id;
protected String routeId;
protected ProducerCache producerCache;
protected int cacheSize;
protected boolean ignoreInvalidEndpoints;
protected String header;
protected Expression expression;
protected String uriDelimiter;
protected final CamelContext camelContext;
protected AsyncProcessor errorHandler;
/**
* The iterator to be used for retrieving the next routing slip(s) to be used.
*/
protected interface RoutingSlipIterator {
/**
* Are the more routing slip(s)?
*
* @param exchange the current exchange
* @return <tt>true</tt> if more slips, <tt>false</tt> otherwise.
*/
boolean hasNext(Exchange exchange);
/**
* Returns the next routing slip(s).
*
* @param exchange the current exchange
* @return the slip(s).
*/
Object next(Exchange exchange);
}
public RoutingSlip(CamelContext camelContext) {
notNull(camelContext, "camelContext");
this.camelContext = camelContext;
}
public RoutingSlip(CamelContext camelContext, Expression expression, String uriDelimiter) {
notNull(camelContext, "camelContext");
notNull(expression, "expression");
this.camelContext = camelContext;
this.expression = expression;
this.uriDelimiter = uriDelimiter;
this.header = null;
}
@Override
public String getId() {
return id;
}
@Override
public void setId(String id) {
this.id = id;
}
@Override
public String getRouteId() {
return routeId;
}
@Override
public void setRouteId(String routeId) {
this.routeId = routeId;
}
public Expression getExpression() {
return expression;
}
public String getUriDelimiter() {
return uriDelimiter;
}
public void setDelimiter(String delimiter) {
this.uriDelimiter = delimiter;
}
public boolean isIgnoreInvalidEndpoints() {
return ignoreInvalidEndpoints;
}
public void setIgnoreInvalidEndpoints(boolean ignoreInvalidEndpoints) {
this.ignoreInvalidEndpoints = ignoreInvalidEndpoints;
}
public int getCacheSize() {
return cacheSize;
}
public void setCacheSize(int cacheSize) {
this.cacheSize = cacheSize;
}
public AsyncProcessor getErrorHandler() {
return errorHandler;
}
public void setErrorHandler(AsyncProcessor errorHandler) {
this.errorHandler = errorHandler;
}
@Override
public String toString() {
return id;
}
@Override
public String getTraceLabel() {
return "routingSlip[" + expression + "]";
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
if (!isStarted()) {
exchange.setException(new IllegalStateException("RoutingSlip has not been started: " + this));
callback.done(true);
return true;
}
Expression exp = expression;
Object slip = exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT);
if (slip != null) {
if (slip instanceof Expression) {
exp = (Expression) slip;
} else {
exp = ExpressionBuilder.constantExpression(slip);
}
}
return doRoutingSlipWithExpression(exchange, exp, callback);
}
/**
* Creates the route slip iterator to be used.
*
* @param exchange the exchange
* @param expression the expression
* @return the iterator, should never be <tt>null</tt>
*/
protected RoutingSlipIterator createRoutingSlipIterator(final Exchange exchange, final Expression expression)
throws Exception {
Object slip = expression.evaluate(exchange, Object.class);
if (exchange.getException() != null) {
// force any exceptions occurred during evaluation to be thrown
throw exchange.getException();
}
final Iterator<?> delegate = ObjectHelper.createIterator(slip, uriDelimiter);
return new RoutingSlipIterator() {
public boolean hasNext(Exchange exchange) {
return delegate.hasNext();
}
public Object next(Exchange exchange) {
return delegate.next();
}
};
}
private boolean doRoutingSlipWithExpression(
final Exchange exchange, final Expression expression, final AsyncCallback originalCallback) {
Exchange current = exchange;
RoutingSlipIterator iter;
try {
iter = createRoutingSlipIterator(exchange, expression);
} catch (Exception e) {
exchange.setException(e);
originalCallback.done(true);
return true;
}
// ensure the slip is empty when we start
if (current.hasProperties()) {
current.setProperty(Exchange.SLIP_ENDPOINT, null);
}
while (iter.hasNext(current)) {
boolean prototype = cacheSize < 0;
Endpoint endpoint;
try {
Object recipient = iter.next(exchange);
recipient = prepareRecipient(exchange, recipient);
Endpoint existing = getExistingEndpoint(exchange, recipient);
if (existing == null) {
endpoint = resolveEndpoint(exchange, recipient, prototype);
} else {
endpoint = existing;
// we have an existing endpoint then its not a prototype scope
prototype = false;
}
// if no endpoint was resolved then try the next
if (endpoint == null) {
continue;
}
} catch (Exception e) {
// error resolving endpoint so we should break out
current.setException(e);
break;
}
//process and prepare the routing slip
boolean sync = processExchange(endpoint, current, exchange, originalCallback, iter, prototype);
current = prepareExchangeForRoutingSlip(current, endpoint);
if (!sync) {
if (LOG.isTraceEnabled()) {
LOG.trace("Processing exchangeId: {} is continued being processed asynchronously",
exchange.getExchangeId());
}
// the remainder of the routing slip will be completed async
// so we break out now, then the callback will be invoked which then continue routing from where we left here
return false;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
}
// we ignore some kind of exceptions and allow us to continue
if (isIgnoreInvalidEndpoints()) {
FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
if (e != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.", e);
}
current.setException(null);
}
}
// Decide whether to continue with the recipients or not; similar logic to the Pipeline
// check for error if so we should break out
if (!continueProcessing(current, "so breaking out of the routing slip", LOG)) {
break;
}
}
// logging nextExchange as it contains the exchange that might have altered the payload and since
// we are logging the completion if will be confusing if we log the original instead
// we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
if (LOG.isTraceEnabled()) {
LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), current);
}
// copy results back to the original exchange
ExchangeHelper.copyResults(exchange, current);
// okay we are completely done with the routing slip
// so we need to signal done on the original callback so it can continue
originalCallback.done(true);
return true;
}
protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) {
return recipient;
} else if (recipient instanceof String) {
// trim strings as end users might have added spaces between separators
recipient = ((String) recipient).trim();
}
if (recipient != null) {
ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
String uri;
if (recipient instanceof String) {
uri = (String) recipient;
} else {
// convert to a string type we can work with
uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
}
// optimize and normalize endpoint
return ecc.normalizeUri(uri);
}
return null;
}
protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) {
if (recipient instanceof Endpoint) {
return (Endpoint) recipient;
}
if (recipient != null) {
if (recipient instanceof NormalizedEndpointUri) {
NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
return ecc.hasEndpoint(nu);
} else {
String uri = recipient.toString();
return exchange.getContext().hasEndpoint(uri);
}
}
return null;
}
protected Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) throws Exception {
Endpoint endpoint = null;
try {
endpoint = prototype
? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient)
: ExchangeHelper.resolveEndpoint(exchange, recipient);
} catch (Exception e) {
if (isIgnoreInvalidEndpoints()) {
LOG.debug("Endpoint uri is invalid: " + recipient + ". This exception will be ignored.", e);
} else {
throw e;
}
}
return endpoint;
}
protected Exchange prepareExchangeForRoutingSlip(Exchange current, Endpoint endpoint) {
// we must use the same id as this is a snapshot strategy where Camel copies a snapshot
// before processing the next step in the pipeline, so we have a snapshot of the exchange
// just before. This snapshot is used if Camel should do redeliveries (re try) using
// DeadLetterChannel. That is why it's important the id is the same, as it is the *same*
// exchange being routed.
Exchange copy = ExchangeHelper.createCopy(current, true);
// prepare for next run
ExchangeHelper.prepareOutToIn(copy);
// ensure stream caching is reset
MessageHelper.resetStreamCache(copy.getIn());
return copy;
}
protected AsyncProcessor createErrorHandler(Route route, Exchange exchange, AsyncProcessor processor, Endpoint endpoint) {
AsyncProcessor answer = processor;
boolean tryBlock = exchange.getProperty(Exchange.TRY_ROUTE_BLOCK, false, boolean.class);
// do not wrap in error handler if we are inside a try block
if (!tryBlock && route != null && errorHandler != null) {
// wrap the producer in error handler so we have fine grained error handling on
// the output side instead of the input side
// this is needed to support redelivery on that output alone and not doing redelivery
// for the entire routingslip/dynamic-router block again which will start from scratch again
answer = errorHandler;
}
return answer;
}
protected boolean processExchange(
final Endpoint endpoint, final Exchange exchange, final Exchange original,
final AsyncCallback originalCallback, final RoutingSlipIterator iter, final boolean prototype) {
// this does the actual processing so log at trace level
if (LOG.isTraceEnabled()) {
LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
}
// routing slip callback which are used when
// - routing slip was routed asynchronously
// - and we are completely done with the routing slip
// so we need to signal done on the original callback so it can continue
AsyncCallback callback = doneSync -> {
if (!doneSync) {
originalCallback.done(false);
}
};
return producerCache.doInAsyncProducer(endpoint, exchange, callback, (p, ex, cb) -> {
// rework error handling to support fine grained error handling
Route route = ExchangeHelper.getRoute(ex);
AsyncProcessor target = createErrorHandler(route, ex, p, endpoint);
// set property which endpoint we send to and the producer that can do it
ex.setProperty(Exchange.TO_ENDPOINT, endpoint.getEndpointUri());
ex.setProperty(Exchange.SLIP_ENDPOINT, endpoint.getEndpointUri());
ex.setProperty(Exchange.SLIP_PRODUCER, p);
return target.process(ex, new AsyncCallback() {
public void done(boolean doneSync) {
// cleanup producer after usage
ex.removeProperty(Exchange.SLIP_PRODUCER);
// we only have to handle async completion of the routing slip
if (doneSync) {
// and stop prototype endpoints
if (prototype) {
ServiceHelper.stopAndShutdownService(endpoint);
}
cb.done(true);
return;
}
try {
// continue processing the routing slip asynchronously
Exchange current = prepareExchangeForRoutingSlip(ex, endpoint);
while (iter.hasNext(current)) {
// we ignore some kind of exceptions and allow us to continue
if (isIgnoreInvalidEndpoints()) {
FailedToCreateProducerException e = current.getException(FailedToCreateProducerException.class);
if (e != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Endpoint uri is invalid: " + endpoint + ". This exception will be ignored.",
e);
}
current.setException(null);
}
}
// Decide whether to continue with the recipients or not; similar logic to the Pipeline
// check for error if so we should break out
if (!continueProcessing(current, "so breaking out of the routing slip", LOG)) {
break;
}
Endpoint nextEndpoint;
boolean prototype = cacheSize < 0;
try {
Object recipient = iter.next(ex);
recipient = prepareRecipient(exchange, recipient);
Endpoint existing = getExistingEndpoint(exchange, recipient);
if (existing == null) {
nextEndpoint = resolveEndpoint(exchange, recipient, prototype);
} else {
nextEndpoint = existing;
// we have an existing endpoint then its not a prototype scope
prototype = false;
}
// if no endpoint was resolved then try the next
if (nextEndpoint == null) {
continue;
}
} catch (Exception e) {
// error resolving endpoint so we should break out
current.setException(e);
break;
}
// prepare and process the routing slip
final boolean prototypeEndpoint = prototype;
AsyncCallback cbNext = doneNext -> {
// and stop prototype endpoints
if (prototypeEndpoint) {
ServiceHelper.stopAndShutdownService(nextEndpoint);
}
cb.done(doneNext);
};
boolean sync = processExchange(nextEndpoint, current, original, cbNext, iter, prototype);
current = prepareExchangeForRoutingSlip(current, nextEndpoint);
if (!sync) {
if (LOG.isTraceEnabled()) {
LOG.trace("Processing exchangeId: {} is continued being processed asynchronously",
original.getExchangeId());
}
return;
}
}
// logging nextExchange as it contains the exchange that might have altered the payload and since
// we are logging the completion if will be confusing if we log the original instead
// we could also consider logging the original and the nextExchange then we have *before* and *after* snapshots
if (LOG.isTraceEnabled()) {
LOG.trace("Processing complete for exchangeId: {} >>> {}", original.getExchangeId(), current);
}
// copy results back to the original exchange
ExchangeHelper.copyResults(original, current);
} catch (Throwable e) {
ex.setException(e);
}
// okay we are completely done with the routing slip
// so we need to signal done on the original callback so it can continue
cb.done(false);
}
});
});
}
@Override
protected void doStart() throws Exception {
if (producerCache == null) {
if (cacheSize < 0) {
producerCache = new EmptyProducerCache(this, camelContext);
LOG.debug("RoutingSlip {} is not using ProducerCache", this);
} else {
producerCache = new DefaultProducerCache(this, camelContext, cacheSize);
LOG.debug("RoutingSlip {} using ProducerCache with cacheSize={}", this, cacheSize);
}
}
ServiceHelper.startService(producerCache, errorHandler);
}
@Override
protected void doStop() throws Exception {
ServiceHelper.stopService(producerCache, errorHandler);
}
@Override
protected void doShutdown() throws Exception {
ServiceHelper.stopAndShutdownServices(producerCache, errorHandler);
}
public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
return producerCache.getEndpointUtilizationStatistics();
}
/**
* Returns the outbound message if available. Otherwise return the inbound message.
*/
private Message getResultMessage(Exchange exchange) {
return exchange.getMessage();
}
/**
* Creates the embedded processor to use when wrapping this routing slip in an error handler.
*/
public AsyncProcessor newRoutingSlipProcessorForErrorHandler() {
return new RoutingSlipProcessor();
}
/**
* Embedded processor that routes to the routing slip that has been set via the exchange property
* {@link Exchange#SLIP_PRODUCER}.
*/
private final class RoutingSlipProcessor extends AsyncProcessorSupport {
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
AsyncProcessor producer = exchange.getProperty(Exchange.SLIP_PRODUCER, AsyncProcessor.class);
return producer.process(exchange, callback);
}
@Override
public String toString() {
return "RoutingSlipProcessor";
}
}
}