| [[Competing-Consumers]] |
| = Competing Consumers |
| |
| Camel supports the |
| https://www.enterpriseintegrationpatterns.com/patterns/messaging/CompetingConsumers.html[Competing Consumers] |
| from the xref:enterprise-integration-patterns.adoc[EIP patterns] book. |
| |
| Camel supports the Competing Consumers from the EIP patterns directly from components that can do this. |
| For example from SEDA, JMS, Kafka, and various AWS components. |
| |
| image::eip/CompetingConsumers.gif[image] |
| |
| - SEDA for SEDA based concurrent processing using a thread pool |
| - JMS for distributed SEDA based concurrent processing with queues which support reliable load balancing, failover and clustering. |
| |
| For components which does not allow concurrent consumers, then Camel allows to route from the consumer |
| to a thread-pool which can then further process the message concurrently, |
| which then simulates a _quasi like_ competing consumers. |
| |
| == Competing Consumers with JMS |
| |
| To enable Competing Consumers you just need to set the `concurrentConsumers` property on the JMS endpoint. |
| |
| For example |
| |
| [source,java] |
| ---- |
| from("jms:MyQueue?concurrentConsumers=5") |
| .to("bean:someBean"); |
| ---- |
| |
| or in XML DSL |
| |
| [source,xml] |
| ---- |
| <route> |
| <from uri="jms:MyQueue?concurrentConsumers=5"/> |
| <to uri="bean:someBean"/> |
| </route> |
| ---- |
| |
| == Competing Consumers with Thread Pool |
| |
| You can simulate competing consumers by using a thread pool which then continue processing the messages concurrently. |
| Then the single thread consumer can quickly continue and pickup new messages to process and offload them to the thread-pool |
| (and its task queue). |
| |
| Suppose we have this simple route where we poll a folder for new files, |
| process the files and afterwards move the files to a backup folder when complete. |
| |
| [source,java] |
| ---- |
| from("file://inbox?move=../backup-${date:now:yyyyMMdd}") |
| .to("bean:calculateBean"); |
| ---- |
| |
| The route is synchronous and there is only a single consumer running at any given time. |
| This scenario is well known and it doesn't affect thread safety as we only have one active thread |
| involved at any given time. |
| |
| Now imagine that the inbox folder is filled with filers quicker than we can process. |
| So we want to speed up this process. How can we do this? |
| |
| Well we could try adding a 2nd route with the same route path. |
| Well that doesn't work so well as we have competing consumers for the same files. |
| That requires however that we use file locking so we wont have two consumers compete for the same file. |
| By default Camel support this with its file locking option on the file component. |
| |
| But what if the component doesn't support this, or its not possible to add a 2nd consumer |
| for the same endpoint? And yes its a bit of a hack and the route logic code is duplicated. |
| And what if we need more, then we need to add a 3rd, a 4th and so on. |
| |
| What if the processing of the file itself is the bottleneck? That is the calculateBean is slow. |
| So how can we process messages with this bean concurrently? |
| |
| Yeah we can use the xref:threads-eip.adoc[Threads EIP], so if we insert it in the route we get: |
| |
| [source,java] |
| ---- |
| from("file://inbox?move=../backup-${date:now:yyyyMMdd}") |
| .threads(10) |
| .to("bean:calculateBean"); |
| ---- |
| |
| So by inserting `threads(10)` we have instructed Camel that from this point forward in the route |
| it should use a thread pool with up till 10 concurrent threads. |
| So when the file consumer delivers a message to the threads, then the threads take it from there |
| and the file consumer can return and continue to poll the next file. |
| |
| By leveraging this fact we can still use a single file consumer to poll new files. |
| And polling a directory to just grab the file handle is very fast. |
| And we wont have problem with file locking, sorting, filtering and whatnot. |
| And at the same time we can leverage the fact that we can process the file messages concurrently |
| by the calculate bean. |
| |
| Here at the end lets take a closer look what happens with the synchronous thread and the |
| asynchronous thread. The synchronous thread hands over the exchange to the new asynchronous thread and as |
| such the synchronous thread completes. The asynchronous thread is then routing and processing the message. |
| And when this thread finishes it will take care of the file completion strategy to move the file |
| into the backup folder. This is an important note, that the on completion is done by the asynchronous thread. |
| |
| This ensures the file is not moved before the file is processed successfully. Suppose the calculate bean |
| could not process one of the files. If it was the asynchronous thread that should do the on completion strategy |
| then the file would have been moved to early into the backup folder. By handing over this to the asynchronous |
| thread we do it after we have processed the message completely |