blob: 52568cb6a04f4274aeca72af6dbfb4468a348234 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.support;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.ReadableByteChannel;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExecutionException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Message;
import org.apache.camel.MessageHistory;
import org.apache.camel.NoSuchBeanException;
import org.apache.camel.NoSuchEndpointException;
import org.apache.camel.NoSuchHeaderException;
import org.apache.camel.NoSuchPropertyException;
import org.apache.camel.NoTypeConversionAvailableException;
import org.apache.camel.Route;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.TypeConversionException;
import org.apache.camel.TypeConverter;
import org.apache.camel.WrappedFile;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.spi.UnitOfWork;
import org.apache.camel.util.IOHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.Scanner;
import org.apache.camel.util.StringHelper;
/**
* Some helper methods for working with {@link Exchange} objects
*/
public final class ExchangeHelper {
/**
* Utility classes should not have a public constructor.
*/
private ExchangeHelper() {
}
/**
* Extracts the Exchange.BINDING of the given type or null if not present
*
* @param exchange the message exchange
* @param type the expected binding type
* @return the binding object of the given type or null if it could not be found or converted
*/
public static <T> T getBinding(Exchange exchange, Class<T> type) {
return exchange != null ? exchange.getProperty(Exchange.BINDING, type) : null;
}
/**
* Attempts to resolve the endpoint for the given value
*
* @param exchange the message exchange being processed
* @param value the value which can be an {@link Endpoint} or an object
* which provides a String representation of an endpoint via
* {@link #toString()}
* @return the endpoint
* @throws NoSuchEndpointException if the endpoint cannot be resolved
*/
public static Endpoint resolveEndpoint(Exchange exchange, Object value) throws NoSuchEndpointException {
Endpoint endpoint;
if (value instanceof Endpoint) {
endpoint = (Endpoint) value;
} else {
String uri = value.toString().trim();
endpoint = CamelContextHelper.getMandatoryEndpoint(exchange.getContext(), uri);
}
return endpoint;
}
/**
* Gets the mandatory property of the exchange of the correct type
*
* @param exchange the exchange
* @param propertyName the property name
* @param type the type
* @return the property value
* @throws TypeConversionException is thrown if error during type conversion
* @throws NoSuchPropertyException is thrown if no property exists
*/
public static <T> T getMandatoryProperty(Exchange exchange, String propertyName, Class<T> type) throws NoSuchPropertyException {
T result = exchange.getProperty(propertyName, type);
if (result != null) {
return result;
}
throw new NoSuchPropertyException(exchange, propertyName, type);
}
/**
* Gets the mandatory inbound header of the correct type
*
* @param exchange the exchange
* @param headerName the header name
* @param type the type
* @return the header value
* @throws TypeConversionException is thrown if error during type conversion
* @throws NoSuchHeaderException is thrown if no headers exists
*/
public static <T> T getMandatoryHeader(Exchange exchange, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException {
T answer = exchange.getIn().getHeader(headerName, type);
if (answer == null) {
throw new NoSuchHeaderException(exchange, headerName, type);
}
return answer;
}
/**
* Gets the mandatory inbound header of the correct type
*
* @param message the message
* @param headerName the header name
* @param type the type
* @return the header value
* @throws TypeConversionException is thrown if error during type conversion
* @throws NoSuchHeaderException is thrown if no headers exists
*/
public static <T> T getMandatoryHeader(Message message, String headerName, Class<T> type) throws TypeConversionException, NoSuchHeaderException {
T answer = message.getHeader(headerName, type);
if (answer == null) {
throw new NoSuchHeaderException(message.getExchange(), headerName, type);
}
return answer;
}
/**
* Gets an header or property of the correct type
*
* @param exchange the exchange
* @param name the name of the header or the property
* @param type the type
* @return the header or property value
* @throws TypeConversionException is thrown if error during type conversion
* @throws NoSuchHeaderException is thrown if no headers exists
*/
public static <T> T getHeaderOrProperty(Exchange exchange, String name, Class<T> type) throws TypeConversionException {
T answer = exchange.getIn().getHeader(name, type);
if (answer == null) {
answer = exchange.getProperty(name, type);
}
return answer;
}
/**
* Converts the value to the given expected type or throws an exception
*
* @return the converted value
* @throws TypeConversionException is thrown if error during type conversion
* @throws NoTypeConversionAvailableException} if no type converters exists to convert to the given type
*/
public static <T> T convertToMandatoryType(Exchange exchange, Class<T> type, Object value)
throws TypeConversionException, NoTypeConversionAvailableException {
CamelContext camelContext = exchange.getContext();
ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
TypeConverter converter = camelContext.getTypeConverter();
if (converter != null) {
return converter.mandatoryConvertTo(type, exchange, value);
}
throw new NoTypeConversionAvailableException(value, type);
}
/**
* Converts the value to the given expected type
*
* @return the converted value
* @throws org.apache.camel.TypeConversionException is thrown if error during type conversion
*/
public static <T> T convertToType(Exchange exchange, Class<T> type, Object value) throws TypeConversionException {
CamelContext camelContext = exchange.getContext();
ObjectHelper.notNull(camelContext, "CamelContext of Exchange");
TypeConverter converter = camelContext.getTypeConverter();
if (converter != null) {
return converter.convertTo(type, exchange, value);
}
return null;
}
/**
* Creates a new instance and copies from the current message exchange so that it can be
* forwarded to another destination as a new instance. Unlike regular copy this operation
* will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
* for async messaging, where the original and copied exchange are independent.
*
* @param exchange original copy of the exchange
* @param handover whether the on completion callbacks should be handed over to the new copy.
*/
public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover) {
return createCorrelatedCopy(exchange, handover, false);
}
/**
* Creates a new instance and copies from the current message exchange so that it can be
* forwarded to another destination as a new instance. Unlike regular copy this operation
* will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
* for async messaging, where the original and copied exchange are independent.
*
* @param exchange original copy of the exchange
* @param handover whether the on completion callbacks should be handed over to the new copy.
* @param useSameMessageId whether to use same message id on the copy message.
*/
public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) {
return createCorrelatedCopy(exchange, handover, useSameMessageId, null);
}
/**
* Creates a new instance and copies from the current message exchange so that it can be
* forwarded to another destination as a new instance. Unlike regular copy this operation
* will not share the same {@link org.apache.camel.spi.UnitOfWork} so its should be used
* for async messaging, where the original and copied exchange are independent.
*
* @param exchange original copy of the exchange
* @param handover whether the on completion callbacks should be handed over to the new copy.
* @param useSameMessageId whether to use same message id on the copy message.
* @param filter whether to handover the on completion
*/
public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId, Predicate<Synchronization> filter) {
String id = exchange.getExchangeId();
// make sure to do a safe copy as the correlated copy can be routed independently of the source.
Exchange copy = exchange.copy();
// do not reuse message id on copy
if (!useSameMessageId) {
if (copy.hasOut()) {
copy.getOut().setMessageId(null);
}
copy.getIn().setMessageId(null);
}
// do not share the unit of work
copy.setUnitOfWork(null);
// do not reuse the message id
// hand over on completion to the copy if we got any
UnitOfWork uow = exchange.getUnitOfWork();
if (handover && uow != null) {
uow.handoverSynchronization(copy, filter);
}
// set a correlation id so we can track back the original exchange
copy.setProperty(Exchange.CORRELATION_ID, id);
return copy;
}
/**
* Creates a new instance and copies from the current message exchange so that it can be
* forwarded to another destination as a new instance.
*
* @param exchange original copy of the exchange
* @param preserveExchangeId whether or not the exchange id should be preserved
* @return the copy
*/
public static Exchange createCopy(Exchange exchange, boolean preserveExchangeId) {
Exchange copy = exchange.copy();
if (preserveExchangeId) {
// must preserve exchange id
copy.setExchangeId(exchange.getExchangeId());
}
return copy;
}
/**
* Copies the results of a message exchange from the source exchange to the result exchange
* which will copy the message contents, exchange properties and the exception.
* Notice the {@link ExchangePattern} is <b>not</b> copied/altered.
*
* @param target the target exchange which will have the output and error state added (result)
* @param source the source exchange which is not modified
*/
public static void copyResults(Exchange target, Exchange source) {
doCopyResults(target, source, false);
}
/**
* Copies the <code>source</code> exchange to <code>target</code> exchange
* preserving the {@link ExchangePattern} of <code>target</code>.
*
* @param target the target exchange which will have the output and error state added (result)
* @param source source exchange.
*/
public static void copyResultsPreservePattern(Exchange target, Exchange source) {
doCopyResults(target, source, true);
}
private static void doCopyResults(Exchange result, Exchange source, boolean preserverPattern) {
if (result == source) {
// we just need to ensure MEP is as expected (eg copy result to OUT if out capable)
// and the result is not failed
if (result.getPattern() == ExchangePattern.InOptionalOut) {
// keep as is
} else if (result.getPattern().isOutCapable() && !result.hasOut() && !result.isFailed()) {
// copy IN to OUT as we expect a OUT response
result.getOut().copyFrom(source.getIn());
}
return;
}
if (source.hasOut()) {
if (preserverPattern) {
// exchange pattern sensitive
Message resultMessage = getResultMessage(result);
resultMessage.copyFrom(source.getOut());
} else {
result.getOut().copyFrom(source.getOut());
}
} else if (result.getPattern() == ExchangePattern.InOptionalOut) {
// special case where the result is InOptionalOut and with no OUT response
// so we should return null to indicate this fact
result.setOut(null);
} else {
// no results so lets copy the last input
// as the final processor on a pipeline might not
// have created any OUT; such as a mock:endpoint
// so lets assume the last IN is the OUT
if (!preserverPattern && result.getPattern().isOutCapable()) {
// only set OUT if its OUT capable
result.getOut().copyFrom(source.getIn());
} else {
// if not replace IN instead to keep the MEP
result.getIn().copyFrom(source.getIn());
// clear any existing OUT as the result is on the IN
if (result.hasOut()) {
result.setOut(null);
}
}
}
if (source.hasProperties()) {
result.getProperties().putAll(source.getProperties());
}
result.setException(source.getException());
}
/**
* Returns the message where to write results in an
* exchange-pattern-sensitive way.
*
* @param exchange message exchange.
* @return result message.
*/
public static Message getResultMessage(Exchange exchange) {
if (exchange.getPattern().isOutCapable()) {
return exchange.getOut();
} else {
return exchange.getIn();
}
}
/**
* Returns true if the given exchange pattern (if defined) can support OUT messages
*
* @param exchange the exchange to interrogate
* @return true if the exchange is defined as an {@link ExchangePattern} which supports
* OUT messages
*/
public static boolean isOutCapable(Exchange exchange) {
ExchangePattern pattern = exchange.getPattern();
return pattern != null && pattern.isOutCapable();
}
/**
* Creates a new instance of the given type from the injector
*
* @param exchange the exchange
* @param type the given type
* @return the created instance of the given type
*/
public static <T> T newInstance(Exchange exchange, Class<T> type) {
return exchange.getContext().getInjector().newInstance(type);
}
/**
* Creates a Map of the variables which are made available to a script or template
*
* @param exchange the exchange to make available
* @return a Map populated with the require variables
*/
public static Map<String, Object> createVariableMap(Exchange exchange) {
Map<String, Object> answer = new HashMap<>();
populateVariableMap(exchange, answer);
return answer;
}
/**
* Populates the Map with the variables which are made available to a script or template
*
* @param exchange the exchange to make available
* @param map the map to populate
*/
public static void populateVariableMap(Exchange exchange, Map<String, Object> map) {
map.put("exchange", exchange);
Message in = exchange.getIn();
map.put("in", in);
map.put("request", in);
map.put("headers", in.getHeaders());
map.put("body", in.getBody());
if (isOutCapable(exchange)) {
// if we are out capable then set out and response as well
// however only grab OUT if it exists, otherwise reuse IN
// this prevents side effects to alter the Exchange if we force creating an OUT message
Message msg = exchange.getMessage();
map.put("out", msg);
map.put("response", msg);
}
map.put("camelContext", exchange.getContext());
}
/**
* Returns the MIME content type on the input message or null if one is not defined
*
* @param exchange the exchange
* @return the MIME content type
*/
public static String getContentType(Exchange exchange) {
return MessageHelper.getContentType(exchange.getIn());
}
/**
* Returns the MIME content encoding on the input message or null if one is not defined
*
* @param exchange the exchange
* @return the MIME content encoding
*/
public static String getContentEncoding(Exchange exchange) {
return MessageHelper.getContentEncoding(exchange.getIn());
}
/**
* Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
*
* @param exchange the exchange
* @param name the bean name
* @return the bean
* @throws NoSuchBeanException if no bean could be found in the registry
*/
public static Object lookupMandatoryBean(Exchange exchange, String name) throws NoSuchBeanException {
Object value = lookupBean(exchange, name);
if (value == null) {
throw new NoSuchBeanException(name);
}
return value;
}
/**
* Performs a lookup in the registry of the mandatory bean name and throws an exception if it could not be found
*
* @param exchange the exchange
* @param name the bean name
* @param type the expected bean type
* @return the bean
* @throws NoSuchBeanException if no bean could be found in the registry
*/
public static <T> T lookupMandatoryBean(Exchange exchange, String name, Class<T> type) {
T value = lookupBean(exchange, name, type);
if (value == null) {
throw new NoSuchBeanException(name);
}
return value;
}
/**
* Performs a lookup in the registry of the bean name
*
* @param exchange the exchange
* @param name the bean name
* @return the bean, or <tt>null</tt> if no bean could be found
*/
public static Object lookupBean(Exchange exchange, String name) {
return exchange.getContext().getRegistry().lookupByName(name);
}
/**
* Performs a lookup in the registry of the bean name and type
*
* @param exchange the exchange
* @param name the bean name
* @param type the expected bean type
* @return the bean, or <tt>null</tt> if no bean could be found
*/
public static <T> T lookupBean(Exchange exchange, String name, Class<T> type) {
return exchange.getContext().getRegistry().lookupByNameAndType(name, type);
}
/**
* Returns the first exchange in the given collection of exchanges which has the same exchange ID as the one given
* or null if none could be found
*
* @param exchanges the exchanges
* @param exchangeId the exchangeId to find
* @return matching exchange, or <tt>null</tt> if none found
*/
public static Exchange getExchangeById(Iterable<Exchange> exchanges, String exchangeId) {
for (Exchange exchange : exchanges) {
String id = exchange.getExchangeId();
if (id != null && id.equals(exchangeId)) {
return exchange;
}
}
return null;
}
/**
* Prepares the exchanges for aggregation.
* <p/>
* This implementation will copy the OUT body to the IN body so when you do
* aggregation the body is <b>only</b> in the IN body to avoid confusing end users.
*
* @param oldExchange the old exchange
* @param newExchange the new exchange
*/
public static void prepareAggregation(Exchange oldExchange, Exchange newExchange) {
// move body/header from OUT to IN
if (oldExchange != null) {
if (oldExchange.hasOut()) {
oldExchange.setIn(oldExchange.getOut());
oldExchange.setOut(null);
}
}
if (newExchange != null) {
if (newExchange.hasOut()) {
newExchange.setIn(newExchange.getOut());
newExchange.setOut(null);
}
}
}
/**
* Checks whether the exchange has been failure handed
*
* @param exchange the exchange
* @return <tt>true</tt> if failure handled, <tt>false</tt> otherwise
*/
public static boolean isFailureHandled(Exchange exchange) {
return exchange.getProperty(Exchange.FAILURE_HANDLED, false, Boolean.class);
}
/**
* Checks whether the exchange {@link UnitOfWork} is exhausted
*
* @param exchange the exchange
* @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
*/
public static boolean isUnitOfWorkExhausted(Exchange exchange) {
return exchange.getProperty(Exchange.UNIT_OF_WORK_EXHAUSTED, false, Boolean.class);
}
/**
* Sets the exchange to be failure handled.
*
* @param exchange the exchange
*/
public static void setFailureHandled(Exchange exchange) {
exchange.setProperty(Exchange.FAILURE_HANDLED, Boolean.TRUE);
// clear exception since its failure handled
exchange.setException(null);
}
/**
* Checks whether the exchange is redelivery exhausted
*
* @param exchange the exchange
* @return <tt>true</tt> if exhausted, <tt>false</tt> otherwise
*/
public static boolean isRedeliveryExhausted(Exchange exchange) {
return exchange.getProperty(Exchange.REDELIVERY_EXHAUSTED, false, Boolean.class);
}
/**
* Checks whether the exchange {@link UnitOfWork} is redelivered
*
* @param exchange the exchange
* @return <tt>true</tt> if redelivered, <tt>false</tt> otherwise
*/
public static boolean isRedelivered(Exchange exchange) {
return exchange.getIn().hasHeaders() && exchange.getIn().getHeader(Exchange.REDELIVERED, false, Boolean.class);
}
/**
* Checks whether the exchange {@link UnitOfWork} has been interrupted during processing
*
* @param exchange the exchange
* @return <tt>true</tt> if interrupted, <tt>false</tt> otherwise
*/
public static boolean isInterrupted(Exchange exchange) {
Object value = exchange.getProperty(Exchange.INTERRUPTED);
return value != null && Boolean.TRUE == value;
}
/**
* Check whether or not stream caching is enabled for the given route or globally.
*
* @param exchange the exchange
* @return <tt>true</tt> if enabled, <tt>false</tt> otherwise
*/
public static boolean isStreamCachingEnabled(final Exchange exchange) {
Route route = exchange.getContext().getRoute(exchange.getFromRouteId());
if (route != null) {
return route.getRouteContext().isStreamCaching();
} else {
return exchange.getContext().getStreamCachingStrategy().isEnabled();
}
}
/**
* Extracts the body from the given exchange.
* <p/>
* If the exchange pattern is provided it will try to honor it and retrieve the body
* from either IN or OUT according to the pattern.
*
* @param exchange the exchange
* @param pattern exchange pattern if given, can be <tt>null</tt>
* @return the result body, can be <tt>null</tt>.
* @throws CamelExecutionException is thrown if the processing of the exchange failed
*/
public static Object extractResultBody(Exchange exchange, ExchangePattern pattern) {
Object answer = null;
if (exchange != null) {
// rethrow if there was an exception during execution
if (exchange.getException() != null) {
throw CamelExecutionException.wrapCamelExecutionException(exchange, exchange.getException());
}
// okay no fault then return the response according to the pattern
// try to honor pattern if provided
boolean notOut = pattern != null && !pattern.isOutCapable();
boolean hasOut = exchange.hasOut();
if (hasOut && !notOut) {
// we have a response in out and the pattern is out capable
answer = exchange.getOut().getBody();
} else if (!hasOut && exchange.getPattern() == ExchangePattern.InOptionalOut) {
// special case where the result is InOptionalOut and with no OUT response
// so we should return null to indicate this fact
answer = null;
} else {
// use IN as the response
answer = exchange.getIn().getBody();
}
// in a very seldom situation then getBody can cause an exception to be set on the exchange
// rethrow if there was an exception during execution
if (exchange.getException() != null) {
throw CamelExecutionException.wrapCamelExecutionException(exchange, exchange.getException());
}
}
return answer;
}
/**
* Tests whether the exchange has already been handled by the error handler
*
* @param exchange the exchange
* @return <tt>true</tt> if handled already by error handler, <tt>false</tt> otherwise
*/
public static boolean hasExceptionBeenHandledByErrorHandler(Exchange exchange) {
return Boolean.TRUE.equals(exchange.getProperty(Exchange.ERRORHANDLER_HANDLED));
}
/**
* Extracts the body from the given future, that represents a handle to an asynchronous exchange.
* <p/>
* Will wait until the future task is complete.
*
* @param context the camel context
* @param future the future handle
* @param type the expected body response type
* @return the result body, can be <tt>null</tt>.
* @throws CamelExecutionException is thrown if the processing of the exchange failed
*/
public static <T> T extractFutureBody(CamelContext context, Future<?> future, Class<T> type) {
try {
return doExtractFutureBody(context, future.get(), type);
} catch (InterruptedException e) {
throw RuntimeCamelException.wrapRuntimeCamelException(e);
} catch (ExecutionException e) {
// execution failed due to an exception so rethrow the cause
throw CamelExecutionException.wrapCamelExecutionException(null, e.getCause());
} finally {
// its harmless to cancel if task is already completed
// and in any case we do not want to get hold of the task a 2nd time
// and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
future.cancel(true);
}
}
/**
* Extracts the body from the given future, that represents a handle to an asynchronous exchange.
* <p/>
* Will wait for the future task to complete, but waiting at most the timeout value.
*
* @param context the camel context
* @param future the future handle
* @param timeout timeout value
* @param unit timeout unit
* @param type the expected body response type
* @return the result body, can be <tt>null</tt>.
* @throws CamelExecutionException is thrown if the processing of the exchange failed
* @throws java.util.concurrent.TimeoutException is thrown if a timeout triggered
*/
public static <T> T extractFutureBody(CamelContext context, Future<?> future, long timeout, TimeUnit unit, Class<T> type) throws TimeoutException {
try {
if (timeout > 0) {
return doExtractFutureBody(context, future.get(timeout, unit), type);
} else {
return doExtractFutureBody(context, future.get(), type);
}
} catch (InterruptedException e) {
// execution failed due interruption so rethrow the cause
throw CamelExecutionException.wrapCamelExecutionException(null, e);
} catch (ExecutionException e) {
// execution failed due to an exception so rethrow the cause
throw CamelExecutionException.wrapCamelExecutionException(null, e.getCause());
} finally {
// its harmless to cancel if task is already completed
// and in any case we do not want to get hold of the task a 2nd time
// and its recommended to cancel according to Brian Goetz in his Java Concurrency in Practice book
future.cancel(true);
}
}
private static <T> T doExtractFutureBody(CamelContext context, Object result, Class<T> type) {
if (result == null) {
return null;
}
if (type.isAssignableFrom(result.getClass())) {
return type.cast(result);
}
if (result instanceof Exchange) {
Exchange exchange = (Exchange) result;
Object answer = ExchangeHelper.extractResultBody(exchange, exchange.getPattern());
return context.getTypeConverter().convertTo(type, exchange, answer);
}
return context.getTypeConverter().convertTo(type, result);
}
/**
* Strategy to prepare results before next iterator or when we are complete,
* which is done by copying OUT to IN, so there is only an IN as input
* for the next iteration.
*
* @param exchange the exchange to prepare
*/
public static void prepareOutToIn(Exchange exchange) {
// we are routing using pipes and filters so we need to manually copy OUT to IN
if (exchange.hasOut()) {
exchange.setIn(exchange.getOut());
exchange.setOut(null);
}
}
/**
* Gets both the messageId and exchangeId to be used for logging purposes.
* <p/>
* Logging both ids, can help to correlate exchanges which may be redelivered messages
* from for example a JMS broker.
*
* @param exchange the exchange
* @return a log message with both the messageId and exchangeId
*/
public static String logIds(Exchange exchange) {
String msgId = exchange.getMessage().getMessageId();
return "(MessageId: " + msgId + " on ExchangeId: " + exchange.getExchangeId() + ")";
}
/**
* Copies the exchange but the copy will be tied to the given context
*
* @param exchange the source exchange
* @param context the camel context
* @return a copy with the given camel context
*/
public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context) {
return copyExchangeAndSetCamelContext(exchange, context, true);
}
/**
* Copies the exchange but the copy will be tied to the given context
*
* @param exchange the source exchange
* @param context the camel context
* @param handover whether to handover on completions from the source to the copy
* @return a copy with the given camel context
*/
public static Exchange copyExchangeAndSetCamelContext(Exchange exchange, CamelContext context, boolean handover) {
DefaultExchange answer = new DefaultExchange(context, exchange.getPattern());
if (exchange.hasProperties()) {
answer.setProperties(safeCopyProperties(exchange.getProperties()));
}
if (handover) {
// Need to hand over the completion for async invocation
exchange.handoverCompletions(answer);
}
answer.setIn(exchange.getIn().copy());
if (exchange.hasOut()) {
answer.setOut(exchange.getOut().copy());
}
answer.setException(exchange.getException());
return answer;
}
/**
* Replaces the existing message with the new message
*
* @param exchange the exchange
* @param newMessage the new message
* @param outOnly whether to replace the message as OUT message
*/
public static void replaceMessage(Exchange exchange, Message newMessage, boolean outOnly) {
Message old = exchange.getMessage();
if (outOnly || exchange.hasOut()) {
exchange.setOut(newMessage);
} else {
exchange.setIn(newMessage);
}
// need to de-reference old from the exchange so it can be GC
if (old instanceof MessageSupport) {
((MessageSupport) old).setExchange(null);
}
}
/**
* Gets the original IN {@link Message} this Unit of Work was started with.
* <p/>
* The original message is only returned if the option {@link org.apache.camel.RuntimeConfiguration#isAllowUseOriginalMessage()}
* is enabled. If its disabled, then <tt>null</tt> is returned.
*
* @return the original IN {@link Message}, or <tt>null</tt> if using original message is disabled.
*/
public static Message getOriginalInMessage(Exchange exchange) {
Message answer = null;
// try parent first
UnitOfWork uow = exchange.getProperty(Exchange.PARENT_UNIT_OF_WORK, UnitOfWork.class);
if (uow != null) {
answer = uow.getOriginalInMessage();
}
// fallback to the current exchange
if (answer == null) {
uow = exchange.getUnitOfWork();
if (uow != null) {
answer = uow.getOriginalInMessage();
}
}
return answer;
}
/**
* Resolve the component scheme (aka name) from the given endpoint uri
*
* @param uri the endpoint uri
* @return the component scheme (name), or <tt>null</tt> if not possible to resolve
*/
public static String resolveScheme(String uri) {
String scheme = null;
if (uri != null) {
// Use the URI prefix to find the component.
String[] splitURI = StringHelper.splitOnCharacter(uri, ":", 2);
if (splitURI[1] != null) {
scheme = splitURI[0];
}
}
return scheme;
}
@SuppressWarnings("unchecked")
private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) {
if (properties == null) {
return null;
}
Map<String, Object> answer = new HashMap<>(properties);
// safe copy message history using a defensive copy
List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY);
if (history != null) {
answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history));
}
return answer;
}
/**
* @see #getCharsetName(Exchange, boolean)
*/
public static String getCharsetName(Exchange exchange) {
return getCharsetName(exchange, true);
}
/**
* Gets the charset name if set as header or property {@link Exchange#CHARSET_NAME}.
* <b>Notice:</b> The lookup from the header has priority over the property.
*
* @param exchange the exchange
* @param useDefault should we fallback and use JVM default charset if no property existed?
* @return the charset, or <tt>null</tt> if no found
*/
public static String getCharsetName(Exchange exchange, boolean useDefault) {
if (exchange != null) {
// header takes precedence
String charsetName = exchange.getIn().getHeader(Exchange.CHARSET_NAME, String.class);
if (charsetName == null) {
charsetName = exchange.getProperty(Exchange.CHARSET_NAME, String.class);
}
if (charsetName != null) {
return IOHelper.normalizeCharset(charsetName);
}
}
if (useDefault) {
return getDefaultCharsetName();
} else {
return null;
}
}
private static String getDefaultCharsetName() {
return ObjectHelper.getSystemProperty(Exchange.DEFAULT_CHARSET_PROPERTY, "UTF-8");
}
/**
* Creates a {@link Scanner} for scanning the given value.
*
* @param exchange the current exchange
* @param value the value, typically the message IN body
* @param delimiter the delimiter pattern to use
* @return the scanner, is newer <tt>null</tt>
*/
public static Scanner getScanner(Exchange exchange, Object value, String delimiter) {
if (value instanceof WrappedFile) {
WrappedFile<?> gf = (WrappedFile<?>) value;
Object body = gf.getBody();
if (body != null) {
// we have loaded the file content into the body so use that
value = body;
} else {
// generic file is just a wrapper for the real file so call again with the real file
return getScanner(exchange, gf.getFile(), delimiter);
}
}
Scanner scanner;
if (value instanceof Readable) {
scanner = new Scanner((Readable) value, delimiter);
} else if (value instanceof String) {
scanner = new Scanner((String) value, delimiter);
} else {
String charset = exchange.getProperty(Exchange.CHARSET_NAME, String.class);
if (value instanceof File) {
try {
scanner = new Scanner((File) value, charset, delimiter);
} catch (IOException e) {
throw new RuntimeCamelException(e);
}
} else if (value instanceof InputStream) {
scanner = new Scanner((InputStream) value, charset, delimiter);
} else if (value instanceof ReadableByteChannel) {
scanner = new Scanner((ReadableByteChannel) value, charset, delimiter);
} else {
// value is not a suitable type, try to convert value to a string
String text = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value);
scanner = new Scanner(text, delimiter);
}
}
return scanner;
}
}