blob: fe889cb07f7002f116d1ce6d5bad8a9362a41e95 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Producer;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
/**
* A content enricher that enriches input data by first obtaining additional
* data from a <i>resource</i> represented by an endpoint <code>producer</code>
* and second by aggregating input data and additional data. Aggregation of
* input data and additional data is delegated to an {@link AggregationStrategy}
* object.
* <p/>
* Uses a {@link org.apache.camel.Producer} to obtain the additional data as opposed to {@link PollEnricher}
* that uses a {@link org.apache.camel.PollingConsumer}.
*
* @see PollEnricher
*/
public class Enricher extends ServiceSupport implements AsyncProcessor {
private static final transient Logger LOG = LoggerFactory.getLogger(Enricher.class);
private AggregationStrategy aggregationStrategy;
private Producer producer;
/**
* Creates a new {@link Enricher}. The default aggregation strategy is to
* copy the additional data obtained from the enricher's resource over the
* input data. When using the copy aggregation strategy the enricher
* degenerates to a normal transformer.
*
* @param producer producer to resource endpoint.
*/
public Enricher(Producer producer) {
this(defaultAggregationStrategy(), producer);
}
/**
* Creates a new {@link Enricher}.
*
* @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
* @param producer producer to resource endpoint.
*/
public Enricher(AggregationStrategy aggregationStrategy, Producer producer) {
this.aggregationStrategy = aggregationStrategy;
this.producer = producer;
}
/**
* Sets the aggregation strategy for this enricher.
*
* @param aggregationStrategy the aggregationStrategy to set
*/
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
/**
* Sets the default aggregation strategy for this enricher.
*/
public void setDefaultAggregationStrategy() {
this.aggregationStrategy = defaultAggregationStrategy();
}
public void process(Exchange exchange) throws Exception {
AsyncProcessorHelper.process(this, exchange);
}
/**
* Enriches the input data (<code>exchange</code>) by first obtaining
* additional data from an endpoint represented by an endpoint
* <code>producer</code> and second by aggregating input data and additional
* data. Aggregation of input data and additional data is delegated to an
* {@link AggregationStrategy} object set at construction time. If the
* message exchange with the resource endpoint fails then no aggregation
* will be done and the failed exchange content is copied over to the
* original message exchange.
*
* @param exchange input data.
*/
public boolean process(final Exchange exchange, final AsyncCallback callback) {
final Exchange resourceExchange = createResourceExchange(exchange, ExchangePattern.InOut);
AsyncProcessor ap = AsyncProcessorTypeConverter.convert(producer);
boolean sync = AsyncProcessorHelper.process(ap, resourceExchange, new AsyncCallback() {
public void done(boolean doneSync) {
// we only have to handle async completion of the routing slip
if (doneSync) {
return;
}
if (resourceExchange.isFailed()) {
// copy resource exchange onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, resourceExchange);
} else {
prepareResult(exchange);
// prepare the exchanges for aggregation
ExchangeHelper.prepareAggregation(exchange, resourceExchange);
Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
if (aggregatedExchange != null) {
// copy aggregation result onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
}
}
// set header with the uri of the endpoint enriched so we can use that for tracing etc
if (exchange.hasOut()) {
exchange.getOut().setHeader(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
} else {
exchange.getIn().setHeader(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
}
callback.done(false);
}
});
if (!sync) {
LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId());
// the remainder of the routing slip will be completed async
// so we break out now, then the callback will be invoked which then continue routing from where we left here
return false;
}
LOG.trace("Processing exchangeId: {} is continued being processed synchronously", exchange.getExchangeId());
if (resourceExchange.isFailed()) {
// copy resource exchange onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, resourceExchange);
} else {
prepareResult(exchange);
// prepare the exchanges for aggregation
ExchangeHelper.prepareAggregation(exchange, resourceExchange);
Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
if (aggregatedExchange != null) {
// copy aggregation result onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
}
}
// set header with the uri of the endpoint enriched so we can use that for tracing etc
if (exchange.hasOut()) {
exchange.getOut().setHeader(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
} else {
exchange.getIn().setHeader(Exchange.TO_ENDPOINT, producer.getEndpoint().getEndpointUri());
}
callback.done(true);
return true;
}
/**
* Creates a new {@link DefaultExchange} instance from the given
* <code>exchange</code>. The resulting exchange's pattern is defined by
* <code>pattern</code>.
*
* @param source exchange to copy from.
* @param pattern exchange pattern to set.
* @return created exchange.
*/
protected Exchange createResourceExchange(Exchange source, ExchangePattern pattern) {
Exchange target = source.copy();
target.setPattern(pattern);
return target;
}
private static void prepareResult(Exchange exchange) {
if (exchange.getPattern().isOutCapable()) {
exchange.getOut().copyFrom(exchange.getIn());
}
}
private static AggregationStrategy defaultAggregationStrategy() {
return new CopyAggregationStrategy();
}
@Override
public String toString() {
return "Enrich[" + producer.getEndpoint().getEndpointUri() + "]";
}
protected void doStart() throws Exception {
ServiceHelper.startService(producer);
}
protected void doStop() throws Exception {
ServiceHelper.stopService(producer);
}
private static class CopyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (newExchange != null) {
copyResultsPreservePattern(oldExchange, newExchange);
}
return oldExchange;
}
}
}