blob: 72991289180e7d062a4578ee2c8a6f61d9d4d79b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ProducerCallback;
import org.apache.camel.impl.ProducerCache;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.model.RoutingSlipDefinition;
import org.apache.camel.util.ExchangeHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.camel.util.ObjectHelper.notNull;
/**
* Implements a <a href="http://camel.apache.org/routing-slip.html">Routing Slip</a>
* pattern where the list of actual endpoints to send a message exchange to are
* dependent on the value of a message header.
*/
public class RoutingSlip extends ServiceSupport implements Processor {
private static final transient Log LOG = LogFactory.getLog(RoutingSlip.class);
private ProducerCache producerCache;
private final String header;
private final String uriDelimiter;
public RoutingSlip(String header) {
this(header, RoutingSlipDefinition.DEFAULT_DELIMITER);
}
public RoutingSlip(String header, String uriDelimiter) {
notNull(header, "header");
notNull(uriDelimiter, "uriDelimiter");
this.header = header;
this.uriDelimiter = uriDelimiter;
}
@Override
public String toString() {
return "RoutingSlip[header=" + header + " uriDelimiter=" + uriDelimiter + "]";
}
public void process(Exchange exchange) throws Exception {
Message message = exchange.getIn();
String[] recipients = recipients(message);
Exchange current = exchange;
for (String nextRecipient : recipients) {
Endpoint endpoint = resolveEndpoint(exchange, nextRecipient);
Exchange copy = current.newInstance();
updateRoutingSlip(current);
copyOutToIn(copy, current);
getProducerCache(exchange).doInProducer(endpoint, copy, null, new ProducerCallback<Object>() {
public Object doInProducer(Producer producer, Exchange exchange, ExchangePattern exchangePattern) throws Exception {
producer.process(exchange);
return exchange;
}
});
current = copy;
}
ExchangeHelper.copyResults(exchange, current);
}
protected ProducerCache getProducerCache(Exchange exchange) throws Exception {
// setup producer cache as we need to use the pluggable service pool defined on camel context
if (producerCache == null) {
this.producerCache = new ProducerCache(exchange.getContext().getProducerServicePool());
this.producerCache.start();
}
return this.producerCache;
}
protected Endpoint resolveEndpoint(Exchange exchange, Object recipient) {
return ExchangeHelper.resolveEndpoint(exchange, recipient);
}
protected void doStart() throws Exception {
if (producerCache != null) {
producerCache.start();
}
}
protected void doStop() throws Exception {
if (producerCache != null) {
producerCache.stop();
}
}
private void updateRoutingSlip(Exchange current) {
Message message = getResultMessage(current);
String oldSlip = message.getHeader(header, String.class);
if (oldSlip != null) {
int delimiterIndex = oldSlip.indexOf(uriDelimiter);
String newSlip = delimiterIndex > 0 ? oldSlip.substring(delimiterIndex + 1) : "";
message.setHeader(header, newSlip);
}
}
/**
* Returns the outbound message if available. Otherwise return the inbound
* message.
*/
private Message getResultMessage(Exchange exchange) {
if (exchange.hasOut()) {
return exchange.getOut();
} else {
// if this endpoint had no out (like a mock endpoint) just take the in
return exchange.getIn();
}
}
/**
* Return the list of recipients defined in the routing slip in the
* specified message.
*/
private String[] recipients(Message message) {
Object headerValue = message.getHeader(header);
if (headerValue != null && !headerValue.equals("")) {
return headerValue.toString().split(uriDelimiter);
}
return new String[] {};
}
/**
* Copy the outbound data in 'source' to the inbound data in 'result'.
*/
private void copyOutToIn(Exchange result, Exchange source) {
result.setException(source.getException());
if (source.hasFault()) {
result.getFault().copyFrom(source.getFault());
}
result.setIn(getResultMessage(source));
result.getProperties().clear();
result.getProperties().putAll(source.getProperties());
}
}