| [[Scatter-Gather]] |
| = Scatter Gather |
| |
| Camel supports the |
| https://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html[Scatter Gather] |
| from the xref:enterprise-integration-patterns.adoc[EIP patterns] book. |
| |
| The Scatter Gather from the EIP patterns allows you to route messages to a number of dynamically |
| specified recipients and re-aggregate the responses back into a single message. |
| |
| image::eip/BroadcastAggregate.gif[image] |
| |
| With Camel this pattern is implemented by using the xref:recipientList-eip.adoc[Recipient List] |
| and the xref:aggregate-eip.adoc[Aggregate] patterns. |
| |
| == Sample |
| |
| In this example we want to get the best quote for beer from several different vendors. |
| We use a dynamic Recipient List to get the request for a quote to all vendors and an Aggregator |
| to pick the best quote out of all the responses. The routes for this are defined as: |
| |
| [source,xml] |
| ---- |
| <camelContext xmlns="http://camel.apache.org/schema/spring"> |
| <route> |
| <from uri="direct:start"/> |
| <recipientList> |
| <header>listOfVendors</header> |
| </recipientList> |
| </route> |
| |
| <route> |
| <from uri="seda:quoteAggregator"/> |
| <aggregate strategyRef="aggregatorStrategy" completionTimeout="1000"> |
| <correlationExpression> |
| <header>quoteRequestId</header> |
| </correlationExpression> |
| <to uri="mock:result"/> |
| </aggregate> |
| </route> |
| </camelContext> |
| ---- |
| |
| So in the first route you see that the Recipient List is looking at the listOfVendors header |
| for the list of recipients. So, we need to send a message like |
| |
| [source,java] |
| ---- |
| Map<String, Object> headers = new HashMap<>(); |
| headers.put("listOfVendors", "bean:vendor1, bean:vendor2, bean:vendor3"); |
| headers.put("quoteRequestId", "quoteRequest-1"); |
| template.sendBodyAndHeaders("direct:start", "<quote_request item=\"beer\"/>", headers); |
| ---- |
| |
| This message will be distributed to the following Endpoints: bean:vendor1, bean:vendor2, and bean:vendor3. |
| These are all beans which look like: |
| |
| [source,java] |
| ---- |
| public class MyVendor { |
| private int beerPrice; |
| |
| @Produce("seda:quoteAggregator") |
| private ProducerTemplate quoteAggregator; |
| |
| public MyVendor(int beerPrice) { |
| this.beerPrice = beerPrice; |
| } |
| |
| public void quote(@XPath("/quote_request/@item") String item, Exchange exchange) { |
| if ("beer".equals(item)) { |
| exchange.getIn().setBody(beerPrice); |
| quoteAggregator.send(exchange); |
| } else { |
| // ignore no quote |
| } |
| } |
| } |
| ---- |
| |
| And are loaded up in Spring XML like: |
| |
| [source,xml] |
| ---- |
| <bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.scattergather.LowestQuoteAggregationStrategy"/> |
| |
| <bean id="vendor1" class="org.apache.camel.spring.processor.scattergather.MyVendor"> |
| <constructor-arg> |
| <value>1</value> |
| </constructor-arg> |
| </bean> |
| |
| <bean id="vendor2" class="org.apache.camel.spring.processor.scattergather.MyVendor"> |
| <constructor-arg> |
| <value>2</value> |
| </constructor-arg> |
| </bean> |
| |
| <bean id="vendor3" class="org.apache.camel.spring.processor.scattergather.MyVendor"> |
| <constructor-arg> |
| <value>3</value> |
| </constructor-arg> |
| </bean> |
| ---- |
| |
| Each bean is loaded with a different price for beer. When the message is sent to each bean endpoint, |
| it will arrive at the `MyVendor.quote` method. This method does a simple check whether this quote |
| request is for beer and then sets the price of beer on the exchange for retrieval at a later step. |
| The message is forwarded on to the next step using POJO Producing (see the `@Produce` annotation). |
| |
| At the next step we want to take the beer quotes from all vendors and find out which one was the best |
| (i.e. the lowest!). To do this we use an Aggregator with a custom aggregation strategy. |
| The Aggregator needs to be able to compare only the messages from this particular quote; |
| this is easily done by specifying a correlationExpression equal to the value of the quoteRequestId header. |
| As shown above in the message sending snippet, we set this header to quoteRequest-1. |
| This correlation value should be unique or you may include responses that are not part of this quote. |
| To pick the lowest quote out of the set, we use a custom aggregation strategy like |
| |
| [source,java] |
| ---- |
| public class LowestQuoteAggregationStrategy implements AggregationStrategy { |
| public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { |
| // the first time we only have the new exchange |
| if (oldExchange == null) { |
| return newExchange; |
| } |
| |
| if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) { |
| return oldExchange; |
| } else { |
| return newExchange; |
| } |
| } |
| } |
| ---- |
| |