blob: fd28244f9b61f2807e5b6ca0a18c2f157fd39063 [file] [log] [blame]
[[PollingConsumer-PollingConsumer]]
= Polling Consumer
Camel supports implementing the
http://www.enterpriseintegrationpatterns.com/PollingConsumer.html[Polling
Consumer] from the xref:enterprise-integration-patterns.adoc[EIP
patterns] using the
https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer]
interface which can be created via the
https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/Endpoint.java[Endpoint.createPollingConsumer()]
method.
image::eip/PollingConsumerSolution.gif[image]
In Java:
javaEndpoint endpoint = context.getEndpoint("activemq:my.queue");
PollingConsumer consumer = endpoint.createPollingConsumer(); Exchange
exchange = consumer.receive();
The *`ConsumerTemplate`* (discussed below) is also available.
There are three main polling methods on
https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer]
[width="100%",cols="50%,50%",options="header",]
|=======================================================================
|Method name |Description
|https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer.receive()]
|Waits until a message is available and then returns it; potentially
blocking forever
|https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer.receive(long)]
|Attempts to receive a message exchange, waiting up to the given timeout
and returning null if no message exchange could be received within the
time available
|https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer.receiveNoWait()]
|Attempts to receive a message exchange immediately without waiting and
returning null if a message exchange is not available yet
|=======================================================================
[[PollingConsumer-EventDrivenPollingConsumerOptions]]
== EventDrivenPollingConsumer Options
The *`EventDrivePollingConsumer`* (the default implementation) supports
the following options:
[width="100%",cols="34%,33%,33%",options="header",]
|=======================================================================
|Option |Default |Description
|`pollingConsumerQueueSize` |`1000` |*Camel 2.14/2.13.1/2.12.4:* The
queue size for the internal hand-off queue between the polling consumer,
and producers sending data into the queue.
|`pollingConsumerBlockWhenFull` |`true` |*Camel 2.14/2.13.1/2.12/4:*
Whether to block any producer if the internal queue is full.
|`pollingConsumerBlockTimeout` |0 |*Camel 2.16:* To use a timeout (in
milliseconds) when the producer is blocked if the internal queue is
full. If the value is *`0`* or negative then no timeout is in use. If a
timeout is triggered then a *`ExchangeTimedOutException`* is thrown.
|=======================================================================
Notice that some Camel xref:components::index.adoc[Components] has their own
implementation of *`PollingConsumer`* and therefore do not support the
options above.
You can configure these options in endpoints xref:uris.adoc[URIs], such
as shown below:
[source,java]
----
Endpoint endpoint =
context.getEndpoint("file:inbox?pollingConsumerQueueSize=50");
PollingConsumer consumer = endpoint.createPollingConsumer();
Exchange exchange = consumer.receive(5000);
----
[[PollingConsumer-ConsumerTemplate]]
== ConsumerTemplate
The *`ConsumerTemplate`* is a template much like
Spring's *`JmsTemplate`* or *`JdbcTemplate`* supporting the
xref:polling-consumer.adoc[Polling Consumer] EIP. With the template you
can consume xref:exchange.adoc[Exchange]s from an
xref:endpoint.adoc[Endpoint]. The template supports the three operations
listed above. However, it also includes convenient methods for returning
the body, etc *`consumeBody`*.
Example:
[source,java]
----
Exchange exchange = consumerTemplate.receive("activemq:my.queue");
----
Or to extract and get the body you can do:
[source,java]
----
Object body = consumerTemplate.receiveBody("activemq:my.queue");
----
And you can provide the body type as a parameter and have it returned as
the type:
[source,java]
----
String body = consumerTemplate.receiveBody("activemq:my.queue", String.class);
----
You get hold of a *`ConsumerTemplate`* from the *`CamelContext`* with
the *`createConsumerTemplate`* operation:
[source,java]
----
ConsumerTemplate consumer = context.createConsumerTemplate();
----
[[PollingConsumer-UsingConsumerTemplatewithSpringDSL]]
== Using ConsumerTemplate with Spring DSL
With the Spring DSL we can declare the consumer in the *`CamelContext`*
with the *`consumerTemplate`* tag, just like the *`ProducerTemplate`*.
The example below illustrates
[source,xml]
----
include::{examplesdir}/components/camel-spring/src/test/resources/org/apache/camel/spring/SpringConsumerTemplateTest-context.xml[tags=e1]
----
Then we can get leverage Spring to inject the *`ConsumerTemplate`* in our
java class. The code below is part of an unit test but it shows how the
consumer and producer can work
together.
[source,java]
----
include::{examplesdir}/components/camel-spring/src/test/java/org/apache/camel/spring/SpringConsumerTemplateTest.java[tags=e1]
----
[[PollingConsumer-TimerBasedPollingConsumer]]
== Timer Based Polling Consumer
In this sample we use a xref:components::timer-component.adoc[Timer] to schedule a route to be
started every 5th second and invoke our bean *`MyCoolBean`* where we
implement the business logic for the xref:polling-consumer.adoc[Polling
Consumer]. Here we want to consume all messages from a JMS queue,
process the message and send them to the next queue.
[[PollingConsumer-ScheduledPollComponents]]
== Scheduled Poll Components
Quite a few inbound Camel endpoints use a scheduled poll pattern to
receive messages and push them through the Camel processing routes. That
is to say externally from the client the endpoint appears to use an
xref:eventDrivenConsumer-eip.adoc[Event Driven Consumer] but internally a
scheduled poll is used to monitor some kind of state or resource and
then fire message exchanges.
Since this a such a common pattern, polling components can extend the
https://github.com/apache/camel/blob/master/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java[ScheduledPollConsumer]
base class which makes it simpler to implement this pattern. There is
also the xref:components::quartz-component.adoc[Quartz Component] which provides scheduled
delivery of messages using the Quartz enterprise scheduler.
For more details see:
* https://github.com/apache/camel/blob/master/core/camel-api/src/main/java/org/apache/camel/PollingConsumer.java[PollingConsumer]
* Scheduled Polling Components
** https://github.com/apache/camel/blob/master/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java[ScheduledPollConsumer]
** xref:components::scheduler-component.adoc[Scheduler]
** xref:components::atom-component.adoc[Atom]
** xref:components::beanstalk-component.adoc[Beanstalk]
** xref:components::file-component.adoc[File]
** xref:components::ftp-component.adoc[FTP]
** xref:components::hbase-component.adoc[hbase]
** xref:components::jpa-component.adoc[JPA]
** xref:components::mail-component.adoc[Mail]
** xref:components::mybatis-component.adoc[MyBatis]
** xref:components::quartz-component.adoc[Quartz]
** xref:components::snmp-component.adoc[SNMP]
** xref:components::aws-s3-component.adoc[AWS-S3]
** xref:components::aws-sqs-component.adoc[AWS-SQS]
[[PollingConsumer-ScheduledPollConsumerOptions]]
=== ScheduledPollConsumer Options
The *`ScheduledPollConsumer`* supports the following options:
confluenceTableSmall
[width="100%",cols="34%,33%,33%",options="header",]
|=======================================================================
|Option |Default |Description
|`backoffErrorThreshold` |`0` |*Camel 2.12:* The number of subsequent
error polls (failed due some error) that should happen before the
*`backoffMultipler`* should kick-in.
|`backoffIdleThreshold` |`0` |*Camel 2.12:* The number of subsequent
idle polls that should happen before the *`backoffMultipler`* should
kick-in.
|`backoffMultiplier` |`0` |*Camel 2.12:* To let the scheduled polling
consumer back-off if there has been a number of subsequent idles/errors
in a row. The multiplier is then the number of polls that will be
skipped before the next actual attempt is happening again. When this
option is in use then *`backoffIdleThreshold`* and/or
*`backoffErrorThreshold`* must also be configured.
|`delay` |`500` |Milliseconds before the next poll.
|`greedy` |`false` |*Camel 2.10.6/2.11.1:* If greedy is enabled, then
the *`ScheduledPollConsumer`* will run immediately again, if the
previous run polled 1 or more messages.
|`initialDelay` |`1000` |Milliseconds before the first poll starts.
|`pollStrategy` | a|
A pluggable *`org.apache.camel.PollingConsumerPollingStrategy`* allowing
you to provide your custom implementation to control error handling
usually occurred during the *`poll`* operation _*before*_ an
xref:exchange.adoc[Exchange] has been created and routed in Camel. In
other words the error occurred while the polling was gathering
information, for instance access to a file network failed so Camel
cannot access it to scan for files.
The default implementation will log the caused exception at *`WARN`*
level and ignore it.
|`runLoggingLevel` |`TRACE` |*Camel 2.8:* The consumer logs a
start/complete log line when it polls. This option allows you to
configure the logging level for that.
|`scheduledExecutorService` |`null` |*Camel 2.10:* Allows for
configuring a custom/shared thread pool to use for the consumer. By
default each consumer has its own single threaded thread pool. This
option allows you to share a thread pool among multiple consumers.
|`scheduler` |`null` a|
*Camel 2.12:* Allow to plugin a custom
*`org.apache.camel.spi.ScheduledPollConsumerScheduler`* to use as the
scheduler for firing when the polling consumer runs. The default
implementation uses the *`ScheduledExecutorService`* and there is a
xref:components::quartz-component.adoc[Quartz], and xref:spring.adoc[Spring] based which
supports CRON expressions. *Notice:* If using a custom scheduler then
the options for *`initialDelay`, `useFixedDelay`*, *`timeUnit`* and
*`scheduledExecutorService`* may not be in use. Use the text *`quartz`*
to refer to use the xref:components::quartz-component.adoc[Quartz] scheduler; and use the
text `spring` to use the xref:spring.adoc[Spring] based; and use the
text *`#myScheduler`* to refer to a custom scheduler by its id in the
xref:registry.adoc[Registry].
See xref:components::quartz-component.adoc[Quartz] page for an example.
|`scheduler.xxx` |`null` |*Camel 2.12:* To configure additional
properties when using a custom *`scheduler`* or any of the
xref:components::quartz-component.adoc[Quartz], xref:spring.adoc[Spring] based scheduler.
|`sendEmptyMessageWhenIdle` |`false` |*Camel 2.9:* If the polling
consumer did not poll any files, you can enable this option to send an
empty message (no body) instead.
|`startScheduler` |`true` |Whether the scheduler should be auto started.
|`timeUnit` |`TimeUnit.MILLISECONDS` |Time unit for *`initialDelay`* and
*`delay`* options.
|`useFixedDelay` | a|
Controls if fixed delay or fixed rate is used. See
http://java.sun.com/j2se/1.5.0/docs/api/java/util/concurrent/ScheduledExecutorService.html[ScheduledExecutorService]
in JDK for details. In *Camel 2.7.x* or older the default value is
*`false`*.
From *Camel 2.8*: the default value is *`true`*.
|=======================================================================
[[PollingConsumer-UsingbackofftoLettheSchedulerbeLessAggressive]]
=== Using `backoff` to Let the Scheduler be Less Aggressive
*Since Camel 2.12*
The scheduled xref:polling-consumer.adoc[Polling Consumer] is by default
static by using the same poll frequency whether or not there is messages
to pickup or not.
From *Camel 2.12*: you can configure the scheduled
xref:polling-consumer.adoc[Polling Consumer] to be more dynamic by using
*`backoff`*. This allows the scheduler to skip N number of polls when it
becomes idle, or there has been X number of errors in a row. See more
details in the table above for the *`backoffXXX`* options.
For example to let a FTP consumer back-off if its becoming idle for a
while you can do:
[source,java]
----
from("ftp://myserver?username=foo&password=secret&delete=true&delay=5s&backoffMultiplier=6&backoffIdleThreshold=5")
.to("bean:processFile");
----
In this example, the FTP consumer will poll for new FTP files every 5th
second. But if it has been idle for 5 attempts in a row, then it will
back-off using a multiplier of 6, which means it will now poll every 5 x
6 = 30th second instead. When the consumer eventually pickup a file,
then the back-off will reset, and the consumer will go back and poll
every 5th second again.
Camel will log at *`DEBUG`* level using
*`org.apache.camel.impl.ScheduledPollConsumer`* when back-off is
kicking-in.
[[PollingConsumer-AboutErrorHandlingandScheduledPollingConsumers]]
=== About Error Handling and Scheduled Polling Consumers
https://github.com/apache/camel/blob/master/core/camel-support/src/main/java/org/apache/camel/support/ScheduledPollConsumer.java[ScheduledPollConsumer]
is scheduled based and its *`run`* method is invoked periodically based
on schedule settings. But errors can also occur when a poll is being
executed. For instance if Camel should poll a file network, and this
network resource is not available then a *`java.io.IOException`* could
occur. As this error happens *before* any xref:exchange.adoc[Exchange]
has been created and prepared for routing, then the regular
xref:error-handling-in-camel.adoc[Error handling in Camel] does not
apply. So what does the consumer do then? Well the exception is
propagated back to the *`run`* method where its handled. Camel will by
default log the exception at *`WARN`* level and then ignore it. At next
schedule the error could have been resolved and thus being able to poll
the endpoint successfully.
[[PollingConsumer-UsingaCustomScheduler]]
=== Using a Custom Scheduler
*Since Camel 2.12:*
The SPI interface
*`org.apache.camel.spi.ScheduledPollConsumerScheduler`* allows to
implement a custom scheduler to control when the
xref:polling-consumer.adoc[Polling Consumer] runs. The default
implementation is based on the JDKs *`ScheduledExecutorService`* with a
single thread in the thread pool. There is a CRON based implementation
in the xref:components::quartz-component.adoc[Quartz], and xref:spring.adoc[Spring]
components.
For an example of developing and using a custom scheduler, see the unit
test *`org.apache.camel.component.file.FileConsumerCustomSchedulerTest`*
from the source code in *`camel-core`*.
[[PollingConsumer-ErrorHandlingWhenUsingPollingConsumerPollStrategy]]
=== Error Handling When Using `PollingConsumerPollStrategy`
*`org.apache.camel.PollingConsumerPollStrategy`* is a pluggable strategy
that you can configure on the *`ScheduledPollConsumer`*. The default
implementation
*`org.apache.camel.impl.DefaultPollingConsumerPollStrategy`* will log
the caused exception at *`WARN`* level and then ignore this issue.
The strategy interface provides the following three methods:
* *`begin`*
** `void begin(Consumer consumer, Endpoint endpoint)`
* *`begin`* (*Camel 2.3*)
** `boolean begin(Consumer consumer, Endpoint endpoint)`
* *`commit`*
** `void commit(Consumer consumer, Endpoint endpoint)`
* *`commit`* (*Camel 2.6*)
** `void commit(Consumer consumer, Endpoint endpoint, int polledMessages)`
* *`rollback`*
** `boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception`
In *Camel 2.3*: the begin method returns a *`boolean`* which indicates
whether or not to skipping polling. So you can implement your custom
logic and return *`false`* if you do not want to poll this time.
In *Camel 2.6*: the commit method has an additional parameter containing
the number of message that was actually polled. For example if there was
no messages polled, the value would be zero, and you can react
accordingly.
The most interesting is the *`rollback`* as it allows you do handle the
caused exception and decide what to do.
For instance if we want to provide a retry feature to a scheduled
consumer we can implement the *`PollingConsumerPollStrategy`* method and
put the retry logic in the *`rollback`* method. Lets just retry up till
three times:
[source,java]
----
public boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception {
if (retryCounter < 3) {
// return true to tell Camel that it should retry the poll immediately
return true;
}
// okay we give up do not retry anymore
return false;
}
----
Notice that we are given the *`Consumer`* as a parameter. We could use
this to _restart_ the consumer as we can invoke stop and start:
[source,java]
----
// error occurred lets restart the consumer, that could maybe
resolve the issue consumer.stop();
consumer.start();
----
*Note:* if you implement the *`begin`* operation make sure to avoid
throwing exceptions as in such a case the *`poll`* operation is not
invoked and Camel will invoke the *`rollback`* directly.
[[PollingConsumer-ConfiguringantoUsePollingConsumerPollStrategy]]
=== Configuring an xref:endpoint.adoc[Endpoint] to Use `PollingConsumerPollStrategy`
To configure an xref:endpoint.adoc[Endpoint] to use a custom
*`PollingConsumerPollStrategy`* you use the option *`pollStrategy`*. For
example in the file consumer below we want to use our custom strategy
defined in the xref:registry.adoc[Registry] with the bean id *`myPoll`*:
[source,java]
----
from("file://inbox/?pollStrategy=#myPoll").to("activemq:queue:inbox")
----
xref:using-this-pattern.adoc[Using This Pattern]