blob: 31fad07aedbe2b8104199637178f42392f72b364 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.bean;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.InvalidPayloadException;
import org.apache.camel.Producer;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An {@link java.lang.reflect.InvocationHandler} which invokes a
* message exchange on a camel {@link Endpoint}
*
* @version
*/
public class CamelInvocationHandler implements InvocationHandler {
private static final transient Logger LOG = LoggerFactory.getLogger(CamelInvocationHandler.class);
// use a static thread pool to not create a new thread pool for each invocation
private static ExecutorService executorService;
private final Endpoint endpoint;
private final Producer producer;
private final MethodInfoCache methodInfoCache;
public CamelInvocationHandler(Endpoint endpoint, Producer producer, MethodInfoCache methodInfoCache) {
this.endpoint = endpoint;
this.producer = producer;
this.methodInfoCache = methodInfoCache;
}
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
BeanInvocation invocation = new BeanInvocation(method, args);
MethodInfo methodInfo = methodInfoCache.getMethodInfo(method);
final ExchangePattern pattern = methodInfo != null ? methodInfo.getPattern() : ExchangePattern.InOut;
final Exchange exchange = new DefaultExchange(endpoint, pattern);
exchange.getIn().setBody(invocation);
// is the return type a future
final boolean isFuture = method.getReturnType() == Future.class;
// create task to execute the proxy and gather the reply
FutureTask task = new FutureTask<Object>(new Callable<Object>() {
public Object call() throws Exception {
// process the exchange
LOG.trace("Proxied method call {} invoking producer: {}", method.getName(), producer);
producer.process(exchange);
Object answer = afterInvoke(method, exchange, pattern, isFuture);
LOG.trace("Proxied method call {} returning: {}", method.getName(), answer);
return answer;
}
});
if (isFuture) {
// submit task and return future
if (LOG.isTraceEnabled()) {
LOG.trace("Submitting task for exchange id {}", exchange.getExchangeId());
}
getExecutorService(exchange.getContext()).submit(task);
return task;
} else {
// execute task now
try {
task.run();
return task.get();
} catch (ExecutionException e) {
// we don't want the wrapped exception from JDK
throw e.getCause();
}
}
}
protected Object afterInvoke(Method method, Exchange exchange, ExchangePattern pattern, boolean isFuture) throws Exception {
// check if we had an exception
Throwable cause = exchange.getException();
if (cause != null) {
Throwable found = findSuitableException(cause, method);
if (found != null) {
if (found instanceof Exception) {
throw (Exception) found;
} else {
// wrap as exception
throw new CamelExchangeException("Error processing exchange", exchange, cause);
}
}
// special for runtime camel exceptions as they can be nested
if (cause instanceof RuntimeCamelException) {
// if the inner cause is a runtime exception we can throw it directly
if (cause.getCause() instanceof RuntimeException) {
throw (RuntimeException) ((RuntimeCamelException) cause).getCause();
}
throw (RuntimeCamelException) cause;
}
// okay just throw the exception as is
if (cause instanceof Exception) {
throw (Exception) cause;
} else {
// wrap as exception
throw new CamelExchangeException("Error processing exchange", exchange, cause);
}
}
// do not return a reply if the method is VOID
Class<?> to = method.getReturnType();
if (to == Void.TYPE) {
return null;
}
// use type converter so we can convert output in the desired type defined by the method
// and let it be mandatory so we know wont return null if we cant convert it to the defined type
Object answer;
if (!isFuture) {
answer = getBody(exchange, to);
} else {
// if its a Future then we need to extract the class from the future type so we know
// which class to return the result as
Class<?> returnTo = getGenericType(exchange.getContext(), method.getGenericReturnType());
answer = getBody(exchange, returnTo);
}
return answer;
}
private static Object getBody(Exchange exchange, Class<?> type) throws InvalidPayloadException {
// get the body from the Exchange from either OUT or IN
if (exchange.hasOut()) {
if (exchange.getOut().getBody() != null) {
return exchange.getOut().getMandatoryBody(type);
} else {
return null;
}
} else {
if (exchange.getIn().getBody() != null) {
return exchange.getIn().getMandatoryBody(type);
} else {
return null;
}
}
}
protected static Class getGenericType(CamelContext context, Type type) throws ClassNotFoundException {
if (type == null) {
// fallback and use object
return Object.class;
}
// unfortunately java dont provide a nice api for getting the generic type of the return type
// due type erasure, so we have to gather it based on a String representation
String name = ObjectHelper.between(type.toString(), "<", ">");
if (name != null) {
if (name.contains("<")) {
// we only need the outer type
name = ObjectHelper.before(name, "<");
}
return context.getClassResolver().resolveMandatoryClass(name);
} else {
// fallback and use object
return Object.class;
}
}
/**
* Tries to find the best suited exception to throw.
* <p/>
* It looks in the exception hierarchy from the caused exception and matches this against the declared exceptions
* being thrown on the method.
*
* @param cause the caused exception
* @param method the method
* @return the exception to throw, or <tt>null</tt> if not possible to find a suitable exception
*/
protected Throwable findSuitableException(Throwable cause, Method method) {
if (method.getExceptionTypes() == null || method.getExceptionTypes().length == 0) {
return null;
}
// see if there is any exception which matches the declared exception on the method
for (Class<?> type : method.getExceptionTypes()) {
Object fault = ObjectHelper.getException(type, cause);
if (fault != null) {
return Throwable.class.cast(fault);
}
}
return null;
}
protected static synchronized ExecutorService getExecutorService(CamelContext context) {
// CamelContext will shutdown thread pool when it shutdown so we can lazy create it on demand
// but in case of hot-deploy or the likes we need to be able to re-create it (its a shared static instance)
if (executorService == null || executorService.isTerminated() || executorService.isShutdown()) {
// try to lookup a pool first based on id/profile
executorService = context.getExecutorServiceStrategy().lookup(CamelInvocationHandler.class, "CamelInvocationHandler", "CamelInvocationHandler");
if (executorService == null) {
executorService = context.getExecutorServiceStrategy().newDefaultThreadPool(CamelInvocationHandler.class, "CamelInvocationHandler");
}
}
return executorService;
}
}