blob: c1c1d54503971e9db84da982428657c3669672d3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.impl;
import java.io.File;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
/**
* @version
*/
public class DefaultConsumerTemplateTest extends ContextTestSupport {
private DefaultConsumerTemplate consumer;
@Override
protected void setUp() throws Exception {
super.setUp();
consumer = new DefaultConsumerTemplate(context);
consumer.start();
}
@Override
protected void tearDown() throws Exception {
consumer.stop();
super.tearDown();
}
public void testConsumeReceive() throws Exception {
template.sendBody("seda:foo", "Hello");
Exchange out = consumer.receive("seda:foo");
assertNotNull(out);
assertEquals("Hello", out.getIn().getBody());
}
public void testConsumeTwiceReceive() throws Exception {
template.sendBody("seda:foo", "Hello");
Exchange out = consumer.receive("seda:foo");
assertNotNull(out);
assertEquals("Hello", out.getIn().getBody());
template.sendBody("seda:foo", "Bye");
out = consumer.receive("seda:foo");
assertNotNull(out);
assertEquals("Bye", out.getIn().getBody());
}
public void testConsumeReceiveNoWait() throws Exception {
Exchange out = consumer.receiveNoWait("seda:foo");
assertNull(out);
template.sendBody("seda:foo", "Hello");
// a little delay to let the consumer see it
Thread.sleep(200);
out = consumer.receiveNoWait("seda:foo");
assertEquals("Hello", out.getIn().getBody());
}
public void testConsumeReceiveTimeout() throws Exception {
long start = System.currentTimeMillis();
Exchange out = consumer.receive("seda:foo", 1000);
assertNull(out);
long delta = System.currentTimeMillis() - start;
assertTrue("Should take about 1 sec: " + delta, delta < 1500);
template.sendBody("seda:foo", "Hello");
out = consumer.receive("seda:foo");
assertEquals("Hello", out.getIn().getBody());
}
public void testConsumeReceiveBody() throws Exception {
template.sendBody("seda:foo", "Hello");
Object body = consumer.receiveBody("seda:foo");
assertEquals("Hello", body);
}
public void testConsumeTwiceReceiveBody() throws Exception {
template.sendBody("seda:foo", "Hello");
Object body = consumer.receiveBody("seda:foo");
assertEquals("Hello", body);
template.sendBody("seda:foo", "Bye");
body = consumer.receiveBody("seda:foo");
assertEquals("Bye", body);
}
public void testConsumeReceiveBodyNoWait() throws Exception {
Object body = consumer.receiveBodyNoWait("seda:foo");
assertNull(body);
template.sendBody("seda:foo", "Hello");
// a little delay to let the consumer see it
Thread.sleep(200);
body = consumer.receiveBodyNoWait("seda:foo");
assertEquals("Hello", body);
}
public void testConsumeReceiveBodyString() throws Exception {
template.sendBody("seda:foo", "Hello");
String body = consumer.receiveBody("seda:foo", String.class);
assertEquals("Hello", body);
}
public void testConsumeTwiceReceiveBodyString() throws Exception {
template.sendBody("seda:foo", "Hello");
String body = consumer.receiveBody("seda:foo", String.class);
assertEquals("Hello", body);
template.sendBody("seda:foo", "Bye");
body = consumer.receiveBody("seda:foo", String.class);
assertEquals("Bye", body);
}
public void testConsumeReceiveBodyStringNoWait() throws Exception {
String body = consumer.receiveBodyNoWait("seda:foo", String.class);
assertNull(body);
template.sendBody("seda:foo", "Hello");
// a little delay to let the consumer see it
Thread.sleep(200);
body = consumer.receiveBodyNoWait("seda:foo", String.class);
assertEquals("Hello", body);
}
public void testConsumeReceiveEndpoint() throws Exception {
template.sendBody("seda:foo", "Hello");
assertNotNull(consumer.getCamelContext());
Endpoint endpoint = context.getEndpoint("seda:foo");
Exchange out = consumer.receive(endpoint);
assertEquals("Hello", out.getIn().getBody());
}
public void testConsumeReceiveEndpointTimeout() throws Exception {
template.sendBody("seda:foo", "Hello");
assertNotNull(consumer.getCamelContext());
Endpoint endpoint = context.getEndpoint("seda:foo");
Exchange out = consumer.receive(endpoint, 1000);
assertEquals("Hello", out.getIn().getBody());
}
public void testConsumeReceiveEndpointNoWait() throws Exception {
assertNotNull(consumer.getCamelContext());
Endpoint endpoint = context.getEndpoint("seda:foo");
Exchange out = consumer.receiveNoWait(endpoint);
assertNull(out);
template.sendBody("seda:foo", "Hello");
// a little delay to let the consumer see it
Thread.sleep(200);
out = consumer.receiveNoWait(endpoint);
assertEquals("Hello", out.getIn().getBody());
}
public void testConsumeReceiveEndpointBody() throws Exception {
template.sendBody("seda:foo", "Hello");
assertNotNull(consumer.getCamelContext());
Endpoint endpoint = context.getEndpoint("seda:foo");
Object body = consumer.receiveBody(endpoint);
assertEquals("Hello", body);
}
public void testConsumeReceiveEndpointBodyTimeout() throws Exception {
template.sendBody("seda:foo", "Hello");
assertNotNull(consumer.getCamelContext());
Endpoint endpoint = context.getEndpoint("seda:foo");
Object body = consumer.receiveBody(endpoint, 1000);
assertEquals("Hello", body);
}
public void testConsumeReceiveEndpointBodyType() throws Exception {
template.sendBody("seda:foo", "Hello");
assertNotNull(consumer.getCamelContext());
Endpoint endpoint = context.getEndpoint("seda:foo");
String body = consumer.receiveBody(endpoint, String.class);
assertEquals("Hello", body);
}
public void testConsumeReceiveEndpointBodyTimeoutType() throws Exception {
template.sendBody("seda:foo", "Hello");
assertNotNull(consumer.getCamelContext());
Endpoint endpoint = context.getEndpoint("seda:foo");
String body = consumer.receiveBody(endpoint, 1000, String.class);
assertEquals("Hello", body);
}
public void testConsumeReceiveBodyTimeoutType() throws Exception {
template.sendBody("seda:foo", "Hello");
String body = consumer.receiveBody("seda:foo", 1000, String.class);
assertEquals("Hello", body);
}
public void testConsumeReceiveEndpointBodyTypeNoWait() throws Exception {
assertNotNull(consumer.getCamelContext());
Endpoint endpoint = context.getEndpoint("seda:foo");
String out = consumer.receiveBodyNoWait(endpoint, String.class);
assertNull(out);
template.sendBody("seda:foo", "Hello");
// a little delay to let the consumer see it
Thread.sleep(200);
out = consumer.receiveBodyNoWait(endpoint, String.class);
assertEquals("Hello", out);
}
public void testConsumeReceiveEndpointBodyNoWait() throws Exception {
assertNotNull(consumer.getCamelContext());
Endpoint endpoint = context.getEndpoint("seda:foo");
Object out = consumer.receiveBodyNoWait(endpoint);
assertNull(out);
template.sendBody("seda:foo", "Hello");
// a little delay to let the consumer see it
Thread.sleep(200);
out = consumer.receiveBodyNoWait(endpoint);
assertEquals("Hello", out);
}
public void testReceiveException() throws Exception {
Exchange exchange = new DefaultExchange(context);
exchange.setException(new IllegalArgumentException("Damn"));
Exchange out = template.send("seda:foo", exchange);
assertTrue(out.isFailed());
assertNotNull(out.getException());
try {
consumer.receiveBody("seda:foo", String.class);
fail("Should have thrown an exception");
} catch (RuntimeCamelException e) {
assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
assertEquals("Damn", e.getCause().getMessage());
}
}
public void testReceiveOut() throws Exception {
Exchange exchange = new DefaultExchange(context);
exchange.getOut().setBody("Bye World");
template.send("seda:foo", exchange);
String out = consumer.receiveBody("seda:foo", String.class);
assertEquals("Bye World", out);
}
public void testCacheConsumers() throws Exception {
ConsumerTemplate template = new DefaultConsumerTemplate(context);
template.setMaximumCacheSize(500);
template.start();
assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
// test that we cache at most 500 consumers to avoid it eating to much memory
for (int i = 0; i < 503; i++) {
Endpoint e = context.getEndpoint("direct:queue:" + i);
template.receiveNoWait(e);
}
assertEquals("Size should be 500", 500, template.getCurrentCacheSize());
template.stop();
// should be 0
assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
}
public void testCacheConsumersFromContext() throws Exception {
ConsumerTemplate template = context.createConsumerTemplate(500);
assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
// test that we cache at most 500 consumers to avoid it eating to much memory
for (int i = 0; i < 503; i++) {
Endpoint e = context.getEndpoint("direct:queue:" + i);
template.receiveNoWait(e);
}
assertEquals("Size should be 500", 500, template.getCurrentCacheSize());
template.stop();
// should be 0
assertEquals("Size should be 0", 0, template.getCurrentCacheSize());
}
public void testDoneUoW() throws Exception {
deleteDirectory("target/foo");
template.sendBodyAndHeader("file:target/foo", "Hello World", Exchange.FILE_NAME, "hello.txt");
Exchange exchange = consumer.receive("file:target/foo?delete=true");
assertNotNull(exchange);
assertEquals("Hello World", exchange.getIn().getBody(String.class));
// file should still exists
File file = new File("target/foo/hello.txt").getAbsoluteFile();
assertTrue("File should exist " + file, file.exists());
// done the exchange
consumer.doneUoW(exchange);
assertFalse("File should have been deleted " + file, file.exists());
}
}