| [[PollingConsumer-PollingConsumer]] |
| = Polling Consumer |
| |
| Camel supports implementing the |
| http://www.enterpriseintegrationpatterns.com/PollingConsumer.html[Polling |
| Consumer] from the xref:enterprise-integration-patterns.adoc[EIP |
| patterns] using the |
| https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer] |
| interface which can be created via the |
| https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/Endpoint.java[Endpoint.createPollingConsumer()] |
| method. |
| |
| image::eip/PollingConsumerSolution.gif[image] |
| |
| In Java: |
| |
| javaEndpoint endpoint = context.getEndpoint("activemq:my.queue"); |
| PollingConsumer consumer = endpoint.createPollingConsumer(); Exchange |
| exchange = consumer.receive(); |
| |
| The *`ConsumerTemplate`* (discussed below) is also available. |
| |
| There are three main polling methods on |
| https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer] |
| |
| [width="100%",cols="50%,50%",options="header",] |
| |======================================================================= |
| |Method name |Description |
| |https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer.receive()] |
| |Waits until a message is available and then returns it; potentially |
| blocking forever |
| |
| |https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer.receive(long)] |
| |Attempts to receive a message exchange, waiting up to the given timeout |
| and returning null if no message exchange could be received within the |
| time available |
| |
| |https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer.receiveNoWait()] |
| |Attempts to receive a message exchange immediately without waiting and |
| returning null if a message exchange is not available yet |
| |======================================================================= |
| |
| [[PollingConsumer-EventDrivenPollingConsumerOptions]] |
| == EventDrivenPollingConsumer Options |
| |
| The *`EventDrivePollingConsumer`* (the default implementation) supports |
| the following options: |
| |
| [width="100%",cols="34%,33%,33%",options="header",] |
| |======================================================================= |
| |Option |Default |Description |
| |`pollingConsumerQueueSize` |`1000` |*Camel 2.14/2.13.1/2.12.4:* The |
| queue size for the internal hand-off queue between the polling consumer, |
| and producers sending data into the queue. |
| |
| |`pollingConsumerBlockWhenFull` |`true` |*Camel 2.14/2.13.1/2.12/4:* |
| Whether to block any producer if the internal queue is full. |
| |
| |`pollingConsumerBlockTimeout` |0 |*Camel 2.16:* To use a timeout (in |
| milliseconds) when the producer is blocked if the internal queue is |
| full. If the value is *`0`* or negative then no timeout is in use. If a |
| timeout is triggered then a *`ExchangeTimedOutException`* is thrown. |
| |======================================================================= |
| |
| Notice that some Camel xref:components::index.adoc[Components] has their own |
| implementation of *`PollingConsumer`* and therefore do not support the |
| options above. |
| |
| You can configure these options in endpoints xref:uris.adoc[URIs], such |
| as shown below: |
| |
| [source,java] |
| ---- |
| Endpoint endpoint = |
| context.getEndpoint("file:inbox?pollingConsumerQueueSize=50"); |
| PollingConsumer consumer = endpoint.createPollingConsumer(); |
| Exchange exchange = consumer.receive(5000); |
| ---- |
| |
| [[PollingConsumer-ConsumerTemplate]] |
| == ConsumerTemplate |
| |
| The *`ConsumerTemplate`* is a template much like |
| Spring's *`JmsTemplate`* or *`JdbcTemplate`* supporting the |
| xref:polling-consumer.adoc[Polling Consumer] EIP. With the template you |
| can consume xref:exchange.adoc[Exchange]s from an |
| xref:endpoint.adoc[Endpoint]. The template supports the three operations |
| listed above. However, it also includes convenient methods for returning |
| the body, etc *`consumeBody`*. |
| |
| Example: |
| |
| [source,java] |
| ---- |
| Exchange exchange = consumerTemplate.receive("activemq:my.queue"); |
| ---- |
| |
| Or to extract and get the body you can do: |
| |
| [source,java] |
| ---- |
| Object body = consumerTemplate.receiveBody("activemq:my.queue"); |
| ---- |
| |
| And you can provide the body type as a parameter and have it returned as |
| the type: |
| |
| [source,java] |
| ---- |
| String body = consumerTemplate.receiveBody("activemq:my.queue", String.class); |
| ---- |
| |
| You get hold of a *`ConsumerTemplate`* from the *`CamelContext`* with |
| the *`createConsumerTemplate`* operation: |
| |
| [source,java] |
| ---- |
| ConsumerTemplate consumer = context.createConsumerTemplate(); |
| ---- |
| |
| [[PollingConsumer-UsingConsumerTemplatewithSpringDSL]] |
| == Using ConsumerTemplate with Spring DSL |
| |
| With the Spring DSL we can declare the consumer in the *`CamelContext`* |
| with the *`consumerTemplate`* tag, just like the *`ProducerTemplate`*. |
| The example below illustrates |
| |
| [source,xml] |
| ---- |
| include::{examplesdir}/components/camel-spring/src/test/resources/org/apache/camel/spring/SpringConsumerTemplateTest-context.xml[tags=e1] |
| ---- |
| |
| Then we can get leverage Spring to inject the *`ConsumerTemplate`* in our |
| java class. The code below is part of an unit test but it shows how the |
| consumer and producer can work |
| together. |
| |
| [source,java] |
| ---- |
| include::{examplesdir}/components/camel-spring/src/test/java/org/apache/camel/spring/SpringConsumerTemplateTest.java[tags=e1] |
| ---- |
| |
| [[PollingConsumer-TimerBasedPollingConsumer]] |
| == Timer Based Polling Consumer |
| |
| In this sample we use a xref:components::timer-component.adoc[Timer] to schedule a route to be |
| started every 5th second and invoke our bean *`MyCoolBean`* where we |
| implement the business logic for the xref:polling-consumer.adoc[Polling |
| Consumer]. Here we want to consume all messages from a JMS queue, |
| process the message and send them to the next queue. |
| |
| [[PollingConsumer-ScheduledPollComponents]] |
| == Scheduled Poll Components |
| |
| Quite a few inbound Camel endpoints use a scheduled poll pattern to |
| receive messages and push them through the Camel processing routes. That |
| is to say externally from the client the endpoint appears to use an |
| xref:eventDrivenConsumer-eip.adoc[Event Driven Consumer] but internally a |
| scheduled poll is used to monitor some kind of state or resource and |
| then fire message exchanges. |
| |
| Since this a such a common pattern, polling components can extend the |
| https://github.com/apache/camel/blob/master/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java[ScheduledPollConsumer] |
| base class which makes it simpler to implement this pattern. There is |
| also the xref:components::quartz-component.adoc[Quartz Component] which provides scheduled |
| delivery of messages using the Quartz enterprise scheduler. |
| |
| For more details see: |
| |
| * https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer] |
| * Scheduled Polling Components |
| ** https://github.com/apache/camel/blob/master/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java[ScheduledPollConsumer] |
| ** xref:components::scheduler-component.adoc[Scheduler] |
| ** xref:components::atom-component.adoc[Atom] |
| ** xref:components::beanstalk-component.adoc[Beanstalk] |
| ** xref:components::file-component.adoc[File] |
| ** xref:components::ftp-component.adoc[FTP] |
| ** xref:components::hbase-component.adoc[hbase] |
| ** xref:components::jpa-component.adoc[JPA] |
| ** xref:components::mail-component.adoc[Mail] |
| ** xref:components::mybatis-component.adoc[MyBatis] |
| ** xref:components::quartz-component.adoc[Quartz] |
| ** xref:components::snmp-component.adoc[SNMP] |
| ** xref:components::aws-s3-component.adoc[AWS-S3] |
| ** xref:components::aws-sqs-component.adoc[AWS-SQS] |
| |
| [[PollingConsumer-ScheduledPollConsumerOptions]] |
| === ScheduledPollConsumer Options |
| |
| The *`ScheduledPollConsumer`* supports the following options: |
| |
| confluenceTableSmall |
| |
| [width="100%",cols="34%,33%,33%",options="header",] |
| |======================================================================= |
| |Option |Default |Description |
| |`backoffErrorThreshold` |`0` |*Camel 2.12:* The number of subsequent |
| error polls (failed due some error) that should happen before the |
| *`backoffMultipler`* should kick-in. |
| |
| |`backoffIdleThreshold` |`0` |*Camel 2.12:* The number of subsequent |
| idle polls that should happen before the *`backoffMultipler`* should |
| kick-in. |
| |
| |`backoffMultiplier` |`0` |*Camel 2.12:* To let the scheduled polling |
| consumer back-off if there has been a number of subsequent idles/errors |
| in a row. The multiplier is then the number of polls that will be |
| skipped before the next actual attempt is happening again. When this |
| option is in use then *`backoffIdleThreshold`* and/or |
| *`backoffErrorThreshold`* must also be configured. |
| |
| |`delay` |`500` |Milliseconds before the next poll. |
| |
| |`greedy` |`false` |*Camel 2.10.6/2.11.1:* If greedy is enabled, then |
| the *`ScheduledPollConsumer`* will run immediately again, if the |
| previous run polled 1 or more messages. |
| |
| |`initialDelay` |`1000` |Milliseconds before the first poll starts. |
| |
| |`pollStrategy` | a| |
| A pluggable *`org.apache.camel.PollingConsumerPollingStrategy`* allowing |
| you to provide your custom implementation to control error handling |
| usually occurred during the *`poll`* operation _*before*_ an |
| xref:exchange.adoc[Exchange] has been created and routed in Camel. In |
| other words the error occurred while the polling was gathering |
| information, for instance access to a file network failed so Camel |
| cannot access it to scan for files. |
| |
| The default implementation will log the caused exception at *`WARN`* |
| level and ignore it. |
| |
| |`runLoggingLevel` |`TRACE` |*Camel 2.8:* The consumer logs a |
| start/complete log line when it polls. This option allows you to |
| configure the logging level for that. |
| |
| |`scheduledExecutorService` |`null` |*Camel 2.10:* Allows for |
| configuring a custom/shared thread pool to use for the consumer. By |
| default each consumer has its own single threaded thread pool. This |
| option allows you to share a thread pool among multiple consumers. |
| |
| |`scheduler` |`null` a| |
| *Camel 2.12:* Allow to plugin a custom |
| *`org.apache.camel.spi.ScheduledPollConsumerScheduler`* to use as the |
| scheduler for firing when the polling consumer runs. The default |
| implementation uses the *`ScheduledExecutorService`* and there is a |
| xref:components::quartz-component.adoc[Quartz], and xref:spring.adoc[Spring] based which |
| supports CRON expressions. *Notice:* If using a custom scheduler then |
| the options for *`initialDelay`, `useFixedDelay`*, *`timeUnit`* and |
| *`scheduledExecutorService`* may not be in use. Use the text *`quartz`* |
| to refer to use the xref:components::quartz-component.adoc[Quartz] scheduler; and use the |
| text `spring` to use the xref:spring.adoc[Spring] based; and use the |
| text *`#myScheduler`* to refer to a custom scheduler by its id in the |
| xref:registry.adoc[Registry]. |
| |
| See xref:components::quartz-component.adoc[Quartz] page for an example. |
| |
| |`scheduler.xxx` |`null` |*Camel 2.12:* To configure additional |
| properties when using a custom *`scheduler`* or any of the |
| xref:components::quartz-component.adoc[Quartz], xref:spring.adoc[Spring] based scheduler. |
| |
| |`sendEmptyMessageWhenIdle` |`false` |*Camel 2.9:* If the polling |
| consumer did not poll any files, you can enable this option to send an |
| empty message (no body) instead. |
| |
| |`startScheduler` |`true` |Whether the scheduler should be auto started. |
| |
| |`timeUnit` |`TimeUnit.MILLISECONDS` |Time unit for *`initialDelay`* and |
| *`delay`* options. |
| |
| |`useFixedDelay` | a| |
| Controls if fixed delay or fixed rate is used. See |
| http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ScheduledExecutorService.html[ScheduledExecutorService] |
| in JDK for details. In *Camel 2.7.x* or older the default value is |
| *`false`*. |
| |
| From *Camel 2.8*: the default value is *`true`*. |
| |
| |======================================================================= |
| |
| [[PollingConsumer-UsingbackofftoLettheSchedulerbeLessAggressive]] |
| === Using `backoff` to Let the Scheduler be Less Aggressive |
| |
| *Since Camel 2.12* |
| |
| The scheduled xref:polling-consumer.adoc[Polling Consumer] is by default |
| static by using the same poll frequency whether or not there is messages |
| to pickup or not. |
| |
| From *Camel 2.12*: you can configure the scheduled |
| xref:polling-consumer.adoc[Polling Consumer] to be more dynamic by using |
| *`backoff`*. This allows the scheduler to skip N number of polls when it |
| becomes idle, or there has been X number of errors in a row. See more |
| details in the table above for the *`backoffXXX`* options. |
| |
| For example to let a FTP consumer back-off if its becoming idle for a |
| while you can do: |
| |
| [source,java] |
| ---- |
| from("ftp://myserver?username=foo&password=secret&delete=true&delay=5s&backoffMultiplier=6&backoffIdleThreshold=5") |
| .to("bean:processFile"); |
| ---- |
| |
| In this example, the FTP consumer will poll for new FTP files every 5th |
| second. But if it has been idle for 5 attempts in a row, then it will |
| back-off using a multiplier of 6, which means it will now poll every 5 x |
| 6 = 30th second instead. When the consumer eventually pickup a file, |
| then the back-off will reset, and the consumer will go back and poll |
| every 5th second again. |
| |
| Camel will log at *`DEBUG`* level using |
| *`org.apache.camel.impl.ScheduledPollConsumer`* when back-off is |
| kicking-in. |
| |
| [[PollingConsumer-AboutErrorHandlingandScheduledPollingConsumers]] |
| === About Error Handling and Scheduled Polling Consumers |
| |
| https://github.com/apache/camel/blob/master/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java[ScheduledPollConsumer] |
| is scheduled based and its *`run`* method is invoked periodically based |
| on schedule settings. But errors can also occur when a poll is being |
| executed. For instance if Camel should poll a file network, and this |
| network resource is not available then a *`java.io.IOException`* could |
| occur. As this error happens *before* any xref:exchange.adoc[Exchange] |
| has been created and prepared for routing, then the regular |
| xref:error-handling-in-camel.adoc[Error handling in Camel] does not |
| apply. So what does the consumer do then? Well the exception is |
| propagated back to the *`run`* method where its handled. Camel will by |
| default log the exception at *`WARN`* level and then ignore it. At next |
| schedule the error could have been resolved and thus being able to poll |
| the endpoint successfully. |
| |
| [[PollingConsumer-UsingaCustomScheduler]] |
| === Using a Custom Scheduler |
| |
| *Since Camel 2.12:* |
| |
| The SPI interface |
| *`org.apache.camel.spi.ScheduledPollConsumerScheduler`* allows to |
| implement a custom scheduler to control when the |
| xref:polling-consumer.adoc[Polling Consumer] runs. The default |
| implementation is based on the JDKs *`ScheduledExecutorService`* with a |
| single thread in the thread pool. There is a CRON based implementation |
| in the xref:components::quartz-component.adoc[Quartz], and xref:spring.adoc[Spring] |
| components. |
| |
| For an example of developing and using a custom scheduler, see the unit |
| test *`org.apache.camel.component.file.FileConsumerCustomSchedulerTest`* |
| from the source code in *`camel-core`*. |
| |
| [[PollingConsumer-ErrorHandlingWhenUsingPollingConsumerPollStrategy]] |
| === Error Handling When Using `PollingConsumerPollStrategy` |
| |
| *`org.apache.camel.PollingConsumerPollStrategy`* is a pluggable strategy |
| that you can configure on the *`ScheduledPollConsumer`*. The default |
| implementation |
| *`org.apache.camel.impl.DefaultPollingConsumerPollStrategy`* will log |
| the caused exception at *`WARN`* level and then ignore this issue. |
| |
| The strategy interface provides the following three methods: |
| |
| * *`begin`* |
| ** `void begin(Consumer consumer, Endpoint endpoint)` |
| * *`begin`* (*Camel 2.3*) |
| ** `boolean begin(Consumer consumer, Endpoint endpoint)` |
| * *`commit`* |
| ** `void commit(Consumer consumer, Endpoint endpoint)` |
| * *`commit`* (*Camel 2.6*) |
| ** `void commit(Consumer consumer, Endpoint endpoint, int polledMessages)` |
| * *`rollback`* |
| ** `boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception` |
| |
| In *Camel 2.3*: the begin method returns a *`boolean`* which indicates |
| whether or not to skipping polling. So you can implement your custom |
| logic and return *`false`* if you do not want to poll this time. |
| |
| In *Camel 2.6*: the commit method has an additional parameter containing |
| the number of message that was actually polled. For example if there was |
| no messages polled, the value would be zero, and you can react |
| accordingly. |
| |
| The most interesting is the *`rollback`* as it allows you do handle the |
| caused exception and decide what to do. |
| |
| For instance if we want to provide a retry feature to a scheduled |
| consumer we can implement the *`PollingConsumerPollStrategy`* method and |
| put the retry logic in the *`rollback`* method. Lets just retry up till |
| three times: |
| |
| [source,java] |
| ---- |
| public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception { |
| if (retryCounter < 3) { |
| // return true to tell Camel that it should retry the poll immediately |
| return true; |
| } |
| // okay we give up do not retry anymore |
| return false; |
| } |
| ---- |
| |
| Notice that we are given the *`Consumer`* as a parameter. We could use |
| this to _restart_ the consumer as we can invoke stop and start: |
| |
| [source,java] |
| ---- |
| // error occurred lets restart the consumer, that could maybe |
| resolve the issue consumer.stop(); |
| consumer.start(); |
| ---- |
| |
| *Note:* if you implement the *`begin`* operation make sure to avoid |
| throwing exceptions as in such a case the *`poll`* operation is not |
| invoked and Camel will invoke the *`rollback`* directly. |
| |
| [[PollingConsumer-ConfiguringantoUsePollingConsumerPollStrategy]] |
| === Configuring an xref:endpoint.adoc[Endpoint] to Use `PollingConsumerPollStrategy` |
| |
| To configure an xref:endpoint.adoc[Endpoint] to use a custom |
| *`PollingConsumerPollStrategy`* you use the option *`pollStrategy`*. For |
| example in the file consumer below we want to use our custom strategy |
| defined in the xref:registry.adoc[Registry] with the bean id *`myPoll`*: |
| |
| [source,java] |
| ---- |
| from("file://inbox/?pollStrategy=#myPoll").to("activemq:queue:inbox") |
| ---- |
| |
| xref:using-this-pattern.adoc[Using This Pattern] |
| |