blob: 457820911e6ed053888ad659ccc67b6f3209ac42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;
import org.apache.camel.AggregationStrategy;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.AsyncProducer;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.ExtendedExchange;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.impl.engine.DefaultProducerCache;
import org.apache.camel.impl.engine.EmptyProducerCache;
import org.apache.camel.spi.EndpointUtilizationStatistics;
import org.apache.camel.spi.IdAware;
import org.apache.camel.spi.NormalizedEndpointUri;
import org.apache.camel.spi.ProducerCache;
import org.apache.camel.spi.RouteIdAware;
import org.apache.camel.support.AsyncProcessorConverterHelper;
import org.apache.camel.support.AsyncProcessorSupport;
import org.apache.camel.support.DefaultExchange;
import org.apache.camel.support.EventHelper;
import org.apache.camel.support.ExchangeHelper;
import org.apache.camel.support.MessageHelper;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.camel.support.ExchangeHelper.copyResultsPreservePattern;
/**
* A content enricher that enriches input data by first obtaining additional data from a <i>resource</i> represented by
* an endpoint <code>producer</code> and second by aggregating input data and additional data. Aggregation of input data
* and additional data is delegated to an {@link AggregationStrategy} object.
* <p/>
* Uses a {@link org.apache.camel.Producer} to obtain the additional data as opposed to {@link PollEnricher} that uses a
* {@link org.apache.camel.PollingConsumer}.
*
* @see PollEnricher
*/
public class Enricher extends AsyncProcessorSupport implements IdAware, RouteIdAware, CamelContextAware {
private static final Logger LOG = LoggerFactory.getLogger(Enricher.class);
private CamelContext camelContext;
private String id;
private String routeId;
private ProducerCache producerCache;
private final Expression expression;
private AggregationStrategy aggregationStrategy;
private boolean aggregateOnException;
private boolean shareUnitOfWork;
private int cacheSize;
private boolean ignoreInvalidEndpoint;
public Enricher(Expression expression) {
this.expression = expression;
}
@Override
public CamelContext getCamelContext() {
return camelContext;
}
@Override
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}
@Override
public String getId() {
return id;
}
@Override
public void setId(String id) {
this.id = id;
}
@Override
public String getRouteId() {
return routeId;
}
@Override
public void setRouteId(String routeId) {
this.routeId = routeId;
}
public Expression getExpression() {
return expression;
}
public EndpointUtilizationStatistics getEndpointUtilizationStatistics() {
return producerCache.getEndpointUtilizationStatistics();
}
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
public AggregationStrategy getAggregationStrategy() {
return aggregationStrategy;
}
public boolean isAggregateOnException() {
return aggregateOnException;
}
public void setAggregateOnException(boolean aggregateOnException) {
this.aggregateOnException = aggregateOnException;
}
public boolean isShareUnitOfWork() {
return shareUnitOfWork;
}
public void setShareUnitOfWork(boolean shareUnitOfWork) {
this.shareUnitOfWork = shareUnitOfWork;
}
public int getCacheSize() {
return cacheSize;
}
public void setCacheSize(int cacheSize) {
this.cacheSize = cacheSize;
}
public boolean isIgnoreInvalidEndpoint() {
return ignoreInvalidEndpoint;
}
public void setIgnoreInvalidEndpoint(boolean ignoreInvalidEndpoint) {
this.ignoreInvalidEndpoint = ignoreInvalidEndpoint;
}
/**
* Enriches the input data (<code>exchange</code>) by first obtaining additional data from an endpoint represented
* by an endpoint <code>producer</code> and second by aggregating input data and additional data. Aggregation of
* input data and additional data is delegated to an {@link AggregationStrategy} object set at construction time. If
* the message exchange with the resource endpoint fails then no aggregation will be done and the failed exchange
* content is copied over to the original message exchange.
*
* @param exchange input data.
*/
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
// which producer to use
final AsyncProducer producer;
final Endpoint endpoint;
// use dynamic endpoint so calculate the endpoint to use
Object recipient = null;
boolean prototype = cacheSize < 0;
try {
recipient = expression.evaluate(exchange, Object.class);
recipient = prepareRecipient(exchange, recipient);
Endpoint existing = getExistingEndpoint(exchange, recipient);
if (existing == null) {
endpoint = resolveEndpoint(exchange, recipient, prototype);
} else {
endpoint = existing;
// we have an existing endpoint then its not a prototype scope
prototype = false;
}
// acquire the producer from the cache
producer = producerCache.acquireProducer(endpoint);
} catch (Throwable e) {
if (isIgnoreInvalidEndpoint()) {
LOG.debug("Endpoint uri is invalid: {}. This exception will be ignored.", recipient, e);
} else {
exchange.setException(e);
}
callback.done(true);
return true;
}
final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
final Endpoint destination = producer.getEndpoint();
StopWatch sw = null;
boolean sending = EventHelper.notifyExchangeSending(exchange.getContext(), resourceExchange, destination);
if (sending) {
sw = new StopWatch();
}
// record timing for sending the exchange using the producer
final StopWatch watch = sw;
final boolean prototypeEndpoint = prototype;
AsyncProcessor ap = AsyncProcessorConverterHelper.convert(producer);
boolean sync = ap.process(resourceExchange, new AsyncCallback() {
public void done(boolean doneSync) {
// we only have to handle async completion
if (doneSync) {
return;
}
// emit event that the exchange was sent to the endpoint
if (watch != null) {
long timeTaken = watch.taken();
EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken);
}
if (!isAggregateOnException() && resourceExchange.isFailed()) {
// copy resource exchange onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, resourceExchange);
} else {
prepareResult(exchange);
try {
// prepare the exchanges for aggregation
ExchangeHelper.prepareAggregation(exchange, resourceExchange);
MessageHelper.resetStreamCache(exchange.getIn());
Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
if (aggregatedExchange != null) {
// copy aggregation result onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
}
} catch (Throwable e) {
// if the aggregationStrategy threw an exception, set it on the original exchange
exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
callback.done(false);
// we failed so break out now
return;
}
}
// set property with the uri of the endpoint enriched so we can use that for tracing etc
exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
// return the producer back to the cache
try {
producerCache.releaseProducer(endpoint, producer);
} catch (Exception e) {
// ignore
}
// and stop prototype endpoints
if (prototypeEndpoint) {
ServiceHelper.stopAndShutdownService(endpoint);
}
callback.done(false);
}
});
if (!sync) {
if (LOG.isTraceEnabled()) {
LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
}
// the remainder of the routing slip will be completed async
// so we break out now, then the callback will be invoked which then continue routing from where we left here
return false;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
}
if (watch != null) {
// emit event that the exchange was sent to the endpoint
long timeTaken = watch.taken();
EventHelper.notifyExchangeSent(resourceExchange.getContext(), resourceExchange, destination, timeTaken);
}
if (!isAggregateOnException() && resourceExchange.isFailed()) {
// copy resource exchange onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, resourceExchange);
} else {
prepareResult(exchange);
try {
// prepare the exchanges for aggregation
ExchangeHelper.prepareAggregation(exchange, resourceExchange);
MessageHelper.resetStreamCache(exchange.getIn());
Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
if (aggregatedExchange != null) {
// copy aggregation result onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
}
} catch (Throwable e) {
// if the aggregationStrategy threw an exception, set it on the original exchange
exchange.setException(new CamelExchangeException("Error occurred during aggregation", exchange, e));
callback.done(true);
// we failed so break out now
return true;
}
}
// set property with the uri of the endpoint enriched so we can use that for tracing etc
exchange.setProperty(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
// return the producer back to the cache
try {
producerCache.releaseProducer(endpoint, producer);
} catch (Exception e) {
// ignore
}
// and stop prototype endpoints
if (prototypeEndpoint) {
ServiceHelper.stopAndShutdownService(endpoint);
}
callback.done(true);
return true;
}
protected static Object prepareRecipient(Exchange exchange, Object recipient) throws NoTypeConversionAvailableException {
if (recipient instanceof Endpoint || recipient instanceof NormalizedEndpointUri) {
return recipient;
} else if (recipient instanceof String) {
// trim strings as end users might have added spaces between separators
recipient = ((String) recipient).trim();
}
if (recipient != null) {
ExtendedCamelContext ecc = (ExtendedCamelContext) exchange.getContext();
String uri;
if (recipient instanceof String) {
uri = (String) recipient;
} else {
// convert to a string type we can work with
uri = ecc.getTypeConverter().mandatoryConvertTo(String.class, exchange, recipient);
}
// optimize and normalize endpoint
return ecc.normalizeUri(uri);
}
return null;
}
protected static Endpoint getExistingEndpoint(Exchange exchange, Object recipient) {
if (recipient instanceof Endpoint) {
return (Endpoint) recipient;
}
if (recipient != null) {
if (recipient instanceof NormalizedEndpointUri) {
NormalizedEndpointUri nu = (NormalizedEndpointUri) recipient;
ExtendedCamelContext ecc = exchange.getContext().adapt(ExtendedCamelContext.class);
return ecc.hasEndpoint(nu);
} else {
String uri = recipient.toString();
return exchange.getContext().hasEndpoint(uri);
}
}
return null;
}
protected static Endpoint resolveEndpoint(Exchange exchange, Object recipient, boolean prototype) {
return prototype
? ExchangeHelper.resolvePrototypeEndpoint(exchange, recipient)
: ExchangeHelper.resolveEndpoint(exchange, recipient);
}
/**
* Creates a new {@link DefaultExchange} instance from the given <code>exchange</code>. The resulting exchange's
* pattern is defined by <code>pattern</code>.
*
* @param source exchange to copy from.
* @param pattern exchange pattern to set.
* @return created exchange.
*/
protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) {
// copy exchange, and do not share the unit of work
Exchange target = ExchangeHelper.createCorrelatedCopy(source, false);
target.setPattern(pattern);
// if we share unit of work, we need to prepare the resource exchange
if (isShareUnitOfWork()) {
target.setProperty(Exchange.PARENT_UNIT_OF_WORK, source.getUnitOfWork());
// and then share the unit of work
target.adapt(ExtendedExchange.class).setUnitOfWork(source.getUnitOfWork());
}
return target;
}
private static void prepareResult(Exchange exchange) {
if (exchange.getPattern().isOutCapable()) {
exchange.getOut().copyFrom(exchange.getIn());
}
}
private static AggregationStrategy defaultAggregationStrategy() {
return new CopyAggregationStrategy();
}
@Override
public String toString() {
return id;
}
@Override
protected void doStart() throws Exception {
if (aggregationStrategy == null) {
aggregationStrategy = defaultAggregationStrategy();
}
if (aggregationStrategy instanceof CamelContextAware) {
((CamelContextAware) aggregationStrategy).setCamelContext(camelContext);
}
if (producerCache == null) {
if (cacheSize < 0) {
producerCache = new EmptyProducerCache(this, camelContext);
LOG.debug("Enricher {} is not using ProducerCache", this);
} else {
producerCache = new DefaultProducerCache(this, camelContext, cacheSize);
LOG.debug("Enricher {} using ProducerCache with cacheSize={}", this, cacheSize);
}
}
ServiceHelper.startService(producerCache, aggregationStrategy);
}
@Override
protected void doStop() throws Exception {
ServiceHelper.stopService(aggregationStrategy, producerCache);
}
private static class CopyAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (newExchange != null) {
copyResultsPreservePattern(oldExchange, newExchange);
}
return oldExchange;
}
}
}