blob: 960268ea97073c31f924011d9d31799c3eade701 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.impl;
import java.lang.reflect.Method;
import javax.xml.bind.annotation.XmlTransient;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Consume;
import org.apache.camel.Consumer;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
import org.apache.camel.IsSingleton;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.ProxyInstantiationException;
import org.apache.camel.Service;
import org.apache.camel.component.bean.BeanProcessor;
import org.apache.camel.component.bean.ProxyHelper;
import org.apache.camel.processor.UnitOfWorkProcessor;
import org.apache.camel.processor.UnitOfWorkProducer;
import org.apache.camel.util.CamelContextHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A helper class for Camel based injector or post processing hooks which can be reused by
* both the <a href="http://camel.apache.org/spring.html">Spring</a>,
* <a href="http://camel.apache.org/guice.html">Guice</a> and
* <a href="http://camel.apache.org/blueprint.html">Blueprint</a>support.
*
* @version
*/
public class CamelPostProcessorHelper implements CamelContextAware {
private static final transient Logger LOG = LoggerFactory.getLogger(CamelPostProcessorHelper.class);
@XmlTransient
private CamelContext camelContext;
public CamelPostProcessorHelper() {
}
public CamelPostProcessorHelper(CamelContext camelContext) {
this.setCamelContext(camelContext);
}
public CamelContext getCamelContext() {
return camelContext;
}
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}
/**
* Does the given context match this camel context
*/
public boolean matchContext(String context) {
if (ObjectHelper.isNotEmpty(context)) {
if (!getCamelContext().getName().equals(context)) {
return false;
}
}
return true;
}
public void consumerInjection(Method method, Object bean, String beanName) {
Consume consume = method.getAnnotation(Consume.class);
if (consume != null && matchContext(consume.context())) {
LOG.info("Creating a consumer for: " + consume);
subscribeMethod(method, bean, beanName, consume.uri(), consume.ref());
}
}
public void subscribeMethod(Method method, Object bean, String beanName, String endpointUri, String endpointName) {
// lets bind this method to a listener
String injectionPointName = method.getName();
Endpoint endpoint = getEndpointInjection(endpointUri, endpointName, injectionPointName, true);
if (endpoint != null) {
try {
Processor processor = createConsumerProcessor(bean, method, endpoint);
Consumer consumer = endpoint.createConsumer(processor);
LOG.debug("Created processor: {} for consumer: {}", processor, consumer);
startService(consumer, bean, beanName);
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
}
}
/**
* Stats the given service
*/
protected void startService(Service service, Object bean, String beanName) throws Exception {
if (isSingleton(bean, beanName)) {
getCamelContext().addService(service);
} else {
LOG.debug("Service is not singleton so you must remember to stop it manually {}", service);
ServiceHelper.startService(service);
}
}
/**
* Create a processor which invokes the given method when an incoming
* message exchange is received
*/
protected Processor createConsumerProcessor(final Object pojo, final Method method, final Endpoint endpoint) {
BeanProcessor answer = new BeanProcessor(pojo, getCamelContext());
answer.setMethodObject(method);
// must ensure the consumer is being executed in an unit of work so synchronization callbacks etc is invoked
return new UnitOfWorkProcessor(answer);
}
protected Endpoint getEndpointInjection(String uri, String name, String injectionPointName, boolean mandatory) {
return CamelContextHelper.getEndpointInjection(getCamelContext(), uri, name, injectionPointName, mandatory);
}
/**
* Creates the object to be injected for an {@link org.apache.camel.EndpointInject} or {@link org.apache.camel.Produce} injection point
*/
public Object getInjectionValue(Class<?> type, String endpointUri, String endpointRef, String injectionPointName,
Object bean, String beanName) {
if (type.isAssignableFrom(ProducerTemplate.class)) {
return createInjectionProducerTemplate(endpointUri, endpointRef, injectionPointName);
} else if (type.isAssignableFrom(ConsumerTemplate.class)) {
return createInjectionConsumerTemplate(endpointUri, endpointRef, injectionPointName);
} else {
Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, true);
if (endpoint != null) {
if (type.isInstance(endpoint)) {
return endpoint;
} else if (type.isAssignableFrom(Producer.class)) {
return createInjectionProducer(endpoint, bean, beanName);
} else if (type.isAssignableFrom(PollingConsumer.class)) {
return createInjectionPollingConsumer(endpoint, bean, beanName);
} else if (type.isInterface()) {
// lets create a proxy
try {
return ProxyHelper.createProxy(endpoint, type);
} catch (Exception e) {
throw createProxyInstantiationRuntimeException(type, endpoint, e);
}
} else {
throw new IllegalArgumentException("Invalid type: " + type.getName()
+ " which cannot be injected via @EndpointInject/@Produce for: " + endpoint);
}
}
return null;
}
}
/**
* Factory method to create a {@link org.apache.camel.ProducerTemplate} to be injected into a POJO
*/
protected ProducerTemplate createInjectionProducerTemplate(String endpointUri, String endpointRef, String injectionPointName) {
// endpoint is optional for this injection point
Endpoint endpoint = getEndpointInjection(endpointUri, endpointRef, injectionPointName, false);
ProducerTemplate answer = new DefaultProducerTemplate(getCamelContext(), endpoint);
// start the template so its ready to use
try {
answer.start();
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
return answer;
}
/**
* Factory method to create a {@link org.apache.camel.ConsumerTemplate} to be injected into a POJO
*/
protected ConsumerTemplate createInjectionConsumerTemplate(String endpointUri, String endpointRef, String injectionPointName) {
ConsumerTemplate answer = new DefaultConsumerTemplate(getCamelContext());
// start the template so its ready to use
try {
answer.start();
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
return answer;
}
/**
* Factory method to create a started {@link org.apache.camel.PollingConsumer} to be injected into a POJO
*/
protected PollingConsumer createInjectionPollingConsumer(Endpoint endpoint, Object bean, String beanName) {
try {
PollingConsumer pollingConsumer = endpoint.createPollingConsumer();
startService(pollingConsumer, bean, beanName);
return pollingConsumer;
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
}
/**
* A Factory method to create a started {@link org.apache.camel.Producer} to be injected into a POJO
*/
protected Producer createInjectionProducer(Endpoint endpoint, Object bean, String beanName) {
try {
Producer producer = endpoint.createProducer();
startService(producer, bean, beanName);
return new UnitOfWorkProducer(producer);
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
}
protected RuntimeException createProxyInstantiationRuntimeException(Class<?> type, Endpoint endpoint, Exception e) {
return new ProxyInstantiationException(type, endpoint, e);
}
/**
* Implementations can override this method to determine if the bean is singleton.
*
* @param bean the bean
* @return <tt>true</tt> if its singleton scoped, for prototype scoped <tt>false</tt> is returned.
*/
protected boolean isSingleton(Object bean, String beanName) {
if (bean instanceof IsSingleton) {
IsSingleton singleton = (IsSingleton) bean;
return singleton.isSingleton();
}
return true;
}
}