blob: cecf47d62508d68839b02640af66297470adc4ed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.routebox.strategy;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.routebox.RouteboxEndpoint;
import org.apache.camel.impl.SynchronizationAdapter;
import org.apache.camel.model.FromDefinition;
import org.apache.camel.model.RouteDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RouteboxDispatcher {
private static final transient Logger LOG = LoggerFactory.getLogger(RouteboxDispatcher.class);
private ProducerTemplate producer;
public RouteboxDispatcher(ProducerTemplate producer) {
super();
this.producer = producer;
}
public Exchange dispatchSync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
URI dispatchUri;
Exchange reply;
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching exchange {} to endpoint {}", exchange, endpoint.getEndpointUri());
}
dispatchUri = selectDispatchUri(endpoint, exchange);
if (exchange.getPattern() == ExchangePattern.InOnly) {
reply = producer.send(dispatchUri.toASCIIString(), exchange);
} else {
reply = issueRequest(endpoint, ExchangePattern.InOut, exchange.getIn().getBody(), exchange.getIn().getHeaders());
}
return reply;
}
public Exchange dispatchAsync(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
URI dispatchUri;
Exchange reply;
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatching exchange {} to endpoint {}", exchange, endpoint.getEndpointUri());
}
dispatchUri = selectDispatchUri(endpoint, exchange);
if (exchange.getPattern() == ExchangePattern.InOnly) {
producer.asyncSend(dispatchUri.toASCIIString(), exchange);
reply = exchange;
} else {
Future<Exchange> future = producer.asyncCallback(dispatchUri.toASCIIString(), exchange, new SynchronizationAdapter());
reply = future.get(endpoint.getConfig().getConnectionTimeout(), TimeUnit.MILLISECONDS);
}
return reply;
}
protected URI selectDispatchUri(RouteboxEndpoint endpoint, Exchange exchange) throws Exception {
URI dispatchUri;
List<URI> consumerUris = getInnerContextConsumerList(endpoint.getConfig().getInnerContext());
if (consumerUris.isEmpty()) {
throw new CamelExchangeException("No routes found to dispatch in Routebox at " + endpoint, exchange);
} else if (consumerUris.size() == 1) {
dispatchUri = consumerUris.get(0);
} else {
if (!endpoint.getConfig().getDispatchMap().isEmpty()) {
// apply URI string found in dispatch Map
String key = exchange.getIn().getHeader("ROUTE_DISPATCH_KEY", String.class);
if (endpoint.getConfig().getDispatchMap().containsKey(key)) {
dispatchUri = new URI(endpoint.getConfig().getDispatchMap().get(key));
} else {
throw new CamelExchangeException("No matching entry found in Dispatch Map for ROUTE_DISPATCH_KEY: " + key, exchange);
}
} else {
// apply dispatch strategy
dispatchUri = endpoint.getConfig().getDispatchStrategy().selectDestinationUri(consumerUris, exchange);
if (dispatchUri == null) {
throw new CamelExchangeException("No matching inner routes found for Operation", exchange);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Dispatch URI set to: " + dispatchUri.toASCIIString());
}
return dispatchUri;
}
protected List<URI> getInnerContextConsumerList(CamelContext context) throws URISyntaxException {
List<URI> consumerList = new ArrayList<URI>();
List<RouteDefinition> routeDefinitions = context.getRouteDefinitions();
for (RouteDefinition routeDefinition : routeDefinitions) {
List<FromDefinition> inputs = routeDefinition.getInputs();
for (FromDefinition input : inputs) {
consumerList.add(new URI(input.getUri()));
}
}
return consumerList;
}
public Exchange issueRequest(Endpoint endpoint, ExchangePattern pattern, final Object body, final Map<String, Object> headers) throws CamelExecutionException {
Exchange exchange = producer.send(endpoint, pattern, new Processor() {
public void process(Exchange exchange) throws Exception {
Message in = exchange.getIn();
in.getHeaders().putAll(headers);
in.setBody(body);
}
});
return exchange;
}
}