blob: 1519f7bd48979cbe647c2f1ae7278d97b9a02222 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.util;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.Message;
import org.apache.camel.NoSuchBeanException;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.NoSuchHeaderException;
import org.apache.camel.NoSuchPropertyException;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.TypeConverter;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.spi.UnitOfWork;
/**
* Some helper methods for working with {@link Exchange} objects
*
* @version
*/
public final class ExchangeHelper {
/**
* Utility classes should not have a public constructor.
*/
private ExchangeHelper() {
}
/**
* Extracts the Exchange.BINDING of the given type or null if not present
*
* @param exchange the message exchange
* @param type the expected binding type
* @return the binding object of the given type or null if it could not be found or converted
*/
public static <T> T getBinding(Exchange exchange, Class<T> type) {
return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null;
}
/**
* Attempts to resolve the endpoint for the given value
*
* @param exchange the message exchange being processed
* @param value the value which can be an {@link Endpoint} or an object
* which provides a String representation of an endpoint via
* {@link #toString()}
* @return the endpoint
* @throws NoSuchEndpointException if the endpoint cannot be resolved
*/
public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
Endpoint endpoint;
if (value instanceof Endpoint) {
endpoint = (Endpoint) value;
} else {
String uri = value.toString().trim();
endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
}
return endpoint;
}
public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException {
T result = exchange.getProperty(propertyName, type);
if (result != null) {
return result;
}
throw new NoSuchPropertyException(exchange, propertyName, type);
}
public static <T> T getMandatoryHeader(Exchange exchange, String propertyName, Class<T> type) throws NoSuchHeaderException {
T answer = exchange.getIn().getHeader(propertyName, type);
if (answer == null) {
throw new NoSuchHeaderException(exchange, propertyName, type);
}
return answer;
}
/**
* Returns the mandatory inbound message body of the correct type or throws
* an exception if it is not present
*/
public static Object getMandatoryInBody(Exchange exchange) throws InvalidPayloadException {
return exchange.getIn().getMandatoryBody();
}
/**
* Returns the mandatory inbound message body of the correct type or throws
* an exception if it is not present
*/
public static <T> T getMandatoryInBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
return exchange.getIn().getMandatoryBody(type);
}
/**
* Returns the mandatory outbound message body of the correct type or throws
* an exception if it is not present
*/
public static Object getMandatoryOutBody(Exchange exchange) throws InvalidPayloadException {
return exchange.getOut().getMandatoryBody();
}
/**
* Returns the mandatory outbound message body of the correct type or throws
* an exception if it is not present
*/
public static <T> T getMandatoryOutBody(Exchange exchange, Class<T> type) throws InvalidPayloadException {
return exchange.getOut().getMandatoryBody(type);
}
/**
* Converts the value to the given expected type or throws an exception
*/
public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value) throws NoTypeConversionAvailableException {
CamelContext camelContext = exchange.getContext();
ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
TypeConverter converter = camelContext.getTypeConverter();
if (converter != null) {
return converter.mandatoryConvertTo(type, exchange, value);
}
throw new NoTypeConversionAvailableException(value, type);
}
/**
* Converts the value to the given expected type returning null if it could
* not be converted
*/
public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) {
CamelContext camelContext = exchange.getContext();
ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
TypeConverter converter = camelContext.getTypeConverter();
if (converter != null) {
return converter.convertTo(type, exchange, value);
}
return null;
}
/**
* Creates a new instance and copies from the current message exchange so that it can be
* forwarded to another destination as a new instance. Unlike regular copy this operation
* will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
* for async messaging, where the original and copied exchange are independent.
*
* @param exchange original copy of the exchange
* @param handover whether the on completion callbacks should be handed over to the new copy.
*/
public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
String id = exchange.getExchangeId();
Exchange copy = exchange.copy();
// do not share the unit of work
copy.setUnitOfWork(null);
// hand over on completion to the copy if we got any
UnitOfWork uow = exchange.getUnitOfWork();
if (handover && uow != null) {
uow.handoverSynchronization(copy);
}
// set a correlation id so we can track back the original exchange
copy.setProperty(Exchange.CORRELATION_ID, id);
return copy;
}
/**
* Creates a new instance and copies from the current message exchange so that it can be
* forwarded to another destination as a new instance.
*
* @param exchange original copy of the exchange
* @param preserveExchangeId whether or not the exchange id should be preserved
* @return the copy
*/
public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) {
Exchange copy = new DefaultExchange(exchange);
if (preserveExchangeId) {
// must preserve exchange id
copy.setExchangeId(exchange.getExchangeId());
}
copy.getProperties().putAll(exchange.getProperties());
copy.setIn(exchange.getIn().copy());
if (exchange.hasOut()) {
copy.setOut(exchange.getOut().copy());
}
return copy;
}
/**
* Copies the results of a message exchange from the source exchange to the result exchange
* which will copy the out and fault message contents and the exception
*
* @param result the result exchange which will have the output and error state added
* @param source the source exchange which is not modified
*/
public static void copyResults(Exchange result, Exchange source) {
// --------------------------------------------------------------------
// TODO: merge logic with that of copyResultsPreservePattern()
// --------------------------------------------------------------------
if (result == source) {
// we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
// and the result is not failed
if (result.getPattern() == ExchangePattern.InOptionalOut) {
// keep as is
} else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
// copy IN to OUT as we expect a OUT response
result.getOut().copyFrom(source.getIn());
}
return;
}
if (result != source) {
result.setException(source.getException());
if (source.hasOut()) {
result.getOut().copyFrom(source.getOut());
} else if (result.getPattern() == ExchangePattern.InOptionalOut) {
// special case where the result is InOptionalOut and with no OUT response
// so we should return null to indicate this fact
result.setOut(null);
} else {
// no results so lets copy the last input
// as the final processor on a pipeline might not
// have created any OUT; such as a mock:endpoint
// so lets assume the last IN is the OUT
if (result.getPattern().isOutCapable()) {
// only set OUT if its OUT capable
result.getOut().copyFrom(source.getIn());
} else {
// if not replace IN instead to keep the MEP
result.getIn().copyFrom(source.getIn());
// clear any existing OUT as the result is on the IN
if (result.hasOut()) {
result.setOut(null);
}
}
}
if (source.hasProperties()) {
result.getProperties().putAll(source.getProperties());
}
}
}
/**
* Copies the <code>source</code> exchange to <code>target</code> exchange
* preserving the {@link ExchangePattern} of <code>target</code>.
*
* @param source source exchange.
* @param result target exchange.
*/
public static void copyResultsPreservePattern(Exchange result, Exchange source) {
// --------------------------------------------------------------------
// TODO: merge logic with that of copyResults()
// --------------------------------------------------------------------
if (result == source) {
// we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
// and the result is not failed
if (result.getPattern() == ExchangePattern.InOptionalOut) {
// keep as is
} else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
// copy IN to OUT as we expect a OUT response
result.getOut().copyFrom(source.getIn());
}
return;
}
// copy in message
result.getIn().copyFrom(source.getIn());
// copy out message
if (source.hasOut()) {
// exchange pattern sensitive
Message resultMessage = source.getOut().isFault() ? result.getOut() : getResultMessage(result);
resultMessage.copyFrom(source.getOut());
}
// copy exception
result.setException(source.getException());
// copy properties
if (source.hasProperties()) {
result.getProperties().putAll(source.getProperties());
}
}
/**
* Returns the message where to write results in an
* exchange-pattern-sensitive way.
*
* @param exchange message exchange.
* @return result message.
*/
public static Message getResultMessage(Exchange exchange) {
if (exchange.getPattern().isOutCapable()) {
return exchange.getOut();
} else {
return exchange.getIn();
}
}
/**
* Returns true if the given exchange pattern (if defined) can support OUT messages
*
* @param exchange the exchange to interrogate
* @return true if the exchange is defined as an {@link ExchangePattern} which supports
* OUT messages
*/
public static boolean isOutCapable(Exchange exchange) {
ExchangePattern pattern = exchange.getPattern();
return pattern != null && pattern.isOutCapable();
}
/**
* Creates a new instance of the given type from the injector
*/
public static <T> T newInstance(Exchange exchange, Class<T> type) {
return exchange.getContext().getInjector().newInstance(type);
}
/**
* Creates a Map of the variables which are made available to a script or template
*
* @param exchange the exchange to make available
* @return a Map populated with the require variables
*/
public static Map<String, Object> createVariableMap(Exchange exchange) {
Map<String, Object> answer = new HashMap<String, Object>();
populateVariableMap(exchange, answer);
return answer;
}
/**
* Populates the Map with the variables which are made available to a script or template
*
* @param exchange the exchange to make available
* @param map the map to populate
*/
public static void populateVariableMap(Exchange exchange, Map<String, Object> map) {
map.put("exchange", exchange);
Message in = exchange.getIn();
map.put("in", in);
map.put("request", in);
map.put("headers", in.getHeaders());
map.put("body", in.getBody());
if (isOutCapable(exchange)) {
// if we are out capable then set out and response as well
// however only grab OUT if it exists, otherwise reuse IN
// this prevents side effects to alter the Exchange if we force creating an OUT message
Message msg = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
map.put("out", msg);
map.put("response", msg);
}
map.put("camelContext", exchange.getContext());
}
/**
* Returns the MIME content type on the input message or null if one is not defined
*/
public static String getContentType(Exchange exchange) {
return MessageHelper.getContentType(exchange.getIn());
}
/**
* Returns the MIME content encoding on the input message or null if one is not defined
*/
public static String getContentEncoding(Exchange exchange) {
return MessageHelper.getContentEncoding(exchange.getIn());
}
/**
* Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
*/
public static Object lookupMandatoryBean(Exchange exchange, String name) {
Object value = lookupBean(exchange, name);
if (value == null) {
throw new NoSuchBeanException(name);
}
return value;
}
/**
* Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
*/
public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) {
T value = lookupBean(exchange, name, type);
if (value == null) {
throw new NoSuchBeanException(name);
}
return value;
}
/**
* Performs a lookup in the registry of the bean name
*/
public static Object lookupBean(Exchange exchange, String name) {
return exchange.getContext().getRegistry().lookup(name);
}
/**
* Performs a lookup in the registry of the bean name and type
*/
public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) {
return exchange.getContext().getRegistry().lookup(name, type);
}
/**
* Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given
* or null if none could be found
*/
public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) {
for (Exchange exchange : exchanges) {
String id = exchange.getExchangeId();
if (id != null && id.equals(exchangeId)) {
return exchange;
}
}
return null;
}
/**
* Prepares the exchanges for aggregation.
* <p/>
* This implementation will copy the OUT body to the IN body so when you do
* aggregation the body is <b>only</b> in the IN body to avoid confusing end users.
*
* @param oldExchange the old exchange
* @param newExchange the new exchange
*/
public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) {
// move body/header from OUT to IN
if (oldExchange != null) {
if (oldExchange.hasOut()) {
oldExchange.setIn(oldExchange.getOut());
oldExchange.setOut(null);
}
}
if (newExchange != null) {
if (newExchange.hasOut()) {
newExchange.setIn(newExchange.getOut());
newExchange.setOut(null);
}
}
}
public static boolean isFailureHandled(Exchange exchange) {
return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class);
}
public static boolean isUnitOfWorkExhausted(Exchange exchange) {
return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class);
}
public static void setFailureHandled(Exchange exchange) {
exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE);
// clear exception since its failure handled
exchange.setException(null);
}
public static boolean isRedeliveryExhausted(Exchange exchange) {
return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
}
public static boolean isRedelivered(Exchange exchange) {
return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class);
}
public static boolean isInterrupted(Exchange exchange) {
return exchange.getException(InterruptedException.class) != null;
}
/**
* Extracts the body from the given exchange.
* <p/>
* If the exchange pattern is provided it will try to honor it and retrieve the body
* from either IN or OUT according to the pattern.
*
* @param exchange the exchange
* @param pattern exchange pattern if given, can be <tt>null</tt>
* @return the result body, can be <tt>null</tt>.
* @throws CamelExecutionException is thrown if the processing of the exchange failed
*/
public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) {
Object answer = null;
if (exchange != null) {
// rethrow if there was an exception during execution
if (exchange.getException() != null) {
throw ObjectHelper.wrapCamelExecutionException(exchange, exchange.getException());
}
// result could have a fault message
if (hasFaultMessage(exchange)) {
return exchange.getOut().getBody();
}
// okay no fault then return the response according to the pattern
// try to honor pattern if provided
boolean notOut = pattern != null && !pattern.isOutCapable();
boolean hasOut = exchange.hasOut();
if (hasOut && !notOut) {
// we have a response in out and the pattern is out capable
answer = exchange.getOut().getBody();
} else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) {
// special case where the result is InOptionalOut and with no OUT response
// so we should return null to indicate this fact
answer = null;
} else {
// use IN as the response
answer = exchange.getIn().getBody();
}
}
return answer;
}
/**
* Tests whether the exchange has a fault message set and that its not null.
*
* @param exchange the exchange
* @return <tt>true</tt> if fault message exists
*/
public static boolean hasFaultMessage(Exchange exchange) {
return exchange.hasOut() && exchange.getOut().isFault() && exchange.getOut().getBody() != null;
}
/**
* Tests whether the exchange has already been handled by the error handler
*
* @param exchange the exchange
* @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise
*/
public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) {
return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
}
/**
* Extracts the body from the given future, that represents a handle to an asynchronous exchange.
* <p/>
* Will wait until the future task is complete.
*
* @param context the camel context
* @param future the future handle
* @param type the expected body response type
* @return the result body, can be <tt>null</tt>.
* @throws CamelExecutionException is thrown if the processing of the exchange failed
*/
public static <T> T extractFutureBody(CamelContext context, Future<Object> future, Class<T> type) {
try {
return doExtractFutureBody(context, future.get(), type);
} catch (InterruptedException e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
} catch (ExecutionException e) {
// execution failed due to an exception so rethrow the cause
throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
} finally {
// its harmless to cancel if task is already completed
// and in any case we do not want to get hold of the task a 2nd time
// and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
future.cancel(true);
}
}
/**
* Extracts the body from the given future, that represents a handle to an asynchronous exchange.
* <p/>
* Will wait for the future task to complete, but waiting at most the timeout value.
*
* @param context the camel context
* @param future the future handle
* @param timeout timeout value
* @param unit timeout unit
* @param type the expected body response type
* @return the result body, can be <tt>null</tt>.
* @throws CamelExecutionException is thrown if the processing of the exchange failed
* @throws java.util.concurrent.TimeoutException
* is thrown if a timeout triggered
*/
public static <T> T extractFutureBody(CamelContext context, Future<Object> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
try {
if (timeout > 0) {
return doExtractFutureBody(context, future.get(timeout, unit), type);
} else {
return doExtractFutureBody(context, future.get(), type);
}
} catch (InterruptedException e) {
// execution failed due interruption so rethrow the cause
throw ObjectHelper.wrapCamelExecutionException(null, e);
} catch (ExecutionException e) {
// execution failed due to an exception so rethrow the cause
throw ObjectHelper.wrapCamelExecutionException(null, e.getCause());
} finally {
// its harmless to cancel if task is already completed
// and in any case we do not want to get hold of the task a 2nd time
// and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
future.cancel(true);
}
}
private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) {
if (result == null) {
return null;
}
if (type.isAssignableFrom(result.getClass())) {
return type.cast(result);
}
if (result instanceof Exchange) {
Exchange exchange = (Exchange) result;
Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern());
return context.getTypeConverter().convertTo(type, answer);
}
return context.getTypeConverter().convertTo(type, result);
}
/**
* Creates an exception message with the provided details.
* <p/>
* All fields is optional so you can pass in only an exception, or just a message etc. or any combination.
*
* @param message the message
* @param exchange the exchange
* @param cause the caused exception
* @return an error message (without stacktrace from exception)
*/
public static String createExceptionMessage(String message, Exchange exchange, Throwable cause) {
StringBuilder sb = new StringBuilder();
if (message != null) {
sb.append(message);
}
if (exchange != null) {
if (sb.length() > 0) {
sb.append(". ");
}
sb.append(exchange);
}
if (cause != null) {
if (sb.length() > 0) {
sb.append(". ");
}
sb.append("Caused by: [" + cause.getClass().getName() + " - " + cause.getMessage() + "]");
}
return sb.toString().trim();
}
/**
* Strategy to prepare results before next iterator or when we are complete,
* which is done by copying OUT to IN, so there is only an IN as input
* for the next iteration.
*
* @param exchange the exchange to prepare
*/
public static void prepareOutToIn(Exchange exchange) {
// we are routing using pipes and filters so we need to manually copy OUT to IN
if (exchange.hasOut()) {
exchange.getIn().copyFrom(exchange.getOut());
exchange.setOut(null);
}
}
}