blob: f563cb1b6466295d4f6a3cc64c23cfa1f9c30d00 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.spring.boot;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.CamelContext;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.StartupListener;
import org.apache.camel.main.MainDurationEventNotifier;
import org.apache.camel.main.MainShutdownStrategy;
import org.apache.camel.main.RoutesCollector;
import org.apache.camel.main.RoutesConfigurer;
import org.apache.camel.main.SimpleMainShutdownStrategy;
import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.CamelEvent.Type;
import org.apache.camel.spi.EventNotifier;
import org.apache.camel.support.EventNotifierSupport;
import org.apache.camel.support.LifecycleStrategySupport;
import org.apache.camel.support.service.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.core.Ordered;
/**
* A spring application listener that when spring boot is starting (refresh event) will setup Camel by:
* <p>
* 1. collecting routes and rests from the various sources (like Spring application context beans registry or opinionated
* classpath locations) and injects these into the Camel context.
* 2. setting up Camel main controller if enabled.
* 3. setting up run duration if in use.
*/
public class CamelSpringBootApplicationListener implements ApplicationListener<ContextRefreshedEvent>, Ordered {
// Static collaborators
private static final Logger LOG = LoggerFactory.getLogger(CamelSpringBootApplicationListener.class);
// Collaborators
private final ApplicationContext applicationContext;
private final List<CamelContextConfiguration> camelContextConfigurations;
private final CamelConfigurationProperties configurationProperties;
private final RoutesCollector springBootRoutesCollector;
// Constructors
public CamelSpringBootApplicationListener(ApplicationContext applicationContext, List<CamelContextConfiguration> camelContextConfigurations,
CamelConfigurationProperties configurationProperties,
RoutesCollector springBootRoutesCollector) {
this.applicationContext = applicationContext;
this.camelContextConfigurations = new ArrayList<>(camelContextConfigurations);
this.configurationProperties = configurationProperties;
this.springBootRoutesCollector = springBootRoutesCollector;
}
// Overridden
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
CamelContext camelContext = applicationContext.getBean(CamelContext.class);
// only add and start Camel if its stopped (initial state)
if (event.getApplicationContext() == this.applicationContext
&& camelContext.getStatus().isStopped()) {
LOG.debug("Post-processing CamelContext bean: {}", camelContext.getName());
try {
// we can use the default routes configurer
RoutesConfigurer configurer = new RoutesConfigurer();
if (configurationProperties.isRoutesCollectorEnabled()) {
configurer.setRoutesCollector(springBootRoutesCollector);
}
configurer.setBeanPostProcessor(camelContext.adapt(ExtendedCamelContext.class).getBeanPostProcessor());
configurer.setJavaRoutesExcludePattern(configurationProperties.getJavaRoutesExcludePattern());
configurer.setJavaRoutesIncludePattern(configurationProperties.getJavaRoutesIncludePattern());
configurer.setRoutesExcludePattern(configurationProperties.getRoutesExcludePattern());
configurer.setRoutesIncludePattern(configurationProperties.getRoutesIncludePattern());
configurer.configureRoutes(camelContext);
for (CamelContextConfiguration camelContextConfiguration : camelContextConfigurations) {
LOG.debug("CamelContextConfiguration found. Invoking beforeApplicationStart: {}", camelContextConfiguration);
camelContextConfiguration.beforeApplicationStart(camelContext);
}
if (configurationProperties.isMainRunController()) {
CamelMainRunController controller = new CamelMainRunController(applicationContext, camelContext);
if (configurationProperties.getDurationMaxMessages() > 0 || configurationProperties.getDurationMaxIdleSeconds() > 0) {
if (configurationProperties.getDurationMaxMessages() > 0) {
LOG.info("CamelSpringBoot will terminate after processing {} messages", configurationProperties.getDurationMaxMessages());
}
if (configurationProperties.getDurationMaxIdleSeconds() > 0) {
LOG.info("CamelSpringBoot will terminate after being idle for more {} seconds", configurationProperties.getDurationMaxIdleSeconds());
}
// register lifecycle so we can trigger to shutdown the JVM when maximum number of messages has been processed
EventNotifier notifier = new MainDurationEventNotifier(camelContext,
configurationProperties.getDurationMaxMessages(), configurationProperties.getDurationMaxIdleSeconds(),
controller.getMainShutdownStrategy(), true, configurationProperties.isRoutesReloadRestartDuration(),
configurationProperties.getDurationMaxAction());
// register our event notifier
ServiceHelper.startService(notifier);
camelContext.getManagementStrategy().addEventNotifier(notifier);
}
if (configurationProperties.getDurationMaxSeconds() > 0) {
LOG.info("CamelSpringBoot will terminate after {} seconds", configurationProperties.getDurationMaxSeconds());
terminateMainControllerAfter(camelContext, configurationProperties.getDurationMaxSeconds(),
controller.getMainShutdownStrategy(), controller.getMainCompleteTask());
}
camelContext.addStartupListener(new StartupListener() {
@Override
public void onCamelContextStarted(CamelContext context, boolean alreadyStarted) throws Exception {
// run the CamelMainRunController after the context has been started
// this way we ensure that NO_START flag is honoured as it's set as
// a thread local variable of the thread CamelMainRunController is
// not running on
if (!alreadyStarted) {
LOG.info("Starting CamelMainRunController to ensure the main thread keeps running");
controller.start();
}
}
});
} else {
if (applicationContext instanceof ConfigurableApplicationContext) {
ConfigurableApplicationContext cac = (ConfigurableApplicationContext) applicationContext;
if (configurationProperties.getDurationMaxSeconds() > 0) {
LOG.info("CamelSpringBoot will terminate after {} seconds", configurationProperties.getDurationMaxSeconds());
terminateApplicationContext(cac, camelContext, configurationProperties.getDurationMaxSeconds());
}
if (configurationProperties.getDurationMaxMessages() > 0 || configurationProperties.getDurationMaxIdleSeconds() > 0) {
if (configurationProperties.getDurationMaxMessages() > 0) {
LOG.info("CamelSpringBoot will terminate after processing {} messages", configurationProperties.getDurationMaxMessages());
}
if (configurationProperties.getDurationMaxIdleSeconds() > 0) {
LOG.info("CamelSpringBoot will terminate after being idle for more {} seconds", configurationProperties.getDurationMaxIdleSeconds());
}
// needed by MainDurationEventNotifier to signal when we have processed the max messages
final MainShutdownStrategy strategy = new SimpleMainShutdownStrategy();
// register lifecycle so we can trigger to shutdown the JVM when maximum number of messages has been processed
EventNotifier notifier = new MainDurationEventNotifier(camelContext,
configurationProperties.getDurationMaxMessages(),
configurationProperties.getDurationMaxIdleSeconds(),
strategy, false,
configurationProperties.isRoutesReloadRestartDuration(),
configurationProperties.getDurationMaxAction());
// register our event notifier
ServiceHelper.startService(notifier);
camelContext.getManagementStrategy().addEventNotifier(notifier);
terminateApplicationContext(cac, camelContext, strategy);
}
}
}
if (!camelContextConfigurations.isEmpty()) {
// we want to call these notifications just after CamelContext has been fully started
// so use an event notifier to trigger when this happens
camelContext.getManagementStrategy().addEventNotifier(new EventNotifierSupport() {
@Override
public void notify(CamelEvent eventObject) throws Exception {
for (CamelContextConfiguration camelContextConfiguration : camelContextConfigurations) {
LOG.debug("CamelContextConfiguration found. Invoking afterApplicationStart: {}", camelContextConfiguration);
try {
camelContextConfiguration.afterApplicationStart(camelContext);
} catch (Exception e) {
LOG.warn("Error during calling afterApplicationStart due " + e.getMessage() + ". This exception is ignored", e);
}
}
}
@Override
public boolean isEnabled(CamelEvent eventObject) {
return eventObject.getType() == Type.CamelContextStarted;
}
});
}
} catch (Exception e) {
throw new CamelSpringBootInitializationException(e);
}
} else {
LOG.debug("Camel already started, not adding routes.");
}
}
@Override
public int getOrder() {
// RoutesCollector implements Ordered so that it's the
// first Camel ApplicationListener to receive events,
// SpringCamelContext should be the last one,
// CamelContextFactoryBean should be second to last and then
// RoutesCollector. This is important for startup as we want
// all resources to be ready and all routes added to the
// context before we start CamelContext.
// So the order should be:
// 1. RoutesCollector (LOWEST_PRECEDENCE - 2)
// 2. CamelContextFactoryBean (LOWEST_PRECEDENCE -1)
// 3. SpringCamelContext (LOWEST_PRECEDENCE)
return LOWEST_PRECEDENCE - 2;
}
// Helpers
private void terminateMainControllerAfter(final CamelContext camelContext, int seconds,
final MainShutdownStrategy shutdownStrategy, final Runnable mainCompletedTask) {
ScheduledExecutorService executorService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "CamelSpringBootTerminateTask");
final AtomicBoolean running = new AtomicBoolean();
Runnable task = () -> {
// need to spin up as separate thread so we can terminate this thread pool without problems
Runnable stop = () -> {
running.set(true);
LOG.info("CamelSpringBoot triggering shutdown of the JVM.");
try {
camelContext.stop();
} catch (Throwable e) {
LOG.warn("Error during stopping CamelContext", e);
} finally {
shutdownStrategy.shutdown();
mainCompletedTask.run();
}
running.set(false);
};
new Thread(stop, "CamelSpringBootTerminateTaskWorker").start();
};
final ScheduledFuture<?> future = executorService.schedule(task, seconds, TimeUnit.SECONDS);
camelContext.addLifecycleStrategy(new LifecycleStrategySupport() {
@Override
public void onContextStop(CamelContext context) {
// we are stopping then cancel the task so we can shutdown quicker
if (!running.get()) {
future.cancel(true);
// trigger shutdown
shutdownStrategy.shutdown();
mainCompletedTask.run();
}
}
});
}
private void terminateApplicationContext(final ConfigurableApplicationContext applicationContext, final CamelContext camelContext, int seconds) {
ScheduledExecutorService executorService = camelContext.getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "CamelSpringBootTerminateTask");
final AtomicBoolean running = new AtomicBoolean();
Runnable task = () -> {
// need to spin up as separate thread so we can terminate this thread pool without problems
Runnable stop = () -> {
running.set(true);
LOG.info("CamelSpringBoot triggering shutdown of the JVM.");
// we need to run a daemon thread to stop ourselves so this thread pool can be stopped nice also
new Thread(applicationContext::close).start();
running.set(false);
};
new Thread(stop, "CamelSpringBootTerminateTaskWorker").start();
};
final ScheduledFuture<?> future = executorService.schedule(task, seconds, TimeUnit.SECONDS);
camelContext.addLifecycleStrategy(new LifecycleStrategySupport() {
@Override
public void onContextStop(CamelContext context) {
// we are stopping then cancel the task so we can shutdown quicker
if (!running.get()) {
future.cancel(true);
}
}
});
}
private void terminateApplicationContext(final ConfigurableApplicationContext applicationContext, final CamelContext camelContext,
final MainShutdownStrategy shutdownStrategy) {
ExecutorService executorService = camelContext.getExecutorServiceManager().newSingleThreadExecutor(this, "CamelSpringBootTerminateTask");
final AtomicBoolean running = new AtomicBoolean();
Runnable task = () -> {
try {
shutdownStrategy.await();
// only mark as running after the latch
running.set(true);
LOG.info("CamelSpringBoot triggering shutdown of the JVM.");
// we need to run a daemon thread to stop ourselves so this thread pool can be stopped nice also
new Thread(applicationContext::close).start();
} catch (Throwable e) {
// ignore
}
running.set(false);
};
final Future<?> future = executorService.submit(task);
camelContext.addLifecycleStrategy(new LifecycleStrategySupport() {
@Override
public void onContextStop(CamelContext context) {
// we are stopping then cancel the task so we can shutdown quicker
if (!running.get()) {
future.cancel(true);
} else {
// trigger shutdown
shutdownStrategy.shutdown();
}
}
});
}
}