blob: f44e7c275a1b286d1cae7a615cb4290dccc2af24 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.mock;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
import org.apache.camel.Component;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Expression;
import org.apache.camel.Handler;
import org.apache.camel.Message;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.builder.ProcessorBuilder;
import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.camel.impl.DefaultEndpoint;
import org.apache.camel.impl.InterceptSendToEndpoint;
import org.apache.camel.spi.BrowsableEndpoint;
import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.CaseInsensitiveMap;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ExpressionComparator;
import org.apache.camel.util.FileUtil;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Mock endpoint which provides a literate, fluent API for testing routes
* using a <a href="http://jmock.org/">JMock style</a> API.
* <p/>
* The mock endpoint have two set of methods
* <ul>
* <li>expectedXXX or expectsXXX - To set pre conditions, before the test is executed</li>
* <li>assertXXX - To assert assertions, after the test has been executed</li>
* </ul>
* Its <b>important</b> to know the difference between the two set. The former is used to
* set expectations before the test is being started (eg before the mock receives messages).
* The latter is used after the test has been executed, to verify the expectations; or
* other assertions which you can perform after the test has been completed.
*
* @version
*/
public class MockEndpoint extends DefaultEndpoint implements BrowsableEndpoint {
private static final transient Logger LOG = LoggerFactory.getLogger(MockEndpoint.class);
// must be volatile so changes is visible between the thread which performs the assertions
// and the threads which process the exchanges when routing messages in Camel
private volatile int expectedCount;
private volatile int counter;
private volatile Processor defaultProcessor;
private volatile Map<Integer, Processor> processors;
private volatile List<Exchange> receivedExchanges;
private volatile List<Throwable> failures;
private volatile List<Runnable> tests;
private volatile CountDownLatch latch;
private volatile long sleepForEmptyTest;
private volatile long resultWaitTime;
private volatile long resultMinimumWaitTime;
private volatile long assertPeriod;
private volatile int expectedMinimumCount;
private volatile List<Object> expectedBodyValues;
private volatile List<Object> actualBodyValues;
private volatile Map<String, Object> expectedHeaderValues;
private volatile Map<String, Object> actualHeaderValues;
private volatile Map<String, Object> expectedPropertyValues;
private volatile Map<String, Object> actualPropertyValues;
private volatile Processor reporter;
public MockEndpoint(String endpointUri, Component component) {
super(endpointUri, component);
init();
}
public MockEndpoint(String endpointUri) {
super(endpointUri);
init();
}
public MockEndpoint() {
this(null);
}
/**
* A helper method to resolve the mock endpoint of the given URI on the given context
*
* @param context the camel context to try resolve the mock endpoint from
* @param uri the uri of the endpoint to resolve
* @return the endpoint
*/
public static MockEndpoint resolve(CamelContext context, String uri) {
return CamelContextHelper.getMandatoryEndpoint(context, uri, MockEndpoint.class);
}
public static void assertWait(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
long start = System.currentTimeMillis();
long left = unit.toMillis(timeout);
long end = start + left;
for (MockEndpoint endpoint : endpoints) {
if (!endpoint.await(left, TimeUnit.MILLISECONDS)) {
throw new AssertionError("Timeout waiting for endpoints to receive enough messages. " + endpoint.getEndpointUri() + " timed out.");
}
left = end - System.currentTimeMillis();
if (left <= 0) {
left = 0;
}
}
}
public static void assertIsSatisfied(long timeout, TimeUnit unit, MockEndpoint... endpoints) throws InterruptedException {
assertWait(timeout, unit, endpoints);
for (MockEndpoint endpoint : endpoints) {
endpoint.assertIsSatisfied();
}
}
public static void assertIsSatisfied(MockEndpoint... endpoints) throws InterruptedException {
for (MockEndpoint endpoint : endpoints) {
endpoint.assertIsSatisfied();
}
}
/**
* Asserts that all the expectations on any {@link MockEndpoint} instances registered
* in the given context are valid
*
* @param context the camel context used to find all the available endpoints to be asserted
*/
public static void assertIsSatisfied(CamelContext context) throws InterruptedException {
ObjectHelper.notNull(context, "camelContext");
Collection<Endpoint> endpoints = context.getEndpoints();
for (Endpoint endpoint : endpoints) {
// if the endpoint was intercepted we should get the delegate
if (endpoint instanceof InterceptSendToEndpoint) {
endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
}
if (endpoint instanceof MockEndpoint) {
MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
mockEndpoint.assertIsSatisfied();
}
}
}
/**
* Asserts that all the expectations on any {@link MockEndpoint} instances registered
* in the given context are valid
*
* @param context the camel context used to find all the available endpoints to be asserted
* @param timeout timeout
* @param unit time unit
*/
public static void assertIsSatisfied(CamelContext context, long timeout, TimeUnit unit) throws InterruptedException {
ObjectHelper.notNull(context, "camelContext");
ObjectHelper.notNull(unit, "unit");
Collection<Endpoint> endpoints = context.getEndpoints();
long millis = unit.toMillis(timeout);
for (Endpoint endpoint : endpoints) {
// if the endpoint was intercepted we should get the delegate
if (endpoint instanceof InterceptSendToEndpoint) {
endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
}
if (endpoint instanceof MockEndpoint) {
MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
mockEndpoint.setResultWaitTime(millis);
mockEndpoint.assertIsSatisfied();
}
}
}
/**
* Sets the assert period on all the expectations on any {@link MockEndpoint} instances registered
* in the given context.
*
* @param context the camel context used to find all the available endpoints
* @param period the period in millis
*/
public static void setAssertPeriod(CamelContext context, long period) {
ObjectHelper.notNull(context, "camelContext");
Collection<Endpoint> endpoints = context.getEndpoints();
for (Endpoint endpoint : endpoints) {
// if the endpoint was intercepted we should get the delegate
if (endpoint instanceof InterceptSendToEndpoint) {
endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
}
if (endpoint instanceof MockEndpoint) {
MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
mockEndpoint.setAssertPeriod(period);
}
}
}
/**
* Reset all mock endpoints
*
* @param context the camel context used to find all the available endpoints to reset
*/
public static void resetMocks(CamelContext context) {
ObjectHelper.notNull(context, "camelContext");
Collection<Endpoint> endpoints = context.getEndpoints();
for (Endpoint endpoint : endpoints) {
// if the endpoint was intercepted we should get the delegate
if (endpoint instanceof InterceptSendToEndpoint) {
endpoint = ((InterceptSendToEndpoint) endpoint).getDelegate();
}
if (endpoint instanceof MockEndpoint) {
MockEndpoint mockEndpoint = (MockEndpoint) endpoint;
mockEndpoint.reset();
}
}
}
public static void expectsMessageCount(int count, MockEndpoint... endpoints) throws InterruptedException {
for (MockEndpoint endpoint : endpoints) {
endpoint.setExpectedMessageCount(count);
}
}
public List<Exchange> getExchanges() {
return getReceivedExchanges();
}
public Consumer createConsumer(Processor processor) throws Exception {
throw new UnsupportedOperationException("You cannot consume from this endpoint");
}
public Producer createProducer() throws Exception {
return new DefaultAsyncProducer(this) {
public boolean process(Exchange exchange, AsyncCallback callback) {
onExchange(exchange);
callback.done(true);
return true;
}
};
}
public void reset() {
init();
}
// Testing API
// -------------------------------------------------------------------------
/**
* Handles the incoming exchange.
* <p/>
* This method turns this mock endpoint into a bean which you can use
* in the Camel routes, which allows you to inject MockEndpoint as beans
* in your routes and use the features of the mock to control the bean.
*
* @param exchange the exchange
* @throws Exception can be thrown
*/
@Handler
public void handle(Exchange exchange) throws Exception {
onExchange(exchange);
}
/**
* Set the processor that will be invoked when the index
* message is received.
*/
public void whenExchangeReceived(int index, Processor processor) {
this.processors.put(index, processor);
}
/**
* Set the processor that will be invoked when the some message
* is received.
*
* This processor could be overwritten by
* {@link #whenExchangeReceived(int, Processor)} method.
*/
public void whenAnyExchangeReceived(Processor processor) {
this.defaultProcessor = processor;
}
/**
* Set the expression which value will be set to the message body
* @param expression which is use to set the message body
*/
public void returnReplyBody(Expression expression) {
this.defaultProcessor = ProcessorBuilder.setBody(expression);
}
/**
* Set the expression which value will be set to the message header
* @param headerName that will be set value
* @param expression which is use to set the message header
*/
public void returnReplyHeader(String headerName, Expression expression) {
this.defaultProcessor = ProcessorBuilder.setHeader(headerName, expression);
}
/**
* Validates that all the available expectations on this endpoint are
* satisfied; or throw an exception
*/
public void assertIsSatisfied() throws InterruptedException {
assertIsSatisfied(sleepForEmptyTest);
}
/**
* Validates that all the available expectations on this endpoint are
* satisfied; or throw an exception
*
* @param timeoutForEmptyEndpoints the timeout in milliseconds that we
* should wait for the test to be true
*/
public void assertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
LOG.info("Asserting: " + this + " is satisfied");
doAssertIsSatisfied(timeoutForEmptyEndpoints);
if (assertPeriod > 0) {
// if an assert period was set then re-assert again to ensure the assertion is still valid
Thread.sleep(assertPeriod);
LOG.info("Re-asserting: " + this + " is satisfied after " + assertPeriod + " millis");
// do not use timeout when we re-assert
doAssertIsSatisfied(0);
}
}
protected void doAssertIsSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
if (expectedCount == 0) {
if (timeoutForEmptyEndpoints > 0) {
LOG.debug("Sleeping for: " + timeoutForEmptyEndpoints + " millis to check there really are no messages received");
Thread.sleep(timeoutForEmptyEndpoints);
}
assertEquals("Received message count", expectedCount, getReceivedCounter());
} else if (expectedCount > 0) {
if (expectedCount != getReceivedCounter()) {
waitForCompleteLatch();
}
assertEquals("Received message count", expectedCount, getReceivedCounter());
} else if (expectedMinimumCount > 0 && getReceivedCounter() < expectedMinimumCount) {
waitForCompleteLatch();
}
if (expectedMinimumCount >= 0) {
int receivedCounter = getReceivedCounter();
assertTrue("Received message count " + receivedCounter + ", expected at least " + expectedMinimumCount, expectedMinimumCount <= receivedCounter);
}
for (Runnable test : tests) {
test.run();
}
for (Throwable failure : failures) {
if (failure != null) {
LOG.error("Caught on " + getEndpointUri() + " Exception: " + failure, failure);
fail("Failed due to caught exception: " + failure);
}
}
}
/**
* Validates that the assertions fail on this endpoint
*/
public void assertIsNotSatisfied() throws InterruptedException {
boolean failed = false;
try {
assertIsSatisfied();
// did not throw expected error... fail!
failed = true;
} catch (AssertionError e) {
LOG.info("Caught expected failure: " + e);
}
if (failed) {
// fail() throws the AssertionError to indicate the test failed.
fail("Expected assertion failure but test succeeded!");
}
}
/**
* Validates that the assertions fail on this endpoint
* @param timeoutForEmptyEndpoints the timeout in milliseconds that we
* should wait for the test to be true
*/
public void assertIsNotSatisfied(long timeoutForEmptyEndpoints) throws InterruptedException {
boolean failed = false;
try {
assertIsSatisfied(timeoutForEmptyEndpoints);
// did not throw expected error... fail!
failed = true;
} catch (AssertionError e) {
LOG.info("Caught expected failure: " + e);
}
if (failed) {
// fail() throws the AssertionError to indicate the test failed.
fail("Expected assertion failure but test succeeded!");
}
}
/**
* Specifies the expected number of message exchanges that should be
* received by this endpoint
*
* @param expectedCount the number of message exchanges that should be
* expected by this endpoint
*/
public void expectedMessageCount(int expectedCount) {
setExpectedMessageCount(expectedCount);
}
/**
* Sets a grace period after which the mock endpoint will re-assert
* to ensure the preliminary assertion is still valid.
* <p/>
* By default this period is disabled
*
* @param period grace period in millis
*/
public void setAssertPeriod(long period) {
this.assertPeriod = period;
}
/**
* Specifies the minimum number of expected message exchanges that should be
* received by this endpoint
*
* @param expectedCount the number of message exchanges that should be
* expected by this endpoint
*/
public void expectedMinimumMessageCount(int expectedCount) {
setMinimumExpectedMessageCount(expectedCount);
}
/**
* Sets an expectation that the given header name & value are received by this endpoint
* <p/>
* You can set multiple expectations for different header names.
* If you set a value of <tt>null</tt> that means we accept either the header is absent, or its value is <tt>null</tt>
*/
public void expectedHeaderReceived(final String name, final Object value) {
if (expectedHeaderValues == null) {
expectedHeaderValues = new CaseInsensitiveMap();
}
expectedHeaderValues.put(name, value);
expects(new Runnable() {
public void run() {
for (int i = 0; i < getReceivedExchanges().size(); i++) {
Exchange exchange = getReceivedExchanges().get(i);
for (Map.Entry<String, Object> entry : expectedHeaderValues.entrySet()) {
String key = entry.getKey();
Object expectedValue = entry.getValue();
// we accept that an expectedValue of null also means that the header may be absent
if (expectedValue != null) {
assertTrue("Exchange " + i + " has no headers", exchange.getIn().hasHeaders());
boolean hasKey = exchange.getIn().getHeaders().containsKey(key);
assertTrue("No header with name " + key + " found for message: " + i, hasKey);
}
Object actualValue = exchange.getIn().getHeader(key);
actualValue = extractActualValue(exchange, actualValue, expectedValue);
assertEquals("Header with name " + key + " for message: " + i, expectedValue, actualValue);
}
}
}
});
}
/**
* Sets an expectation that the given property name & value are received by this endpoint
* <p/>
* You can set multiple expectations for different property names.
* If you set a value of <tt>null</tt> that means we accept either the property is absent, or its value is <tt>null</tt>
*/
public void expectedPropertyReceived(final String name, final Object value) {
if (expectedPropertyValues == null) {
expectedPropertyValues = new ConcurrentHashMap<String, Object>();
}
if (value != null) {
// ConcurrentHashMap cannot store null values
expectedPropertyValues.put(name, value);
}
expects(new Runnable() {
public void run() {
for (int i = 0; i < getReceivedExchanges().size(); i++) {
Exchange exchange = getReceivedExchanges().get(i);
for (Map.Entry<String, Object> entry : expectedPropertyValues.entrySet()) {
String key = entry.getKey();
Object expectedValue = entry.getValue();
// we accept that an expectedValue of null also means that the header may be absent
if (expectedValue != null) {
assertTrue("Exchange " + i + " has no properties", !exchange.getProperties().isEmpty());
boolean hasKey = exchange.getProperties().containsKey(key);
assertTrue("No property with name " + key + " found for message: " + i, hasKey);
}
Object actualValue = exchange.getProperty(key);
actualValue = extractActualValue(exchange, actualValue, expectedValue);
assertEquals("Property with name " + key + " for message: " + i, expectedValue, actualValue);
}
}
}
});
}
/**
* Adds an expectation that the given body values are received by this
* endpoint in the specified order
*/
@SuppressWarnings("unchecked")
public void expectedBodiesReceived(final List bodies) {
expectedMessageCount(bodies.size());
this.expectedBodyValues = bodies;
this.actualBodyValues = new ArrayList<Object>();
expects(new Runnable() {
public void run() {
for (int i = 0; i < expectedBodyValues.size(); i++) {
Exchange exchange = getReceivedExchanges().get(i);
assertTrue("No exchange received for counter: " + i, exchange != null);
Object expectedBody = expectedBodyValues.get(i);
Object actualBody = null;
if (i < actualBodyValues.size()) {
actualBody = actualBodyValues.get(i);
}
actualBody = extractActualValue(exchange, actualBody, expectedBody);
assertEquals("Body of message: " + i, expectedBody, actualBody);
}
}
});
}
private Object extractActualValue(Exchange exchange, Object actualValue, Object expectedValue) {
if (actualValue == null) {
return null;
}
if (actualValue instanceof Expression) {
actualValue = ((Expression)actualValue).evaluate(exchange, expectedValue != null ? expectedValue.getClass() : Object.class);
} else if (actualValue instanceof Predicate) {
actualValue = ((Predicate)actualValue).matches(exchange);
} else if (expectedValue != null) {
String from = actualValue.getClass().getName();
String to = expectedValue.getClass().getName();
actualValue = getCamelContext().getTypeConverter().convertTo(expectedValue.getClass(), actualValue);
assertTrue("There is no type conversion possible from " + from + " to " + to, actualValue != null);
}
return actualValue;
}
/**
* Sets an expectation that the given predicates matches the received messages by this endpoint
*/
public void expectedMessagesMatches(Predicate... predicates) {
for (int i = 0; i < predicates.length; i++) {
final int messageIndex = i;
final Predicate predicate = predicates[i];
final AssertionClause clause = new AssertionClause(this) {
public void run() {
addPredicate(predicate);
applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
}
};
expects(clause);
}
}
/**
* Sets an expectation that the given body values are received by this endpoint
*/
public void expectedBodiesReceived(Object... bodies) {
List<Object> bodyList = new ArrayList<Object>();
bodyList.addAll(Arrays.asList(bodies));
expectedBodiesReceived(bodyList);
}
/**
* Adds an expectation that the given body value are received by this endpoint
*/
public AssertionClause expectedBodyReceived() {
expectedMessageCount(1);
final AssertionClause clause = new AssertionClause(this) {
public void run() {
Exchange exchange = getReceivedExchanges().get(0);
assertTrue("No exchange received for counter: " + 0, exchange != null);
Object actualBody = exchange.getIn().getBody();
Expression exp = createExpression(getCamelContext());
Object expectedBody = exp.evaluate(exchange, Object.class);
assertEquals("Body of message: " + 0, expectedBody, actualBody);
}
};
expects(clause);
return clause;
}
/**
* Adds an expectation that the given body values are received by this
* endpoint in any order
*/
@SuppressWarnings("unchecked")
public void expectedBodiesReceivedInAnyOrder(final List bodies) {
expectedMessageCount(bodies.size());
this.expectedBodyValues = bodies;
this.actualBodyValues = new ArrayList<Object>();
expects(new Runnable() {
public void run() {
Set<Object> actualBodyValuesSet = new HashSet<Object>(actualBodyValues);
for (int i = 0; i < expectedBodyValues.size(); i++) {
Exchange exchange = getReceivedExchanges().get(i);
assertTrue("No exchange received for counter: " + i, exchange != null);
Object expectedBody = expectedBodyValues.get(i);
assertTrue("Message with body " + expectedBody
+ " was expected but not found in " + actualBodyValuesSet,
actualBodyValuesSet.remove(expectedBody));
}
}
});
}
/**
* Adds an expectation that the given body values are received by this
* endpoint in any order
*/
public void expectedBodiesReceivedInAnyOrder(Object... bodies) {
List<Object> bodyList = new ArrayList<Object>();
bodyList.addAll(Arrays.asList(bodies));
expectedBodiesReceivedInAnyOrder(bodyList);
}
/**
* Adds an expectation that a file exists with the given name
*
* @param name name of file, will cater for / and \ on different OS platforms
*/
public void expectedFileExists(final String name) {
expectedFileExists(name, null);
}
/**
* Adds an expectation that a file exists with the given name
* <p/>
* Will wait at most 5 seconds while checking for the existence of the file.
*
* @param name name of file, will cater for / and \ on different OS platforms
* @param content content of file to compare, can be <tt>null</tt> to not compare content
*/
public void expectedFileExists(final String name, final String content) {
final File file = new File(FileUtil.normalizePath(name)).getAbsoluteFile();
expects(new Runnable() {
public void run() {
// wait at most 5 seconds for the file to exists
final long timeout = System.currentTimeMillis() + 5000;
boolean stop = false;
while (!stop && !file.exists()) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
// ignore
}
stop = System.currentTimeMillis() > timeout;
}
assertTrue("The file should exists: " + name, file.exists());
if (content != null) {
String body = getCamelContext().getTypeConverter().convertTo(String.class, file);
assertEquals("Content of file: " + name, content, body);
}
}
});
}
/**
* Adds an expectation that messages received should have the given exchange pattern
*/
public void expectedExchangePattern(final ExchangePattern exchangePattern) {
expectedMessagesMatches(new Predicate() {
public boolean matches(Exchange exchange) {
return exchange.getPattern().equals(exchangePattern);
}
});
}
/**
* Adds an expectation that messages received should have ascending values
* of the given expression such as a user generated counter value
*/
public void expectsAscending(final Expression expression) {
expects(new Runnable() {
public void run() {
assertMessagesAscending(expression);
}
});
}
/**
* Adds an expectation that messages received should have ascending values
* of the given expression such as a user generated counter value
*/
public AssertionClause expectsAscending() {
final AssertionClause clause = new AssertionClause(this) {
public void run() {
assertMessagesAscending(createExpression(getCamelContext()));
}
};
expects(clause);
return clause;
}
/**
* Adds an expectation that messages received should have descending values
* of the given expression such as a user generated counter value
*/
public void expectsDescending(final Expression expression) {
expects(new Runnable() {
public void run() {
assertMessagesDescending(expression);
}
});
}
/**
* Adds an expectation that messages received should have descending values
* of the given expression such as a user generated counter value
*/
public AssertionClause expectsDescending() {
final AssertionClause clause = new AssertionClause(this) {
public void run() {
assertMessagesDescending(createExpression(getCamelContext()));
}
};
expects(clause);
return clause;
}
/**
* Adds an expectation that no duplicate messages should be received using
* the expression to determine the message ID
*
* @param expression the expression used to create a unique message ID for
* message comparison (which could just be the message
* payload if the payload can be tested for uniqueness using
* {@link Object#equals(Object)} and
* {@link Object#hashCode()}
*/
public void expectsNoDuplicates(final Expression expression) {
expects(new Runnable() {
public void run() {
assertNoDuplicates(expression);
}
});
}
/**
* Adds an expectation that no duplicate messages should be received using
* the expression to determine the message ID
*/
public AssertionClause expectsNoDuplicates() {
final AssertionClause clause = new AssertionClause(this) {
public void run() {
assertNoDuplicates(createExpression(getCamelContext()));
}
};
expects(clause);
return clause;
}
/**
* Asserts that the messages have ascending values of the given expression
*/
public void assertMessagesAscending(Expression expression) {
assertMessagesSorted(expression, true);
}
/**
* Asserts that the messages have descending values of the given expression
*/
public void assertMessagesDescending(Expression expression) {
assertMessagesSorted(expression, false);
}
protected void assertMessagesSorted(Expression expression, boolean ascending) {
String type = ascending ? "ascending" : "descending";
ExpressionComparator comparator = new ExpressionComparator(expression);
List<Exchange> list = getReceivedExchanges();
for (int i = 1; i < list.size(); i++) {
int j = i - 1;
Exchange e1 = list.get(j);
Exchange e2 = list.get(i);
int result = comparator.compare(e1, e2);
if (result == 0) {
fail("Messages not " + type + ". Messages" + j + " and " + i + " are equal with value: "
+ expression.evaluate(e1, Object.class) + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
} else {
if (!ascending) {
result = result * -1;
}
if (result > 0) {
fail("Messages not " + type + ". Message " + j + " has value: " + expression.evaluate(e1, Object.class)
+ " and message " + i + " has value: " + expression.evaluate(e2, Object.class) + " for expression: "
+ expression + ". Exchanges: " + e1 + " and " + e2);
}
}
}
}
public void assertNoDuplicates(Expression expression) {
Map<Object, Exchange> map = new HashMap<Object, Exchange>();
List<Exchange> list = getReceivedExchanges();
for (int i = 0; i < list.size(); i++) {
Exchange e2 = list.get(i);
Object key = expression.evaluate(e2, Object.class);
Exchange e1 = map.get(key);
if (e1 != null) {
fail("Duplicate message found on message " + i + " has value: " + key + " for expression: " + expression + ". Exchanges: " + e1 + " and " + e2);
} else {
map.put(key, e2);
}
}
}
/**
* Adds the expectation which will be invoked when enough messages are received
*/
public void expects(Runnable runnable) {
tests.add(runnable);
}
/**
* Adds an assertion to the given message index
*
* @param messageIndex the number of the message
* @return the assertion clause
*/
public AssertionClause message(final int messageIndex) {
final AssertionClause clause = new AssertionClause(this) {
public void run() {
applyAssertionOn(MockEndpoint.this, messageIndex, assertExchangeReceived(messageIndex));
}
};
expects(clause);
return clause;
}
/**
* Adds an assertion to all the received messages
*
* @return the assertion clause
*/
public AssertionClause allMessages() {
final AssertionClause clause = new AssertionClause(this) {
public void run() {
List<Exchange> list = getReceivedExchanges();
int index = 0;
for (Exchange exchange : list) {
applyAssertionOn(MockEndpoint.this, index++, exchange);
}
}
};
expects(clause);
return clause;
}
/**
* Asserts that the given index of message is received (starting at zero)
*/
public Exchange assertExchangeReceived(int index) {
int count = getReceivedCounter();
assertTrue("Not enough messages received. Was: " + count, count > index);
return getReceivedExchanges().get(index);
}
// Properties
// -------------------------------------------------------------------------
public List<Throwable> getFailures() {
return failures;
}
public int getReceivedCounter() {
return receivedExchanges.size();
}
public List<Exchange> getReceivedExchanges() {
return receivedExchanges;
}
public int getExpectedCount() {
return expectedCount;
}
public long getSleepForEmptyTest() {
return sleepForEmptyTest;
}
/**
* Allows a sleep to be specified to wait to check that this endpoint really
* is empty when {@link #expectedMessageCount(int)} is called with zero
*
* @param sleepForEmptyTest the milliseconds to sleep for to determine that
* this endpoint really is empty
*/
public void setSleepForEmptyTest(long sleepForEmptyTest) {
this.sleepForEmptyTest = sleepForEmptyTest;
}
public long getResultWaitTime() {
return resultWaitTime;
}
/**
* Sets the maximum amount of time (in millis) the {@link #assertIsSatisfied()} will
* wait on a latch until it is satisfied
*/
public void setResultWaitTime(long resultWaitTime) {
this.resultWaitTime = resultWaitTime;
}
/**
* Sets the minimum expected amount of time (in millis) the {@link #assertIsSatisfied()} will
* wait on a latch until it is satisfied
*/
public void setMinimumResultWaitTime(long resultMinimumWaitTime) {
this.resultMinimumWaitTime = resultMinimumWaitTime;
}
/**
* Specifies the expected number of message exchanges that should be
* received by this endpoint
*
* @param expectedCount the number of message exchanges that should be
* expected by this endpoint
*/
public void setExpectedMessageCount(int expectedCount) {
this.expectedCount = expectedCount;
if (expectedCount <= 0) {
latch = null;
} else {
latch = new CountDownLatch(expectedCount);
}
}
/**
* Specifies the minimum number of expected message exchanges that should be
* received by this endpoint
*
* @param expectedCount the number of message exchanges that should be
* expected by this endpoint
*/
public void setMinimumExpectedMessageCount(int expectedCount) {
this.expectedMinimumCount = expectedCount;
if (expectedCount <= 0) {
latch = null;
} else {
latch = new CountDownLatch(expectedMinimumCount);
}
}
public Processor getReporter() {
return reporter;
}
/**
* Allows a processor to added to the endpoint to report on progress of the test
*/
public void setReporter(Processor reporter) {
this.reporter = reporter;
}
// Implementation methods
// -------------------------------------------------------------------------
private void init() {
expectedCount = -1;
counter = 0;
processors = new HashMap<Integer, Processor>();
receivedExchanges = new CopyOnWriteArrayList<Exchange>();
failures = new CopyOnWriteArrayList<Throwable>();
tests = new CopyOnWriteArrayList<Runnable>();
latch = null;
sleepForEmptyTest = 0;
resultWaitTime = 0;
resultMinimumWaitTime = 0L;
assertPeriod = 0L;
expectedMinimumCount = -1;
expectedBodyValues = null;
actualBodyValues = new ArrayList<Object>();
expectedHeaderValues = null;
actualHeaderValues = null;
expectedPropertyValues = null;
actualPropertyValues = null;
}
protected synchronized void onExchange(Exchange exchange) {
try {
if (reporter != null) {
reporter.process(exchange);
}
// copy the exchange so the mock stores the copy and not the actual exchange
Exchange copy = ExchangeHelper.createCopy(exchange, true);
performAssertions(exchange, copy);
} catch (Throwable e) {
// must catch java.lang.Throwable as AssertionException extends java.lang.Error
failures.add(e);
} finally {
// make sure latch is counted down to avoid test hanging forever
if (latch != null) {
latch.countDown();
}
}
}
/**
* Performs the assertions on the incoming exchange.
*
* @param exchange the actual exchange
* @param copy a copy of the exchange (only store this)
* @throws Exception can be thrown if something went wrong
*/
protected void performAssertions(Exchange exchange, Exchange copy) throws Exception {
Message in = copy.getIn();
Object actualBody = in.getBody();
if (expectedHeaderValues != null) {
if (actualHeaderValues == null) {
actualHeaderValues = new CaseInsensitiveMap();
}
if (in.hasHeaders()) {
actualHeaderValues.putAll(in.getHeaders());
}
}
if (expectedPropertyValues != null) {
if (actualPropertyValues == null) {
actualPropertyValues = new ConcurrentHashMap<String, Object>();
}
actualPropertyValues.putAll(copy.getProperties());
}
if (expectedBodyValues != null) {
int index = actualBodyValues.size();
if (expectedBodyValues.size() > index) {
Object expectedBody = expectedBodyValues.get(index);
if (expectedBody != null) {
// prefer to convert body early, for example when using files
// we need to read the content at this time
Object body = in.getBody(expectedBody.getClass());
if (body != null) {
actualBody = body;
}
}
actualBodyValues.add(actualBody);
}
}
// let counter be 0 index-based in the logs
if (LOG.isDebugEnabled()) {
String msg = getEndpointUri() + " >>>> " + counter + " : " + copy + " with body: " + actualBody;
if (copy.getIn().hasHeaders()) {
msg += " and headers:" + copy.getIn().getHeaders();
}
LOG.debug(msg);
}
++counter;
// record timestamp when exchange was received
copy.setProperty(Exchange.RECEIVED_TIMESTAMP, new Date());
receivedExchanges.add(copy);
Processor processor = processors.get(getReceivedCounter()) != null
? processors.get(getReceivedCounter()) : defaultProcessor;
if (processor != null) {
try {
// must process the incoming exchange and NOT the copy as the idea
// is the end user can manipulate the exchange
processor.process(exchange);
} catch (Exception e) {
// set exceptions on exchange so we can throw exceptions to simulate errors
exchange.setException(e);
}
}
}
protected void waitForCompleteLatch() throws InterruptedException {
if (latch == null) {
fail("Should have a latch!");
}
StopWatch watch = new StopWatch();
waitForCompleteLatch(resultWaitTime);
long delta = watch.stop();
LOG.debug("Took {} millis to complete latch", delta);
if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
fail("Expected minimum " + resultMinimumWaitTime
+ " millis waiting on the result, but was faster with " + delta + " millis.");
}
}
protected void waitForCompleteLatch(long timeout) throws InterruptedException {
// Wait for a default 10 seconds if resultWaitTime is not set
long waitTime = timeout == 0 ? 10000L : timeout;
// now lets wait for the results
LOG.debug("Waiting on the latch for: " + timeout + " millis");
latch.await(waitTime, TimeUnit.MILLISECONDS);
}
protected void assertEquals(String message, Object expectedValue, Object actualValue) {
if (!ObjectHelper.equal(expectedValue, actualValue)) {
fail(message + ". Expected: <" + expectedValue + "> but was: <" + actualValue + ">");
}
}
protected void assertTrue(String message, boolean predicate) {
if (!predicate) {
fail(message);
}
}
protected void fail(Object message) {
if (LOG.isDebugEnabled()) {
List<Exchange> list = getReceivedExchanges();
int index = 0;
for (Exchange exchange : list) {
LOG.debug("{} failed and received[{}]: {}", new Object[]{getEndpointUri(), ++index, exchange});
}
}
throw new AssertionError(getEndpointUri() + " " + message);
}
public int getExpectedMinimumCount() {
return expectedMinimumCount;
}
public void await() throws InterruptedException {
if (latch != null) {
latch.await();
}
}
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
if (latch != null) {
return latch.await(timeout, unit);
}
return true;
}
public boolean isSingleton() {
return true;
}
public boolean isLenientProperties() {
return true;
}
}