| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.support; |
| |
| import java.io.File; |
| import java.io.Serializable; |
| import java.math.BigDecimal; |
| import java.math.BigInteger; |
| import java.util.Date; |
| import java.util.LinkedHashMap; |
| import java.util.Map; |
| |
| import org.apache.camel.Exchange; |
| import org.apache.camel.RuntimeExchangeException; |
| import org.apache.camel.WrappedFile; |
| import org.apache.camel.util.ObjectHelper; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Holder object for sending an exchange over a remote wire as a serialized object. |
| * This is usually configured using the <tt>transferExchange=true</tt> option on the endpoint. |
| * <br/> |
| * <b>Note:</b> Message body of type {@link File} or {@link WrappedFile} is <b>not</b> supported and |
| * a {@link RuntimeExchangeException} is thrown. |
| * <br/> |
| * As opposed to normal usage where only the body part of the exchange is transferred over the wire, |
| * this holder object serializes the following fields over the wire: |
| * <ul> |
| * <li>exchangeId</li> |
| * <li>in body</li> |
| * <li>out body</li> |
| * <li>fault body </li> |
| * <li>exception</li> |
| * </ul> |
| * <br/> |
| * The exchange properties are not propagated by default. However you can specify they should be included |
| * by the {@link DefaultExchangeHolder#marshal(Exchange, boolean)} method. |
| * <br/> |
| * And the following headers is transferred if their values are of primitive types, String or Number based. |
| * <ul> |
| * <li>in headers</li> |
| * <li>out headers</li> |
| * <li>fault headers</li> |
| * </ul> |
| * The body is serialized and stored as serialized bytes. The header and exchange properties only include |
| * primitive, String, and Number types (and Exception types for exchange properties). Any other type is skipped. |
| * Any message body object that is not serializable will be skipped and Camel will log this at <tt>WARN</tt> level. |
| * And any message header values that is not a primitive value will be skipped and Camel will log this at <tt>DEBUG</tt> level. |
| */ |
| public class DefaultExchangeHolder implements Serializable { |
| |
| private static final long serialVersionUID = 2L; |
| private static final Logger LOG = LoggerFactory.getLogger(DefaultExchangeHolder.class); |
| |
| private String exchangeId; |
| private Object inBody; |
| private Object outBody; |
| private Map<String, Object> inHeaders; |
| private Map<String, Object> outHeaders; |
| private Map<String, Object> properties; |
| private Exception exception; |
| |
| /** |
| * Creates a payload object with the information from the given exchange. |
| * |
| * @param exchange the exchange, must <b>not</b> be <tt>null</tt> |
| * @return the holder object with information copied form the exchange |
| */ |
| public static DefaultExchangeHolder marshal(Exchange exchange) { |
| return marshal(exchange, true, false); |
| } |
| |
| /** |
| * Creates a payload object with the information from the given exchange. |
| * |
| * @param exchange the exchange, must <b>not</b> be <tt>null</tt> |
| * @param includeProperties whether or not to include exchange properties |
| * @return the holder object with information copied form the exchange |
| */ |
| public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties) { |
| return marshal(exchange, includeProperties, false, true); |
| } |
| |
| /** |
| * Creates a payload object with the information from the given exchange. |
| * |
| * @param exchange the exchange, must <b>not</b> be <tt>null</tt> |
| * @param includeProperties whether or not to include exchange properties |
| * @param allowSerializedHeaders whether or not to include serialized headers |
| * @return the holder object with information copied form the exchange |
| */ |
| public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties, boolean allowSerializedHeaders) { |
| return marshal(exchange, includeProperties, allowSerializedHeaders, true); |
| } |
| |
| /** |
| * Creates a payload object with the information from the given exchange. |
| * |
| * @param exchange the exchange, must <b>not</b> be <tt>null</tt> |
| * @param includeProperties whether or not to include exchange properties |
| * @param allowSerializedHeaders whether or not to include serialized headers |
| * @param preserveExchangeId whether to preserve exchange id |
| * @return the holder object with information copied form the exchange |
| */ |
| public static DefaultExchangeHolder marshal(Exchange exchange, boolean includeProperties, |
| boolean allowSerializedHeaders, boolean preserveExchangeId) { |
| ObjectHelper.notNull(exchange, "exchange"); |
| |
| // we do not support files |
| Object body = exchange.getIn().getBody(); |
| if (body instanceof WrappedFile || body instanceof File) { |
| throw new RuntimeExchangeException("Message body of type " + body.getClass().getCanonicalName() + " is not supported by this marshaller.", exchange); |
| } |
| |
| DefaultExchangeHolder payload = new DefaultExchangeHolder(); |
| |
| if (preserveExchangeId) { |
| payload.exchangeId = exchange.getExchangeId(); |
| } |
| payload.inBody = checkSerializableBody("in body", exchange, exchange.getIn().getBody()); |
| payload.safeSetInHeaders(exchange, allowSerializedHeaders); |
| if (exchange.hasOut()) { |
| payload.outBody = checkSerializableBody("out body", exchange, exchange.getOut().getBody()); |
| payload.safeSetOutHeaders(exchange, allowSerializedHeaders); |
| } |
| if (includeProperties) { |
| payload.safeSetProperties(exchange, allowSerializedHeaders); |
| } |
| payload.exception = exchange.getException(); |
| |
| return payload; |
| } |
| |
| /** |
| * Transfers the information from the payload to the exchange. |
| * |
| * @param exchange the exchange to set values from the payload, must <b>not</b> be <tt>null</tt> |
| * @param payload the payload with the values, must <b>not</b> be <tt>null</tt> |
| */ |
| public static void unmarshal(Exchange exchange, DefaultExchangeHolder payload) { |
| ObjectHelper.notNull(exchange, "exchange"); |
| ObjectHelper.notNull(payload, "payload"); |
| |
| if (payload.exchangeId != null) { |
| exchange.setExchangeId(payload.exchangeId); |
| } |
| exchange.getIn().setBody(payload.inBody); |
| if (payload.inHeaders != null) { |
| exchange.getIn().setHeaders(payload.inHeaders); |
| } |
| if (payload.outBody != null) { |
| exchange.getOut().setBody(payload.outBody); |
| if (payload.outHeaders != null) { |
| exchange.getOut().setHeaders(payload.outHeaders); |
| } |
| } |
| if (payload.properties != null) { |
| for (String key : payload.properties.keySet()) { |
| exchange.setProperty(key, payload.properties.get(key)); |
| } |
| } |
| exchange.setException(payload.exception); |
| } |
| |
| /** |
| * Adds a property to the payload. |
| * <p/> |
| * This can be done in special situations where additional information must be added which was not provided |
| * from the source. |
| * |
| * @param payload the serialized payload |
| * @param key the property key to add |
| * @param property the property value to add |
| */ |
| public static void addProperty(DefaultExchangeHolder payload, String key, Serializable property) { |
| if (key == null || property == null) { |
| return; |
| } |
| if (payload.properties == null) { |
| payload.properties = new LinkedHashMap<>(); |
| } |
| payload.properties.put(key, property); |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = new StringBuilder("DefaultExchangeHolder[exchangeId=").append(exchangeId); |
| sb.append("inBody=").append(inBody).append(", outBody=").append(outBody); |
| sb.append(", inHeaders=").append(inHeaders).append(", outHeaders=").append(outHeaders); |
| sb.append(", properties=").append(properties).append(", exception=").append(exception); |
| return sb.append(']').toString(); |
| } |
| |
| private Map<String, Object> safeSetInHeaders(Exchange exchange, boolean allowSerializedHeaders) { |
| if (exchange.getIn().hasHeaders()) { |
| Map<String, Object> map = checkValidHeaderObjects("in headers", exchange, exchange.getIn().getHeaders(), allowSerializedHeaders); |
| if (map != null && !map.isEmpty()) { |
| inHeaders = new LinkedHashMap<>(map); |
| } |
| } |
| return null; |
| } |
| |
| @Deprecated |
| private Map<String, Object> safeSetOutHeaders(Exchange exchange, boolean allowSerializedHeaders) { |
| if (exchange.hasOut() && exchange.getOut().hasHeaders()) { |
| Map<String, Object> map = checkValidHeaderObjects("out headers", exchange, exchange.getOut().getHeaders(), allowSerializedHeaders); |
| if (map != null && !map.isEmpty()) { |
| outHeaders = new LinkedHashMap<>(map); |
| } |
| } |
| return null; |
| } |
| |
| private Map<String, Object> safeSetProperties(Exchange exchange, boolean allowSerializedHeaders) { |
| if (exchange.hasProperties()) { |
| Map<String, Object> map = checkValidExchangePropertyObjects("properties", exchange, exchange.getProperties(), allowSerializedHeaders); |
| if (map != null && !map.isEmpty()) { |
| properties = new LinkedHashMap<>(map); |
| } |
| } |
| return null; |
| } |
| |
| private static Object checkSerializableBody(String type, Exchange exchange, Object object) { |
| if (object == null) { |
| return null; |
| } |
| |
| Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, object); |
| if (converted != null) { |
| return converted; |
| } else { |
| LOG.warn("Exchange {} containing object: {} of type: {} cannot be serialized, it will be excluded by the holder.", type, object, object.getClass().getCanonicalName()); |
| return null; |
| } |
| } |
| |
| private static Map<String, Object> checkValidHeaderObjects(String type, Exchange exchange, Map<String, Object> map, boolean allowSerializedHeaders) { |
| if (map == null) { |
| return null; |
| } |
| |
| Map<String, Object> result = new LinkedHashMap<>(); |
| for (Map.Entry<String, Object> entry : map.entrySet()) { |
| |
| // silently skip any values which is null |
| if (entry.getValue() == null) { |
| continue; |
| } |
| |
| Object value = getValidHeaderValue(entry.getKey(), entry.getValue(), allowSerializedHeaders); |
| if (value != null) { |
| Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, value); |
| if (converted != null) { |
| result.put(entry.getKey(), converted); |
| } else { |
| logCannotSerializeObject(type, entry.getKey(), entry.getValue()); |
| } |
| } else { |
| logInvalidHeaderValue(type, entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| return result; |
| } |
| |
| private static Map<String, Object> checkValidExchangePropertyObjects(String type, Exchange exchange, Map<String, Object> map, boolean allowSerializedHeaders) { |
| if (map == null) { |
| return null; |
| } |
| |
| Map<String, Object> result = new LinkedHashMap<>(); |
| for (Map.Entry<String, Object> entry : map.entrySet()) { |
| |
| // silently skip any values which is null |
| if (entry.getValue() == null) { |
| continue; |
| } |
| |
| Object value = getValidExchangePropertyValue(entry.getKey(), entry.getValue(), allowSerializedHeaders); |
| if (value != null) { |
| Serializable converted = exchange.getContext().getTypeConverter().convertTo(Serializable.class, exchange, value); |
| if (converted != null) { |
| result.put(entry.getKey(), converted); |
| } else { |
| logCannotSerializeObject(type, entry.getKey(), entry.getValue()); |
| } |
| } else { |
| logInvalidExchangePropertyValue(type, entry.getKey(), entry.getValue()); |
| } |
| } |
| |
| return result; |
| } |
| |
| /** |
| * We only want to store header values of primitive and String related types. |
| * <p/> |
| * This default implementation will allow: |
| * <ul> |
| * <li>any primitives and their counter Objects (Integer, Double etc.)</li> |
| * <li>String and any other literals, Character, CharSequence</li> |
| * <li>Boolean</li> |
| * <li>Number</li> |
| * <li>java.util.Date</li> |
| * </ul> |
| * |
| * We make possible store serialized headers by the boolean field allowSerializedHeaders |
| * |
| * @param headerName the header name |
| * @param headerValue the header value |
| * @param allowSerializedHeaders the header value |
| * @return the value to use, <tt>null</tt> to ignore this header |
| */ |
| protected static Object getValidHeaderValue(String headerName, Object headerValue, boolean allowSerializedHeaders) { |
| if (headerValue instanceof String) { |
| return headerValue; |
| } else if (headerValue instanceof BigInteger) { |
| return headerValue; |
| } else if (headerValue instanceof BigDecimal) { |
| return headerValue; |
| } else if (headerValue instanceof Number) { |
| return headerValue; |
| } else if (headerValue instanceof Character) { |
| return headerValue; |
| } else if (headerValue instanceof CharSequence) { |
| return headerValue.toString(); |
| } else if (headerValue instanceof Boolean) { |
| return headerValue; |
| } else if (headerValue instanceof Date) { |
| return headerValue; |
| } else if (allowSerializedHeaders) { |
| if (headerValue instanceof Serializable) { |
| return headerValue; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * We only want to store exchange property values of primitive and String related types, and |
| * as well any caught exception that Camel routing engine has caught. |
| * <p/> |
| * This default implementation will allow the same values as {@link #getValidHeaderValue(String, Object, boolean)} |
| * and in addition any value of type {@link Throwable}. |
| * |
| * @param propertyName the property name |
| * @param propertyValue the property value |
| * @return the value to use, <tt>null</tt> to ignore this header |
| */ |
| protected static Object getValidExchangePropertyValue(String propertyName, Object propertyValue, boolean allowSerializedHeaders) { |
| // for exchange properties we also allow exception to be transferred so people can store caught exception |
| if (propertyValue instanceof Throwable) { |
| return propertyValue; |
| } |
| return getValidHeaderValue(propertyName, propertyValue, allowSerializedHeaders); |
| } |
| |
| private static void logCannotSerializeObject(String type, String key, Object value) { |
| if (key.startsWith("Camel")) { |
| // log Camel at DEBUG level |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Exchange {} containing key: {} with object: {} of type: {} cannot be serialized, it will be excluded by the holder.", |
| new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)}); |
| } |
| } else { |
| // log regular at WARN level |
| LOG.warn("Exchange {} containing key: {} with object: {} of type: {} cannot be serialized, it will be excluded by the holder.", |
| new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)}); |
| } |
| } |
| |
| private static void logInvalidHeaderValue(String type, String key, Object value) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Exchange {} containing key: {} with object: {} of type: {} is not valid header type, it will be excluded by the holder.", |
| new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)}); |
| } |
| } |
| |
| private static void logInvalidExchangePropertyValue(String type, String key, Object value) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Exchange {} containing key: {} with object: {} of type: {} is not valid exchange property type, it will be excluded by the holder.", |
| new Object[]{type, key, value, ObjectHelper.classCanonicalName(value)}); |
| } |
| } |
| |
| } |