blob: 27f2c1ec1188ccfc617cb850094b5f08afede4b1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.Processor;
import org.apache.camel.Route;
import org.apache.camel.ShutdownRoute;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.management.InstrumentationProcessor;
import org.apache.camel.model.FromDefinition;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.processor.RoutePolicyProcessor;
import org.apache.camel.processor.UnitOfWorkProcessor;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.RoutePolicy;
import org.apache.camel.util.ObjectHelper;
/**
* The context used to activate new routing rules
*
* @version
*/
public class DefaultRouteContext implements RouteContext {
private final Map<ProcessorDefinition<?>, AtomicInteger> nodeIndex = new HashMap<ProcessorDefinition<?>, AtomicInteger>();
private final RouteDefinition route;
private FromDefinition from;
private final Collection<Route> routes;
private Endpoint endpoint;
private final List<Processor> eventDrivenProcessors = new ArrayList<Processor>();
private CamelContext camelContext;
private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>();
private InterceptStrategy managedInterceptStrategy;
private boolean routeAdded;
private Boolean trace;
private Boolean streamCache;
private Boolean handleFault;
private Long delay;
private Boolean autoStartup = Boolean.TRUE;
private List<RoutePolicy> routePolicyList = new ArrayList<RoutePolicy>();
private ShutdownRoute shutdownRoute;
private ShutdownRunningTask shutdownRunningTask;
public DefaultRouteContext(CamelContext camelContext, RouteDefinition route, FromDefinition from, Collection<Route> routes) {
this.camelContext = camelContext;
this.route = route;
this.from = from;
this.routes = routes;
}
/**
* Only used for lazy construction from inside ExpressionType
*/
public DefaultRouteContext(CamelContext camelContext) {
this.camelContext = camelContext;
this.routes = new ArrayList<Route>();
this.route = new RouteDefinition("temporary");
}
public Endpoint getEndpoint() {
if (endpoint == null) {
endpoint = from.resolveEndpoint(this);
}
return endpoint;
}
public FromDefinition getFrom() {
return from;
}
public RouteDefinition getRoute() {
return route;
}
public CamelContext getCamelContext() {
return camelContext;
}
public Processor createProcessor(ProcessorDefinition<?> node) throws Exception {
return node.createOutputsProcessor(this);
}
public Endpoint resolveEndpoint(String uri) {
return route.resolveEndpoint(getCamelContext(), uri);
}
public Endpoint resolveEndpoint(String uri, String ref) {
Endpoint endpoint = null;
if (uri != null) {
endpoint = resolveEndpoint(uri);
if (endpoint == null) {
throw new NoSuchEndpointException(uri);
}
}
if (ref != null) {
endpoint = lookup(ref, Endpoint.class);
if (endpoint == null) {
throw new NoSuchEndpointException("ref:" + ref, "check your camel registry with id " + ref);
}
// Check the endpoint has the right CamelContext
if (!this.getCamelContext().equals(endpoint.getCamelContext())) {
throw new NoSuchEndpointException("ref:" + ref, "make sure the endpoint has the same camel context as the route does.");
}
}
if (endpoint == null) {
throw new IllegalArgumentException("Either 'uri' or 'ref' must be specified on: " + this);
} else {
return endpoint;
}
}
public <T> T lookup(String name, Class<T> type) {
return getCamelContext().getRegistry().lookup(name, type);
}
public <T> Map<String, T> lookupByType(Class<T> type) {
return getCamelContext().getRegistry().lookupByType(type);
}
public void commit() {
// now lets turn all of the event driven consumer processors into a single route
if (!eventDrivenProcessors.isEmpty()) {
Processor target = Pipeline.newInstance(getCamelContext(), eventDrivenProcessors);
// and wrap it in a unit of work so the UoW is on the top, so the entire route will be in the same UoW
UnitOfWorkProcessor unitOfWorkProcessor = new UnitOfWorkProcessor(this, target);
// and then optionally add route policy processor if a custom policy is set
RoutePolicyProcessor routePolicyProcessor = null;
List<RoutePolicy> routePolicyList = getRoutePolicyList();
if (routePolicyList != null && !routePolicyList.isEmpty()) {
routePolicyProcessor = new RoutePolicyProcessor(unitOfWorkProcessor, routePolicyList);
// add it as service if we have not already done that (eg possible if two routes have the same service)
if (!camelContext.hasService(routePolicyProcessor)) {
try {
camelContext.addService(routePolicyProcessor);
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
}
target = routePolicyProcessor;
} else {
target = unitOfWorkProcessor;
}
// and wrap it by a instrumentation processor that is to be used for performance stats
// for this particular route
InstrumentationProcessor wrapper = new InstrumentationProcessor();
wrapper.setType("route");
wrapper.setProcessor(target);
// and create the route that wraps the UoW
Route edcr = new EventDrivenConsumerRoute(this, getEndpoint(), wrapper);
// create the route id
String routeId = route.idOrCreate(getCamelContext().getNodeIdFactory());
edcr.getProperties().put(Route.ID_PROPERTY, routeId);
edcr.getProperties().put(Route.PARENT_PROPERTY, Integer.toHexString(route.hashCode()));
if (route.getGroup() != null) {
edcr.getProperties().put(Route.GROUP_PROPERTY, route.getGroup());
}
// after the route is created then set the route on the policy processor so we get hold of it
if (routePolicyProcessor != null) {
routePolicyProcessor.setRoute(edcr);
}
// invoke init on route policy
if (routePolicyList != null && !routePolicyList.isEmpty()) {
for (RoutePolicy policy : routePolicyList) {
policy.onInit(edcr);
}
}
routes.add(edcr);
}
}
public void addEventDrivenProcessor(Processor processor) {
eventDrivenProcessors.add(processor);
}
public List<InterceptStrategy> getInterceptStrategies() {
return interceptStrategies;
}
public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) {
this.interceptStrategies = interceptStrategies;
}
public void addInterceptStrategy(InterceptStrategy interceptStrategy) {
getInterceptStrategies().add(interceptStrategy);
}
public void setManagedInterceptStrategy(InterceptStrategy interceptStrategy) {
this.managedInterceptStrategy = interceptStrategy;
}
public InterceptStrategy getManagedInterceptStrategy() {
return managedInterceptStrategy;
}
public boolean isRouteAdded() {
return routeAdded;
}
public void setIsRouteAdded(boolean routeAdded) {
this.routeAdded = routeAdded;
}
public void setTracing(Boolean tracing) {
this.trace = tracing;
}
public Boolean isTracing() {
if (trace != null) {
return trace;
} else {
// fallback to the option from camel context
return getCamelContext().isTracing();
}
}
public void setStreamCaching(Boolean cache) {
this.streamCache = cache;
}
public Boolean isStreamCaching() {
if (streamCache != null) {
return streamCache;
} else {
// fallback to the option from camel context
return getCamelContext().isStreamCaching();
}
}
public void setHandleFault(Boolean handleFault) {
this.handleFault = handleFault;
}
public Boolean isHandleFault() {
if (handleFault != null) {
return handleFault;
} else {
// fallback to the option from camel context
return getCamelContext().isHandleFault();
}
}
public void setDelayer(Long delay) {
this.delay = delay;
}
public Long getDelayer() {
if (delay != null) {
return delay;
} else {
// fallback to the option from camel context
return getCamelContext().getDelayer();
}
}
public void setAutoStartup(Boolean autoStartup) {
this.autoStartup = autoStartup;
}
public Boolean isAutoStartup() {
if (autoStartup != null) {
return autoStartup;
}
// default to true
return true;
}
public void setShutdownRoute(ShutdownRoute shutdownRoute) {
this.shutdownRoute = shutdownRoute;
}
public ShutdownRoute getShutdownRoute() {
if (shutdownRoute != null) {
return shutdownRoute;
} else {
// fallback to the option from camel context
return getCamelContext().getShutdownRoute();
}
}
public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) {
this.shutdownRunningTask = shutdownRunningTask;
}
public ShutdownRunningTask getShutdownRunningTask() {
if (shutdownRunningTask != null) {
return shutdownRunningTask;
} else {
// fallback to the option from camel context
return getCamelContext().getShutdownRunningTask();
}
}
public int getAndIncrement(ProcessorDefinition<?> node) {
AtomicInteger count = nodeIndex.get(node);
if (count == null) {
count = new AtomicInteger();
nodeIndex.put(node, count);
}
return count.getAndIncrement();
}
public void setRoutePolicyList(List<RoutePolicy> routePolicyList) {
this.routePolicyList = routePolicyList;
}
public List<RoutePolicy> getRoutePolicyList() {
return routePolicyList;
}
}