blob: ab9b29785398f2a09b937f5393638fc96e129975 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.camel.util.ExchangeHelper.copyResultsPreservePattern;
/**
* A content enricher that enriches input data by first obtaining additional
* data from a <i>resource</i> represented by an endpoint <code>producer</code>
* and second by aggregating input data and additional data. Aggregation of
* input data and additional data is delegated to an {@link org.apache.camel.processor.aggregate.AggregationStrategy}
* object.
* <p/>
* Uses a {@link org.apache.camel.PollingConsumer} to obtain the additional data as opposed to {@link Enricher}
* that uses a {@link org.apache.camel.Producer}.
*
* @see Enricher
*/
public class PollEnricher extends ServiceSupport implements Processor {
private static final transient Logger LOG = LoggerFactory.getLogger(PollEnricher.class);
private AggregationStrategy aggregationStrategy;
private PollingConsumer consumer;
private long timeout;
/**
* Creates a new {@link PollEnricher}. The default aggregation strategy is to
* copy the additional data obtained from the enricher's resource over the
* input data. When using the copy aggregation strategy the enricher
* degenerates to a normal transformer.
*
* @param consumer consumer to resource endpoint.
*/
public PollEnricher(PollingConsumer consumer) {
this(defaultAggregationStrategy(), consumer, 0);
}
/**
* Creates a new {@link PollEnricher}.
*
* @param aggregationStrategy aggregation strategy to aggregate input data and additional data.
* @param consumer consumer to resource endpoint.
* @param timeout timeout in millis
*/
public PollEnricher(AggregationStrategy aggregationStrategy, PollingConsumer consumer, long timeout) {
this.aggregationStrategy = aggregationStrategy;
this.consumer = consumer;
this.timeout = timeout;
}
/**
* Sets the aggregation strategy for this poll enricher.
*
* @param aggregationStrategy the aggregationStrategy to set
*/
public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
this.aggregationStrategy = aggregationStrategy;
}
/**
* Sets the default aggregation strategy for this poll enricher.
*/
public void setDefaultAggregationStrategy() {
this.aggregationStrategy = defaultAggregationStrategy();
}
/**
* Sets the timeout to use when polling.
* <p/>
* Use 0 or negative to not use timeout and block until data is available.
*
* @param timeout timeout in millis.
*/
public void setTimeout(long timeout) {
this.timeout = timeout;
}
/**
* Enriches the input data (<code>exchange</code>) by first obtaining
* additional data from an endpoint represented by an endpoint
* <code>producer</code> and second by aggregating input data and additional
* data. Aggregation of input data and additional data is delegated to an
* {@link org.apache.camel.processor.aggregate.AggregationStrategy} object set at construction time. If the
* message exchange with the resource endpoint fails then no aggregation
* will be done and the failed exchange content is copied over to the
* original message exchange.
*
* @param exchange input data.
*/
public void process(Exchange exchange) throws Exception {
preCheckPoll(exchange);
Exchange resourceExchange;
if (timeout < 0) {
LOG.debug("Consumer receive: {}", consumer);
resourceExchange = consumer.receive();
} else if (timeout == 0) {
LOG.debug("Consumer receiveNoWait: {}", consumer);
resourceExchange = consumer.receiveNoWait();
} else {
LOG.debug("Consumer receive with timeout: {} ms. {}", timeout, consumer);
resourceExchange = consumer.receive(timeout);
}
if (resourceExchange == null) {
LOG.debug("Consumer received no exchange");
} else {
LOG.debug("Consumer received: {}", resourceExchange);
}
if (resourceExchange != null && resourceExchange.isFailed()) {
// copy resource exchange onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, resourceExchange);
} else {
prepareResult(exchange);
// prepare the exchanges for aggregation
ExchangeHelper.prepareAggregation(exchange, resourceExchange);
Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
if (aggregatedExchange != null) {
// copy aggregation result onto original exchange (preserving pattern)
copyResultsPreservePattern(exchange, aggregatedExchange);
// handover any synchronization
if (resourceExchange != null) {
resourceExchange.handoverCompletions(exchange);
}
}
}
// set header with the uri of the endpoint enriched so we can use that for tracing etc
if (exchange.hasOut()) {
exchange.getOut().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
} else {
exchange.getIn().setHeader(Exchange.TO_ENDPOINT, consumer.getEndpoint().getEndpointUri());
}
}
/**
* Strategy to pre check polling.
* <p/>
* Is currently used to prevent doing poll enrich from a file based endpoint when the current route also
* started from a file based endpoint as that is not currently supported.
*
* @param exchange the current exchange
*/
protected void preCheckPoll(Exchange exchange) throws Exception {
// noop
}
private static void prepareResult(Exchange exchange) {
if (exchange.getPattern().isOutCapable()) {
exchange.getOut().copyFrom(exchange.getIn());
}
}
private static AggregationStrategy defaultAggregationStrategy() {
return new CopyAggregationStrategy();
}
@Override
public String toString() {
return "PollEnrich[" + consumer + "]";
}
protected void doStart() throws Exception {
ServiceHelper.startService(consumer);
}
protected void doStop() throws Exception {
ServiceHelper.stopService(consumer);
}
private static class CopyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (newExchange != null) {
copyResultsPreservePattern(oldExchange, newExchange);
} else {
// if no newExchange then there was no message from the external resource
// and therefore we should set an empty body to indicate this fact
// but keep headers/attachments as we want to propagate those
oldExchange.getIn().setBody(null);
oldExchange.setOut(null);
}
return oldExchange;
}
}
}