blob: 04714b1980fcadbec3b7130a79ff1ea601df1e36 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Processor;
import org.apache.camel.util.ExchangeHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The processor which sends messages in a loop.
*/
public class LoopProcessor extends DelegateAsyncProcessor implements Traceable {
private static final Logger LOG = LoggerFactory.getLogger(LoopProcessor.class);
private final Expression expression;
private final boolean copy;
public LoopProcessor(Processor processor, Expression expression, boolean copy) {
super(processor);
this.expression = expression;
this.copy = copy;
}
@Override
public boolean process(Exchange exchange, AsyncCallback callback) {
// use atomic integer to be able to pass reference and keep track on the values
AtomicInteger index = new AtomicInteger();
AtomicInteger count = new AtomicInteger();
// Intermediate conversion to String is needed when direct conversion to Integer is not available
// but evaluation result is a textual representation of a numeric value.
String text = expression.evaluate(exchange, String.class);
try {
int num = ExchangeHelper.convertToMandatoryType(exchange, Integer.class, text);
count.set(num);
} catch (NoTypeConversionAvailableException e) {
exchange.setException(e);
callback.done(true);
return true;
}
Exchange target = exchange;
// set the size before we start
exchange.setProperty(Exchange.LOOP_SIZE, count);
// loop synchronously
while (index.get() < count.get()) {
// and prepare for next iteration
target = prepareExchange(exchange, index.get());
boolean sync = process(target, callback, index, count);
if (!sync) {
LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
// the remainder of the routing slip will be completed async
// so we break out now, then the callback will be invoked which then continue routing from where we left here
return false;
}
LOG.trace("Processing exchangeId: {} is continued being processed synchronously", target.getExchangeId());
// increment counter before next loop
index.getAndIncrement();
}
// we are done so prepare the result
ExchangeHelper.copyResults(exchange, target);
LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
callback.done(true);
return true;
}
protected boolean process(final Exchange exchange, final AsyncCallback callback,
final AtomicInteger index, final AtomicInteger count) {
// set current index as property
LOG.debug("LoopProcessor: iteration #{}", index.get());
exchange.setProperty(Exchange.LOOP_INDEX, index.get());
boolean sync = processNext(exchange, new AsyncCallback() {
public void done(boolean doneSync) {
// we only have to handle async completion of the routing slip
if (doneSync) {
return;
}
Exchange target = exchange;
// increment index as we have just processed once
index.getAndIncrement();
// continue looping asynchronously
while (index.get() < count.get()) {
// and prepare for next iteration
target = prepareExchange(exchange, index.get());
// process again
boolean sync = process(target, callback, index, count);
if (!sync) {
LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", target.getExchangeId());
// the remainder of the routing slip will be completed async
// so we break out now, then the callback will be invoked which then continue routing from where we left here
return;
}
// increment counter before next loop
index.getAndIncrement();
}
// we are done so prepare the result
ExchangeHelper.copyResults(exchange, target);
LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange);
callback.done(false);
}
});
return sync;
}
/**
* Prepares the exchange for the next iteration
*
* @param exchange the exchange
* @param index the index of the next iteration
* @return the exchange to use
*/
protected Exchange prepareExchange(Exchange exchange, int index) {
if (copy) {
// create a correlated copy, and do not handover completions on copies
return ExchangeHelper.createCorrelatedCopy(exchange, false);
} else {
ExchangeHelper.prepareOutToIn(exchange);
return exchange;
}
}
public Expression getExpression() {
return expression;
}
public String getTraceLabel() {
return "loop[" + expression + "]";
}
@Override
public String toString() {
return "Loop[for: " + expression + " times do: " + getProcessor() + "]";
}
}