| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.impl.engine; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.LinkedHashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.CamelContextAware; |
| import org.apache.camel.Component; |
| import org.apache.camel.Consumer; |
| import org.apache.camel.ConsumerTemplate; |
| import org.apache.camel.Endpoint; |
| import org.apache.camel.ErrorHandlerFactory; |
| import org.apache.camel.ExtendedCamelContext; |
| import org.apache.camel.ExtendedStartupListener; |
| import org.apache.camel.FailedToStartRouteException; |
| import org.apache.camel.FluentProducerTemplate; |
| import org.apache.camel.GlobalEndpointConfiguration; |
| import org.apache.camel.IsSingleton; |
| import org.apache.camel.MultipleConsumersSupport; |
| import org.apache.camel.NoFactoryAvailableException; |
| import org.apache.camel.NoSuchEndpointException; |
| import org.apache.camel.PollingConsumer; |
| import org.apache.camel.Processor; |
| import org.apache.camel.Producer; |
| import org.apache.camel.ProducerTemplate; |
| import org.apache.camel.ResolveEndpointFailedException; |
| import org.apache.camel.Route; |
| import org.apache.camel.RoutesBuilder; |
| import org.apache.camel.RuntimeCamelException; |
| import org.apache.camel.Service; |
| import org.apache.camel.ServiceStatus; |
| import org.apache.camel.ShutdownRoute; |
| import org.apache.camel.ShutdownRunningTask; |
| import org.apache.camel.StartupListener; |
| import org.apache.camel.StatefulService; |
| import org.apache.camel.Suspendable; |
| import org.apache.camel.SuspendableService; |
| import org.apache.camel.TypeConverter; |
| import org.apache.camel.VetoCamelContextStartException; |
| import org.apache.camel.impl.transformer.TransformerKey; |
| import org.apache.camel.impl.validator.ValidatorKey; |
| import org.apache.camel.spi.AnnotationBasedProcessorFactory; |
| import org.apache.camel.spi.AsyncProcessorAwaitManager; |
| import org.apache.camel.spi.BeanIntrospection; |
| import org.apache.camel.spi.BeanProcessorFactory; |
| import org.apache.camel.spi.BeanProxyFactory; |
| import org.apache.camel.spi.CamelBeanPostProcessor; |
| import org.apache.camel.spi.CamelContextNameStrategy; |
| import org.apache.camel.spi.CamelContextTracker; |
| import org.apache.camel.spi.ClassResolver; |
| import org.apache.camel.spi.ComponentResolver; |
| import org.apache.camel.spi.DataFormat; |
| import org.apache.camel.spi.DataFormatResolver; |
| import org.apache.camel.spi.DataType; |
| import org.apache.camel.spi.Debugger; |
| import org.apache.camel.spi.DeferServiceFactory; |
| import org.apache.camel.spi.EndpointRegistry; |
| import org.apache.camel.spi.EndpointStrategy; |
| import org.apache.camel.spi.EventNotifier; |
| import org.apache.camel.spi.ExecutorServiceManager; |
| import org.apache.camel.spi.FactoryFinder; |
| import org.apache.camel.spi.FactoryFinderResolver; |
| import org.apache.camel.spi.HeadersMapFactory; |
| import org.apache.camel.spi.InflightRepository; |
| import org.apache.camel.spi.Injector; |
| import org.apache.camel.spi.InterceptSendToEndpoint; |
| import org.apache.camel.spi.InterceptStrategy; |
| import org.apache.camel.spi.Language; |
| import org.apache.camel.spi.LanguageResolver; |
| import org.apache.camel.spi.LifecycleStrategy; |
| import org.apache.camel.spi.LogListener; |
| import org.apache.camel.spi.ManagementMBeanAssembler; |
| import org.apache.camel.spi.ManagementNameStrategy; |
| import org.apache.camel.spi.ManagementStrategy; |
| import org.apache.camel.spi.ManagementStrategyFactory; |
| import org.apache.camel.spi.MessageHistoryFactory; |
| import org.apache.camel.spi.ModelJAXBContextFactory; |
| import org.apache.camel.spi.NodeIdFactory; |
| import org.apache.camel.spi.PackageScanClassResolver; |
| import org.apache.camel.spi.PackageScanResourceResolver; |
| import org.apache.camel.spi.ProcessorFactory; |
| import org.apache.camel.spi.PropertiesComponent; |
| import org.apache.camel.spi.ReactiveExecutor; |
| import org.apache.camel.spi.Registry; |
| import org.apache.camel.spi.RestConfiguration; |
| import org.apache.camel.spi.RestRegistry; |
| import org.apache.camel.spi.RestRegistryFactory; |
| import org.apache.camel.spi.RouteController; |
| import org.apache.camel.spi.RouteError.Phase; |
| import org.apache.camel.spi.RoutePolicyFactory; |
| import org.apache.camel.spi.RouteStartupOrder; |
| import org.apache.camel.spi.RuntimeEndpointRegistry; |
| import org.apache.camel.spi.ShutdownStrategy; |
| import org.apache.camel.spi.StreamCachingStrategy; |
| import org.apache.camel.spi.Tracer; |
| import org.apache.camel.spi.Transformer; |
| import org.apache.camel.spi.TransformerRegistry; |
| import org.apache.camel.spi.TypeConverterRegistry; |
| import org.apache.camel.spi.UnitOfWorkFactory; |
| import org.apache.camel.spi.UuidGenerator; |
| import org.apache.camel.spi.Validator; |
| import org.apache.camel.spi.ValidatorRegistry; |
| import org.apache.camel.support.CamelContextHelper; |
| import org.apache.camel.support.EndpointHelper; |
| import org.apache.camel.support.EventHelper; |
| import org.apache.camel.support.OrderedComparator; |
| import org.apache.camel.support.ProcessorEndpoint; |
| import org.apache.camel.support.jsse.SSLContextParameters; |
| import org.apache.camel.support.service.ServiceHelper; |
| import org.apache.camel.support.service.ServiceSupport; |
| import org.apache.camel.util.IOHelper; |
| import org.apache.camel.util.ObjectHelper; |
| import org.apache.camel.util.StopWatch; |
| import org.apache.camel.util.StringHelper; |
| import org.apache.camel.util.TimeUtils; |
| import org.apache.camel.util.URISupport; |
| import org.apache.camel.util.function.ThrowingRunnable; |
| import org.slf4j.MDC; |
| |
| import static org.apache.camel.spi.UnitOfWork.MDC_CAMEL_CONTEXT_ID; |
| |
| /** |
| * Represents the context used to configure routes and the policies to use. |
| */ |
| public abstract class AbstractCamelContext extends ServiceSupport implements ExtendedCamelContext, Suspendable { |
| |
| public enum Initialization { |
| Eager, Default, Lazy |
| } |
| |
| private String version; |
| private final AtomicBoolean vetoStarted = new AtomicBoolean(); |
| private String managementName; |
| private ClassLoader applicationContextClassLoader; |
| private final AtomicInteger endpointKeyCounter = new AtomicInteger(); |
| private final List<EndpointStrategy> endpointStrategies = new ArrayList<>(); |
| private final GlobalEndpointConfiguration globalEndpointConfiguration = new DefaultGlobalEndpointConfiguration(); |
| private final Map<String, Component> components = new ConcurrentHashMap<>(); |
| private final Set<Route> routes = new LinkedHashSet<>(); |
| private final List<Service> servicesToStop = new CopyOnWriteArrayList<>(); |
| private final List<StartupListener> startupListeners = new CopyOnWriteArrayList<>(); |
| private final DeferServiceStartupListener deferStartupListener = new DeferServiceStartupListener(); |
| private boolean autoCreateComponents = true; |
| private final Map<String, Language> languages = new ConcurrentHashMap<>(); |
| private final List<LifecycleStrategy> lifecycleStrategies = new CopyOnWriteArrayList<>(); |
| private Map<String, RestConfiguration> restConfigurations = new ConcurrentHashMap<>(); |
| private List<InterceptStrategy> interceptStrategies = new ArrayList<>(); |
| private List<RoutePolicyFactory> routePolicyFactories = new ArrayList<>(); |
| private Set<LogListener> logListeners = new LinkedHashSet<>(); |
| // special flags to control the first startup which can are special |
| private volatile boolean firstStartDone; |
| private volatile boolean doNotStartRoutesOnFirstStart; |
| private final ThreadLocal<Boolean> isStartingRoutes = new ThreadLocal<>(); |
| private final ThreadLocal<Boolean> isSetupRoutes = new ThreadLocal<>(); |
| private Initialization initialization = Initialization.Default; |
| private Boolean autoStartup = Boolean.TRUE; |
| private Boolean backlogTrace = Boolean.FALSE; |
| private Boolean trace = Boolean.FALSE; |
| private String tracePattern; |
| private Boolean debug = Boolean.FALSE; |
| private Boolean messageHistory = Boolean.TRUE; |
| private Boolean logMask = Boolean.FALSE; |
| private Boolean logExhaustedMessageBody = Boolean.FALSE; |
| private Boolean streamCache = Boolean.FALSE; |
| private Boolean disableJMX = Boolean.FALSE; |
| private Boolean loadTypeConverters = Boolean.TRUE; |
| private Boolean typeConverterStatisticsEnabled = Boolean.FALSE; |
| private Boolean useMDCLogging = Boolean.FALSE; |
| private String mdcLoggingKeysPattern; |
| private Boolean useDataType = Boolean.FALSE; |
| private Boolean useBreadcrumb = Boolean.FALSE; |
| private Boolean allowUseOriginalMessage = Boolean.FALSE; |
| private Long delay; |
| private ErrorHandlerFactory errorHandlerFactory; |
| private Map<String, String> globalOptions = new HashMap<>(); |
| private PropertiesComponent propertiesComponent; |
| private final Map<String, FactoryFinder> factories = new ConcurrentHashMap<>(); |
| private final Map<String, BaseRouteService> routeServices = new LinkedHashMap<>(); |
| private final Map<String, BaseRouteService> suspendedRouteServices = new LinkedHashMap<>(); |
| |
| private final Object lock = new Object(); |
| private volatile CamelContextNameStrategy nameStrategy; |
| private volatile ReactiveExecutor reactiveExecutor; |
| private volatile ManagementNameStrategy managementNameStrategy; |
| private volatile Registry registry; |
| private volatile TypeConverter typeConverter; |
| private volatile TypeConverterRegistry typeConverterRegistry; |
| private volatile Injector injector; |
| private volatile CamelBeanPostProcessor beanPostProcessor; |
| private volatile ComponentResolver componentResolver; |
| private volatile LanguageResolver languageResolver; |
| private volatile DataFormatResolver dataFormatResolver; |
| private volatile ManagementStrategy managementStrategy; |
| private volatile ManagementMBeanAssembler managementMBeanAssembler; |
| private volatile RestRegistryFactory restRegistryFactory; |
| private volatile RestRegistry restRegistry; |
| private volatile HeadersMapFactory headersMapFactory; |
| private volatile BeanProxyFactory beanProxyFactory; |
| private volatile BeanProcessorFactory beanProcessorFactory; |
| private volatile ClassResolver classResolver; |
| private volatile PackageScanClassResolver packageScanClassResolver; |
| private volatile PackageScanResourceResolver packageScanResourceResolver; |
| private volatile ServicePool<Producer> producerServicePool; |
| private volatile ServicePool<PollingConsumer> pollingConsumerServicePool; |
| private volatile NodeIdFactory nodeIdFactory; |
| private volatile ProcessorFactory processorFactory; |
| private volatile MessageHistoryFactory messageHistoryFactory; |
| private volatile FactoryFinderResolver factoryFinderResolver; |
| private volatile FactoryFinder defaultFactoryFinder; |
| private volatile StreamCachingStrategy streamCachingStrategy; |
| private volatile InflightRepository inflightRepository; |
| private volatile AsyncProcessorAwaitManager asyncProcessorAwaitManager; |
| private volatile ShutdownStrategy shutdownStrategy; |
| private volatile ModelJAXBContextFactory modelJAXBContextFactory; |
| private volatile ExecutorServiceManager executorServiceManager; |
| private volatile UuidGenerator uuidGenerator; |
| private volatile UnitOfWorkFactory unitOfWorkFactory; |
| private volatile RouteController routeController; |
| private volatile ScheduledExecutorService errorHandlerExecutorService; |
| private volatile BeanIntrospection beanIntrospection; |
| private final DeferServiceFactory deferServiceFactory = new DefaultDeferServiceFactory(); |
| private final AnnotationBasedProcessorFactory annotationBasedProcessorFactory = new DefaultAnnotationBasedProcessorFactory(); |
| |
| private TransformerRegistry<TransformerKey> transformerRegistry; |
| private ValidatorRegistry<ValidatorKey> validatorRegistry; |
| private EndpointRegistry<EndpointKey> endpoints; |
| private RuntimeEndpointRegistry runtimeEndpointRegistry; |
| |
| private final List<RouteStartupOrder> routeStartupOrder = new ArrayList<>(); |
| // start auto assigning route ids using numbering 1000 and upwards |
| private int defaultRouteStartupOrder = 1000; |
| private ShutdownRoute shutdownRoute = ShutdownRoute.Default; |
| private ShutdownRunningTask shutdownRunningTask = ShutdownRunningTask.CompleteCurrentTaskOnly; |
| private Debugger debugger; |
| private Tracer tracer; |
| private final StopWatch stopWatch = new StopWatch(false); |
| private Date startDate; |
| |
| private SSLContextParameters sslContextParameters; |
| private final ThreadLocal<Set<String>> componentsInCreation = new ThreadLocal<Set<String>>() { |
| @Override |
| public Set<String> initialValue() { |
| return new HashSet<>(); |
| } |
| }; |
| private Map<Class<?>, Object> extensions = new ConcurrentHashMap<>(); |
| |
| /** |
| * Creates the {@link CamelContext} using |
| * {@link org.apache.camel.support.DefaultRegistry} as registry. |
| * <p/> |
| * Use one of the other constructors to force use an explicit registry. |
| */ |
| public AbstractCamelContext() { |
| this(true); |
| } |
| |
| /** |
| * Creates the {@link CamelContext} using the given registry |
| * |
| * @param registry the registry |
| */ |
| public AbstractCamelContext(Registry registry) { |
| this(); |
| setRegistry(registry); |
| } |
| |
| public AbstractCamelContext(boolean init) { |
| // create a provisional (temporary) endpoint registry at first since end |
| // users may access endpoints before CamelContext is started |
| // we will later transfer the endpoints to the actual |
| // DefaultEndpointRegistry later, but we do this to starup Camel faster. |
| this.endpoints = new ProvisionalEndpointRegistry(); |
| |
| // add the defer service startup listener |
| this.startupListeners.add(deferStartupListener); |
| |
| if (init) { |
| try { |
| init(); |
| } catch (Exception e) { |
| throw new RuntimeException("Error initializing CamelContext", e); |
| } |
| } |
| } |
| |
| @Override |
| public void doInit() throws Exception { |
| // setup management first since end users may use it to add event |
| // notifiers using the management strategy before the CamelContext has been started |
| setupManagement(null); |
| |
| // Call all registered trackers with this context |
| // Note, this may use a partially constructed object |
| CamelContextTracker.notifyContextCreated(this); |
| } |
| |
| @Override |
| public <T extends CamelContext> T adapt(Class<T> type) { |
| return type.cast(this); |
| } |
| |
| @Override |
| public <T> T getExtension(Class<T> type) { |
| if (type.isInstance(this)) { |
| return type.cast(this); |
| } |
| Object extension = extensions.get(type); |
| if (extension instanceof Supplier) { |
| extension = ((Supplier)extension).get(); |
| setExtension(type, (T)extension); |
| } |
| return (T)extension; |
| } |
| |
| @Override |
| public <T> void setExtension(Class<T> type, T module) { |
| try { |
| extensions.put(type, doAddService(module)); |
| } catch (Exception e) { |
| throw RuntimeCamelException.wrapRuntimeCamelException(e); |
| } |
| } |
| |
| public <T> void setDefaultExtension(Class<T> type, Supplier<T> module) { |
| extensions.putIfAbsent(type, module); |
| } |
| |
| @Override |
| public boolean isVetoStarted() { |
| return vetoStarted.get(); |
| } |
| |
| public Initialization getInitialization() { |
| return initialization; |
| } |
| |
| public void setInitialization(Initialization initialization) { |
| this.initialization = initialization; |
| } |
| |
| @Override |
| public String getName() { |
| return getNameStrategy().getName(); |
| } |
| |
| @Override |
| public void setName(String name) { |
| // use an explicit name strategy since an explicit name was provided to be used |
| setNameStrategy(new ExplicitCamelContextNameStrategy(name)); |
| } |
| |
| @Override |
| public CamelContextNameStrategy getNameStrategy() { |
| if (nameStrategy == null) { |
| synchronized (lock) { |
| if (nameStrategy == null) { |
| setNameStrategy(createCamelContextNameStrategy()); |
| } |
| } |
| } |
| return nameStrategy; |
| } |
| |
| @Override |
| public void setNameStrategy(CamelContextNameStrategy nameStrategy) { |
| this.nameStrategy = doAddService(nameStrategy); |
| } |
| |
| @Override |
| public ManagementNameStrategy getManagementNameStrategy() { |
| if (managementNameStrategy == null) { |
| synchronized (lock) { |
| if (managementNameStrategy == null) { |
| setManagementNameStrategy(createManagementNameStrategy()); |
| } |
| } |
| } |
| return managementNameStrategy; |
| } |
| |
| @Override |
| public void setManagementNameStrategy(ManagementNameStrategy managementNameStrategy) { |
| this.managementNameStrategy = doAddService(managementNameStrategy); |
| } |
| |
| @Override |
| public String getManagementName() { |
| return managementName; |
| } |
| |
| @Override |
| public void setManagementName(String managementName) { |
| this.managementName = managementName; |
| } |
| |
| @Override |
| public Component hasComponent(String componentName) { |
| return components.get(componentName); |
| } |
| |
| @Override |
| public void addComponent(String componentName, final Component component) { |
| ObjectHelper.notNull(component, "component"); |
| component.setCamelContext(this); |
| Component oldValue = components.putIfAbsent(componentName, component); |
| if (oldValue != null) { |
| throw new IllegalArgumentException("Cannot add component as its already previously added: " + componentName); |
| } |
| postInitComponent(componentName, component); |
| } |
| |
| private void postInitComponent(String componentName, final Component component) { |
| for (LifecycleStrategy strategy : lifecycleStrategies) { |
| strategy.onComponentAdd(componentName, component); |
| } |
| |
| // keep reference to properties component up to date |
| if (component instanceof PropertiesComponent && "properties".equals(componentName)) { |
| propertiesComponent = (PropertiesComponent)component; |
| // ensure properties component is initialize early |
| ServiceHelper.initService(propertiesComponent); |
| } |
| } |
| |
| @Override |
| public Component getComponent(String name) { |
| return getComponent(name, autoCreateComponents, true); |
| } |
| |
| @Override |
| public Component getComponent(String name, boolean autoCreateComponents) { |
| return getComponent(name, autoCreateComponents, true); |
| } |
| |
| @Override |
| public Component getComponent(String name, boolean autoCreateComponents, boolean autoStart) { |
| init(); |
| |
| // Check if the named component is already being created, that would |
| // mean |
| // that the initComponent has triggered a new getComponent |
| if (componentsInCreation.get().contains(name)) { |
| throw new IllegalStateException("Circular dependency detected, the component " + name + " is already being created"); |
| } |
| |
| try { |
| // Flag used to mark a component of being created. |
| final AtomicBoolean created = new AtomicBoolean(false); |
| |
| // atomic operation to get/create a component. Avoid global locks. |
| final Component component = components.computeIfAbsent(name, new Function<String, Component>() { |
| @Override |
| public Component apply(String comp) { |
| created.set(true); |
| return AbstractCamelContext.this.initComponent(name, autoCreateComponents); |
| } |
| }); |
| |
| // Start the component after its creation as if it is a component |
| // proxy |
| // that creates/start a delegated component, we may end up in a |
| // deadlock |
| if (component != null && created.get() && autoStart && (isStarted() || isStarting())) { |
| // If the component is looked up after the context is started, |
| // lets start it up. |
| startService(component); |
| } |
| |
| return component; |
| } catch (Exception e) { |
| throw new RuntimeCamelException("Cannot auto create component: " + name, e); |
| } finally { |
| // remove the reference to the component being created |
| componentsInCreation.get().remove(name); |
| } |
| } |
| |
| /** |
| * Function to initialize a component and auto start. Returns null if the |
| * autoCreateComponents is disabled |
| */ |
| private Component initComponent(String name, boolean autoCreateComponents) { |
| Component component = null; |
| if (autoCreateComponents) { |
| try { |
| if (log.isDebugEnabled()) { |
| log.debug("Using ComponentResolver: {} to resolve component with name: {}", getComponentResolver(), name); |
| } |
| |
| // Mark the component as being created so we can detect circular |
| // requests. |
| // |
| // In spring apps, the component resolver may trigger a new |
| // getComponent |
| // because of the underlying bean factory and as the endpoints |
| // are |
| // registered as singleton, the spring factory creates the bean |
| // and then check the type so the getComponent is always |
| // triggered. |
| // |
| // Simple circular dependency: |
| // |
| // <camelContext id="camel" |
| // xmlns="http://camel.apache.org/schema/spring"> |
| // <route> |
| // <from id="twitter" |
| // uri="twitter://timeline/home?type=polling"/> |
| // <log message="Got ${body}"/> |
| // </route> |
| // </camelContext> |
| // |
| // Complex circular dependency: |
| // |
| // <camelContext id="camel" |
| // xmlns="http://camel.apache.org/schema/spring"> |
| // <route> |
| // <from id="log" uri="seda:test"/> |
| // <to id="seda" uri="log:test"/> |
| // </route> |
| // </camelContext> |
| // |
| // This would freeze the app (lock or infinite loop). |
| // |
| // See https://issues.apache.org/jira/browse/CAMEL-11225 |
| componentsInCreation.get().add(name); |
| |
| component = getComponentResolver().resolveComponent(name, this); |
| if (component != null) { |
| component.setCamelContext(this); |
| postInitComponent(name, component); |
| } |
| } catch (Exception e) { |
| throw new RuntimeCamelException("Cannot auto create component: " + name, e); |
| } |
| } |
| return component; |
| } |
| |
| @Override |
| public <T extends Component> T getComponent(String name, Class<T> componentType) { |
| Component component = getComponent(name); |
| if (componentType.isInstance(component)) { |
| return componentType.cast(component); |
| } else { |
| String message; |
| if (component == null) { |
| message = "Did not find component given by the name: " + name; |
| } else { |
| message = "Found component of type: " + component.getClass() + " instead of expected: " + componentType; |
| } |
| throw new IllegalArgumentException(message); |
| } |
| } |
| |
| public Component resolveComponent(String name) { |
| Component answer = hasComponent(name); |
| if (answer == null) { |
| try { |
| answer = getComponentResolver().resolveComponent(name, this); |
| } catch (Exception e) { |
| throw new RuntimeCamelException("Cannot resolve component: " + name, e); |
| } |
| } |
| return answer; |
| } |
| |
| @Override |
| public Component removeComponent(String componentName) { |
| Component oldComponent = components.remove(componentName); |
| if (oldComponent != null) { |
| try { |
| stopServices(oldComponent); |
| } catch (Exception e) { |
| log.warn("Error stopping component " + oldComponent + ". This exception will be ignored.", e); |
| } |
| for (LifecycleStrategy strategy : lifecycleStrategies) { |
| strategy.onComponentRemove(componentName, oldComponent); |
| } |
| } |
| // keep reference to properties component up to date |
| if (oldComponent != null && "properties".equals(componentName)) { |
| propertiesComponent = null; |
| } |
| return oldComponent; |
| } |
| |
| // Endpoint Management Methods |
| // ----------------------------------------------------------------------- |
| |
| @Override |
| public EndpointRegistry<EndpointKey> getEndpointRegistry() { |
| return endpoints; |
| } |
| |
| @Override |
| public Collection<Endpoint> getEndpoints() { |
| return new ArrayList<>(endpoints.values()); |
| } |
| |
| @Override |
| public Map<String, Endpoint> getEndpointMap() { |
| Map<String, Endpoint> answer = new TreeMap<>(); |
| for (Map.Entry<EndpointKey, Endpoint> entry : endpoints.entrySet()) { |
| answer.put(entry.getKey().get(), entry.getValue()); |
| } |
| return answer; |
| } |
| |
| @Override |
| public Endpoint hasEndpoint(String uri) { |
| return endpoints.get(getEndpointKey(uri)); |
| } |
| |
| @Override |
| public Endpoint addEndpoint(String uri, Endpoint endpoint) throws Exception { |
| Endpoint oldEndpoint; |
| |
| startService(endpoint); |
| oldEndpoint = endpoints.remove(getEndpointKey(uri)); |
| for (LifecycleStrategy strategy : lifecycleStrategies) { |
| strategy.onEndpointAdd(endpoint); |
| } |
| addEndpointToRegistry(uri, endpoint); |
| if (oldEndpoint != null && oldEndpoint != endpoint) { |
| stopServices(oldEndpoint); |
| } |
| |
| return oldEndpoint; |
| } |
| |
| @Override |
| public void removeEndpoint(Endpoint endpoint) throws Exception { |
| removeEndpoints(endpoint.getEndpointUri()); |
| } |
| |
| @Override |
| public Collection<Endpoint> removeEndpoints(String uri) throws Exception { |
| Collection<Endpoint> answer = new ArrayList<>(); |
| Endpoint oldEndpoint = endpoints.remove(getEndpointKey(uri)); |
| if (oldEndpoint != null) { |
| answer.add(oldEndpoint); |
| stopServices(oldEndpoint); |
| } else { |
| for (Map.Entry<EndpointKey, Endpoint> entry : endpoints.entrySet()) { |
| oldEndpoint = entry.getValue(); |
| if (EndpointHelper.matchEndpoint(this, oldEndpoint.getEndpointUri(), uri)) { |
| try { |
| stopServices(oldEndpoint); |
| } catch (Exception e) { |
| log.warn("Error stopping endpoint " + oldEndpoint + ". This exception will be ignored.", e); |
| } |
| answer.add(oldEndpoint); |
| endpoints.remove(entry.getKey()); |
| } |
| } |
| } |
| |
| // notify lifecycle its being removed |
| for (Endpoint endpoint : answer) { |
| for (LifecycleStrategy strategy : lifecycleStrategies) { |
| strategy.onEndpointRemove(endpoint); |
| } |
| } |
| |
| return answer; |
| } |
| |
| @Override |
| public Endpoint getEndpoint(String uri) { |
| init(); |
| |
| StringHelper.notEmpty(uri, "uri"); |
| |
| log.trace("Getting endpoint with uri: {}", uri); |
| |
| // in case path has property placeholders then try to let property |
| // component resolve those |
| try { |
| uri = resolvePropertyPlaceholders(uri); |
| } catch (Exception e) { |
| throw new ResolveEndpointFailedException(uri, e); |
| } |
| |
| final String rawUri = uri; |
| |
| // normalize uri so we can do endpoint hits with minor mistakes and |
| // parameters is not in the same order |
| uri = normalizeEndpointUri(uri); |
| |
| log.trace("Getting endpoint with raw uri: {}, normalized uri: {}", rawUri, uri); |
| |
| Endpoint answer; |
| String scheme = null; |
| // use optimized method to get the endpoint uri |
| EndpointKey key = getEndpointKeyPreNormalized(uri); |
| answer = endpoints.get(key); |
| if (answer == null) { |
| try { |
| // Use the URI prefix to find the component. |
| String[] splitURI = StringHelper.splitOnCharacter(uri, ":", 2); |
| if (splitURI[1] != null) { |
| scheme = splitURI[0]; |
| log.trace("Endpoint uri: {} is from component with name: {}", uri, scheme); |
| Component component = getComponent(scheme); |
| |
| // Ask the component to resolve the endpoint. |
| if (component != null) { |
| log.trace("Creating endpoint from uri: {} using component: {}", uri, component); |
| |
| // Have the component create the endpoint if it can. |
| if (component.useRawUri()) { |
| answer = component.createEndpoint(rawUri); |
| } else { |
| answer = component.createEndpoint(uri); |
| } |
| |
| if (answer != null && log.isDebugEnabled()) { |
| log.debug("{} converted to endpoint: {} by component: {}", URISupport.sanitizeUri(uri), answer, component); |
| } |
| } |
| } |
| |
| if (answer == null) { |
| // no component then try in registry and elsewhere |
| answer = createEndpoint(uri); |
| log.trace("No component to create endpoint from uri: {} fallback lookup in registry -> {}", uri, answer); |
| } |
| |
| if (answer == null && splitURI[1] == null) { |
| // the uri has no context-path which is rare and it was not |
| // referring to an endpoint in the registry |
| // so try to see if it can be created by a component |
| |
| int pos = uri.indexOf('?'); |
| String componentName = pos > 0 ? uri.substring(0, pos) : uri; |
| |
| Component component = getComponent(componentName); |
| |
| // Ask the component to resolve the endpoint. |
| if (component != null) { |
| log.trace("Creating endpoint from uri: {} using component: {}", uri, component); |
| |
| // Have the component create the endpoint if it can. |
| if (component.useRawUri()) { |
| answer = component.createEndpoint(rawUri); |
| } else { |
| answer = component.createEndpoint(uri); |
| } |
| |
| if (answer != null && log.isDebugEnabled()) { |
| log.debug("{} converted to endpoint: {} by component: {}", URISupport.sanitizeUri(uri), answer, component); |
| } |
| } |
| |
| } |
| |
| if (answer != null) { |
| addService(answer); |
| answer = addEndpointToRegistry(uri, answer); |
| } |
| } catch (Exception e) { |
| throw new ResolveEndpointFailedException(uri, e); |
| } |
| } |
| |
| // unknown scheme |
| if (answer == null && scheme != null) { |
| throw new ResolveEndpointFailedException(uri, "No component found with scheme: " + scheme); |
| } |
| |
| return answer; |
| } |
| |
| @Override |
| public Endpoint getEndpoint(String uri, Map<String, Object> parameters) { |
| init(); |
| |
| StringHelper.notEmpty(uri, "uri"); |
| |
| log.trace("Getting endpoint with uri: {} and parameters: {}", uri, parameters); |
| |
| // in case path has property placeholders then try to let property |
| // component resolve those |
| try { |
| uri = resolvePropertyPlaceholders(uri); |
| } catch (Exception e) { |
| throw new ResolveEndpointFailedException(uri, e); |
| } |
| |
| final String rawUri = uri; |
| |
| // normalize uri so we can do endpoint hits with minor mistakes and |
| // parameters is not in the same order |
| uri = normalizeEndpointUri(uri); |
| |
| log.trace("Getting endpoint with raw uri: {}, normalized uri: {}", rawUri, uri); |
| |
| Endpoint answer; |
| String scheme = null; |
| // use optimized method to get the endpoint uri |
| EndpointKey key = getEndpointKeyPreNormalized(uri); |
| answer = endpoints.get(key); |
| if (answer == null) { |
| try { |
| // Use the URI prefix to find the component. |
| String[] splitURI = StringHelper.splitOnCharacter(uri, ":", 2); |
| if (splitURI[1] != null) { |
| scheme = splitURI[0]; |
| log.trace("Endpoint uri: {} is from component with name: {}", uri, scheme); |
| Component component = getComponent(scheme); |
| |
| // Ask the component to resolve the endpoint. |
| if (component != null) { |
| log.trace("Creating endpoint from uri: {} using component: {}", uri, component); |
| |
| // Have the component create the endpoint if it can. |
| if (component.useRawUri()) { |
| answer = component.createEndpoint(rawUri, parameters); |
| } else { |
| answer = component.createEndpoint(uri, parameters); |
| } |
| |
| if (answer != null && log.isDebugEnabled()) { |
| log.debug("{} converted to endpoint: {} by component: {}", URISupport.sanitizeUri(uri), answer, component); |
| } |
| } |
| } |
| |
| if (answer == null) { |
| // no component then try in registry and elsewhere |
| answer = createEndpoint(uri); |
| log.trace("No component to create endpoint from uri: {} fallback lookup in registry -> {}", uri, answer); |
| } |
| |
| if (answer == null && splitURI[1] == null) { |
| // the uri has no context-path which is rare and it was not |
| // referring to an endpoint in the registry |
| // so try to see if it can be created by a component |
| |
| int pos = uri.indexOf('?'); |
| String componentName = pos > 0 ? uri.substring(0, pos) : uri; |
| |
| Component component = getComponent(componentName); |
| |
| // Ask the component to resolve the endpoint. |
| if (component != null) { |
| log.trace("Creating endpoint from uri: {} using component: {}", uri, component); |
| |
| // Have the component create the endpoint if it can. |
| if (component.useRawUri()) { |
| answer = component.createEndpoint(rawUri, parameters); |
| } else { |
| answer = component.createEndpoint(uri, parameters); |
| } |
| |
| if (answer != null && log.isDebugEnabled()) { |
| log.debug("{} converted to endpoint: {} by component: {}", URISupport.sanitizeUri(uri), answer, component); |
| } |
| } |
| |
| } |
| |
| if (answer != null) { |
| addService(answer); |
| answer = addEndpointToRegistry(uri, answer); |
| } |
| } catch (Exception e) { |
| throw new ResolveEndpointFailedException(uri, e); |
| } |
| } |
| |
| // unknown scheme |
| if (answer == null && scheme != null) { |
| throw new ResolveEndpointFailedException(uri, "No component found with scheme: " + scheme); |
| } |
| |
| return answer; |
| } |
| |
| @Override |
| public <T extends Endpoint> T getEndpoint(String name, Class<T> endpointType) { |
| Endpoint endpoint = getEndpoint(name); |
| if (endpoint == null) { |
| throw new NoSuchEndpointException(name); |
| } |
| if (endpoint instanceof InterceptSendToEndpoint) { |
| endpoint = ((InterceptSendToEndpoint)endpoint).getOriginalEndpoint(); |
| } |
| if (endpointType.isInstance(endpoint)) { |
| return endpointType.cast(endpoint); |
| } else { |
| throw new IllegalArgumentException("The endpoint is not of type: " + endpointType + " but is: " + endpoint.getClass().getCanonicalName()); |
| } |
| } |
| |
| @Override |
| public void registerEndpointCallback(EndpointStrategy strategy) { |
| if (!endpointStrategies.contains(strategy)) { |
| // let it be invoked for already registered endpoints so it can |
| // catch-up. |
| endpointStrategies.add(strategy); |
| for (Endpoint endpoint : getEndpoints()) { |
| Endpoint newEndpoint = strategy.registerEndpoint(endpoint.getEndpointUri(), endpoint); |
| if (newEndpoint != null) { |
| // put will replace existing endpoint with the new endpoint |
| endpoints.put(getEndpointKey(endpoint.getEndpointUri()), newEndpoint); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Strategy to add the given endpoint to the internal endpoint registry |
| * |
| * @param uri uri of the endpoint |
| * @param endpoint the endpoint to add |
| * @return the added endpoint |
| */ |
| protected Endpoint addEndpointToRegistry(String uri, Endpoint endpoint) { |
| StringHelper.notEmpty(uri, "uri"); |
| ObjectHelper.notNull(endpoint, "endpoint"); |
| |
| // if there is endpoint strategies, then use the endpoints they return |
| // as this allows to intercept endpoints etc. |
| for (EndpointStrategy strategy : endpointStrategies) { |
| endpoint = strategy.registerEndpoint(uri, endpoint); |
| } |
| endpoints.put(getEndpointKey(uri, endpoint), endpoint); |
| return endpoint; |
| } |
| |
| /** |
| * Normalize uri so we can do endpoint hits with minor mistakes and |
| * parameters is not in the same order. |
| * |
| * @param uri the uri |
| * @return normalized uri |
| * @throws ResolveEndpointFailedException if uri cannot be normalized |
| */ |
| protected static String normalizeEndpointUri(String uri) { |
| try { |
| uri = URISupport.normalizeUri(uri); |
| } catch (Exception e) { |
| throw new ResolveEndpointFailedException(uri, e); |
| } |
| return uri; |
| } |
| |
| /** |
| * Gets the endpoint key to use for lookup or whe adding endpoints to the |
| * {@link DefaultEndpointRegistry} |
| * |
| * @param uri the endpoint uri |
| * @return the key |
| */ |
| protected EndpointKey getEndpointKey(String uri) { |
| return new EndpointKey(uri); |
| } |
| |
| /** |
| * Gets the endpoint key to use for lookup or whe adding endpoints to the |
| * {@link DefaultEndpointRegistry} |
| * |
| * @param uri the endpoint uri which is pre normalized |
| * @return the key |
| */ |
| protected EndpointKey getEndpointKeyPreNormalized(String uri) { |
| return new EndpointKey(uri, true); |
| } |
| |
| /** |
| * Gets the endpoint key to use for lookup or whe adding endpoints to the |
| * {@link DefaultEndpointRegistry} |
| * |
| * @param uri the endpoint uri |
| * @param endpoint the endpoint |
| * @return the key |
| */ |
| protected EndpointKey getEndpointKey(String uri, Endpoint endpoint) { |
| if (endpoint != null && !endpoint.isSingleton()) { |
| int counter = endpointKeyCounter.incrementAndGet(); |
| return new EndpointKey(uri + ":" + counter); |
| } else { |
| return new EndpointKey(uri); |
| } |
| } |
| |
| @Override |
| public GlobalEndpointConfiguration getGlobalEndpointConfiguration() { |
| return globalEndpointConfiguration; |
| } |
| |
| // Route Management Methods |
| // ----------------------------------------------------------------------- |
| |
| @Override |
| public void setRouteController(RouteController routeController) { |
| this.routeController = routeController; |
| doAddService(routeController); |
| } |
| |
| @Override |
| public RouteController getRouteController() { |
| if (routeController == null) { |
| synchronized (lock) { |
| if (routeController == null) { |
| setRouteController(createRouteController()); |
| } |
| } |
| } |
| return routeController; |
| } |
| |
| @Override |
| public List<RouteStartupOrder> getRouteStartupOrder() { |
| return routeStartupOrder; |
| } |
| |
| @Override |
| public List<Route> getRoutes() { |
| // lets return a copy of the collection as objects are removed later |
| // when services are stopped |
| if (routes.isEmpty()) { |
| return Collections.emptyList(); |
| } else { |
| synchronized (routes) { |
| return new ArrayList<>(routes); |
| } |
| } |
| } |
| |
| @Override |
| public Route getRoute(String id) { |
| if (id != null) { |
| for (Route route : getRoutes()) { |
| if (route.getId().equals(id)) { |
| return route; |
| } |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public Processor getProcessor(String id) { |
| for (Route route : getRoutes()) { |
| List<Processor> list = route.filter(id); |
| if (list.size() == 1) { |
| return list.get(0); |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public <T extends Processor> T getProcessor(String id, Class<T> type) { |
| Processor answer = getProcessor(id); |
| if (answer != null) { |
| return type.cast(answer); |
| } |
| return null; |
| } |
| |
| public void removeRoute(Route route) { |
| synchronized (this.routes) { |
| this.routes.remove(route); |
| } |
| } |
| |
| public void addRoute(Route route) { |
| synchronized (this.routes) { |
| this.routes.add(route); |
| } |
| } |
| |
| @Override |
| public void addRoutes(final RoutesBuilder builder) throws Exception { |
| init(); |
| log.debug("Adding routes from builder: {}", builder); |
| doWithDefinedClassLoader(() -> builder.addRoutesToCamelContext(AbstractCamelContext.this)); |
| } |
| |
| public ServiceStatus getRouteStatus(String key) { |
| BaseRouteService routeService = routeServices.get(key); |
| if (routeService != null) { |
| return routeService.getStatus(); |
| } |
| return null; |
| } |
| |
| public boolean isStartingRoutes() { |
| Boolean answer = isStartingRoutes.get(); |
| return answer != null && answer; |
| } |
| |
| public void setStartingRoutes(boolean starting) { |
| if (starting) { |
| isStartingRoutes.set(true); |
| } else { |
| isStartingRoutes.remove(); |
| } |
| } |
| |
| @Override |
| public boolean isSetupRoutes() { |
| Boolean answer = isSetupRoutes.get(); |
| return answer != null && answer; |
| } |
| |
| public void startAllRoutes() throws Exception { |
| doStartOrResumeRoutes(routeServices, true, true, false, false); |
| } |
| |
| public synchronized void startRoute(String routeId) throws Exception { |
| DefaultRouteError.reset(this, routeId); |
| |
| BaseRouteService routeService = routeServices.get(routeId); |
| if (routeService != null) { |
| try { |
| startRouteService(routeService, false); |
| } catch (Exception e) { |
| DefaultRouteError.set(this, routeId, Phase.START, e); |
| throw e; |
| } |
| } |
| } |
| |
| public synchronized void resumeRoute(String routeId) throws Exception { |
| DefaultRouteError.reset(this, routeId); |
| |
| try { |
| if (!routeSupportsSuspension(routeId)) { |
| // start route if suspension is not supported |
| startRoute(routeId); |
| return; |
| } |
| |
| BaseRouteService routeService = routeServices.get(routeId); |
| if (routeService != null) { |
| resumeRouteService(routeService); |
| // must resume the route as well |
| Route route = getRoute(routeId); |
| ServiceHelper.resumeService(route); |
| } |
| } catch (Exception e) { |
| DefaultRouteError.set(this, routeId, Phase.RESUME, e); |
| throw e; |
| } |
| } |
| |
| public synchronized boolean stopRoute(String routeId, long timeout, TimeUnit timeUnit, boolean abortAfterTimeout) throws Exception { |
| DefaultRouteError.reset(this, routeId); |
| |
| BaseRouteService routeService = routeServices.get(routeId); |
| if (routeService != null) { |
| try { |
| RouteStartupOrder route = new DefaultRouteStartupOrder(1, routeService.getRoute(), routeService); |
| |
| boolean completed = getShutdownStrategy().shutdown(this, route, timeout, timeUnit, abortAfterTimeout); |
| if (completed) { |
| // must stop route service as well |
| stopRouteService(routeService, false); |
| } else { |
| // shutdown was aborted, make sure route is re-started |
| // properly |
| startRouteService(routeService, false); |
| } |
| return completed; |
| } catch (Exception e) { |
| DefaultRouteError.set(this, routeId, Phase.STOP, e); |
| throw e; |
| } |
| } |
| |
| return false; |
| } |
| |
| public void stopRoute(String routeId) throws Exception { |
| doShutdownRoute(routeId, getShutdownStrategy().getTimeout(), getShutdownStrategy().getTimeUnit(), false); |
| } |
| |
| public void stopRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { |
| doShutdownRoute(routeId, timeout, timeUnit, false); |
| } |
| |
| protected synchronized void doShutdownRoute(String routeId, long timeout, TimeUnit timeUnit, boolean removingRoutes) throws Exception { |
| DefaultRouteError.reset(this, routeId); |
| |
| BaseRouteService routeService = routeServices.get(routeId); |
| if (routeService != null) { |
| try { |
| List<RouteStartupOrder> routes = new ArrayList<>(1); |
| RouteStartupOrder order = new DefaultRouteStartupOrder(1, routeService.getRoute(), routeService); |
| routes.add(order); |
| |
| getShutdownStrategy().shutdown(this, routes, timeout, timeUnit); |
| // must stop route service as well (and remove the routes from |
| // management) |
| stopRouteService(routeService, removingRoutes); |
| } catch (Exception e) { |
| DefaultRouteError.set(this, routeId, removingRoutes ? Phase.SHUTDOWN : Phase.STOP, e); |
| throw e; |
| } |
| } |
| } |
| |
| @Override |
| public synchronized boolean removeRoute(String routeId) throws Exception { |
| DefaultRouteError.reset(this, routeId); |
| |
| // gather a map of all the endpoints in use by the routes, so we can |
| // known if a given endpoints is in use |
| // by one or more routes, when we remove the route |
| Map<String, Set<Endpoint>> endpointsInUse = new HashMap<>(); |
| for (Map.Entry<String, BaseRouteService> entry : routeServices.entrySet()) { |
| endpointsInUse.put(entry.getKey(), entry.getValue().gatherEndpoints()); |
| } |
| |
| BaseRouteService routeService = routeServices.get(routeId); |
| if (routeService != null) { |
| if (getRouteStatus(routeId).isStopped()) { |
| try { |
| routeService.setRemovingRoutes(true); |
| shutdownRouteService(routeService); |
| routeServices.remove(routeId); |
| // remove route from startup order as well, as it was |
| // removed |
| routeStartupOrder.removeIf(order -> order.getRoute().getId().equals(routeId)); |
| |
| // from the route which we have removed, then remove all its |
| // private endpoints |
| // (eg the endpoints which are not in use by other routes) |
| Set<Endpoint> toRemove = new LinkedHashSet<>(); |
| for (Endpoint endpoint : endpointsInUse.get(routeId)) { |
| // how many times is the endpoint in use |
| int count = 0; |
| for (Set<Endpoint> endpoints : endpointsInUse.values()) { |
| if (endpoints.contains(endpoint)) { |
| count++; |
| } |
| } |
| // notice we will count ourselves so if there is only 1 |
| // then its safe to remove |
| if (count <= 1) { |
| toRemove.add(endpoint); |
| } |
| } |
| for (Endpoint endpoint : toRemove) { |
| log.debug("Removing: {} which was only in use by route: {}", endpoint, routeId); |
| removeEndpoint(endpoint); |
| } |
| } catch (Exception e) { |
| DefaultRouteError.set(this, routeId, Phase.REMOVE, e); |
| throw e; |
| } |
| |
| return true; |
| } else { |
| return false; |
| } |
| } |
| return false; |
| } |
| |
| public void suspendRoute(String routeId) throws Exception { |
| suspendRoute(routeId, getShutdownStrategy().getTimeout(), getShutdownStrategy().getTimeUnit()); |
| } |
| |
| public synchronized void suspendRoute(String routeId, long timeout, TimeUnit timeUnit) throws Exception { |
| DefaultRouteError.reset(this, routeId); |
| |
| try { |
| if (!routeSupportsSuspension(routeId)) { |
| stopRoute(routeId, timeout, timeUnit); |
| return; |
| } |
| |
| BaseRouteService routeService = routeServices.get(routeId); |
| if (routeService != null) { |
| List<RouteStartupOrder> routes = new ArrayList<>(1); |
| Route route = routeService.getRoute(); |
| RouteStartupOrder order = new DefaultRouteStartupOrder(1, route, routeService); |
| routes.add(order); |
| |
| getShutdownStrategy().suspend(this, routes, timeout, timeUnit); |
| // must suspend route service as well |
| suspendRouteService(routeService); |
| // must suspend the route as well |
| if (route instanceof SuspendableService) { |
| ((SuspendableService)route).suspend(); |
| } |
| } |
| } catch (Exception e) { |
| DefaultRouteError.set(this, routeId, Phase.SUSPEND, e); |
| throw e; |
| } |
| } |
| |
| @Override |
| public void addService(Object object) throws Exception { |
| addService(object, true); |
| } |
| |
| @Override |
| public void addService(Object object, boolean stopOnShutdown) throws Exception { |
| addService(object, stopOnShutdown, false); |
| } |
| |
| @Override |
| public void addService(Object object, boolean stopOnShutdown, boolean forceStart) throws Exception { |
| internalAddService(object, stopOnShutdown, forceStart); |
| } |
| |
| protected <T> T doAddService(T object) { |
| return doAddService(object, true); |
| } |
| |
| protected <T> T doAddService(T object, boolean stopOnShutdown) { |
| return doAddService(object, stopOnShutdown, true); |
| } |
| |
| protected <T> T doAddService(T object, boolean stopOnShutdown, boolean forceStart) { |
| try { |
| internalAddService(object, stopOnShutdown, forceStart); |
| } catch (Exception e) { |
| throw RuntimeCamelException.wrapRuntimeCamelException(e); |
| } |
| return object; |
| } |
| |
| private void internalAddService(Object object, boolean stopOnShutdown, boolean forceStart) throws Exception { |
| |
| // inject CamelContext |
| if (object instanceof CamelContextAware) { |
| CamelContextAware aware = (CamelContextAware)object; |
| aware.setCamelContext(this); |
| } |
| |
| if (object instanceof Service) { |
| Service service = (Service)object; |
| |
| for (LifecycleStrategy strategy : lifecycleStrategies) { |
| if (service instanceof Endpoint) { |
| // use specialized endpoint add |
| strategy.onEndpointAdd((Endpoint)service); |
| } else { |
| strategy.onServiceAdd(this, service, null); |
| } |
| } |
| |
| if (!forceStart) { |
| // now start the service (and defer starting if CamelContext is |
| // starting up itself) |
| deferStartService(object, stopOnShutdown); |
| } else { |
| // only add to services to close if its a singleton |
| // otherwise we could for example end up with a lot of prototype |
| // scope endpoints |
| boolean singleton = true; // assume singleton by default |
| if (object instanceof IsSingleton) { |
| singleton = ((IsSingleton)service).isSingleton(); |
| } |
| // do not add endpoints as they have their own list |
| if (singleton && !(service instanceof Endpoint)) { |
| // only add to list of services to stop if its not already there |
| if (stopOnShutdown && !hasService(service)) { |
| // special for type converter / type converter registry which is stopped manual later |
| boolean tc = service instanceof TypeConverter || service instanceof TypeConverterRegistry; |
| if (!tc) { |
| servicesToStop.add(service); |
| } |
| } |
| } |
| ServiceHelper.startService(service); |
| } |
| } |
| } |
| |
| @Override |
| public boolean removeService(Object object) throws Exception { |
| if (object instanceof Endpoint) { |
| removeEndpoint((Endpoint)object); |
| return true; |
| } |
| if (object instanceof Service) { |
| Service service = (Service)object; |
| for (LifecycleStrategy strategy : lifecycleStrategies) { |
| strategy.onServiceRemove(this, service, null); |
| } |
| return servicesToStop.remove(service); |
| } |
| return false; |
| } |
| |
| @Override |
| public boolean hasService(Object object) { |
| if (object instanceof Service) { |
| Service service = (Service)object; |
| return servicesToStop.contains(service); |
| } |
| return false; |
| } |
| |
| @Override |
| public <T> T hasService(Class<T> type) { |
| for (Service service : servicesToStop) { |
| if (type.isInstance(service)) { |
| return type.cast(service); |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public <T> Set<T> hasServices(Class<T> type) { |
| Set<T> set = new HashSet<>(); |
| for (Service service : servicesToStop) { |
| if (type.isInstance(service)) { |
| set.add((T)service); |
| } |
| } |
| return set; |
| } |
| |
| @Override |
| public void deferStartService(Object object, boolean stopOnShutdown) throws Exception { |
| if (object instanceof Service) { |
| Service service = (Service)object; |
| |
| // only add to services to close if its a singleton |
| // otherwise we could for example end up with a lot of prototype |
| // scope endpoints |
| boolean singleton = true; // assume singleton by default |
| if (object instanceof IsSingleton) { |
| singleton = ((IsSingleton)service).isSingleton(); |
| } |
| // do not add endpoints as they have their own list |
| if (singleton && !(service instanceof Endpoint)) { |
| // only add to list of services to stop if its not already there |
| if (stopOnShutdown && !hasService(service)) { |
| servicesToStop.add(service); |
| } |
| } |
| // are we already started? |
| if (isStarted()) { |
| ServiceHelper.startService(service); |
| } else { |
| deferStartupListener.addService(service); |
| } |
| } |
| } |
| |
| @Override |
| public void addStartupListener(StartupListener listener) throws Exception { |
| // either add to listener so we can invoke then later when CamelContext |
| // has been started |
| // or invoke the callback right now |
| if (isStarted()) { |
| listener.onCamelContextStarted(this, true); |
| } else { |
| startupListeners.add(listener); |
| } |
| } |
| |
| public String getComponentParameterJsonSchema(String componentName) throws IOException { |
| // use the component factory finder to find the package name of the |
| // component class, which is the location |
| // where the documentation exists as well |
| FactoryFinder finder = getFactoryFinder(DefaultComponentResolver.RESOURCE_PATH); |
| Class<?> clazz = finder.findClass(componentName).orElse(null); |
| if (clazz == null) { |
| // fallback and find existing component |
| Component existing = hasComponent(componentName); |
| if (existing != null) { |
| clazz = existing.getClass(); |
| } else { |
| return null; |
| } |
| } |
| |
| String packageName = clazz.getPackage().getName(); |
| packageName = packageName.replace('.', '/'); |
| String path = packageName + "/" + componentName + ".json"; |
| |
| ClassResolver resolver = getClassResolver(); |
| InputStream inputStream = resolver.loadResourceAsStream(path); |
| log.debug("Loading component JSON Schema for: {} using class resolver: {} -> {}", componentName, resolver, inputStream); |
| if (inputStream != null) { |
| try { |
| return IOHelper.loadText(inputStream); |
| } finally { |
| IOHelper.close(inputStream); |
| } |
| } |
| // special for ActiveMQ as it is really just JMS |
| if ("ActiveMQComponent".equals(clazz.getSimpleName())) { |
| return getComponentParameterJsonSchema("jms"); |
| } else { |
| return null; |
| } |
| } |
| |
| public String getDataFormatParameterJsonSchema(String dataFormatName) throws IOException { |
| // use the dataformat factory finder to find the package name of the |
| // dataformat class, which is the location |
| // where the documentation exists as well |
| FactoryFinder finder = getFactoryFinder(DefaultDataFormatResolver.DATAFORMAT_RESOURCE_PATH); |
| Class<?> clazz = finder.findClass(dataFormatName).orElse(null); |
| if (clazz == null) { |
| return null; |
| } |
| |
| String packageName = clazz.getPackage().getName(); |
| packageName = packageName.replace('.', '/'); |
| String path = packageName + "/" + dataFormatName + ".json"; |
| |
| ClassResolver resolver = getClassResolver(); |
| InputStream inputStream = resolver.loadResourceAsStream(path); |
| log.debug("Loading dataformat JSON Schema for: {} using class resolver: {} -> {}", dataFormatName, resolver, inputStream); |
| if (inputStream != null) { |
| try { |
| return IOHelper.loadText(inputStream); |
| } finally { |
| IOHelper.close(inputStream); |
| } |
| } |
| return null; |
| } |
| |
| public String getLanguageParameterJsonSchema(String languageName) throws IOException { |
| // use the language factory finder to find the package name of the |
| // language class, which is the location |
| // where the documentation exists as well |
| FactoryFinder finder = getFactoryFinder(DefaultLanguageResolver.LANGUAGE_RESOURCE_PATH); |
| Class<?> clazz = finder.findClass(languageName).orElse(null); |
| if (clazz == null) { |
| return null; |
| } |
| |
| String packageName = clazz.getPackage().getName(); |
| packageName = packageName.replace('.', '/'); |
| String path = packageName + "/" + languageName + ".json"; |
| |
| ClassResolver resolver = getClassResolver(); |
| InputStream inputStream = resolver.loadResourceAsStream(path); |
| log.debug("Loading language JSON Schema for: {} using class resolver: {} -> {}", languageName, resolver, inputStream); |
| if (inputStream != null) { |
| try { |
| return IOHelper.loadText(inputStream); |
| } finally { |
| IOHelper.close(inputStream); |
| } |
| } |
| return null; |
| } |
| |
| public String getEipParameterJsonSchema(String eipName) throws IOException { |
| // the eip json schema may be in some of the sub-packages so look until |
| // we find it |
| String[] subPackages = new String[] {"", "/config", "/dataformat", "/language", "/loadbalancer", "/rest"}; |
| for (String sub : subPackages) { |
| String path = CamelContextHelper.MODEL_DOCUMENTATION_PREFIX + sub + "/" + eipName + ".json"; |
| ClassResolver resolver = getClassResolver(); |
| InputStream inputStream = resolver.loadResourceAsStream(path); |
| if (inputStream != null) { |
| log.debug("Loading eip JSON Schema for: {} using class resolver: {} -> {}", eipName, resolver, inputStream); |
| try { |
| return IOHelper.loadText(inputStream); |
| } finally { |
| IOHelper.close(inputStream); |
| } |
| } |
| } |
| return null; |
| } |
| |
| // Helper methods |
| // ----------------------------------------------------------------------- |
| |
| @Override |
| public Language resolveLanguage(String language) { |
| Language answer; |
| synchronized (languages) { |
| answer = languages.get(language); |
| |
| // check if the language is singleton, if so return the shared |
| // instance |
| if (answer instanceof IsSingleton) { |
| boolean singleton = ((IsSingleton)answer).isSingleton(); |
| if (singleton) { |
| return answer; |
| } |
| } |
| |
| // language not known or not singleton, then use resolver |
| answer = getLanguageResolver().resolveLanguage(language, this); |
| |
| // inject CamelContext if aware |
| if (answer != null) { |
| if (answer instanceof CamelContextAware) { |
| ((CamelContextAware)answer).setCamelContext(this); |
| } |
| if (answer instanceof Service) { |
| try { |
| startService((Service)answer); |
| } catch (Exception e) { |
| throw RuntimeCamelException.wrapRuntimeCamelException(e); |
| } |
| } |
| |
| languages.put(language, answer); |
| } |
| } |
| |
| return answer; |
| } |
| |
| @Override |
| public String resolvePropertyPlaceholders(String text) { |
| if (text != null && text.contains(PropertiesComponent.PREFIX_TOKEN)) { |
| // the parser will throw exception if property key was not found |
| String answer = getPropertiesComponent().parseUri(text); |
| log.debug("Resolved text: {} -> {}", text, answer); |
| return answer; |
| } |
| |
| // return original text as is |
| return text; |
| } |
| |
| // Properties |
| // ----------------------------------------------------------------------- |
| |
| @Override |
| public TypeConverter getTypeConverter() { |
| if (typeConverter == null) { |
| synchronized (lock) { |
| if (typeConverter == null) { |
| setTypeConverter(createTypeConverter()); |
| } |
| } |
| } |
| return typeConverter; |
| } |
| |
| public void setTypeConverter(TypeConverter typeConverter) { |
| this.typeConverter = doAddService(typeConverter); |
| } |
| |
| @Override |
| public TypeConverterRegistry getTypeConverterRegistry() { |
| if (typeConverterRegistry == null) { |
| synchronized (lock) { |
| if (typeConverterRegistry == null) { |
| setTypeConverterRegistry(createTypeConverterRegistry()); |
| } |
| } |
| } |
| return typeConverterRegistry; |
| } |
| |
| @Override |
| public void setTypeConverterRegistry(TypeConverterRegistry typeConverterRegistry) { |
| this.typeConverterRegistry = doAddService(typeConverterRegistry); |
| // some registries are also a type converter implementation |
| if (typeConverterRegistry instanceof TypeConverter) { |
| this.typeConverter = (TypeConverter)typeConverterRegistry; |
| } |
| } |
| |
| @Override |
| public Injector getInjector() { |
| if (injector == null) { |
| synchronized (lock) { |
| if (injector == null) { |
| setInjector(createInjector()); |
| } |
| } |
| } |
| return injector; |
| } |
| |
| @Override |
| public void setInjector(Injector injector) { |
| this.injector = doAddService(injector); |
| } |
| |
| @Override |
| public PropertiesComponent getPropertiesComponent() { |
| if (propertiesComponent == null) { |
| synchronized (lock) { |
| if (propertiesComponent == null) { |
| setPropertiesComponent(createPropertiesComponent()); |
| } |
| } |
| } |
| return propertiesComponent; |
| } |
| |
| @Override |
| public void setPropertiesComponent(PropertiesComponent propertiesComponent) { |
| this.propertiesComponent = doAddService(propertiesComponent); |
| } |
| |
| @Override |
| public CamelBeanPostProcessor getBeanPostProcessor() { |
| if (beanPostProcessor == null) { |
| synchronized (lock) { |
| if (beanPostProcessor == null) { |
| setBeanPostProcessor(createBeanPostProcessor()); |
| } |
| } |
| } |
| return beanPostProcessor; |
| } |
| |
| public void setBeanPostProcessor(CamelBeanPostProcessor beanPostProcessor) { |
| this.beanPostProcessor = doAddService(beanPostProcessor); |
| } |
| |
| @Override |
| public ManagementMBeanAssembler getManagementMBeanAssembler() { |
| return managementMBeanAssembler; |
| } |
| |
| public void setManagementMBeanAssembler(ManagementMBeanAssembler managementMBeanAssembler) { |
| this.managementMBeanAssembler = doAddService(managementMBeanAssembler, false); |
| } |
| |
| public ComponentResolver getComponentResolver() { |
| if (componentResolver == null) { |
| synchronized (lock) { |
| if (componentResolver == null) { |
| setComponentResolver(createComponentResolver()); |
| } |
| } |
| } |
| return componentResolver; |
| } |
| |
| public void setComponentResolver(ComponentResolver componentResolver) { |
| this.componentResolver = doAddService(componentResolver); |
| } |
| |
| public LanguageResolver getLanguageResolver() { |
| if (languageResolver == null) { |
| synchronized (lock) { |
| if (languageResolver == null) { |
| setLanguageResolver(createLanguageResolver()); |
| } |
| } |
| } |
| return languageResolver; |
| } |
| |
| public void setLanguageResolver(LanguageResolver languageResolver) { |
| this.languageResolver = doAddService(languageResolver); |
| } |
| |
| public boolean isAutoCreateComponents() { |
| return autoCreateComponents; |
| } |
| |
| public void setAutoCreateComponents(boolean autoCreateComponents) { |
| this.autoCreateComponents = autoCreateComponents; |
| } |
| |
| @Override |
| public Registry getRegistry() { |
| if (registry == null) { |
| synchronized (lock) { |
| if (registry == null) { |
| setRegistry(createRegistry()); |
| } |
| } |
| } |
| return registry; |
| } |
| |
| @Override |
| public <T> T getRegistry(Class<T> type) { |
| Registry reg = getRegistry(); |
| |
| if (type.isAssignableFrom(reg.getClass())) { |
| return type.cast(reg); |
| } |
| return null; |
| } |
| |
| @Override |
| public void setRegistry(Registry registry) { |
| if (registry instanceof CamelContextAware) { |
| ((CamelContextAware)registry).setCamelContext(this); |
| } |
| this.registry = registry; |
| } |
| |
| @Override |
| public List<LifecycleStrategy> getLifecycleStrategies() { |
| return lifecycleStrategies; |
| } |
| |
| @Override |
| public void addLifecycleStrategy(LifecycleStrategy lifecycleStrategy) { |
| getLifecycleStrategies().add(lifecycleStrategy); |
| } |
| |
| @Override |
| public void setupRoutes(boolean done) { |
| if (done) { |
| isSetupRoutes.remove(); |
| } else { |
| isSetupRoutes.set(true); |
| } |
| } |
| |
| @Override |
| public RestConfiguration getRestConfiguration() { |
| return restConfigurations.get(""); |
| } |
| |
| @Override |
| public void setRestConfiguration(RestConfiguration restConfiguration) { |
| restConfigurations.put("", restConfiguration); |
| } |
| |
| @Override |
| public Collection<RestConfiguration> getRestConfigurations() { |
| return restConfigurations.values(); |
| } |
| |
| @Override |
| public void addRestConfiguration(RestConfiguration restConfiguration) { |
| restConfigurations.put(restConfiguration.getComponent(), restConfiguration); |
| } |
| |
| @Override |
| public RestConfiguration getRestConfiguration(String component, boolean defaultIfNotExist) { |
| if (component == null) { |
| component = ""; |
| } |
| RestConfiguration config = restConfigurations.get(component); |
| if (config == null && defaultIfNotExist) { |
| // grab the default configuration |
| config = getRestConfiguration(); |
| if (config == null || (config.getComponent() != null && !config.getComponent().equals(component))) { |
| config = new RestConfiguration(); |
| restConfigurations.put(component, config); |
| } |
| } |
| return config; |
| } |
| |
| @Override |
| public List<InterceptStrategy> getInterceptStrategies() { |
| return interceptStrategies; |
| } |
| |
| public void setInterceptStrategies(List<InterceptStrategy> interceptStrategies) { |
| this.interceptStrategies = interceptStrategies; |
| } |
| |
| @Override |
| public void addInterceptStrategy(InterceptStrategy interceptStrategy) { |
| getInterceptStrategies().add(interceptStrategy); |
| } |
| |
| @Override |
| public List<RoutePolicyFactory> getRoutePolicyFactories() { |
| return routePolicyFactories; |
| } |
| |
| public void setRoutePolicyFactories(List<RoutePolicyFactory> routePolicyFactories) { |
| this.routePolicyFactories = routePolicyFactories; |
| } |
| |
| @Override |
| public void addRoutePolicyFactory(RoutePolicyFactory routePolicyFactory) { |
| getRoutePolicyFactories().add(routePolicyFactory); |
| } |
| |
| @Override |
| public Set<LogListener> getLogListeners() { |
| return logListeners; |
| } |
| |
| @Override |
| public void addLogListener(LogListener listener) { |
| logListeners.add(listener); |
| } |
| |
| @Override |
| public void setStreamCaching(Boolean cache) { |
| this.streamCache = cache; |
| } |
| |
| @Override |
| public Boolean isStreamCaching() { |
| return streamCache; |
| } |
| |
| @Override |
| public void setTracing(Boolean tracing) { |
| this.trace = tracing; |
| } |
| |
| @Override |
| public Boolean isTracing() { |
| return trace; |
| } |
| |
| @Override |
| public String getTracingPattern() { |
| return tracePattern; |
| } |
| |
| @Override |
| public void setTracingPattern(String tracePattern) { |
| this.tracePattern = tracePattern; |
| } |
| |
| @Override |
| public Boolean isBacklogTracing() { |
| return backlogTrace; |
| } |
| |
| @Override |
| public void setBacklogTracing(Boolean backlogTrace) { |
| this.backlogTrace = backlogTrace; |
| } |
| |
| @Override |
| public void setDebugging(Boolean debug) { |
| this.debug = debug; |
| } |
| |
| @Override |
| public Boolean isDebugging() { |
| return debug; |
| } |
| |
| @Override |
| public void setMessageHistory(Boolean messageHistory) { |
| this.messageHistory = messageHistory; |
| } |
| |
| @Override |
| public Boolean isMessageHistory() { |
| return messageHistory; |
| } |
| |
| @Override |
| public void setLogMask(Boolean logMask) { |
| this.logMask = logMask; |
| } |
| |
| @Override |
| public Boolean isLogMask() { |
| return logMask != null && logMask; |
| } |
| |
| @Override |
| public Boolean isLogExhaustedMessageBody() { |
| return logExhaustedMessageBody; |
| } |
| |
| @Override |
| public void setLogExhaustedMessageBody(Boolean logExhaustedMessageBody) { |
| this.logExhaustedMessageBody = logExhaustedMessageBody; |
| } |
| |
| @Override |
| public Long getDelayer() { |
| return delay; |
| } |
| |
| @Override |
| public void setDelayer(Long delay) { |
| this.delay = delay; |
| } |
| |
| @Override |
| public ProducerTemplate createProducerTemplate() { |
| return createProducerTemplate(0); |
| } |
| |
| @Override |
| public ProducerTemplate createProducerTemplate(int maximumCacheSize) { |
| DefaultProducerTemplate answer = new DefaultProducerTemplate(this); |
| answer.setMaximumCacheSize(maximumCacheSize); |
| // start it so its ready to use |
| try { |
| startService(answer); |
| } catch (Exception e) { |
| throw RuntimeCamelException.wrapRuntimeCamelException(e); |
| } |
| return answer; |
| } |
| |
| @Override |
| public FluentProducerTemplate createFluentProducerTemplate() { |
| return createFluentProducerTemplate(0); |
| } |
| |
| @Override |
| public FluentProducerTemplate createFluentProducerTemplate(int maximumCacheSize) { |
| DefaultFluentProducerTemplate answer = new DefaultFluentProducerTemplate(this); |
| answer.setMaximumCacheSize(maximumCacheSize); |
| // start it so its ready to use |
| try { |
| startService(answer); |
| } catch (Exception e) { |
| throw RuntimeCamelException.wrapRuntimeCamelException(e); |
| } |
| return answer; |
| } |
| |
| @Override |
| public ConsumerTemplate createConsumerTemplate() { |
| return createConsumerTemplate(0); |
| } |
| |
| @Override |
| public ConsumerTemplate createConsumerTemplate(int maximumCacheSize) { |
| DefaultConsumerTemplate answer = new DefaultConsumerTemplate(this); |
| answer.setMaximumCacheSize(maximumCacheSize); |
| // start it so its ready to use |
| try { |
| startService(answer); |
| } catch (Exception e) { |
| throw RuntimeCamelException.wrapRuntimeCamelException(e); |
| } |
| return answer; |
| } |
| |
| @Override |
| public ErrorHandlerFactory getErrorHandlerFactory() { |
| return errorHandlerFactory; |
| } |
| |
| @Override |
| public void setErrorHandlerFactory(ErrorHandlerFactory errorHandlerFactory) { |
| this.errorHandlerFactory = errorHandlerFactory; |
| } |
| |
| @Override |
| public ScheduledExecutorService getErrorHandlerExecutorService() { |
| if (errorHandlerExecutorService == null) { |
| synchronized (lock) { |
| if (errorHandlerExecutorService == null) { |
| // setup default thread pool for error handler |
| errorHandlerExecutorService = createErrorHandlerExecutorService(); |
| } |
| } |
| } |
| return errorHandlerExecutorService; |
| } |
| |
| protected ScheduledExecutorService createErrorHandlerExecutorService() { |
| return getExecutorServiceManager().newDefaultScheduledThreadPool("ErrorHandlerRedeliveryThreadPool", "ErrorHandlerRedeliveryTask"); |
| } |
| |
| public void setErrorHandlerExecutorService(ScheduledExecutorService errorHandlerExecutorService) { |
| this.errorHandlerExecutorService = errorHandlerExecutorService; |
| } |
| |
| public void setProducerServicePool(ServicePool<Producer> producerServicePool) { |
| this.producerServicePool = doAddService(producerServicePool); |
| } |
| |
| public ServicePool<Producer> getProducerServicePool() { |
| if (producerServicePool == null) { |
| synchronized (lock) { |
| if (producerServicePool == null) { |
| setProducerServicePool(createProducerServicePool()); |
| } |
| } |
| } |
| return producerServicePool; |
| } |
| |
| public ServicePool<PollingConsumer> getPollingConsumerServicePool() { |
| if (pollingConsumerServicePool == null) { |
| synchronized (lock) { |
| if (pollingConsumerServicePool == null) { |
| setPollingConsumerServicePool(createPollingConsumerServicePool()); |
| } |
| } |
| } |
| return pollingConsumerServicePool; |
| } |
| |
| public void setPollingConsumerServicePool(ServicePool<PollingConsumer> pollingConsumerServicePool) { |
| this.pollingConsumerServicePool = doAddService(pollingConsumerServicePool); |
| } |
| |
| @Override |
| public UnitOfWorkFactory getUnitOfWorkFactory() { |
| if (unitOfWorkFactory == null) { |
| synchronized (lock) { |
| if (unitOfWorkFactory == null) { |
| setUnitOfWorkFactory(createUnitOfWorkFactory()); |
| } |
| } |
| } |
| return unitOfWorkFactory; |
| } |
| |
| @Override |
| public void setUnitOfWorkFactory(UnitOfWorkFactory unitOfWorkFactory) { |
| this.unitOfWorkFactory = doAddService(unitOfWorkFactory); |
| } |
| |
| @Override |
| public RuntimeEndpointRegistry getRuntimeEndpointRegistry() { |
| return runtimeEndpointRegistry; |
| } |
| |
| @Override |
| public void setRuntimeEndpointRegistry(RuntimeEndpointRegistry runtimeEndpointRegistry) { |
| this.runtimeEndpointRegistry = doAddService(runtimeEndpointRegistry); |
| } |
| |
| @Override |
| public String getUptime() { |
| long delta = getUptimeMillis(); |
| if (delta == 0) { |
| return ""; |
| } |
| return TimeUtils.printDuration(delta); |
| } |
| |
| @Override |
| public long getUptimeMillis() { |
| if (startDate == null) { |
| return 0; |
| } |
| return new Date().getTime() - startDate.getTime(); |
| } |
| |
| @Override |
| public Date getStartDate() { |
| return startDate; |
| } |
| |
| @Override |
| public String getVersion() { |
| if (version == null) { |
| synchronized (lock) { |
| if (version == null) { |
| version = doGetVersion(); |
| } |
| } |
| } |
| return version; |
| } |
| |
| private String doGetVersion() { |
| String version = null; |
| |
| InputStream is = null; |
| // try to load from maven properties first |
| try { |
| Properties p = new Properties(); |
| is = getClass().getResourceAsStream("/META-INF/maven/org.apache.camel/camel-base/pom.properties"); |
| if (is != null) { |
| p.load(is); |
| version = p.getProperty("version", ""); |
| } |
| } catch (Exception e) { |
| // ignore |
| } finally { |
| if (is != null) { |
| IOHelper.close(is); |
| } |
| } |
| |
| // fallback to using Java API |
| if (version == null) { |
| Package aPackage = getClass().getPackage(); |
| if (aPackage != null) { |
| version = aPackage.getImplementationVersion(); |
| if (version == null) { |
| version = aPackage.getSpecificationVersion(); |
| } |
| } |
| } |
| |
| if (version == null) { |
| // we could not compute the version so use a blank |
| version = ""; |
| } |
| |
| return version; |
| } |
| |
| @Override |
| protected void doSuspend() throws Exception { |
| EventHelper.notifyCamelContextSuspending(this); |
| |
| log.info("Apache Camel {} (CamelContext: {}) is suspending", getVersion(), getName()); |
| StopWatch watch = new StopWatch(); |
| |
| // update list of started routes to be suspended |
| // because we only want to suspend started routes |
| // (so when we resume we only resume the routes which actually was |
| // suspended) |
| for (Map.Entry<String, BaseRouteService> entry : getRouteServices().entrySet()) { |
| if (entry.getValue().getStatus().isStarted()) { |
| suspendedRouteServices.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| // assemble list of startup ordering so routes can be shutdown |
| // accordingly |
| List<RouteStartupOrder> orders = new ArrayList<>(); |
| for (Map.Entry<String, BaseRouteService> entry : suspendedRouteServices.entrySet()) { |
| Route route = entry.getValue().getRoute(); |
| Integer order = entry.getValue().getStartupOrder(); |
| if (order == null) { |
| order = defaultRouteStartupOrder++; |
| } |
| orders.add(new DefaultRouteStartupOrder(order, route, entry.getValue())); |
| } |
| |
| // suspend routes using the shutdown strategy so it can shutdown in |
| // correct order |
| // routes which doesn't support suspension will be stopped instead |
| getShutdownStrategy().suspend(this, orders); |
| |
| // mark the route services as suspended or stopped |
| for (BaseRouteService service : suspendedRouteServices.values()) { |
| if (routeSupportsSuspension(service.getId())) { |
| service.suspend(); |
| } else { |
| service.stop(); |
| } |
| } |
| |
| watch.taken(); |
| if (log.isInfoEnabled()) { |
| log.info("Apache Camel {} (CamelContext: {}) is suspended in {}", getVersion(), getName(), TimeUtils.printDuration(watch.taken())); |
| } |
| |
| EventHelper.notifyCamelContextSuspended(this); |
| } |
| |
| @Override |
| protected void doResume() throws Exception { |
| try { |
| EventHelper.notifyCamelContextResuming(this); |
| |
| log.info("Apache Camel {} (CamelContext: {}) is resuming", getVersion(), getName()); |
| StopWatch watch = new StopWatch(); |
| |
| // start the suspended routes (do not check for route clashes, and |
| // indicate) |
| doStartOrResumeRoutes(suspendedRouteServices, false, true, true, false); |
| |
| // mark the route services as resumed (will be marked as started) as |
| // well |
| for (BaseRouteService service : suspendedRouteServices.values()) { |
| if (routeSupportsSuspension(service.getId())) { |
| service.resume(); |
| } else { |
| service.start(); |
| } |
| } |
| |
| if (log.isInfoEnabled()) { |
| log.info("Resumed {} routes", suspendedRouteServices.size()); |
| log.info("Apache Camel {} (CamelContext: {}) resumed in {}", getVersion(), getName(), TimeUtils.printDuration(watch.taken())); |
| } |
| |
| // and clear the list as they have been resumed |
| suspendedRouteServices.clear(); |
| |
| EventHelper.notifyCamelContextResumed(this); |
| } catch (Exception e) { |
| EventHelper.notifyCamelContextResumeFailed(this, e); |
| throw e; |
| } |
| } |
| |
| @Override |
| public void start() { |
| try (MDCHelper mdcHelper = new MDCHelper()) { |
| init(); |
| vetoStarted.set(false); |
| startDate = new Date(); |
| stopWatch.restart(); |
| log.info("Apache Camel {} (CamelContext: {}) is starting", getVersion(), getName()); |
| |
| // Start the route controller |
| ServiceHelper.startService(this.routeController); |
| |
| doNotStartRoutesOnFirstStart = !firstStartDone && !isAutoStartup(); |
| |
| // if the context was configured with auto startup = false, and we |
| // are already started, |
| // then we may need to start the routes on the 2nd start call |
| if (firstStartDone && !isAutoStartup() && isStarted()) { |
| // invoke this logic to warm up the routes and if possible also |
| // start the routes |
| try { |
| doStartOrResumeRoutes(routeServices, true, true, false, true); |
| } catch (Exception e) { |
| throw RuntimeCamelException.wrapRuntimeException(e); |
| } |
| } |
| |
| // super will invoke doStart which will prepare internal services |
| // and start routes etc. |
| try { |
| firstStartDone = true; |
| super.start(); |
| } catch (Exception e) { |
| VetoCamelContextStartException veto = ObjectHelper.getException(VetoCamelContextStartException.class, e); |
| if (veto != null) { |
| // mark we veto against starting Camel |
| vetoStarted.set(true); |
| if (veto.isRethrowException()) { |
| throw e; |
| } else { |
| log.info("CamelContext ({}) vetoed to not start due {}", getName(), e.getMessage()); |
| // swallow exception and change state of this camel context |
| // to stopped |
| stop(); |
| return; |
| } |
| } else { |
| log.error("Error starting CamelContext (" + getName() + ") due to exception thrown: " + e.getMessage(), e); |
| throw RuntimeCamelException.wrapRuntimeException(e); |
| } |
| } |
| |
| if (log.isInfoEnabled()) { |
| // count how many routes are actually started |
| int started = 0; |
| for (Route route : getRoutes()) { |
| ServiceStatus status = getRouteStatus(route.getId()); |
| if (status != null && status.isStarted()) { |
| started++; |
| } |
| } |
| |
| final Collection<Route> controlledRoutes = getRouteController().getControlledRoutes(); |
| if (controlledRoutes.isEmpty()) { |
| log.info("Total {} routes, of which {} are started", getRoutes().size(), started); |
| } else { |
| log.info("Total {} routes, of which {} are started, and {} are managed by RouteController: {}", getRoutes().size(), started, controlledRoutes.size(), |
| getRouteController().getClass().getName()); |
| } |
| log.info("Apache Camel {} (CamelContext: {}) started in {}", getVersion(), getName(), TimeUtils.printDuration(stopWatch.taken())); |
| } |
| |
| // okay the routes has been started so emit event that CamelContext |
| // has started (here at the end) |
| EventHelper.notifyCamelContextStarted(this); |
| |
| // now call the startup listeners where the routes has been started |
| for (StartupListener startup : startupListeners) { |
| if (startup instanceof ExtendedStartupListener) { |
| try { |
| ((ExtendedStartupListener)startup).onCamelContextFullyStarted(this, isStarted()); |
| } catch (Exception e) { |
| throw RuntimeCamelException.wrapRuntimeException(e); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void stop() { |
| try (MDCHelper mdcHelper = new MDCHelper()) { |
| super.stop(); |
| } |
| } |
| |
| @Override |
| public void suspend() { |
| try (MDCHelper mdcHelper = new MDCHelper()) { |
| super.suspend(); |
| } |
| } |
| |
| @Override |
| public void resume() { |
| try (MDCHelper mdcHelper = new MDCHelper()) { |
| super.resume(); |
| } |
| } |
| |
| @Override |
| public void shutdown() { |
| try (MDCHelper mdcHelper = new MDCHelper()) { |
| super.shutdown(); |
| } |
| } |
| |
| // Implementation methods |
| // ----------------------------------------------------------------------- |
| |
| @Override |
| protected synchronized void doStart() throws Exception { |
| doWithDefinedClassLoader(() -> { |
| try { |
| doStartCamel(); |
| } catch (Exception e) { |
| // fire event that we failed to start |
| EventHelper.notifyCamelContextStartupFailed(AbstractCamelContext.this, e); |
| // rethrow cause |
| throw e; |
| } |
| }); |
| } |
| |
| private <T extends Throwable> void doWithDefinedClassLoader(ThrowingRunnable<T> callable) throws T { |
| ClassLoader tccl = Thread.currentThread().getContextClassLoader(); |
| try { |
| // Using the ApplicationClassLoader as the default for TCCL |
| if (applicationContextClassLoader != null) { |
| Thread.currentThread().setContextClassLoader(applicationContextClassLoader); |
| } |
| callable.run(); |
| } finally { |
| Thread.currentThread().setContextClassLoader(tccl); |
| } |
| } |
| |
| protected void doStartCamel() throws Exception { |
| |
| // custom properties may use property placeholders so resolve those |
| // early on |
| if (globalOptions != null && !globalOptions.isEmpty()) { |
| for (Map.Entry<String, String> entry : globalOptions.entrySet()) { |
| String key = entry.getKey(); |
| String value = entry.getValue(); |
| if (value != null) { |
| String replaced = resolvePropertyPlaceholders(value); |
| if (!value.equals(replaced)) { |
| if (log.isDebugEnabled()) { |
| log.debug("Camel property with key {} replaced value from {} -> {}", key, value, replaced); |
| } |
| entry.setValue(replaced); |
| } |
| } |
| } |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Using ClassResolver={}, PackageScanClassResolver={}, ApplicationContextClassLoader={}, RouteController={}", getClassResolver(), |
| getPackageScanClassResolver(), getApplicationContextClassLoader(), getRouteController()); |
| } |
| |
| if (isStreamCaching()) { |
| log.info("StreamCaching is enabled on CamelContext: {}", getName()); |
| } |
| |
| if (isBacklogTracing()) { |
| // tracing is added in the DefaultChannel so we can enable it on the fly |
| log.info("Backlog Tracing is enabled on CamelContext: {}", getName()); |
| } |
| if (isTracing()) { |
| // tracing is added in the DefaultChannel so we can enable it on the fly |
| log.info("Tracing is enabled on CamelContext: {}", getName()); |
| } |
| |
| if (isUseMDCLogging()) { |
| // log if MDC has been enabled |
| String pattern = getMDCLoggingKeysPattern(); |
| if (pattern != null) { |
| log.info("MDC logging (keys-pattern: {}) is enabled on CamelContext: {}", pattern, getName()); |
| } else { |
| log.info("MDC logging is enabled on CamelContext: {}", getName()); |
| } |
| } |
| |
| if (getDelayer() != null && getDelayer() > 0) { |
| log.info("Delayer is enabled with: {} ms. on CamelContext: {}", getDelayer(), getName()); |
| } |
| |
| // start management strategy before lifecycles are started |
| ManagementStrategy managementStrategy = getManagementStrategy(); |
| startService(managementStrategy); |
| |
| // start lifecycle strategies |
| ServiceHelper.startService(lifecycleStrategies); |
| Iterator<LifecycleStrategy> it = lifecycleStrategies.iterator(); |
| while (it.hasNext()) { |
| LifecycleStrategy strategy = it.next(); |
| try { |
| strategy.onContextStart(this); |
| } catch (VetoCamelContextStartException e) { |
| // okay we should not start Camel since it was vetoed |
| log.warn("Lifecycle strategy vetoed starting CamelContext ({}) due: {}", getName(), e.getMessage()); |
| throw e; |
| } catch (Exception e) { |
| log.warn("Lifecycle strategy " + strategy + " failed starting CamelContext ({}) due: {}", getName(), e.getMessage()); |
| throw e; |
| } |
| } |
| |
| // start notifiers as services |
| for (EventNotifier notifier : getManagementStrategy().getEventNotifiers()) { |
| if (notifier instanceof Service) { |
| Service service = (Service)notifier; |
| for (LifecycleStrategy strategy : lifecycleStrategies) { |
| strategy.onServiceAdd(this, service, null); |
| } |
| startService((Service)notifier); |
| } |
| } |
| |
| // must let some bootstrap service be started before we can notify the |
| // starting event |
| EventHelper.notifyCamelContextStarting(this); |
| |
| forceLazyInitialization(); |
| |
| // if camel-bean is on classpath then we can load its bean proxy factory |
| BeanProxyFactory beanProxyFactory = new BeanProxyFactoryResolver().resolve(this); |
| if (beanProxyFactory != null) { |
| addService(beanProxyFactory); |
| } |
| |
| // re-create endpoint registry as the cache size limit may be set after the constructor of this instance was called. |
| // and we needed to create endpoints up-front as it may be accessed before this context is started |
| endpoints = doAddService(createEndpointRegistry(endpoints)); |
| // Initialize declarative transformer registry |
| transformerRegistry = doAddService(createTransformerRegistry()); |
| // Initialize declarative validator registry |
| validatorRegistry = doAddService(createValidatorRegistry()); |
| |
| // optimised to not include runtimeEndpointRegistry unless startServices |
| // its enabled or JMX statistics is in extended mode |
| if (runtimeEndpointRegistry == null && getManagementStrategy() != null && getManagementStrategy().getManagementAgent() != null) { |
| Boolean isEnabled = getManagementStrategy().getManagementAgent().getEndpointRuntimeStatisticsEnabled(); |
| boolean isExtended = getManagementStrategy().getManagementAgent().getStatisticsLevel().isExtended(); |
| // extended mode is either if we use Extended statistics level or |
| // the option is explicit enabled |
| boolean extended = isExtended || isEnabled != null && isEnabled; |
| if (extended) { |
| runtimeEndpointRegistry = new DefaultRuntimeEndpointRegistry(); |
| } |
| } |
| if (runtimeEndpointRegistry != null) { |
| if (runtimeEndpointRegistry instanceof EventNotifier && getManagementStrategy() != null) { |
| getManagementStrategy().addEventNotifier((EventNotifier)runtimeEndpointRegistry); |
| } |
| addService(runtimeEndpointRegistry, true, true); |
| } |
| |
| bindDataFormats(); |
| |
| // start components |
| startServices(components.values()); |
| |
| // start the route definitions before the routes is started |
| startRouteDefinitions(); |
| |
| if (isUseDataType()) { |
| // log if DataType has been enabled |
| log.info("Message DataType is enabled on CamelContext: {}", getName()); |
| } |
| |
| // is there any stream caching enabled then log an info about this and |
| // its limit of spooling to disk, so people is aware of this |
| if (isStreamCachingInUse()) { |
| // stream caching is in use so enable the strategy |
| getStreamCachingStrategy().setEnabled(true); |
| } else { |
| // log if stream caching is not in use as this can help people to |
| // enable it if they use streams |
| log.info("StreamCaching is not in use. If using streams then its recommended to enable stream caching." |
| + " See more details at http://camel.apache.org/stream-caching.html"); |
| } |
| |
| if (isAllowUseOriginalMessage()) { |
| log.debug("AllowUseOriginalMessage enabled because UseOriginalMessage is in use"); |
| } |
| |
| log.debug("Using HeadersMapFactory: {}", getHeadersMapFactory()); |
| if (!getHeadersMapFactory().isCaseInsensitive()) { |
| log.info("HeadersMapFactory: {} is case-sensitive which can cause problems for protocols such as HTTP based, which rely on case-insensitive headers.", |
| getHeadersMapFactory()); |
| } |
| |
| // lets log at INFO level if we are not using the default reactive executor |
| if (!getReactiveExecutor().getClass().getSimpleName().equals("DefaultReactiveExecutor")) { |
| log.info("Using ReactiveExecutor: {}", getReactiveExecutor()); |
| } else { |
| log.debug("Using ReactiveExecutor: {}", getReactiveExecutor()); |
| } |
| |
| // start routes |
| if (doNotStartRoutesOnFirstStart) { |
| log.debug("Skip starting routes as CamelContext has been configured with autoStartup=false"); |
| } |
| |
| // invoke this logic to warmup the routes and if possible also start the routes |
| EventHelper.notifyCamelContextRoutesStarting(this); |
| doStartOrResumeRoutes(routeServices, true, !doNotStartRoutesOnFirstStart, false, true); |
| EventHelper.notifyCamelContextRoutesStarted(this); |
| |
| long cacheCounter = getBeanIntrospection().getCachedClassesCounter(); |
| if (cacheCounter > 0) { |
| log.debug("Clearing BeanIntrospection cache with {} objects using during starting Camel", cacheCounter); |
| getBeanIntrospection().clearCache(); |
| } |
| |
| // starting will continue in the start method |
| } |
| |
| protected void startRouteDefinitions() throws Exception { |
| } |
| |
| protected boolean isStreamCachingInUse() throws Exception { |
| return isStreamCaching(); |
| } |
| |
| protected void bindDataFormats() throws Exception { |
| } |
| |
| @Override |
| protected synchronized void doStop() throws Exception { |
| stopWatch.restart(); |
| log.info("Apache Camel {} (CamelContext: {}) is shutting down", getVersion(), getName()); |
| EventHelper.notifyCamelContextStopping(this); |
| EventHelper.notifyCamelContextRoutesStopping(this); |
| |
| // Stop the route controller |
| ServiceHelper.stopAndShutdownService(this.routeController); |
| |
| // stop route inputs in the same order as they was started so we stop |
| // the very first inputs first |
| try { |
| // force shutting down routes as they may otherwise cause shutdown to hang |
| if (shutdownStrategy != null) { |
| shutdownStrategy.shutdownForced(this, getRouteStartupOrder()); |
| } |
| } catch (Throwable e) { |
| log.warn("Error occurred while shutting down routes. This exception will be ignored.", e); |
| } |
| |
| // shutdown await manager to trigger interrupt of blocked threads to |
| // attempt to free these threads graceful |
| shutdownServices(asyncProcessorAwaitManager); |
| |
| // we need also to include routes which failed to start to ensure all resources get stopped when stopping Camel |
| for (BaseRouteService routeService : routeServices.values()) { |
| boolean found = routeStartupOrder.stream().anyMatch(o -> o.getRoute().getId().equals(routeService.getId())); |
| if (!found) { |
| log.debug("Route: {} which failed to startup will be stopped", routeService.getId()); |
| routeStartupOrder.add(doPrepareRouteToBeStarted(routeService)); |
| } |
| } |
| |
| routeStartupOrder.sort(Comparator.comparingInt(RouteStartupOrder::getStartupOrder).reversed()); |
| List<BaseRouteService> list = new ArrayList<>(); |
| for (RouteStartupOrder startupOrder : routeStartupOrder) { |
| DefaultRouteStartupOrder order = (DefaultRouteStartupOrder)startupOrder; |
| BaseRouteService routeService = order.getRouteService(); |
| list.add(routeService); |
| } |
| shutdownServices(list, false); |
| // do not clear route services or startup listeners as we can start |
| // Camel again and get the route back as before |
| routeStartupOrder.clear(); |
| |
| EventHelper.notifyCamelContextRoutesStopped(this); |
| |
| // but clear any suspend routes |
| suspendedRouteServices.clear(); |
| |
| // stop consumers from the services to close first, such as POJO |
| // consumer (eg @Consumer) |
| // which we need to stop after the routes, as a POJO consumer is |
| // essentially a route also |
| for (Service service : servicesToStop) { |
| if (service instanceof Consumer) { |
| shutdownServices(service); |
| } |
| } |
| |
| // the stop order is important |
| |
| // shutdown default error handler thread pool |
| if (errorHandlerExecutorService != null) { |
| // force shutting down the thread pool |
| getExecutorServiceManager().shutdownNow(errorHandlerExecutorService); |
| errorHandlerExecutorService = null; |
| } |
| |
| // shutdown debugger |
| ServiceHelper.stopAndShutdownService(getDebugger()); |
| |
| shutdownServices(endpoints.values()); |
| endpoints.clear(); |
| |
| shutdownServices(components.values()); |
| components.clear(); |
| |
| shutdownServices(languages.values()); |
| languages.clear(); |
| |
| try { |
| for (LifecycleStrategy strategy : lifecycleStrategies) { |
| strategy.onContextStop(this); |
| } |
| } catch (Throwable e) { |
| log.warn("Error occurred while stopping lifecycle strategies. This exception will be ignored.", e); |
| } |
| |
| // shutdown services as late as possible (except type converters as they may be needed during the remainder of the stopping) |
| shutdownServices(servicesToStop); |
| servicesToStop.clear(); |
| |
| // must notify that we are stopped before stopping the management strategy |
| EventHelper.notifyCamelContextStopped(this); |
| |
| // stop the notifier service |
| for (EventNotifier notifier : getManagementStrategy().getEventNotifiers()) { |
| shutdownServices(notifier); |
| } |
| |
| // shutdown executor service, reactive executor and management as the last one |
| shutdownServices(executorServiceManager); |
| shutdownServices(reactiveExecutor); |
| shutdownServices(managementStrategy); |
| shutdownServices(managementMBeanAssembler); |
| shutdownServices(lifecycleStrategies); |
| // do not clear lifecycleStrategies as we can start Camel again and get |
| // the route back as before |
| |
| // shutdown type converter as late as possible |
| ServiceHelper.stopService(typeConverter); |
| ServiceHelper.stopService(typeConverterRegistry); |
| |
| // stop the lazy created so they can be re-created on restart |
| forceStopLazyInitialization(); |
| |
| if (log.isInfoEnabled()) { |
| log.info("Apache Camel " + getVersion() + " (CamelContext: " + getName() + ") uptime {}", getUptime()); |
| log.info("Apache Camel {} (CamelContext: {}) is shutdown in {}", getVersion(), getName(), TimeUtils.printDuration(stopWatch.taken())); |
| } |
| |
| // and clear start date |
| startDate = null; |
| |
| // Call all registered trackers with this context |
| // Note, this may use a partially constructed object |
| CamelContextTracker.notifyContextDestroyed(this); |
| } |
| |
| /** |
| * Starts or resumes the routes |
| * |
| * @param routeServices the routes to start (will only start a route if its |
| * not already started) |
| * @param checkClash whether to check for startup ordering clash |
| * @param startConsumer whether the route consumer should be started. Can be |
| * used to warmup the route without starting the consumer. |
| * @param resumeConsumer whether the route consumer should be resumed. |
| * @param addingRoutes whether we are adding new routes |
| * @throws Exception is thrown if error starting routes |
| */ |
| protected void doStartOrResumeRoutes(Map<String, BaseRouteService> routeServices, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes) |
| throws Exception { |
| setStartingRoutes(true); |
| try { |
| // filter out already started routes |
| Map<String, BaseRouteService> filtered = new LinkedHashMap<>(); |
| for (Map.Entry<String, BaseRouteService> entry : routeServices.entrySet()) { |
| boolean startable = false; |
| |
| Consumer consumer = entry.getValue().getRoute().getConsumer(); |
| if (consumer instanceof SuspendableService) { |
| // consumer could be suspended, which is not reflected in |
| // the BaseRouteService status |
| startable = ((SuspendableService)consumer).isSuspended(); |
| } |
| |
| if (!startable && consumer instanceof StatefulService) { |
| // consumer could be stopped, which is not reflected in the |
| // BaseRouteService status |
| startable = ((StatefulService)consumer).getStatus().isStartable(); |
| } else if (!startable) { |
| // no consumer so use state from route service |
| startable = entry.getValue().getStatus().isStartable(); |
| } |
| |
| if (startable) { |
| filtered.put(entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| // the context is in last phase of staring, so lets start the routes |
| safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, filtered.values()); |
| |
| } finally { |
| setStartingRoutes(false); |
| } |
| } |
| |
| protected boolean routeSupportsSuspension(String routeId) { |
| BaseRouteService routeService = routeServices.get(routeId); |
| if (routeService != null) { |
| return routeService.getRoute().supportsSuspension(); |
| } |
| return false; |
| } |
| |
| private void shutdownServices(Object service) { |
| // do not rethrow exception as we want to keep shutting down in case of |
| // problems |
| |
| // allow us to do custom work before delegating to service helper |
| try { |
| if (service instanceof Service) { |
| ServiceHelper.stopAndShutdownService(service); |
| } else if (service instanceof Collection) { |
| ServiceHelper.stopAndShutdownServices((Collection<?>)service); |
| } |
| } catch (Throwable e) { |
| log.warn("Error occurred while shutting down service: " + service + ". This exception will be ignored.", e); |
| // fire event |
| EventHelper.notifyServiceStopFailure(this, service, e); |
| } |
| } |
| |
| private void shutdownServices(Collection<?> services) { |
| // reverse stopping by default |
| shutdownServices(services, true); |
| } |
| |
| private void shutdownServices(Collection<?> services, boolean reverse) { |
| Collection<?> list = services; |
| if (reverse) { |
| List<Object> reverseList = new ArrayList<>(services); |
| Collections.reverse(reverseList); |
| list = reverseList; |
| } |
| |
| for (Object service : list) { |
| shutdownServices(service); |
| } |
| } |
| |
| private void startService(Service service) throws Exception { |
| // and register startup aware so they can be notified when |
| // camel context has been started |
| if (service instanceof StartupListener) { |
| StartupListener listener = (StartupListener)service; |
| addStartupListener(listener); |
| } |
| |
| if (service instanceof CamelContextAware) { |
| CamelContextAware aware = (CamelContextAware)service; |
| aware.setCamelContext(this); |
| } |
| |
| service.start(); |
| } |
| |
| private void startServices(Collection<?> services) throws Exception { |
| for (Object element : services) { |
| if (element instanceof Service) { |
| startService((Service)element); |
| } |
| } |
| } |
| |
| private void stopServices(Object service) throws Exception { |
| // allow us to do custom work before delegating to service helper |
| try { |
| ServiceHelper.stopService(service); |
| } catch (Exception e) { |
| // fire event |
| EventHelper.notifyServiceStopFailure(this, service, e); |
| // rethrow to signal error with stopping |
| throw e; |
| } |
| } |
| |
| /** |
| * Starts the given route service |
| */ |
| public synchronized void startRouteService(BaseRouteService routeService, boolean addingRoutes) throws Exception { |
| // we may already be starting routes so remember this, so we can unset |
| // accordingly in finally block |
| boolean alreadyStartingRoutes = isStartingRoutes(); |
| if (!alreadyStartingRoutes) { |
| setStartingRoutes(true); |
| } |
| |
| try { |
| // the route service could have been suspended, and if so then |
| // resume it instead |
| if (routeService.getStatus().isSuspended()) { |
| resumeRouteService(routeService); |
| } else { |
| // start the route service |
| routeServices.put(routeService.getId(), routeService); |
| if (shouldStartRoutes()) { |
| // this method will log the routes being started |
| safelyStartRouteServices(true, true, true, false, addingRoutes, routeService); |
| // start route services if it was configured to auto startup |
| // and we are not adding routes |
| boolean autoStartup = routeService.isAutoStartup(); |
| if (!addingRoutes || autoStartup) { |
| // start the route since auto start is enabled or we are |
| // starting a route (not adding new routes) |
| routeService.start(); |
| } |
| } |
| } |
| } finally { |
| if (!alreadyStartingRoutes) { |
| setStartingRoutes(false); |
| } |
| } |
| } |
| |
| /** |
| * Resumes the given route service |
| */ |
| protected synchronized void resumeRouteService(BaseRouteService routeService) throws Exception { |
| // the route service could have been stopped, and if so then start it |
| // instead |
| if (!routeService.getStatus().isSuspended()) { |
| startRouteService(routeService, false); |
| } else { |
| // resume the route service |
| if (shouldStartRoutes()) { |
| // this method will log the routes being started |
| safelyStartRouteServices(true, false, true, true, false, routeService); |
| // must resume route service as well |
| routeService.resume(); |
| } |
| } |
| } |
| |
| protected synchronized void stopRouteService(BaseRouteService routeService, boolean removingRoutes) throws Exception { |
| routeService.setRemovingRoutes(removingRoutes); |
| stopRouteService(routeService); |
| } |
| |
| protected void logRouteState(Route route, String state) { |
| if (log.isInfoEnabled()) { |
| if (route.getConsumer() != null) { |
| log.info("Route: {} is {}, was consuming from: {}", route.getId(), state, route.getConsumer().getEndpoint()); |
| } else { |
| log.info("Route: {} is {}.", route.getId(), state); |
| } |
| } |
| } |
| |
| protected synchronized void stopRouteService(BaseRouteService routeService) throws Exception { |
| routeService.stop(); |
| logRouteState(routeService.getRoute(), "stopped"); |
| } |
| |
| protected synchronized void shutdownRouteService(BaseRouteService routeService) throws Exception { |
| routeService.shutdown(); |
| logRouteState(routeService.getRoute(), "shutdown and removed"); |
| } |
| |
| protected synchronized void suspendRouteService(BaseRouteService routeService) throws Exception { |
| routeService.setRemovingRoutes(false); |
| routeService.suspend(); |
| logRouteState(routeService.getRoute(), "suspended"); |
| } |
| |
| /** |
| * Starts the routes services in a proper manner which ensures the routes |
| * will be started in correct order, check for clash and that the routes |
| * will also be shutdown in correct order as well. |
| * <p/> |
| * This method <b>must</b> be used to start routes in a safe manner. |
| * |
| * @param checkClash whether to check for startup order clash |
| * @param startConsumer whether the route consumer should be started. Can be |
| * used to warmup the route without starting the consumer. |
| * @param resumeConsumer whether the route consumer should be resumed. |
| * @param addingRoutes whether we are adding new routes |
| * @param routeServices the routes |
| * @throws Exception is thrown if error starting the routes |
| */ |
| protected synchronized void safelyStartRouteServices(boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes, |
| Collection<BaseRouteService> routeServices) |
| throws Exception { |
| // list of inputs to start when all the routes have been prepared for |
| // starting |
| // we use a tree map so the routes will be ordered according to startup |
| // order defined on the route |
| Map<Integer, DefaultRouteStartupOrder> inputs = new TreeMap<>(); |
| |
| // figure out the order in which the routes should be started |
| for (BaseRouteService routeService : routeServices) { |
| DefaultRouteStartupOrder order = doPrepareRouteToBeStarted(routeService); |
| // check for clash before we add it as input |
| if (checkClash) { |
| doCheckStartupOrderClash(order, inputs); |
| } |
| inputs.put(order.getStartupOrder(), order); |
| } |
| |
| // warm up routes before we start them |
| doWarmUpRoutes(inputs, startConsumer); |
| |
| // sort the startup listeners so they are started in the right order |
| startupListeners.sort(OrderedComparator.get()); |
| // now call the startup listeners where the routes has been warmed up |
| // (only the actual route consumer has not yet been started) |
| for (StartupListener startup : startupListeners) { |
| startup.onCamelContextStarted(this, isStarted()); |
| } |
| // because the consumers may also register startup listeners we need to |
| // reset |
| // the already started listeners |
| List<StartupListener> backup = new ArrayList<>(startupListeners); |
| startupListeners.clear(); |
| |
| // now start the consumers |
| if (startConsumer) { |
| if (resumeConsumer) { |
| // and now resume the routes |
| doResumeRouteConsumers(inputs, addingRoutes); |
| } else { |
| // and now start the routes |
| // and check for clash with multiple consumers of the same |
| // endpoints which is not allowed |
| doStartRouteConsumers(inputs, addingRoutes); |
| } |
| } |
| |
| // sort the startup listeners so they are started in the right order |
| startupListeners.sort(OrderedComparator.get()); |
| // now the consumers that was just started may also add new |
| // StartupListeners (such as timer) |
| // so we need to ensure they get started as well |
| for (StartupListener startup : startupListeners) { |
| startup.onCamelContextStarted(this, isStarted()); |
| } |
| // and add the previous started startup listeners to the list so we have |
| // them all |
| startupListeners.addAll(0, backup); |
| |
| // inputs no longer needed |
| inputs.clear(); |
| } |
| |
| /** |
| * @see #safelyStartRouteServices(boolean,boolean,boolean,boolean,Collection) |
| */ |
| protected synchronized void safelyStartRouteServices(boolean forceAutoStart, boolean checkClash, boolean startConsumer, boolean resumeConsumer, boolean addingRoutes, |
| BaseRouteService... routeServices) |
| throws Exception { |
| safelyStartRouteServices(checkClash, startConsumer, resumeConsumer, addingRoutes, Arrays.asList(routeServices)); |
| } |
| |
| private DefaultRouteStartupOrder doPrepareRouteToBeStarted(BaseRouteService routeService) { |
| // add the inputs from this route service to the list to start |
| // afterwards |
| // should be ordered according to the startup number |
| Integer startupOrder = routeService.getStartupOrder(); |
| if (startupOrder == null) { |
| // auto assign a default startup order |
| startupOrder = defaultRouteStartupOrder++; |
| } |
| |
| // create holder object that contains information about this route to be |
| // started |
| Route route = routeService.getRoute(); |
| return new DefaultRouteStartupOrder(startupOrder, route, routeService); |
| } |
| |
| private boolean doCheckStartupOrderClash(DefaultRouteStartupOrder answer, Map<Integer, DefaultRouteStartupOrder> inputs) throws FailedToStartRouteException { |
| // check for clash by startupOrder id |
| DefaultRouteStartupOrder other = inputs.get(answer.getStartupOrder()); |
| if (other != null && answer != other) { |
| String otherId = other.getRoute().getId(); |
| throw new FailedToStartRouteException(answer.getRoute().getId(), "startupOrder clash. Route " + otherId + " already has startupOrder " + answer |
| .getStartupOrder() + " configured which this route have as well. Please correct startupOrder to be unique among all your routes."); |
| } |
| // check in existing already started as well |
| for (RouteStartupOrder order : routeStartupOrder) { |
| String otherId = order.getRoute().getId(); |
| if (answer.getRoute().getId().equals(otherId)) { |
| // its the same route id so skip clash check as its the same |
| // route (can happen when using suspend/resume) |
| } else if (answer.getStartupOrder() == order.getStartupOrder()) { |
| throw new FailedToStartRouteException(answer.getRoute().getId(), "startupOrder clash. Route " + otherId + " already has startupOrder " + answer |
| .getStartupOrder() + " configured which this route have as well. Please correct startupOrder to be unique among all your routes."); |
| } |
| } |
| return true; |
| } |
| |
| private void doWarmUpRoutes(Map<Integer, DefaultRouteStartupOrder> inputs, boolean autoStartup) throws FailedToStartRouteException { |
| // now prepare the routes by starting its services before we start the |
| // input |
| for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) { |
| // defer starting inputs till later as we want to prepare the routes |
| // by starting |
| // all their processors and child services etc. |
| // then later we open the floods to Camel by starting the inputs |
| // what this does is to ensure Camel is more robust on starting |
| // routes as all routes |
| // will then be prepared in time before we start inputs which will |
| // consume messages to be routed |
| BaseRouteService routeService = entry.getValue().getRouteService(); |
| log.debug("Warming up route id: {} having autoStartup={}", routeService.getId(), autoStartup); |
| routeService.warmUp(); |
| } |
| } |
| |
| private void doResumeRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes) throws Exception { |
| doStartOrResumeRouteConsumers(inputs, true, addingRoutes); |
| } |
| |
| private void doStartRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean addingRoutes) throws Exception { |
| doStartOrResumeRouteConsumers(inputs, false, addingRoutes); |
| } |
| |
| private void doStartOrResumeRouteConsumers(Map<Integer, DefaultRouteStartupOrder> inputs, boolean resumeOnly, boolean addingRoute) throws Exception { |
| List<Endpoint> routeInputs = new ArrayList<>(); |
| |
| for (Map.Entry<Integer, DefaultRouteStartupOrder> entry : inputs.entrySet()) { |
| Integer order = entry.getKey(); |
| Route route = entry.getValue().getRoute(); |
| BaseRouteService routeService = entry.getValue().getRouteService(); |
| |
| // if we are starting camel, then skip routes which are configured |
| // to not be auto started |
| boolean autoStartup = routeService.isAutoStartup(); |
| if (addingRoute && !autoStartup) { |
| log.info("Skipping starting of route {} as its configured with autoStartup=false", routeService.getId()); |
| continue; |
| } |
| |
| // start the service |
| for (Consumer consumer : routeService.getInputs().values()) { |
| Endpoint endpoint = consumer.getEndpoint(); |
| |
| // check multiple consumer violation, with the other routes to |
| // be started |
| if (!doCheckMultipleConsumerSupportClash(endpoint, routeInputs)) { |
| throw new FailedToStartRouteException(routeService.getId(), "Multiple consumers for the same endpoint is not allowed: " + endpoint); |
| } |
| |
| // check for multiple consumer violations with existing routes |
| // which |
| // have already been started, or is currently starting |
| List<Endpoint> existingEndpoints = new ArrayList<>(); |
| for (Route existingRoute : getRoutes()) { |
| if (route.getId().equals(existingRoute.getId())) { |
| // skip ourselves |
| continue; |
| } |
| Endpoint existing = existingRoute.getEndpoint(); |
| ServiceStatus status = getRouteStatus(existingRoute.getId()); |
| if (status != null && (status.isStarted() || status.isStarting())) { |
| existingEndpoints.add(existing); |
| } |
| } |
| if (!doCheckMultipleConsumerSupportClash(endpoint, existingEndpoints)) { |
| throw new FailedToStartRouteException(routeService.getId(), "Multiple consumers for the same endpoint is not allowed: " + endpoint); |
| } |
| |
| // start the consumer on the route |
| log.debug("Route: {} >>> {}", route.getId(), route); |
| if (resumeOnly) { |
| log.debug("Resuming consumer (order: {}) on route: {}", order, route.getId()); |
| } else { |
| log.debug("Starting consumer (order: {}) on route: {}", order, route.getId()); |
| } |
| |
| if (resumeOnly && route.supportsSuspension()) { |
| // if we are resuming and the route can be resumed |
| ServiceHelper.resumeService(consumer); |
| log.info("Route: {} resumed and consuming from: {}", route.getId(), endpoint); |
| } else { |
| // when starting we should invoke the lifecycle strategies |
| for (LifecycleStrategy strategy : lifecycleStrategies) { |
| strategy.onServiceAdd(this, consumer, route); |
| } |
| try { |
| startService(consumer); |
| route.getProperties().remove("route.start.exception"); |
| } catch (Exception e) { |
| route.getProperties().put("route.start.exception", e); |
| throw e; |
| } |
| |
| log.info("Route: {} started and consuming from: {}", route.getId(), endpoint); |
| } |
| |
| routeInputs.add(endpoint); |
| |
| // add to the order which they was started, so we know how to |
| // stop them in reverse order |
| // but only add if we haven't already registered it before (we |
| // dont want to double add when restarting) |
| boolean found = false; |
| for (RouteStartupOrder other : routeStartupOrder) { |
| if (other.getRoute().getId().equals(route.getId())) { |
| found = true; |
| break; |
| } |
| } |
| if (!found) { |
| routeStartupOrder.add(entry.getValue()); |
| } |
| } |
| |
| if (resumeOnly) { |
| routeService.resume(); |
| } else { |
| // and start the route service (no need to start children as |
| // they are already warmed up) |
| try { |
| routeService.start(); |
| route.getProperties().remove("route.start.exception"); |
| } catch (Exception e) { |
| route.getProperties().put("route.start.exception", e); |
| throw e; |
| } |
| } |
| } |
| } |
| |
| private boolean doCheckMultipleConsumerSupportClash(Endpoint endpoint, List<Endpoint> routeInputs) { |
| // is multiple consumers supported |
| boolean multipleConsumersSupported = false; |
| if (endpoint instanceof MultipleConsumersSupport) { |
| multipleConsumersSupported = ((MultipleConsumersSupport)endpoint).isMultipleConsumersSupported(); |
| } |
| |
| if (multipleConsumersSupported) { |
| // multiple consumer allowed, so return true |
| return true; |
| } |
| |
| // check in progress list |
| if (routeInputs.contains(endpoint)) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Force some lazy initialization to occur upfront before we start any |
| * components and create routes |
| */ |
| protected void forceLazyInitialization() { |
| if (initialization != Initialization.Lazy) { |
| doStartStandardServices(); |
| |
| if (initialization == Initialization.Eager) { |
| doStartEagerServices(); |
| } |
| } |
| } |
| |
| protected void doStartStandardServices() { |
| getVersion(); |
| getTypeConverter(); |
| getTypeConverterRegistry(); |
| getInjector(); |
| getRegistry(); |
| getLanguageResolver(); |
| getExecutorServiceManager(); |
| getInflightRepository(); |
| getAsyncProcessorAwaitManager(); |
| getShutdownStrategy(); |
| getPackageScanClassResolver(); |
| getProducerServicePool(); |
| getPollingConsumerServicePool(); |
| getRestRegistryFactory(); |
| getReactiveExecutor(); |
| getBeanIntrospection(); |
| getPropertiesComponent(); |
| |
| if (isTypeConverterStatisticsEnabled() != null) { |
| getTypeConverterRegistry().getStatistics().setStatisticsEnabled(isTypeConverterStatisticsEnabled()); |
| } |
| |
| // resolve simple language to initialize it |
| resolveLanguage("simple"); |
| } |
| |
| protected void doStartEagerServices() { |
| getFactoryFinderResolver(); |
| getDefaultFactoryFinder(); |
| getComponentResolver(); |
| getDataFormatResolver(); |
| getManagementStrategy(); |
| getHeadersMapFactory(); |
| getClassResolver(); |
| getNodeIdFactory(); |
| getProcessorFactory(); |
| getMessageHistoryFactory(); |
| getStreamCachingStrategy(); |
| getModelJAXBContextFactory(); |
| getUuidGenerator(); |
| getUnitOfWorkFactory(); |
| getRouteController(); |
| getBeanProxyFactory(); |
| getBeanProcessorFactory(); |
| getBeanPostProcessor(); |
| } |
| |
| /** |
| * Force clear lazy initialization so they can be re-created on restart |
| */ |
| protected void forceStopLazyInitialization() { |
| injector = null; |
| languageResolver = null; |
| typeConverterRegistry = null; |
| typeConverter = null; |
| } |
| |
| /** |
| * A pluggable strategy to allow an endpoint to be created without requiring |
| * a component to be its factory, such as for looking up the URI inside some |
| * {@link Registry} |
| * |
| * @param uri the uri for the endpoint to be created |
| * @return the newly created endpoint or null if it could not be resolved |
| */ |
| protected Endpoint createEndpoint(String uri) { |
| Object value = getRegistry().lookupByName(uri); |
| if (value instanceof Endpoint) { |
| return (Endpoint)value; |
| } else if (value instanceof Processor) { |
| return new ProcessorEndpoint(uri, this, (Processor)value); |
| } else if (value != null) { |
| return convertBeanToEndpoint(uri, value); |
| } |
| return null; |
| } |
| |
| /** |
| * Strategy method for attempting to convert the bean from a |
| * {@link Registry} to an endpoint using some kind of transformation or |
| * wrapper |
| * |
| * @param uri the uri for the endpoint (and name in the registry) |
| * @param bean the bean to be converted to an endpoint, which will be not |
| * null |
| * @return a new endpoint |
| */ |
| protected Endpoint convertBeanToEndpoint(String uri, Object bean) { |
| throw new IllegalArgumentException("uri: " + uri + " bean: " + bean + " could not be converted to an Endpoint"); |
| } |
| |
| /** |
| * Should we start newly added routes? |
| */ |
| protected boolean shouldStartRoutes() { |
| return isStarted() && !isStarting(); |
| } |
| |
| @Override |
| public Map<String, String> getGlobalOptions() { |
| return globalOptions; |
| } |
| |
| @Override |
| public void setGlobalOptions(Map<String, String> globalOptions) { |
| this.globalOptions = globalOptions; |
| } |
| |
| @Override |
| public FactoryFinder getDefaultFactoryFinder() { |
| if (defaultFactoryFinder == null) { |
| synchronized (lock) { |
| if (defaultFactoryFinder == null) { |
| defaultFactoryFinder = getFactoryFinderResolver().resolveDefaultFactoryFinder(getClassResolver()); |
| } |
| } |
| } |
| return defaultFactoryFinder; |
| } |
| |
| @Override |
| public FactoryFinderResolver getFactoryFinderResolver() { |
| if (factoryFinderResolver == null) { |
| synchronized (lock) { |
| if (factoryFinderResolver == null) { |
| factoryFinderResolver = createFactoryFinderResolver(); |
| } |
| } |
| } |
| return factoryFinderResolver; |
| } |
| |
| @Override |
| public void setFactoryFinderResolver(FactoryFinderResolver factoryFinderResolver) { |
| this.factoryFinderResolver = doAddService(factoryFinderResolver); |
| } |
| |
| @Override |
| public FactoryFinder getFactoryFinder(String path) throws NoFactoryAvailableException { |
| return factories.computeIfAbsent(path, this::createFactoryFinder); |
| } |
| |
| protected FactoryFinder createFactoryFinder(String path) { |
| return getFactoryFinderResolver().resolveFactoryFinder(getClassResolver(), path); |
| } |
| |
| @Override |
| public ClassResolver getClassResolver() { |
| if (classResolver == null) { |
| synchronized (lock) { |
| if (classResolver == null) { |
| setClassResolver(createClassResolver()); |
| } |
| } |
| } |
| return classResolver; |
| } |
| |
| @Override |
| public void setClassResolver(ClassResolver classResolver) { |
| this.classResolver = doAddService(classResolver); |
| } |
| |
| @Override |
| public PackageScanClassResolver getPackageScanClassResolver() { |
| if (packageScanClassResolver == null) { |
| synchronized (lock) { |
| if (packageScanClassResolver == null) { |
| setPackageScanClassResolver(createPackageScanClassResolver()); |
| } |
| } |
| } |
| return packageScanClassResolver; |
| } |
| |
| @Override |
| public void setPackageScanClassResolver(PackageScanClassResolver packageScanClassResolver) { |
| this.packageScanClassResolver = doAddService(packageScanClassResolver); |
| } |
| |
| @Override |
| public PackageScanResourceResolver getPackageScanResourceResolver() { |
| if (packageScanResourceResolver == null) { |
| synchronized (lock) { |
| if (packageScanResourceResolver == null) { |
| setPackageScanResourceResolver(createPackageScanResourceResolver()); |
| } |
| } |
| } |
| return packageScanResourceResolver; |
| } |
| |
| @Override |
| public void setPackageScanResourceResolver(PackageScanResourceResolver packageScanResourceResolver) { |
| this.packageScanResourceResolver = doAddService(packageScanResourceResolver); |
| } |
| |
| @Override |
| public List<String> getComponentNames() { |
| return new ArrayList<>(components.keySet()); |
| } |
| |
| @Override |
| public List<String> getLanguageNames() { |
| return new ArrayList<>(languages.keySet()); |
| } |
| |
| @Override |
| public ModelJAXBContextFactory getModelJAXBContextFactory() { |
| if (modelJAXBContextFactory == null) { |
| synchronized (lock) { |
| if (modelJAXBContextFactory == null) { |
| setModelJAXBContextFactory(createModelJAXBContextFactory()); |
| } |
| } |
| } |
| return modelJAXBContextFactory; |
| } |
| |
| @Override |
| public void setModelJAXBContextFactory(final ModelJAXBContextFactory modelJAXBContextFactory) { |
| this.modelJAXBContextFactory = doAddService(modelJAXBContextFactory); |
| } |
| |
| @Override |
| public NodeIdFactory getNodeIdFactory() { |
| if (nodeIdFactory == null) { |
| synchronized (lock) { |
| if (nodeIdFactory == null) { |
| setNodeIdFactory(createNodeIdFactory()); |
| } |
| } |
| } |
| return nodeIdFactory; |
| } |
| |
| @Override |
| public void setNodeIdFactory(NodeIdFactory idFactory) { |
| this.nodeIdFactory = doAddService(idFactory); |
| } |
| |
| @Override |
| public ManagementStrategy getManagementStrategy() { |
| return managementStrategy; |
| } |
| |
| @Override |
| public void setManagementStrategy(ManagementStrategy managementStrategy) { |
| this.managementStrategy = managementStrategy; |
| } |
| |
| @Override |
| public void disableJMX() { |
| if (isNew()) { |
| disableJMX = true; |
| } else if (isInit()) { |
| disableJMX = true; |
| // we are still in initializing mode, so we can disable JMX, by |
| // setting up management again |
| setupManagement(null); |
| } else { |
| throw new IllegalStateException("Disabling JMX can only be done when CamelContext has not been started"); |
| } |
| } |
| |
| public boolean isJMXDisabled() { |
| return disableJMX; |
| } |
| |
| @Override |
| public void setupManagement(Map<String, Object> options) { |
| ManagementStrategyFactory factory = new DefaultManagementStrategyFactory(); |
| if (!isJMXDisabled()) { |
| try { |
| FactoryFinder finder = getFactoryFinder("META-INF/services/org/apache/camel/management/"); |
| if (finder != null) { |
| Object object = finder.newInstance("ManagementStrategyFactory").orElse(null); |
| if (object instanceof ManagementStrategyFactory) { |
| factory = (ManagementStrategyFactory)object; |
| } |
| } |
| } catch (Exception e) { |
| log.warn("Cannot create JMX lifecycle strategy. Will fallback and disable JMX.", e); |
| } |
| } |
| |
| // preserve any existing event notifiers that may have been already added |
| List<EventNotifier> notifiers = null; |
| if (managementStrategy != null) { |
| notifiers = managementStrategy.getEventNotifiers(); |
| } |
| |
| log.debug("Setting up management with factory: {}", factory); |
| try { |
| ManagementStrategy strategy = factory.create(this, options); |
| if (notifiers != null) { |
| notifiers.forEach(strategy::addEventNotifier); |
| } |
| LifecycleStrategy lifecycle = factory.createLifecycle(this); |
| factory.setupManagement(this, strategy, lifecycle); |
| } catch (Exception e) { |
| log.warn("Error setting up management due " + e.getMessage()); |
| throw RuntimeCamelException.wrapRuntimeCamelException(e); |
| } |
| } |
| |
| @Override |
| public InflightRepository getInflightRepository() { |
| if (inflightRepository == null) { |
| synchronized (lock) { |
| if (inflightRepository == null) { |
| setInflightRepository(createInflightRepository()); |
| } |
| } |
| } |
| return inflightRepository; |
| } |
| |
| @Override |
| public void setInflightRepository(InflightRepository repository) { |
| this.inflightRepository = doAddService(repository); |
| } |
| |
| @Override |
| public AsyncProcessorAwaitManager getAsyncProcessorAwaitManager() { |
| if (asyncProcessorAwaitManager == null) { |
| synchronized (lock) { |
| if (asyncProcessorAwaitManager == null) { |
| setAsyncProcessorAwaitManager(createAsyncProcessorAwaitManager()); |
| } |
| } |
| } |
| return asyncProcessorAwaitManager; |
| } |
| |
| @Override |
| public void setAsyncProcessorAwaitManager(AsyncProcessorAwaitManager asyncProcessorAwaitManager) { |
| this.asyncProcessorAwaitManager = doAddService(asyncProcessorAwaitManager); |
| } |
| |
| @Override |
| public BeanIntrospection getBeanIntrospection() { |
| if (beanIntrospection == null) { |
| synchronized (lock) { |
| if (beanIntrospection == null) { |
| setBeanIntrospection(createBeanIntrospection()); |
| } |
| } |
| } |
| return beanIntrospection; |
| } |
| |
| @Override |
| public void setBeanIntrospection(BeanIntrospection beanIntrospection) { |
| this.beanIntrospection = doAddService(beanIntrospection); |
| } |
| |
| @Override |
| public void setAutoStartup(Boolean autoStartup) { |
| this.autoStartup = autoStartup; |
| } |
| |
| @Override |
| public Boolean isAutoStartup() { |
| return autoStartup != null && autoStartup; |
| } |
| |
| @Override |
| public Boolean isLoadTypeConverters() { |
| return loadTypeConverters != null && loadTypeConverters; |
| } |
| |
| @Override |
| public void setLoadTypeConverters(Boolean loadTypeConverters) { |
| this.loadTypeConverters = loadTypeConverters; |
| } |
| |
| @Override |
| public Boolean isTypeConverterStatisticsEnabled() { |
| return typeConverterStatisticsEnabled != null && typeConverterStatisticsEnabled; |
| } |
| |
| @Override |
| public void setTypeConverterStatisticsEnabled(Boolean typeConverterStatisticsEnabled) { |
| this.typeConverterStatisticsEnabled = typeConverterStatisticsEnabled; |
| } |
| |
| @Override |
| public Boolean isUseMDCLogging() { |
| return useMDCLogging != null && useMDCLogging; |
| } |
| |
| @Override |
| public void setUseMDCLogging(Boolean useMDCLogging) { |
| this.useMDCLogging = useMDCLogging; |
| } |
| |
| @Override |
| public String getMDCLoggingKeysPattern() { |
| return mdcLoggingKeysPattern; |
| } |
| |
| @Override |
| public void setMDCLoggingKeysPattern(String pattern) { |
| this.mdcLoggingKeysPattern = pattern; |
| } |
| |
| @Override |
| public Boolean isUseDataType() { |
| return useDataType; |
| } |
| |
| @Override |
| public void setUseDataType(Boolean useDataType) { |
| this.useDataType = useDataType; |
| } |
| |
| @Override |
| public Boolean isUseBreadcrumb() { |
| return useBreadcrumb != null && useBreadcrumb; |
| } |
| |
| @Override |
| public void setUseBreadcrumb(Boolean useBreadcrumb) { |
| this.useBreadcrumb = useBreadcrumb; |
| } |
| |
| @Override |
| public ClassLoader getApplicationContextClassLoader() { |
| return applicationContextClassLoader; |
| } |
| |
| @Override |
| public void setApplicationContextClassLoader(ClassLoader classLoader) { |
| applicationContextClassLoader = classLoader; |
| } |
| |
| @Override |
| public DataFormatResolver getDataFormatResolver() { |
| if (dataFormatResolver == null) { |
| synchronized (lock) { |
| if (dataFormatResolver == null) { |
| setDataFormatResolver(createDataFormatResolver()); |
| } |
| } |
| } |
| return dataFormatResolver; |
| } |
| |
| @Override |
| public void setDataFormatResolver(DataFormatResolver dataFormatResolver) { |
| this.dataFormatResolver = doAddService(dataFormatResolver); |
| } |
| |
| @Override |
| public DataFormat resolveDataFormat(String name) { |
| DataFormat answer = getDataFormatResolver().resolveDataFormat(name, this); |
| |
| // inject CamelContext if aware |
| if (answer instanceof CamelContextAware) { |
| ((CamelContextAware)answer).setCamelContext(this); |
| } |
| |
| return answer; |
| } |
| |
| @Override |
| public DataFormat createDataFormat(String name) { |
| DataFormat answer = getDataFormatResolver().createDataFormat(name, this); |
| |
| // inject CamelContext if aware |
| if (answer instanceof CamelContextAware) { |
| ((CamelContextAware)answer).setCamelContext(this); |
| } |
| |
| return answer; |
| } |
| |
| protected static <T> T lookup(CamelContext context, String ref, Class<T> type) { |
| try { |
| return context.getRegistry().lookupByNameAndType(ref, type); |
| } catch (Exception e) { |
| // need to ignore not same type and return it as null |
| return null; |
| } |
| } |
| |
| @Override |
| public ShutdownStrategy getShutdownStrategy() { |
| if (shutdownStrategy == null) { |
| synchronized (lock) { |
| if (shutdownStrategy == null) { |
| setShutdownStrategy(createShutdownStrategy()); |
| } |
| } |
| } |
| return shutdownStrategy; |
| } |
| |
| @Override |
| public void setShutdownStrategy(ShutdownStrategy shutdownStrategy) { |
| this.shutdownStrategy = doAddService(shutdownStrategy); |
| } |
| |
| @Override |
| public ShutdownRoute getShutdownRoute() { |
| return shutdownRoute; |
| } |
| |
| @Override |
| public void setShutdownRoute(ShutdownRoute shutdownRoute) { |
| this.shutdownRoute = shutdownRoute; |
| } |
| |
| @Override |
| public ShutdownRunningTask getShutdownRunningTask() { |
| return shutdownRunningTask; |
| } |
| |
| @Override |
| public void setShutdownRunningTask(ShutdownRunningTask shutdownRunningTask) { |
| this.shutdownRunningTask = shutdownRunningTask; |
| } |
| |
| @Override |
| public void setAllowUseOriginalMessage(Boolean allowUseOriginalMessage) { |
| this.allowUseOriginalMessage = allowUseOriginalMessage; |
| } |
| |
| @Override |
| public Boolean isAllowUseOriginalMessage() { |
| return allowUseOriginalMessage != null && allowUseOriginalMessage; |
| } |
| |
| @Override |
| public ExecutorServiceManager getExecutorServiceManager() { |
| if (executorServiceManager == null) { |
| synchronized (lock) { |
| if (executorServiceManager == null) { |
| setExecutorServiceManager(createExecutorServiceManager()); |
| } |
| } |
| } |
| return this.executorServiceManager; |
| } |
| |
| @Override |
| public void setExecutorServiceManager(ExecutorServiceManager executorServiceManager) { |
| // special for executorServiceManager as want to stop it manually so |
| // false in stopOnShutdown |
| this.executorServiceManager = doAddService(executorServiceManager, false); |
| } |
| |
| @Override |
| public ProcessorFactory getProcessorFactory() { |
| if (processorFactory == null) { |
| synchronized (lock) { |
| if (processorFactory == null) { |
| setProcessorFactory(createProcessorFactory()); |
| } |
| } |
| } |
| return processorFactory; |
| } |
| |
| @Override |
| public void setProcessorFactory(ProcessorFactory processorFactory) { |
| this.processorFactory = doAddService(processorFactory); |
| } |
| |
| @Override |
| public MessageHistoryFactory getMessageHistoryFactory() { |
| if (messageHistoryFactory == null) { |
| synchronized (lock) { |
| if (messageHistoryFactory == null) { |
| setMessageHistoryFactory(createMessageHistoryFactory()); |
| } |
| } |
| } |
| return messageHistoryFactory; |
| } |
| |
| @Override |
| public void setMessageHistoryFactory(MessageHistoryFactory messageHistoryFactory) { |
| this.messageHistoryFactory = doAddService(messageHistoryFactory); |
| } |
| |
| @Override |
| public Debugger getDebugger() { |
| return debugger; |
| } |
| |
| @Override |
| public void setDebugger(Debugger debugger) { |
| if (isStartingOrStarted()) { |
| throw new IllegalStateException("Can not set debugger on a started CamelContext"); |
| } |
| this.debugger = doAddService(debugger); |
| // enable debugging if we set a custom debugger |
| setDebugging(true); |
| } |
| |
| @Override |
| public Tracer getTracer() { |
| if (tracer == null) { |
| synchronized (lock) { |
| if (tracer == null) { |
| setTracer(createTracer()); |
| } |
| } |
| } |
| return tracer; |
| } |
| |
| @Override |
| public void setTracer(Tracer tracer) { |
| this.tracer = tracer; |
| // enable tracing if we set a custom tracer |
| setTracing(true); |
| } |
| |
| @Override |
| public UuidGenerator getUuidGenerator() { |
| if (uuidGenerator == null) { |
| synchronized (lock) { |
| if (uuidGenerator == null) { |
| setUuidGenerator(createUuidGenerator()); |
| } |
| } |
| } |
| return uuidGenerator; |
| } |
| |
| @Override |
| public void setUuidGenerator(UuidGenerator uuidGenerator) { |
| this.uuidGenerator = doAddService(uuidGenerator); |
| } |
| |
| @Override |
| public StreamCachingStrategy getStreamCachingStrategy() { |
| if (streamCachingStrategy == null) { |
| synchronized (lock) { |
| if (streamCachingStrategy == null) { |
| setStreamCachingStrategy(createStreamCachingStrategy()); |
| } |
| } |
| } |
| return streamCachingStrategy; |
| } |
| |
| @Override |
| public void setStreamCachingStrategy(StreamCachingStrategy streamCachingStrategy) { |
| this.streamCachingStrategy = doAddService(streamCachingStrategy, true, false); |
| } |
| |
| @Override |
| public RestRegistry getRestRegistry() { |
| if (restRegistry == null) { |
| synchronized (lock) { |
| if (restRegistry == null) { |
| setRestRegistry(createRestRegistry()); |
| } |
| } |
| } |
| return restRegistry; |
| } |
| |
| @Override |
| public void setRestRegistry(RestRegistry restRegistry) { |
| this.restRegistry = doAddService(restRegistry); |
| } |
| |
| protected RestRegistry createRestRegistry() { |
| RestRegistryFactory factory = getRestRegistryFactory(); |
| if (factory == null) { |
| throw new IllegalStateException("No RestRegistryFactory implementation found. You need to add camel-rest to the classpath."); |
| } |
| return factory.createRegistry(); |
| } |
| |
| public RestRegistryFactory getRestRegistryFactory() { |
| if (restRegistryFactory == null) { |
| synchronized (lock) { |
| if (restRegistryFactory == null) { |
| setRestRegistryFactory(createRestRegistryFactory()); |
| } |
| } |
| } |
| return restRegistryFactory; |
| } |
| |
| public void setRestRegistryFactory(RestRegistryFactory restRegistryFactory) { |
| this.restRegistryFactory = doAddService(restRegistryFactory); |
| } |
| |
| @Override |
| public String getGlobalOption(String key) { |
| String value = getGlobalOptions().get(key); |
| if (ObjectHelper.isNotEmpty(value)) { |
| try { |
| value = resolvePropertyPlaceholders(value); |
| } catch (Exception e) { |
| throw new RuntimeCamelException("Error getting global option: " + key, e); |
| } |
| } |
| return value; |
| } |
| |
| @Override |
| public Transformer resolveTransformer(String scheme) { |
| return transformerRegistry.resolveTransformer(new TransformerKey(scheme)); |
| } |
| |
| @Override |
| public Transformer resolveTransformer(DataType from, DataType to) { |
| return transformerRegistry.resolveTransformer(new TransformerKey(from, to)); |
| } |
| |
| @Override |
| public TransformerRegistry<TransformerKey> getTransformerRegistry() { |
| return transformerRegistry; |
| } |
| |
| @Override |
| public Validator resolveValidator(DataType type) { |
| return validatorRegistry.resolveValidator(new ValidatorKey(type)); |
| } |
| |
| @Override |
| public ValidatorRegistry<ValidatorKey> getValidatorRegistry() { |
| return validatorRegistry; |
| } |
| |
| @Override |
| public void setSSLContextParameters(SSLContextParameters sslContextParameters) { |
| this.sslContextParameters = sslContextParameters; |
| } |
| |
| @Override |
| public SSLContextParameters getSSLContextParameters() { |
| return this.sslContextParameters; |
| } |
| |
| @Override |
| public HeadersMapFactory getHeadersMapFactory() { |
| if (headersMapFactory == null) { |
| synchronized (lock) { |
| if (headersMapFactory == null) { |
| setHeadersMapFactory(createHeadersMapFactory()); |
| } |
| } |
| } |
| return headersMapFactory; |
| } |
| |
| @Override |
| public void setHeadersMapFactory(HeadersMapFactory headersMapFactory) { |
| this.headersMapFactory = doAddService(headersMapFactory); |
| } |
| |
| @Override |
| public ReactiveExecutor getReactiveExecutor() { |
| if (reactiveExecutor == null) { |
| synchronized (lock) { |
| if (reactiveExecutor == null) { |
| setReactiveExecutor(createReactiveExecutor()); |
| } |
| } |
| } |
| return reactiveExecutor; |
| } |
| |
| @Override |
| public void setReactiveExecutor(ReactiveExecutor reactiveExecutor) { |
| // special for executorServiceManager as want to stop it manually so |
| // false in stopOnShutdown |
| this.reactiveExecutor = doAddService(reactiveExecutor, false); |
| } |
| |
| @Override |
| public DeferServiceFactory getDeferServiceFactory() { |
| return deferServiceFactory; |
| } |
| |
| @Override |
| public AnnotationBasedProcessorFactory getAnnotationBasedProcessorFactory() { |
| return annotationBasedProcessorFactory; |
| } |
| |
| @Override |
| public BeanProxyFactory getBeanProxyFactory() { |
| if (beanProxyFactory == null) { |
| synchronized (lock) { |
| if (beanProxyFactory == null) { |
| beanProxyFactory = createBeanProxyFactory(); |
| } |
| } |
| } |
| return beanProxyFactory; |
| } |
| |
| @Override |
| public BeanProcessorFactory getBeanProcessorFactory() { |
| if (beanProcessorFactory == null) { |
| synchronized (lock) { |
| if (beanProcessorFactory == null) { |
| beanProcessorFactory = createBeanProcessorFactory(); |
| } |
| } |
| } |
| return beanProcessorFactory; |
| } |
| |
| protected Map<String, BaseRouteService> getRouteServices() { |
| return routeServices; |
| } |
| |
| /** |
| * Reset context counter to a preset value. Mostly used for tests to ensure |
| * a predictable getName() |
| * |
| * @param value new value for the context counter |
| */ |
| public static void setContextCounter(int value) { |
| DefaultCamelContextNameStrategy.setCounter(value); |
| DefaultManagementNameStrategy.setCounter(value); |
| } |
| |
| @Override |
| public String toString() { |
| return "CamelContext(" + getName() + ")"; |
| } |
| |
| class MDCHelper implements AutoCloseable { |
| final Map<String, String> originalContextMap; |
| |
| MDCHelper() { |
| if (isUseMDCLogging()) { |
| originalContextMap = MDC.getCopyOfContextMap(); |
| MDC.put(MDC_CAMEL_CONTEXT_ID, getName()); |
| } else { |
| originalContextMap = null; |
| } |
| } |
| |
| @Override |
| public void close() { |
| if (isUseMDCLogging()) { |
| if (originalContextMap != null) { |
| MDC.setContextMap(originalContextMap); |
| } else { |
| MDC.clear(); |
| } |
| } |
| } |
| } |
| |
| protected abstract ReactiveExecutor createReactiveExecutor(); |
| |
| protected abstract StreamCachingStrategy createStreamCachingStrategy(); |
| |
| protected abstract TypeConverter createTypeConverter(); |
| |
| protected abstract TypeConverterRegistry createTypeConverterRegistry(); |
| |
| protected abstract Injector createInjector(); |
| |
| protected abstract PropertiesComponent createPropertiesComponent(); |
| |
| protected abstract CamelBeanPostProcessor createBeanPostProcessor(); |
| |
| protected abstract ComponentResolver createComponentResolver(); |
| |
| protected abstract Registry createRegistry(); |
| |
| protected abstract UuidGenerator createUuidGenerator(); |
| |
| protected abstract ModelJAXBContextFactory createModelJAXBContextFactory(); |
| |
| protected abstract NodeIdFactory createNodeIdFactory(); |
| |
| protected abstract FactoryFinderResolver createFactoryFinderResolver(); |
| |
| protected abstract ClassResolver createClassResolver(); |
| |
| protected abstract ProcessorFactory createProcessorFactory(); |
| |
| protected abstract DataFormatResolver createDataFormatResolver(); |
| |
| protected abstract MessageHistoryFactory createMessageHistoryFactory(); |
| |
| protected abstract InflightRepository createInflightRepository(); |
| |
| protected abstract AsyncProcessorAwaitManager createAsyncProcessorAwaitManager(); |
| |
| protected abstract RouteController createRouteController(); |
| |
| protected abstract ShutdownStrategy createShutdownStrategy(); |
| |
| protected abstract PackageScanClassResolver createPackageScanClassResolver(); |
| |
| protected abstract PackageScanResourceResolver createPackageScanResourceResolver(); |
| |
| protected abstract ExecutorServiceManager createExecutorServiceManager(); |
| |
| protected abstract ServicePool<Producer> createProducerServicePool(); |
| |
| protected abstract ServicePool<PollingConsumer> createPollingConsumerServicePool(); |
| |
| protected abstract UnitOfWorkFactory createUnitOfWorkFactory(); |
| |
| protected abstract CamelContextNameStrategy createCamelContextNameStrategy(); |
| |
| protected abstract ManagementNameStrategy createManagementNameStrategy(); |
| |
| protected abstract HeadersMapFactory createHeadersMapFactory(); |
| |
| protected abstract BeanProxyFactory createBeanProxyFactory(); |
| |
| protected abstract BeanProcessorFactory createBeanProcessorFactory(); |
| |
| protected abstract BeanIntrospection createBeanIntrospection(); |
| |
| protected abstract Tracer createTracer(); |
| |
| protected abstract LanguageResolver createLanguageResolver(); |
| |
| protected abstract RestRegistryFactory createRestRegistryFactory(); |
| |
| protected abstract EndpointRegistry<EndpointKey> createEndpointRegistry(Map<EndpointKey, Endpoint> endpoints); |
| |
| protected abstract TransformerRegistry<TransformerKey> createTransformerRegistry() throws Exception; |
| |
| protected abstract ValidatorRegistry<ValidatorKey> createValidatorRegistry() throws Exception; |
| |
| } |