blob: f5f6228e26e92a710051058129c150ddd6dfcd87 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.builder;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.CamelContext;
import org.apache.camel.Channel;
import org.apache.camel.DelegateProcessor;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.Route;
import org.apache.camel.TestSupport;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.engine.DefaultRoute;
import org.apache.camel.processor.ChoiceProcessor;
import org.apache.camel.processor.EvaluateExpressionProcessor;
import org.apache.camel.processor.FilterProcessor;
import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.processor.Pipeline;
import org.apache.camel.processor.RecipientList;
import org.apache.camel.processor.SendProcessor;
import org.apache.camel.processor.Splitter;
import org.apache.camel.processor.ThreadsProcessor;
import org.apache.camel.processor.errorhandler.DeadLetterChannel;
import org.apache.camel.processor.idempotent.IdempotentConsumer;
import org.apache.camel.support.processor.idempotent.MemoryIdempotentRepository;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class RouteBuilderTest extends TestSupport {
protected Processor myProcessor = new MyProcessor();
protected DelegateProcessor interceptor1;
protected DelegateProcessor interceptor2;
protected CamelContext createCamelContext() {
// disable stream cache otherwise to much hazzle in this unit test to
// filter the stream cache
// in all the assertion codes
DefaultCamelContext ctx = new DefaultCamelContext();
ctx.setStreamCaching(Boolean.FALSE);
return ctx;
}
protected List<Route> buildSimpleRoute() throws Exception {
// START SNIPPET: e1
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").to("direct:b");
}
};
// END SNIPPET: e1
return getRouteList(builder);
}
@Test
public void testSimpleRoute() throws Exception {
List<Route> routes = buildSimpleRoute();
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
DefaultRoute consumer = assertIsInstanceOf(DefaultRoute.class, route);
Channel channel = unwrapChannel(consumer.getProcessor());
SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, channel.getNextProcessor());
assertEquals("direct://b", sendProcessor.getDestination().getEndpointUri(), "Endpoint URI");
}
}
protected List<Route> buildSimpleRouteWithHeaderPredicate() throws Exception {
// START SNIPPET: e2
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").filter(header("foo").isEqualTo("bar")).to("direct:b");
}
};
// END SNIPPET: e2
return getRouteList(builder);
}
@Test
public void testSimpleRouteWithHeaderPredicate() throws Exception {
List<Route> routes = buildSimpleRouteWithHeaderPredicate();
log.debug("Created routes: " + routes);
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
DefaultRoute consumer = assertIsInstanceOf(DefaultRoute.class, route);
Channel channel = unwrapChannel(consumer.getProcessor());
FilterProcessor filterProcessor = assertIsInstanceOf(FilterProcessor.class, channel.getNextProcessor());
SendProcessor sendProcessor
= assertIsInstanceOf(SendProcessor.class, unwrapChannel(filterProcessor).getNextProcessor());
assertEquals("direct://b", sendProcessor.getDestination().getEndpointUri(), "Endpoint URI");
}
}
protected List<Route> buildSimpleRouteWithChoice() throws Exception {
// START SNIPPET: e3
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").choice().when(header("foo").isEqualTo("bar")).to("direct:b")
.when(header("foo").isEqualTo("cheese")).to("direct:c").otherwise().to("direct:d");
}
};
// END SNIPPET: e3
return getRouteList(builder);
}
@Test
public void testSimpleRouteWithChoice() throws Exception {
List<Route> routes = buildSimpleRouteWithChoice();
log.debug("Created routes: " + routes);
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
DefaultRoute consumer = assertIsInstanceOf(DefaultRoute.class, route);
Channel channel = unwrapChannel(consumer.getProcessor());
ChoiceProcessor choiceProcessor = assertIsInstanceOf(ChoiceProcessor.class, channel.getNextProcessor());
List<FilterProcessor> filters = choiceProcessor.getFilters();
assertEquals(2, filters.size(), "Should be two when clauses");
Processor filter1 = filters.get(0);
assertSendTo(unwrapChannel(((FilterProcessor) filter1).getProcessor()).getNextProcessor(), "direct://b");
Processor filter2 = filters.get(1);
assertSendTo(unwrapChannel(((FilterProcessor) filter2).getProcessor()).getNextProcessor(), "direct://c");
assertSendTo(unwrapChannel(choiceProcessor.getOtherwise()).getNextProcessor(), "direct://d");
}
}
protected List<Route> buildCustomProcessor() throws Exception {
// START SNIPPET: e4
myProcessor = new Processor() {
public void process(Exchange exchange) {
log.debug("Called with exchange: " + exchange);
}
};
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").process(myProcessor);
}
};
// END SNIPPET: e4
return getRouteList(builder);
}
@Test
public void testCustomProcessor() throws Exception {
List<Route> routes = buildCustomProcessor();
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
}
}
protected List<Route> buildCustomProcessorWithFilter() throws Exception {
// START SNIPPET: e5
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").filter(header("foo").isEqualTo("bar")).process(myProcessor);
}
};
// END SNIPPET: e5
return getRouteList(builder);
}
@Test
public void testCustomProcessorWithFilter() throws Exception {
List<Route> routes = buildCustomProcessorWithFilter();
log.debug("Created routes: " + routes);
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
}
}
protected List<Route> buildWireTap() throws Exception {
// START SNIPPET: e6
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").multicast().to("direct:tap", "direct:b");
}
};
// END SNIPPET: e6
return getRouteList(builder);
}
@Test
public void testWireTap() throws Exception {
List<Route> routes = buildWireTap();
log.debug("Created routes: " + routes);
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
DefaultRoute consumer = assertIsInstanceOf(DefaultRoute.class, route);
Channel channel = unwrapChannel(consumer.getProcessor());
MulticastProcessor multicastProcessor = assertIsInstanceOf(MulticastProcessor.class, channel.getNextProcessor());
List<Processor> endpoints = new ArrayList<>(multicastProcessor.getProcessors());
assertEquals(2, endpoints.size(), "Should have 2 endpoints");
assertSendToProcessor(unwrapChannel(endpoints.get(0)).getNextProcessor(), "direct://tap");
assertSendToProcessor(unwrapChannel(endpoints.get(1)).getNextProcessor(), "direct://b");
}
}
protected List<Route> buildRouteWithInterceptor() throws Exception {
interceptor1 = new org.apache.camel.support.processor.DelegateProcessor() {
};
interceptor2 = new MyInterceptorProcessor();
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").process(interceptor1).process(interceptor2).to("direct:d");
}
};
return getRouteList(builder);
}
@Test
public void testRouteWithInterceptor() throws Exception {
List<Route> routes = buildRouteWithInterceptor();
log.debug("Created routes: " + routes);
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
DefaultRoute consumer = assertIsInstanceOf(DefaultRoute.class, route);
Pipeline line = assertIsInstanceOf(Pipeline.class, unwrap(consumer.getProcessor()));
assertEquals(3, line.next().size());
// last should be our seda
List<Processor> processors = new ArrayList<>(line.next());
Processor sendTo = assertIsInstanceOf(SendProcessor.class, unwrapChannel(processors.get(2)).getNextProcessor());
assertSendTo(sendTo, "direct://d");
}
}
@Test
public void testComplexExpressions() throws Exception {
// START SNIPPET: e7
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").filter(header("foo").isEqualTo(123)).to("direct:b");
}
};
// END SNIPPET: e7
List<Route> routes = getRouteList(builder);
log.debug("Created routes: " + routes);
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
}
}
protected List<Route> buildStaticRecipientList() throws Exception {
// START SNIPPET: multicast
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").multicast().to("direct:b", "direct:c", "direct:d");
}
};
// END SNIPPET: multicast
return getRouteList(builder);
}
protected List<Route> buildDynamicRecipientList() throws Exception {
// START SNIPPET: e9
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").recipientList(header("foo"));
}
};
// END SNIPPET: e9
return getRouteList(builder);
}
@Test
public void testRouteDynamicReceipentList() throws Exception {
List<Route> routes = buildDynamicRecipientList();
log.debug("Created routes: " + routes);
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
DefaultRoute consumer = assertIsInstanceOf(DefaultRoute.class, route);
Channel channel = unwrapChannel(consumer.getProcessor());
Pipeline line = assertIsInstanceOf(Pipeline.class, channel.getNextProcessor());
Iterator<?> it = line.next().iterator();
// EvaluateExpressionProcessor should be wrapped in error handler
Object first = it.next();
first = assertIsInstanceOf(DeadLetterChannel.class, first).getOutput();
assertIsInstanceOf(EvaluateExpressionProcessor.class, first);
// and the second should NOT be wrapped in error handler
Object second = it.next();
assertIsInstanceOf(RecipientList.class, second);
}
}
protected List<Route> buildSplitter() throws Exception {
// START SNIPPET: splitter
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").split(bodyAs(String.class).tokenize("\n")).to("direct:b");
}
};
// END SNIPPET: splitter
return getRouteList(builder);
}
@Test
public void testSplitter() throws Exception {
List<Route> routes = buildSplitter();
log.debug("Created routes: " + routes);
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
DefaultRoute consumer = assertIsInstanceOf(DefaultRoute.class, route);
Channel channel = unwrapChannel(consumer.getProcessor());
assertIsInstanceOf(Splitter.class, channel.getNextProcessor());
}
}
protected List<Route> buildIdempotentConsumer() throws Exception {
// START SNIPPET: idempotent
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a")
.idempotentConsumer(header("myMessageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200))
.to("direct:b");
}
};
// END SNIPPET: idempotent
return getRouteList(builder);
}
@Test
public void testIdempotentConsumer() throws Exception {
List<Route> routes = buildIdempotentConsumer();
log.debug("Created routes: " + routes);
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
DefaultRoute consumer = assertIsInstanceOf(DefaultRoute.class, route);
Channel channel = unwrapChannel(consumer.getProcessor());
IdempotentConsumer idempotentConsumer = assertIsInstanceOf(IdempotentConsumer.class, channel.getNextProcessor());
assertEquals("header(myMessageId)", idempotentConsumer.getMessageIdExpression().toString(), "messageIdExpression");
assertIsInstanceOf(MemoryIdempotentRepository.class, idempotentConsumer.getIdempotentRepository());
SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class,
unwrapChannel(idempotentConsumer.getProcessor()).getNextProcessor());
assertEquals("direct://b", sendProcessor.getDestination().getEndpointUri(), "Endpoint URI");
}
}
protected List<Route> buildThreads() throws Exception {
// START SNIPPET: e10
RouteBuilder builder = new RouteBuilder() {
public void configure() {
errorHandler(deadLetterChannel("mock:error"));
from("direct:a").threads(5, 10).to("mock:a").to("mock:b");
}
};
// END SNIPPET: e10
return getRouteList(builder);
}
@Test
public void testThreads() throws Exception {
List<Route> routes = buildThreads();
log.debug("Created routes: " + routes);
assertEquals(1, routes.size(), "Number routes created");
for (Route route : routes) {
Endpoint key = route.getEndpoint();
assertEquals("direct://a", key.getEndpointUri(), "From endpoint");
DefaultRoute consumer = assertIsInstanceOf(DefaultRoute.class, route);
Pipeline line = assertIsInstanceOf(Pipeline.class, unwrap(consumer.getProcessor()));
Iterator<Processor> it = line.next().iterator();
assertIsInstanceOf(ThreadsProcessor.class, unwrapChannel(it.next()).getNextProcessor());
assertIsInstanceOf(SendProcessor.class, unwrapChannel(it.next()).getNextProcessor());
assertIsInstanceOf(SendProcessor.class, unwrapChannel(it.next()).getNextProcessor());
}
}
protected void assertSendTo(Processor processor, String uri) {
if (!(processor instanceof SendProcessor)) {
processor = unwrapErrorHandler(processor);
}
SendProcessor sendProcessor = assertIsInstanceOf(SendProcessor.class, processor);
assertEquals(uri, sendProcessor.getDestination().getEndpointUri(), "Endpoint URI");
}
protected void assertSendToProcessor(Processor processor, String uri) {
if (!(processor instanceof Producer)) {
processor = unwrapErrorHandler(processor);
}
if (processor instanceof SendProcessor) {
assertSendTo(processor, uri);
} else {
Producer producer = assertIsInstanceOf(Producer.class, processor);
assertEquals(uri, producer.getEndpoint().getEndpointUri(), "Endpoint URI");
}
}
/**
* By default routes should be wrapped in the {@link DeadLetterChannel} so lets unwrap that and return the actual
* processor
*/
protected Processor getProcessorWithoutErrorHandler(Route route) {
DefaultRoute consumerRoute = assertIsInstanceOf(DefaultRoute.class, route);
Processor processor = unwrap(consumerRoute.getProcessor());
return unwrapErrorHandler(processor);
}
protected Processor unwrapErrorHandler(Processor processor) {
if (processor instanceof DeadLetterChannel) {
DeadLetterChannel deadLetter = (DeadLetterChannel) processor;
return deadLetter.getOutput();
} else {
return processor;
}
}
protected Processor unwrapDelegateProcessor(Processor processor) {
if (processor instanceof DelegateProcessor) {
DelegateProcessor delegate = (DelegateProcessor) processor;
return delegate.getProcessor();
} else {
return processor;
}
}
@Test
public void testCorrectNumberOfRoutes() throws Exception {
RouteBuilder builder = new RouteBuilder() {
public void configure() throws Exception {
errorHandler(deadLetterChannel("mock:error"));
from("direct:start").to("direct:in");
from("direct:in").to("mock:result");
}
};
List<Route> routes = getRouteList(builder);
assertEquals(2, routes.size());
}
@Test
public void testLifecycleInterceptor() throws Exception {
AtomicInteger before = new AtomicInteger();
AtomicInteger after = new AtomicInteger();
RouteBuilder builder = new RouteBuilder() {
public void configure() throws Exception {
}
};
builder.addLifecycleInterceptor(new RouteBuilderLifecycleStrategy() {
@Override
public void beforeConfigure(RouteBuilder builder) {
before.incrementAndGet();
}
@Override
public void afterConfigure(RouteBuilder builder) {
after.incrementAndGet();
}
});
DefaultCamelContext context = new DefaultCamelContext();
context.addRoutes(builder);
assertEquals(1, before.get());
assertEquals(1, after.get());
}
}