| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.impl; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.Consumer; |
| import org.apache.camel.Navigate; |
| import org.apache.camel.Route; |
| import org.apache.camel.Service; |
| import org.apache.camel.model.RouteDefinition; |
| import org.apache.camel.spi.LifecycleStrategy; |
| import org.apache.camel.spi.RouteContext; |
| import org.apache.camel.util.EventHelper; |
| import org.apache.camel.util.ServiceHelper; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| /** |
| * Represents the runtime objects for a given {@link RouteDefinition} so that it can be stopped independently |
| * of other routes |
| * |
| * @version $Revision$ |
| */ |
| public class RouteService extends ServiceSupport { |
| |
| private static final Log LOG = LogFactory.getLog(RouteService.class); |
| |
| private final DefaultCamelContext camelContext; |
| private final RouteDefinition routeDefinition; |
| private final List<RouteContext> routeContexts; |
| private final List<Route> routes; |
| private final String id; |
| private boolean removingRoutes; |
| private boolean startInputs = true; |
| private final Map<Route, Consumer> inputs = new HashMap<Route, Consumer>(); |
| |
| public RouteService(DefaultCamelContext camelContext, RouteDefinition routeDefinition, List<RouteContext> routeContexts, List<Route> routes) { |
| this.camelContext = camelContext; |
| this.routeDefinition = routeDefinition; |
| this.routeContexts = routeContexts; |
| this.routes = routes; |
| this.id = routeDefinition.idOrCreate(camelContext.getNodeIdFactory()); |
| } |
| |
| public String getId() { |
| return id; |
| } |
| |
| public CamelContext getCamelContext() { |
| return camelContext; |
| } |
| |
| public List<RouteContext> getRouteContexts() { |
| return routeContexts; |
| } |
| |
| public RouteDefinition getRouteDefinition() { |
| return routeDefinition; |
| } |
| |
| public Collection<Route> getRoutes() { |
| return routes; |
| } |
| |
| /** |
| * Sets whether inputs (consumers) should be started when starting the routes |
| * <p/> |
| * By default inputs are started. |
| * |
| * @param flag flag to either start inputs or not |
| */ |
| public void startInputs(boolean flag) { |
| this.startInputs = flag; |
| } |
| |
| /** |
| * Gets the inputs to the routes. |
| * |
| * @return list of {@link Consumer} as inputs for the routes |
| */ |
| public Map<Route, Consumer> getInputs() { |
| return inputs; |
| } |
| |
| public boolean isRemovingRoutes() { |
| return removingRoutes; |
| } |
| |
| public void setRemovingRoutes(boolean removingRoutes) { |
| this.removingRoutes = removingRoutes; |
| } |
| |
| protected void doStart() throws Exception { |
| camelContext.addRouteCollection(routes); |
| |
| for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { |
| strategy.onRoutesAdd(routes); |
| } |
| |
| for (Route route : routes) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Starting route services: " + route); |
| } |
| |
| // TODO: We should also consider processors which are not services then we can manage all processors as well |
| // otherwise its only the processors which is a Service |
| |
| List<Service> services = route.getServices(); |
| |
| // callback that we are staring these services |
| route.onStartingServices(services); |
| |
| // gather list of services to start as we need to start child services as well |
| List<Service> list = new ArrayList<Service>(); |
| for (Service service : services) { |
| doGetChildServices(list, service); |
| } |
| |
| // split into consumers and child services as we need to start the consumers |
| // afterwards to avoid them being active while the others start |
| List<Service> childServices = new ArrayList<Service>(); |
| for (Service service : list) { |
| if (service instanceof Consumer) { |
| inputs.put(route, (Consumer) service); |
| } else { |
| childServices.add(service); |
| } |
| } |
| startChildService(route, childServices); |
| |
| // start the route itself |
| ServiceHelper.startService(route); |
| |
| // fire event |
| EventHelper.notifyRouteStarted(camelContext, route); |
| } |
| |
| if (startInputs) { |
| // start the input consumers |
| for (Map.Entry<Route, Consumer> entry : inputs.entrySet()) { |
| Route route = entry.getKey(); |
| Consumer consumer = entry.getValue(); |
| startChildService(route, consumer); |
| } |
| } |
| } |
| |
| protected void doStop() throws Exception { |
| // clear inputs |
| inputs.clear(); |
| |
| // if we are stopping CamelContext then we are shutting down |
| boolean isShutdownCamelContext = camelContext.isStopping(); |
| |
| if (isShutdownCamelContext || isRemovingRoutes()) { |
| // need to call onRoutesRemove when the CamelContext is shutting down or Route is shutdown |
| for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { |
| strategy.onRoutesRemove(routes); |
| } |
| } |
| |
| for (Route route : routes) { |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("Stopping route: " + route); |
| } |
| // getServices will not add services again |
| List<Service> services = route.getServices(); |
| |
| // gather list of services to stop as we need to start child services as well |
| List<Service> list = new ArrayList<Service>(); |
| for (Service service : services) { |
| doGetChildServices(list, service); |
| } |
| stopChildService(route, list, isShutdownCamelContext); |
| |
| // stop the route itself |
| if (isShutdownCamelContext) { |
| ServiceHelper.stopAndShutdownService(route); |
| } else { |
| ServiceHelper.stopService(route); |
| } |
| |
| // fire event |
| EventHelper.notifyRouteStopped(camelContext, route); |
| } |
| |
| camelContext.removeRouteCollection(routes); |
| } |
| |
| protected void startChildService(Route route, Service... services) throws Exception { |
| List<Service> list = new ArrayList<Service>(Arrays.asList(services)); |
| startChildService(route, list); |
| } |
| |
| protected void startChildService(Route route, List<Service> services) throws Exception { |
| for (Service service : services) { |
| for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { |
| strategy.onServiceAdd(camelContext, service, route); |
| } |
| ServiceHelper.startService(service); |
| addChildService(service); |
| } |
| } |
| |
| protected void stopChildService(Route route, List<Service> services, boolean shutdown) throws Exception { |
| for (Service service : services) { |
| for (LifecycleStrategy strategy : camelContext.getLifecycleStrategies()) { |
| strategy.onServiceRemove(camelContext, service, route); |
| } |
| if (shutdown) { |
| ServiceHelper.stopAndShutdownService(service); |
| } else { |
| ServiceHelper.stopService(service); |
| } |
| removeChildService(service); |
| } |
| } |
| |
| /** |
| * Need to recursive start child services for routes |
| */ |
| private static void doGetChildServices(List<Service> services, Service service) throws Exception { |
| services.add(service); |
| |
| if (service instanceof Navigate) { |
| Navigate<?> nav = (Navigate<?>) service; |
| if (nav.hasNext()) { |
| List<?> children = nav.next(); |
| for (Object child : children) { |
| if (child instanceof Service) { |
| doGetChildServices(services, (Service) child); |
| } |
| } |
| } |
| } |
| } |
| |
| } |