| [[Disruptor-DisruptorComponent]] |
| Disruptor Component |
| ~~~~~~~~~~~~~~~~~~~ |
| |
| *Available as of Camel 2.12* |
| |
| The *disruptor:* component provides asynchronous |
| http://www.eecs.harvard.edu/~mdw/proj/seda/[SEDA] behavior much as the |
| standard SEDA Component, but utilizes a |
| https://github.com/LMAX-Exchange/disruptor[Disruptor] instead of a |
| http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html[BlockingQueue] |
| utilized by the standard link:seda.html[SEDA]. Alternatively, a |
| |
| *disruptor-vm:* endpoint is supported by this component, providing an |
| alternative to the standard link:vm.html[VM]. As with the SEDA |
| component, buffers of the *disruptor:* endpoints are only visible within |
| a *single* link:camelcontext.html[CamelContext] and no support is |
| provided for persistence or recovery. The buffers of the |
| **disruptor-vm:** endpoints also provides support for communication |
| across CamelContexts instances so you can use this mechanism to |
| communicate across web applications (provided that *camel-disruptor.jar* |
| is on the *system/boot* classpath). |
| |
| The main advantage of choosing to use the Disruptor Component over the |
| SEDA or the VM Component is performance in use cases where there is high |
| contention between producer(s) and/or multicasted or concurrent |
| Consumers. In those cases, significant increases of throughput and |
| reduction of latency has been observed. Performance in scenarios without |
| contention is comparable to the SEDA and VM Components. |
| |
| The Disruptor is implemented with the intention of mimicing the |
| behaviour and options of the SEDA and VM Components as much as possible. |
| The main differences with the them are the following: |
| |
| * The buffer used is always bounded in size (default 1024 exchanges). |
| * As a the buffer is always bouded, the default behaviour for the |
| Disruptor is to block while the buffer is full instead of throwing an |
| exception. This default behaviour may be configured on the component |
| (see options). |
| * The Disruptor enpoints don't implement the BrowsableEndpoint |
| interface. As such, the exchanges currently in the Disruptor can't be |
| retrieved, only the amount of exchanges. |
| * The Disruptor requires its consumers (multicasted or otherwise) to be |
| statically configured. Adding or removing consumers on the fly requires |
| complete flushing of all pending exchanges in the Disruptor. |
| * As a result of the reconfiguration: Data sent over a Disruptor is |
| directly processed and 'gone' if there is at least one consumer, late |
| joiners only get new exchanges published after they've joined. |
| * The *pollTimeout* option is not supported by the Disruptor Component. |
| * When a producer blocks on a full Disruptor, it does not respond to |
| thread interrupts. |
| |
| Maven users will need to add the following dependency to their `pom.xml` |
| for this component: |
| |
| [source,java] |
| ------------------------------------------------------------ |
| <dependency> |
| <groupId>org.apache.camel</groupId> |
| <artifactId>camel-disruptor</artifactId> |
| <version>x.x.x</version> |
| <!-- use the same version as your Camel core version --> |
| </dependency> |
| ------------------------------------------------------------ |
| |
| [[Disruptor-URIformat]] |
| URI format |
| ^^^^^^^^^^ |
| |
| [source,java] |
| ----------------------------- |
| disruptor:someName[?options] |
| ----------------------------- |
| |
| or |
| |
| [source,java] |
| -------------------------------- |
| disruptor-vm:someName[?options] |
| -------------------------------- |
| |
| Where **someName** can be any string that uniquely identifies the |
| endpoint within the current link:camelcontext.html[CamelContext] (or |
| across contexts in case of + |
| **disruptor-vm:**). + |
| You can append query options to the URI in the following format: |
| |
| [source,java] |
| ------------------------------ |
| ?option=value&option=value&… |
| ------------------------------ |
| |
| [[Disruptor-Options]] |
| Options |
| ^^^^^^^ |
| |
| All the following options are valid for both the **disruptor:** and |
| **disruptor-vm:** components. |
| |
| [width="100%",cols="10%,10%,80%",options="header",] |
| |======================================================================= |
| |Name | Default | Description |
| |
| |size |1024 |The maximum capacity of the Disruptors ringbuffer. Will be effectively |
| increased to the nearest power of two. *Notice:* Mind if you use this |
| option, then its the first endpoint being created with the queue name, |
| that determines the size. To make sure all endpoints use same size, then |
| configure the size option on all of them, or the first endpoint being |
| created. |
| |
| |bufferSize | | *Component only:* The maximum default size (capacity of the number of |
| messages it can hold) of the Disruptors ringbuffer. This option is used |
| if size is not in use. |
| |
| |queueSize | | *Component only:* Additional option to specify the <em>bufferSize</em> |
| to maintain maximum compatibility with the link:seda.html[SEDA] |
| Component. |
| |
| |concurrentConsumers |1 |Number of concurrent threads processing exchanges. |
| |
| |waitForTaskToComplete |IfReplyExpected |Option to specify whether the caller should wait for the async task to |
| complete or not before continuing. The following three options are |
| supported: _Always_, _Never_ or _IfReplyExpected_. The first two values |
| are self-explanatory. The last value, _IfReplyExpected_, will only wait |
| if the message is link:request-reply.html[Request Reply] based. See more |
| information about link:async.html[Async] messaging. |
| |
| |timeout |30000 |Timeout (in milliseconds) before a producer will stop waiting for an |
| asynchronous task to complete. See _waitForTaskToComplete_ and |
| link:async.html[Async] for more details. You can disable timeout by |
| using 0 or a negative value. |
| |
| |defaultMultipleConsumers | | *Component only:* Allows to set the default allowance of multiple |
| consumers for endpoints created by this component used when |
| _multipleConsumers_ is not provided. |
| |
| |multipleConsumers |false |Specifies whether multiple consumers are allowed. If enabled, you can |
| use Disruptor for http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern[Publish-Subscribe] messaging. |
| That is, you can send a message to the SEDA queue and have each consumer |
| receive a copy of the message. When enabled, this option should be |
| specified on every consumer endpoint. |
| |
| |limitConcurrentConsumers |true |Whether to limit the number of concurrentConsumers to the maximum of |
| 500. By default, an exception will be thrown if a Disruptor endpoint is |
| configured with a greater number. You can disable that check by turning |
| this option off. |
| |
| |blockWhenFull |true |Whether a thread that sends messages to a full Disruptor will block |
| until the ringbuffer's capacity is no longer exhausted. By default, the |
| calling thread will block and wait until the message can be accepted. By |
| disabling this option, an exception will be thrown stating that the |
| queue is full. |
| |
| |defaultBlockWhenFull | | *Component only:* Allows to set the default producer behaviour when the |
| ringbuffer is full for endpoints created by this comonent used when |
| _blockWhenFull_ is not provided. |
| |
| |waitStrategy |Blocking |Defines the strategy used by consumer threads to wait on new exchanges |
| to be published. The options allowed are:_Blocking_, _Sleeping_, |
| _BusySpin_ and _Yielding_. Refer to the section below for more |
| information on this subject |
| |
| |defaultWaitStrategy | | *Component only:* Allows to set the default wait strategy for endpoints |
| created by this comonent used when _waitStrategy_ is not provided. |
| |
| |producerType |Multi | Defines the producers allowed on the Disruptor. The options allowed are: |
| _Multi_ to allow multiple producers and _Single_ to enable certain |
| optimizations only allowed when one concurrent producer (on one thread |
| or otherwise synchronized) is active. |
| |
| |defaultProducerType | | *Component only:* Allows to set the default producer type for endpoints |
| created by this comonent used when _producerType_ is not provided. |
| |======================================================================= |
| |
| [[Disruptor-Waitstrategies]] |
| Wait strategies |
| ^^^^^^^^^^^^^^^ |
| |
| The wait strategy effects the type of waiting performed by the consumer |
| threads that are currently waiting for the next exchange to be |
| published. The following strategies can be chosen: |
| |
| [width="100%",cols="10%,45%,45%",options="header",] |
| |======================================================================= |
| |Name |Description |Advice |
| |
| |Blocking | Blocking strategy that uses a lock and condition variable for Consumers |
| waiting on a barrier. | This strategy can be used when throughput and low-latency are not as |
| important as CPU resource. |
| |
| |Sleeping |Sleeping strategy that initially spins, then uses a Thread.yield(), and |
| eventually for the minimum number of nanos the OS and JVM will allow |
| while the Consumers are waiting on a barrier. |This strategy is a good compromise between performance and CPU resource. |
| Latency spikes can occur after quiet periods. |
| |
| |BusySpin |Busy Spin strategy that uses a busy spin loop for Consumers waiting on a |
| barrier. |This strategy will use CPU resource to avoid syscalls which can |
| introduce latency jitter. It is best used when threads can be bound to |
| specific CPU cores. |
| |
| |Yielding |Yielding strategy that uses a Thread.yield() for Consumers waiting on a |
| barrier after an initially spinning. |This strategy is a good compromise between performance and CPU resource |
| without incurring significant latency spikes. |
| |======================================================================= |
| |
| [[Disruptor-UseofRequestReply]] |
| Use of Request Reply |
| ^^^^^^^^^^^^^^^^^^^^ |
| |
| The Disruptor component supports using link:request-reply.html[Request |
| Reply], where the caller will wait for the Async route to complete. For |
| instance: |
| |
| [source,java] |
| ------------------------------------------------------------------------------ |
| from("mina:tcp://0.0.0.0:9876?textline=true&sync=true").to("disruptor:input"); |
| from("disruptor:input").to("bean:processInput").to("bean:createResponse"); |
| ------------------------------------------------------------------------------ |
| |
| In the route above, we have a TCP listener on port 9876 that accepts |
| incoming requests. The request is routed to the _disruptor:input_ |
| buffer. As it is a link:request-reply.html[Request Reply] message, we |
| wait for the response. When the consumer on the _disruptor:input_ buffer |
| is complete, it copies the response to the original message response. |
| |
| [[Disruptor-Concurrentconsumers]] |
| Concurrent consumers |
| ^^^^^^^^^^^^^^^^^^^^ |
| |
| By default, the Disruptor endpoint uses a single consumer thread, but |
| you can configure it to use concurrent consumer threads. So instead of |
| thread pools you can use: |
| |
| [source,java] |
| -------------------------------------------------------------- |
| from("disruptor:stageName?concurrentConsumers=5").process(...) |
| -------------------------------------------------------------- |
| |
| As for the difference between the two, note a thread pool can |
| increase/shrink dynamically at runtime depending on load, whereas the |
| number of concurrent consumers is always fixed and supported by the |
| Disruptor internally so performance will be higher. |
| |
| [[Disruptor-Threadpools]] |
| Thread pools |
| ^^^^^^^^^^^^ |
| |
| Be aware that adding a thread pool to a Disruptor endpoint by doing |
| something like: |
| |
| [source,java] |
| -------------------------------------------------- |
| from("disruptor:stageName").thread(5).process(...) |
| -------------------------------------------------- |
| |
| Can wind up with adding a normal |
| http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/BlockingQueue.html[BlockingQueue] |
| to be used in conjunction with the Disruptor, effectively negating part |
| of the performance gains achieved by using the Disruptor. Instead, it is |
| advices to directly configure number of threads that process messages on |
| a Disruptor endpoint using the concurrentConsumers option. |
| |
| [[Disruptor-Sample]] |
| Sample |
| ^^^^^^ |
| |
| In the route below we use the Disruptor to send the request to this |
| async queue to be able to send a fire-and-forget message for further |
| processing in another thread, and return a constant reply in this thread |
| to the original caller. |
| |
| [source,java] |
| ------------------------------------------------- |
| public void configure() throws Exception { |
| from("direct:start") |
| // send it to the disruptor that is async |
| .to("disruptor:next") |
| // return a constant response |
| .transform(constant("OK")); |
| |
| from("disruptor:next").to("mock:result"); |
| } |
| ------------------------------------------------- |
| |
| Here we send a Hello World message and expects the reply to be OK. |
| |
| [source,java] |
| ----------------------------------------------------------------- |
| Object out = template.requestBody("direct:start", "Hello World"); |
| assertEquals("OK", out); |
| ----------------------------------------------------------------- |
| |
| The "Hello World" message will be consumed from the Disruptor from |
| another thread for further processing. Since this is from a unit test, |
| it will be sent to a mock endpoint where we can do assertions in the |
| unit test. |
| |
| [[Disruptor-UsingmultipleConsumers]] |
| Using multipleConsumers |
| ^^^^^^^^^^^^^^^^^^^^^^^ |
| |
| In this example we have defined two consumers and registered them as |
| spring beans. |
| |
| [source,java] |
| ------------------------------------------------------------------------------------------- |
| <!-- define the consumers as spring beans --> |
| <bean id="consumer1" class="org.apache.camel.spring.example.FooEventConsumer"/> |
| |
| <bean id="consumer2" class="org.apache.camel.spring.example.AnotherFooEventConsumer"/> |
| |
| <camelContext xmlns="http://camel.apache.org/schema/spring"> |
| <!-- define a shared endpoint which the consumers can refer to instead of using url --> |
| <endpoint id="foo" uri="disruptor:foo?multipleConsumers=true"/> |
| </camelContext> |
| ------------------------------------------------------------------------------------------- |
| |
| Since we have specified multipleConsumers=true on the Disruptor foo |
| endpoint we can have those two or more consumers receive their own copy |
| of the message as a kind of pub-sub style messaging. As the beans are |
| part of an unit test they simply send the message to a mock endpoint, |
| but notice how we can use @Consume to consume from the Disruptor. |
| |
| [source,java] |
| ------------------------------------------- |
| public class FooEventConsumer { |
| |
| @EndpointInject(uri = "mock:result") |
| private ProducerTemplate destination; |
| |
| @Consume(ref = "foo") |
| public void doSomething(String body) { |
| destination.sendBody("foo" + body); |
| } |
| |
| } |
| ------------------------------------------- |
| |
| [[Disruptor-Extractingdisruptorinformation]] |
| Extracting disruptor information |
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
| |
| If needed, information such as buffer size, etc. can be obtained without |
| using JMX in this fashion: |
| |
| [source,java] |
| -------------------------------------------------------------------- |
| DisruptorEndpoint disruptor = context.getEndpoint("disruptor:xxxx"); |
| int size = disruptor.getBufferSize(); |
| -------------------------------------------------------------------- |