blob: 824a26de21b2416677006ed036d84d40cdc092e3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.processor.aggregator;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.AggregateDefinition;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @version
*/
public class AlbertoAggregatorTest extends ContextTestSupport {
private static final String SURNAME_HEADER = "surname";
private static final String TYPE_HEADER = "type";
private static final String BROTHERS_TYPE = "brothers";
private Logger log = LoggerFactory.getLogger(this.getClass());
public void testAggregator() throws Exception {
String allNames = "Harpo Marx,Fiodor Karamazov,Chico Marx,Ivan Karamazov,Groucho Marx,Alexei Karamazov,Dimitri Karamazov";
List<String> marxBrothers = new ArrayList<String>();
marxBrothers.add("Harpo");
marxBrothers.add("Chico");
marxBrothers.add("Groucho");
List<String> karamazovBrothers = new ArrayList<String>();
karamazovBrothers.add("Fiodor");
karamazovBrothers.add("Ivan");
karamazovBrothers.add("Alexei");
karamazovBrothers.add("Dimitri");
Map<String, List> allBrothers = new HashMap<String, List>();
allBrothers.put("Marx", marxBrothers);
allBrothers.put("Karamazov", karamazovBrothers);
MockEndpoint resultEndpoint = getMockEndpoint("mock:result");
resultEndpoint.expectedMessageCount(1);
resultEndpoint.expectedBodiesReceived(allBrothers);
template.sendBody("direct:start", allNames);
assertMockEndpointsSatisfied();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
AggregationStrategy surnameAggregator = new AggregationStrategy() {
@SuppressWarnings("unchecked")
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
debugIn("Surname Aggregator", oldExchange, newExchange);
Exchange answer = newExchange;
if (oldExchange != null) {
List<String> brothers = oldExchange.getIn().getBody(List.class);
brothers.add(newExchange.getIn().getBody(String.class));
answer = oldExchange;
} else {
List<String>brothers = new ArrayList<String>();
brothers.add(newExchange.getIn().getBody(String.class));
newExchange.getIn().setBody(brothers);
}
debugOut("Surname Aggregator", answer);
return answer;
}
};
@SuppressWarnings("unchecked")
AggregationStrategy brothersAggregator = new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
debugIn("Brothers Aggregator", oldExchange, newExchange);
Exchange answer = newExchange;
if (oldExchange != null) {
Map<String, List> brothers = oldExchange.getIn().getBody(Map.class);
brothers.put(newExchange.getIn().getHeader(SURNAME_HEADER, String.class), newExchange.getIn().getBody(List.class));
answer = oldExchange;
} else {
Map<String, List> brothers = new HashMap<String, List>();
brothers.put(newExchange.getIn().getHeader(SURNAME_HEADER, String.class), newExchange.getIn().getBody(List.class));
newExchange.getIn().setBody(brothers);
}
debugOut("Brothers Aggregator", answer);
return answer;
}
};
private void debugIn(String stringId, Exchange oldExchange, Exchange newExchange) {
if (oldExchange != null) {
log.debug(stringId + " old headers in: " + oldExchange.getIn().getHeaders());
log.debug(stringId + " old body in: " + oldExchange.getIn().getBody());
}
log.debug(stringId + " new headers in: " + newExchange.getIn().getHeaders());
log.debug(stringId + " new body in: " + newExchange.getIn().getBody());
}
private void debugOut(String stringId, Exchange exchange) {
log.debug(stringId + " old headers out: " + exchange.getIn().getHeaders());
log.debug(stringId + " old body out: " + exchange.getIn().getBody());
}
@Override
public void configure() throws Exception {
from("direct:start")
// Separate people
.split(bodyAs(String.class).tokenize(",")).process(
// Split the name, erase the surname and put it in a
// header
new Processor() {
public void process(Exchange exchange) throws Exception {
String[] parts = exchange.getIn()
.getBody(String.class).split(
" ");
exchange.getIn().setBody(parts[0]);
exchange.getIn().setHeader(
SURNAME_HEADER, parts[1]);
} // process
}) // Processor
.to("direct:joinSurnames");
from("direct:joinSurnames")
.aggregate(header(SURNAME_HEADER),
surnameAggregator).completionTimeout(2000L).setHeader(TYPE_HEADER,
constant(BROTHERS_TYPE)).to("direct:joinBrothers");
// Join all brothers lists and remove surname and type headers
AggregateDefinition agg =
from("direct:joinBrothers").aggregate(header(TYPE_HEADER),
brothersAggregator);
agg.setCompletionTimeout(2000L);
agg.removeHeader(SURNAME_HEADER)
.removeHeader(TYPE_HEADER)
.to("mock:result");
}
};
}
}