| [[GracefulShutdown-GracefulShutdown]] |
| = Graceful Shutdown |
| |
| *Since Camel 2.2* |
| |
| Camel now supports a pluggable shutdown strategy using |
| `org.apache.camel.spi.ShutdownStrategy`. Its responsible for shutting |
| down routes in a graceful manner. The other resources will still be |
| handled by xref:camelcontext.adoc[CamelContext] to shutdown. This leaves |
| the problem at hand with properly shutting down all the routes in a |
| reliable manner to the `ShutdownStrategy`. |
| |
| Camel provides a default strategy in the |
| `org.apache.camel.impl.DefaultShutdownStrategy` which is capable of |
| doing that. |
| |
| [[GracefulShutdown-DefaultShutdownStrategy]] |
| == DefaultShutdownStrategy |
| |
| The default strategy will gracefully shutdown routes: |
| |
| * *Camel 2.2:* in the same order they was started |
| * *Camel 2.3:* in the reverse order they was started. The option |
| `shutdownRoutesInReverseOrder` can be used to use the old behavior. |
| * let pending and current in flight exchanges run to completion before |
| shutting down |
| * using a timeout of 300 seconds which then forces a shutdown now |
| |
| You can configure the timeout, and whether it should shutdown now |
| remaining routes when the timeout occurred or ignore. See the setters on |
| the class. |
| |
| It will output to log the progress during graceful shutdown as shown in |
| an example below |
| |
| [source,bash] |
| --------------------------------- |
| 2009-12-20 10:56:53,055 [main ] INFO DefaultCamelContext - Apache Camel |
| (CamelContext:camel-1) is stopping 2009-12-20 10:56:53,056 [main ] INFO |
| DefaultShutdownStrategy - Starting to graceful shutdown routes (timeout |
| 300 seconds) 2009-12-20 10:56:53,059 [1: ShutdownTask] INFO |
| DefaultShutdownStrategy - Waiting as there are still 5 inflight |
| exchanges to complete before we can shutdown 2009-12-20 10:56:54,060 [1: |
| ShutdownTask] INFO DefaultShutdownStrategy - Waiting as there are still |
| 4 inflight exchanges to complete before we can shutdown 2009-12-20 |
| 10:56:55,061 [1: ShutdownTask] INFO DefaultShutdownStrategy - Waiting as |
| there are still 3 inflight exchanges to complete before we can shutdown |
| 2009-12-20 10:56:56,065 [1: ShutdownTask] INFO DefaultShutdownStrategy - |
| Waiting as there are still 2 inflight exchanges to complete before we |
| can shutdown 2009-12-20 10:56:57,066 [1: ShutdownTask] INFO |
| DefaultShutdownStrategy - Waiting as there are still 1 inflight |
| exchanges to complete before we can shutdown 2009-12-20 10:56:58,069 |
| [main ] INFO DefaultShutdownStrategy - Graceful shutdown of routes |
| complete in 5 seconds. 2009-12-20 10:56:58,072 [main ] INFO |
| DefaultInflightRepository - Shutting down with no inflight exchanges. |
| 2009-12-20 10:56:58,077 [main ] INFO DefaultCamelContext - Apache Camel |
| (CamelContext:camel-1) stopped |
| --------------------------------- |
| |
| Notice how it waits while there are inflight exchanges still being |
| processed before it can shutdown. |
| |
| [[GracefulShutdown-Suppressingloggingduetotimeoutnotallowingallinflightmessagestocomplete]] |
| == Suppressing logging due to timeout not allowing all inflight messages to complete |
| |
| |
| *Since Camel 2.12* |
| |
| If a graceful shutdown could not shutdown cleanly within the given |
| timeout period, then Camel performs a more aggressive shutdown by |
| forcing routes and thread pools etc to shutdown. And as well the routing |
| engine will reject continue processing xref:exchange.adoc[Exchange]s. If |
| this happens you may see WARN logs about xref:exchange.adoc[Exchange]s |
| being rejected and other failures due the forced shutdown. |
| |
| If you do not want to see these logs, you can suppress this by setting |
| the option SuppressLoggingOnTimeout to true. |
| |
| [source,java] |
| --------------------------------- |
| context.getShutdownStrategy().setSuppressLoggingOnTimeout(true); |
| --------------------------------- |
| |
| Notice the suppress is a "best effort" though there may still be some |
| logs coming from 3rd party libraries and whatnot, which Camel cannot |
| control. |
| |
| [[GracefulShutdown-Logginginflightexchangeinformationontimeout]] |
| == Logging inflight exchange information on timeout |
| |
| *Since Camel 2.15* |
| |
| If a graceful shutdown could not shutdown cleanly within the given |
| timeout period, then Camel performs a more aggressive shutdown by |
| forcing routes and thread pools etc to shutdown. When the timeout |
| happens, then Camel logs information about the current inflight |
| exchanges, which shows from which route the exchange origins, and where |
| it currently is being routed. For example the logging below, shows that |
| there is 1 inflight exchange, that origins from route1, and currently is |
| still in route1 at the "delay1" node. The elapsed is time in millis how |
| long at the current node (eg delay1) and duration is total time in |
| mills. |
| |
| If you enable DEBUG logging level |
| on `org.apache.camel.impl.DefaultShutdownStrategy` then it logs the same |
| inflight exchange information during graceful shutdown |
| |
| [source,bash] |
| --------------------------------- |
| 2015-01-12 13:23:23,656 [ - ShutdownTask] INFO DefaultShutdownStrategy - |
| There are 1 inflight exchanges: InflightExchange: |
| [exchangeId=ID-davsclaus-air-62213-1421065401253-0-3, |
| fromRouteId=route1, routeId=route1, nodeId=delay1, elapsed=2007, |
| duration=2017] |
| --------------------------------- |
| |
| If you do not want to see these logs, you can turn this off by setting |
| the option logInflightExchangesOnTimeout to false. |
| |
| [source,java] |
| --------------------------------- |
| context.getShutdownStrategy().setLogInflightExchangesOnTimeout(false); |
| --------------------------------- |
| |
| [[GracefulShutdown-Controllingorderingofroutes]] |
| == Controlling ordering of routes |
| |
| You can configure the order in which routes should be started, and thus |
| also the same order they are being shutdown. |
| See more at |
| xref:configuring-route-startup-ordering-and-autostartup.adoc[Configuring |
| route startup ordering and autostartup]. |
| |
| [[GracefulShutdown-Finegrainedconfiguration]] |
| == Fine grained configuration |
| |
| You can control two areas that influence graceful shutdown in the Camel |
| routing: |
| |
| * `ShutdownRoute` |
| * `ShutdownRunningTask` |
| |
| These options can be configured on two scopes: `context` and `route`. |
| Where a route will fallback to the `context` scoped option, if not |
| explicit configured. (same principle as xref:error-handler.adoc[Error |
| Handler], etc.). |
| |
| [[GracefulShutdown-ShutdownRoute]] |
| == ShutdownRoute |
| |
| This option can control how a given route should act during graceful |
| shutdown. It has two values `Default` and `Defer`. The `Default` is |
| obviously the default option which lets Camel shutdown the route as |
| early as possible. The `Defer` is used to defer shutting down this route |
| to a later stage. This is useful when other routes are dependent upon |
| it. For example an internal route which other routes reuse. |
| |
| For example in the route below we have two routes, where route 1 is |
| dependent upon route 2. At shutdown we want route 1 to complete all its |
| current messages and we also want the 2nd route to do this as well. So |
| we can mark both routes to `Defer` but since route 1 is a |
| xref:components::seda-component.adoc[SEDA] based route its `Defer` by default (it uses |
| `ShutdownAware`). |
| |
| A Java DSL based example to defer shutting down the 2nd |
| |
| [source,java] |
| --------------------------------- |
| package org.apache.camel.processor; |
| |
| import java.io.File; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.camel.Component; |
| import org.apache.camel.ContextTestSupport; |
| import org.apache.camel.Processor; |
| import org.apache.camel.builder.RouteBuilder; |
| import org.apache.camel.component.file.FileConsumer; |
| import org.apache.camel.component.file.FileEndpoint; |
| import org.apache.camel.component.file.GenericFileOperations; |
| import org.apache.camel.component.mock.MockEndpoint; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import static org.apache.camel.ShutdownRoute.Defer; |
| |
| public class ShutdownDeferTest extends ContextTestSupport { |
| |
| private static final AtomicBoolean CONSUMER_SUSPENDED = new AtomicBoolean(); |
| |
| @Override |
| @Before |
| public void setUp() throws Exception { |
| deleteDirectory("target/deferred"); |
| super.setUp(); |
| } |
| |
| @Test |
| public void testShutdownDeferred() throws Exception { |
| MockEndpoint bar = getMockEndpoint("mock:bar"); |
| bar.expectedMinimumMessageCount(1); |
| |
| template.sendBody("seda:foo", "A"); |
| template.sendBody("seda:foo", "B"); |
| template.sendBody("seda:foo", "C"); |
| template.sendBody("seda:foo", "D"); |
| template.sendBody("seda:foo", "E"); |
| |
| assertMockEndpointsSatisfied(); |
| |
| Thread.sleep(50); |
| |
| context.stop(); |
| |
| assertFalse("Should not have been suspended", CONSUMER_SUSPENDED.get()); |
| } |
| |
| @Override |
| protected RouteBuilder createRouteBuilder() throws Exception { |
| return new RouteBuilder() { |
| @Override |
| // START SNIPPET: e1 |
| public void configure() throws Exception { |
| from("seda:foo") |
| .startupOrder(1) |
| .to("file://target/deferred"); |
| |
| // use file component to transfer files from route 1 -> route 2 as it |
| // will normally suspend, but by deferring this we can let route 1 |
| // complete while shutting down |
| MyDeferFileEndpoint defer = new MyDeferFileEndpoint("file://target/deferred?initialDelay=0&delay=10", getContext().getComponent("file")); |
| defer.setFile(new File("target/deferred")); |
| |
| from(defer) |
| // defer shutting down this route as the 1st route depends upon it |
| .startupOrder(2).shutdownRoute(Defer) |
| .to("mock:bar"); |
| } |
| // END SNIPPET: e1 |
| }; |
| } |
| |
| private static final class MyDeferFileEndpoint extends FileEndpoint { |
| |
| private MyDeferFileEndpoint(String endpointUri, Component component) { |
| super(endpointUri, component); |
| } |
| |
| @Override |
| protected FileConsumer newFileConsumer(Processor processor, GenericFileOperations<File> operations) { |
| return new FileConsumer(this, processor, operations, createGenericFileStrategy()) { |
| @Override |
| protected void doSuspend() throws Exception { |
| CONSUMER_SUSPENDED.set(true); |
| super.doSuspend(); |
| } |
| }; |
| } |
| } |
| } |
| --------------------------------- |
| |
| Defer shutting down internal routes only |
| |
| Its best to only defer shutting down internal routes only. As *public* |
| routes should shutdown as quickly as possible otherwise it will just |
| keep intake new messages which will delay the shutdown processor. Or |
| even have it timeout if a lot of new messages keep coming in. |
| |
| [[GracefulShutdown-ShutdownRunningTask]] |
| == ShutdownRunningTask |
| |
| This option control how a given route consumer acts during shutdown. |
| Most route consumer will only operate on a single task (message), |
| however the xref:batch-consumer.adoc[Batch Consumer] can operate on many |
| messages (in a batch). This option is for those kind of consumers. By |
| default it uses the option `CompleteCurrentTaskOnly` which mean that the |
| current _in progress_ task (message) will be completed and then the |
| consumer will shutdown. The other option `CompleteAllTasks` allows the |
| consumer to complete all the tasks (messages) before shutting down. For |
| example a xref:components::file-component.adoc[File] consumer will process all the pending |
| files it has picked up before shutting down. |
| |
| [source,java] |
| --------------------------------- |
| package org.apache.camel.processor; |
| |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.camel.ContextTestSupport; |
| import org.apache.camel.Exchange; |
| import org.apache.camel.Processor; |
| import org.apache.camel.ShutdownRunningTask; |
| import org.apache.camel.builder.RouteBuilder; |
| import org.apache.camel.component.mock.MockEndpoint; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class ShutdownCompleteAllTasksTest extends ContextTestSupport { |
| |
| private static String url = "file:target/pending?initialDelay=0&delay=10"; |
| private static AtomicInteger counter = new AtomicInteger(); |
| private static CountDownLatch latch = new CountDownLatch(2); |
| |
| @Override |
| @Before |
| public void setUp() throws Exception { |
| deleteDirectory("target/pending"); |
| super.setUp(); |
| |
| template.sendBodyAndHeader(url, "A", Exchange.FILE_NAME, "a.txt"); |
| template.sendBodyAndHeader(url, "B", Exchange.FILE_NAME, "b.txt"); |
| template.sendBodyAndHeader(url, "C", Exchange.FILE_NAME, "c.txt"); |
| template.sendBodyAndHeader(url, "D", Exchange.FILE_NAME, "d.txt"); |
| template.sendBodyAndHeader(url, "E", Exchange.FILE_NAME, "e.txt"); |
| } |
| |
| @Test |
| public void testShutdownCompleteAllTasks() throws Exception { |
| // give it 30 seconds to shutdown |
| context.getShutdownStrategy().setTimeout(30); |
| |
| // start route |
| context.startRoute("foo"); |
| |
| MockEndpoint bar = getMockEndpoint("mock:bar"); |
| bar.expectedMinimumMessageCount(1); |
| |
| assertMockEndpointsSatisfied(); |
| |
| int batch = bar.getReceivedExchanges().get(0).getProperty(Exchange.BATCH_SIZE, int.class); |
| |
| // wait for latch |
| latch.await(10, TimeUnit.SECONDS); |
| |
| // shutdown during processing |
| context.stop(); |
| |
| // should route all |
| assertEquals("Should complete all messages", batch, counter.get()); |
| } |
| |
| @Override |
| protected RouteBuilder createRouteBuilder() throws Exception { |
| return new RouteBuilder() { |
| @Override |
| // START SNIPPET: e1 |
| public void configure() throws Exception { |
| from(url).routeId("foo").noAutoStartup() |
| // let it complete all tasks during shutdown |
| .shutdownRunningTask(ShutdownRunningTask.CompleteAllTasks) |
| .process(new MyProcessor()) |
| .to("mock:bar"); |
| } |
| // END SNIPPET: e1 |
| }; |
| } |
| |
| public static class MyProcessor implements Processor { |
| |
| public void process(Exchange exchange) throws Exception { |
| counter.incrementAndGet(); |
| latch.countDown(); |
| } |
| } |
| |
| } |
| --------------------------------- |
| |
| [[GracefulShutdown-JMXmanaged]] |
| == JMX managed |
| |
| The `ShutdownStrategy` is JMX aware as well so you can manage it from a |
| JMX console. For example you can change the timeout value. |
| |
| [[GracefulShutdown-Shuttingdownindividualroutes]] |
| == Shutting down individual routes |
| |
| *Since Camel 2.3* |
| Its now possible to gracefully shutdown an individual route using |
| `shutdownRoute(routeId)` method on `CamelContext`. Its also possible to |
| provide a specific timeout to use instead of the default timeout |
| settings using `shutdownRoute(routeId, timeout, timeUnit)`. |
| |
| [[GracefulShutdown-Developerrelated]] |
| == Developer related |
| |
| If you develop your own Camel component or want to implement your own |
| shutdown strategy then read this section for details. |
| |
| [[GracefulShutdown-ShutdownStrategy]] |
| == ShutdownStrategy |
| |
| You can implement your own strategy to control the shutdown by |
| implementing the `org.apache.camel.spi.ShutdownStrategy` and the set it |
| on the `CamelContext` using the `setShutdownStrategy` method. |
| |
| When using Spring XML you then just define a spring bean which |
| implements the `org.apache.camel.spi.ShutdownStrategy` and Camel will |
| look it up at startup and use it instead of its default. See more at |
| xref:advanced-configuration-of-camelcontext-using-spring.adoc[Advanced |
| configuration of CamelContext using Spring]. |
| |
| [[GracefulShutdown-ShutdownAware]] |
| == ShutdownAware |
| |
| The interface `org.apache.camel.spi.ShutdownAware` is an optional |
| interface consumers can implement to have fine grained control during |
| shutdown. The `ShutdownStrategy` must be able to deal with consumers |
| which implement this interface. This interface was introduced to cater |
| for in memory consumers such as xref:components::seda-component.adoc[SEDA] which potentially |
| have a number of pending messages on its internal in memory queues. What |
| this allows is to let it control the shutdown process to let it complete |
| its pending messages. |
| |
| The method `getPendingExchangesSize` should return the number of pending |
| messages which reside on the in memory queues. + |
| The method `deferShutdown` should return `true` to defer the shutdown |
| to a later stage, when there are no more pending and inflight messages. |
| |
| xref:batch-consumer.adoc[Batch Consumer] should implement |
| `ShutdownAware` so they properly support the `ShutdownRunningTask` |
| option. See `GenericFileConsumer` for an example. |
| |