blob: d25c981427c2ed4769feb3571d2d08ea27378abb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.servicemix.common;
import java.lang.reflect.Method;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jbi.JBIException;
import javax.jbi.component.ComponentContext;
import javax.jbi.component.ComponentLifeCycle;
import javax.jbi.management.LifeCycleMBean;
import javax.jbi.messaging.DeliveryChannel;
import javax.jbi.messaging.ExchangeStatus;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessageExchange.Role;
import javax.jbi.messaging.MessagingException;
import javax.jbi.servicedesc.ServiceEndpoint;
import javax.management.MBeanServer;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.xml.namespace.QName;
import org.apache.servicemix.common.management.MBeanServerHelper;
import org.apache.servicemix.executors.Executor;
import org.apache.servicemix.executors.ExecutorFactory;
import org.apache.servicemix.executors.impl.ExecutorFactoryImpl;
import org.slf4j.Logger;
/**
* <p>
* Base class for life cycle management of components. This class may be used as
* is.
* </p>
*
* @author Guillaume Nodet
* @version $Revision: 399873 $
* @since 3.0
*/
public class AsyncBaseLifeCycle implements ComponentLifeCycle {
public static final String INITIALIZED = "Initialized";
protected transient Logger logger;
protected ServiceMixComponent component;
protected ComponentContext context;
protected ObjectName mbeanName;
protected ExecutorFactory executorFactory;
protected Executor consumerExecutor;
protected Executor providerExecutor;
protected AtomicBoolean running;
protected DeliveryChannel channel;
protected Thread poller;
protected AtomicBoolean polling;
protected TransactionManager transactionManager;
protected boolean workManagerCreated;
protected ThreadLocal<String> correlationId;
protected String currentState = LifeCycleMBean.UNKNOWN;
protected Container container;
protected Map<String, Set<String>> knownExchanges;
public AsyncBaseLifeCycle() {
this.running = new AtomicBoolean(false);
this.polling = new AtomicBoolean(false);
this.correlationId = new ThreadLocal<String>();
this.knownExchanges = new ConcurrentHashMap<String, Set<String>>();
}
public AsyncBaseLifeCycle(ServiceMixComponent component) {
this();
setComponent(component);
}
public Container getContainer() {
return container;
}
/**
* @org.apache.xbean.Property hidden="true"
*/
protected void setComponent(ServiceMixComponent component) {
this.component = component;
this.logger = component.getLogger();
}
/*
* (non-Javadoc)
*
* @see javax.jbi.component.ComponentLifeCycle#getExtensionMBeanName()
*/
public ObjectName getExtensionMBeanName() {
return mbeanName;
}
protected Object getExtensionMBean() throws Exception {
return null;
}
protected ObjectName createExtensionMBeanName() throws Exception {
return this.context.getMBeanNames().createCustomComponentMBeanName("Configuration");
}
public QName getEPRServiceName() {
return null;
}
public String getCurrentState() {
return currentState;
}
/**
* @org.apache.xbean.Property hidden="true"
*/
protected void setCurrentState(String currentState) {
this.currentState = currentState;
}
public boolean isStarted() {
return currentState != null && currentState.equals(LifeCycleMBean.STARTED);
}
/**
* @return true if the object is stopped
*/
public boolean isStopped() {
return currentState != null && currentState.equals(LifeCycleMBean.STOPPED);
}
/**
* @return true if the object is shutDown
*/
public boolean isShutDown() {
return currentState != null && currentState.equals(LifeCycleMBean.SHUTDOWN);
}
/**
* @return true if the object is shutDown
*/
public boolean isInitialized() {
return currentState != null && currentState.equals(INITIALIZED);
}
/**
* @return true if the object is shutDown
*/
public boolean isUnknown() {
return currentState == null || currentState.equals(LifeCycleMBean.UNKNOWN);
}
/*
* (non-Javadoc)
*
* @see javax.jbi.component.ComponentLifeCycle#init(javax.jbi.component.ComponentContext)
*/
public void init(ComponentContext context) throws JBIException {
try {
logger.debug("Initializing component");
Thread.currentThread().setContextClassLoader(component.getClass().getClassLoader());
this.context = context;
this.channel = context.getDeliveryChannel();
try {
this.transactionManager = (TransactionManager) context.getTransactionManager();
} catch (Throwable e) {
// Ignore, this is just a safeguard against non compliant
// JBI implementation which throws an exception instead of
// return null
}
container = Container.detect(context);
doInit();
setCurrentState(INITIALIZED);
logger.debug("Component initialized");
} catch (JBIException e) {
throw e;
} catch (Exception e) {
throw new JBIException("Error calling init", e);
}
}
protected void doInit() throws Exception {
// Register extension mbean
MBeanServer server = this.context.getMBeanServer();
Object mbean = getExtensionMBean();
if (server != null && mbean != null) {
MBeanServerHelper.register(server, createExtensionMBeanName(), mbean);
}
// Obtain or create the work manager
// When using the WorkManager from ServiceMix,
// some class loader problems can appear when
// trying to uninstall the components.
// Some threads owned by the work manager have a
// security context referencing the component class loader
// so that every loaded classes are locked
// this.workManager = findWorkManager();
if (this.executorFactory == null) {
this.executorFactory = findExecutorFactory();
}
if (this.executorFactory == null) {
this.executorFactory = createExecutorFactory();
}
this.consumerExecutor = this.executorFactory.createExecutor("component." + getContext().getComponentName() + ".consumer");
this.providerExecutor = this.executorFactory.createExecutor("component." + getContext().getComponentName() + ".provider");
}
/*
* (non-Javadoc)
*
* @see javax.jbi.component.ComponentLifeCycle#shutDown()
*/
public void shutDown() throws JBIException {
try {
logger.debug("Shutting down component");
Thread.currentThread().setContextClassLoader(component.getClass().getClassLoader());
doShutDown();
setCurrentState(LifeCycleMBean.SHUTDOWN);
this.context = null;
logger.debug("Component shut down");
} catch (JBIException e) {
throw e;
} catch (Exception e) {
throw new JBIException("Error calling shutdown", e);
}
}
protected void doShutDown() throws Exception {
// Unregister mbean
if (this.mbeanName != null) {
MBeanServer server = this.context.getMBeanServer();
if (server == null) {
throw new JBIException("null mBeanServer");
}
if (server.isRegistered(this.mbeanName)) {
server.unregisterMBean(this.mbeanName);
}
}
// Destroy excutor
consumerExecutor.shutdown();
providerExecutor.shutdown();
consumerExecutor = null;
providerExecutor = null;
}
/*
* (non-Javadoc)
*
* @see javax.jbi.component.ComponentLifeCycle#start()
*/
public void start() throws JBIException {
try {
logger.debug("Starting component");
Thread.currentThread().setContextClassLoader(component.getClass().getClassLoader());
if (this.running.compareAndSet(false, true)) {
doStart();
setCurrentState(LifeCycleMBean.STARTED);
}
logger.debug("Component started");
} catch (JBIException e) {
throw e;
} catch (Exception e) {
throw new JBIException("Error calling start", e);
}
}
protected void doStart() throws Exception {
boolean doPoll = false;
if (container.getType() != Container.Type.ServiceMix3) {
doPoll = true;
} else {
Object smx3container = ((Container.Smx3Container)container).getSmx3Container();
try {
Method m = smx3container.getClass().getMethod("isOptimizedDelivery");
doPoll = false == (Boolean)m.invoke(smx3container, null);
} catch (NoSuchMethodException nsmex) {
doPoll = false;
}
}
if (doPoll) {
synchronized (this.polling) {
consumerExecutor.execute(new Runnable() {
public void run() {
poller = Thread.currentThread();
pollDeliveryChannel();
}
});
polling.wait();
}
}
}
protected void pollDeliveryChannel() {
synchronized (polling) {
polling.set(true);
polling.notify();
}
Executor executor = null;
while (running.get()) {
try {
final MessageExchange exchange = channel.accept(1000L);
if (exchange != null) {
executor = exchange.getRole().equals(Role.CONSUMER) ? consumerExecutor : providerExecutor;
final Transaction tx = (Transaction) exchange
.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
if (tx != null && container.handleTransactions()) {
if (transactionManager == null) {
throw new IllegalStateException(
"Exchange is enlisted in a transaction, but no transaction manager is available");
}
transactionManager.suspend();
}
executor.execute(new Runnable() {
public void run() {
processExchangeInTx(exchange, tx);
}
});
}
} catch (Throwable t) {
if (running.get() == false) {
// Should have been interrupted, discard the throwable
logger.debug("Polling thread will stop");
} else {
logger.error("Error polling delivery channel", t);
}
}
}
synchronized (polling) {
polling.set(false);
polling.notify();
}
}
/*
* (non-Javadoc)
*
* @see javax.jbi.component.ComponentLifeCycle#stop()
*/
public void stop() throws JBIException {
try {
logger.debug("Stopping component");
Thread.currentThread().setContextClassLoader(component.getClass().getClassLoader());
if (this.running.compareAndSet(true, false)) {
doStop();
setCurrentState(LifeCycleMBean.STOPPED);
}
logger.debug("Component stopped");
} catch (JBIException e) {
throw e;
} catch (Exception e) {
throw new JBIException("Error calling stop", e);
}
}
protected void doStop() throws Exception {
// Interrupt the polling thread and await termination
try {
synchronized (polling) {
if (polling.get()) {
poller.interrupt();
polling.wait();
}
}
} finally {
poller = null;
}
}
/**
* @return Returns the context.
*/
public ComponentContext getContext() {
return context;
}
public Executor getExecutor(Role role) {
if (role != null && role.equals(Role.CONSUMER)) {
return this.consumerExecutor;
} else if (role != null && role.equals(Role.PROVIDER)) {
return this.providerExecutor;
}
return null;
}
/**
* The executor to use for various tasks that need to be performed by the component.
* If none is provided, one will be created from the executorFactory.
*
* @param executor
* @see #setExecutorFactory(ExecutorFactory)
*/
public void setExecutor(Role role, Executor executor) {
if (role != null && role.equals(Role.CONSUMER)) {
this.consumerExecutor = executor;
} else {
this.providerExecutor = executor;
}
}
public ExecutorFactory getExecutorFactory() {
return executorFactory;
}
/**
* The executor factory to use to create the executor.
* If none is provided, one will be retrieved from the JBI container when the component
* is deployed into ServiceMix 3.x, or a default implementation will be used.
*
* @param executorFactory
* @see #setExecutor(Role, Executor)
*/
public void setExecutorFactory(ExecutorFactory executorFactory) {
this.executorFactory = executorFactory;
}
protected ExecutorFactory createExecutorFactory() {
// grab the default factory from the container - it's always there...if not then it's a junit test, so create one
return this.container != null ? this.container.getExecutorFactory() : new ExecutorFactoryImpl();
}
public Object getSmx3Container() {
if (container instanceof Container.Smx3Container) {
return ((Container.Smx3Container) container).getSmx3Container();
}
return null;
}
protected ExecutorFactory findExecutorFactory() {
// TODO: should look in jndi for an existing ExecutorFactory
return container.getExecutorFactory();
}
protected void processExchangeInTx(MessageExchange exchange, Transaction tx) {
ExchangeStatus oldStatus = exchange.getStatus();
try {
if (tx != null) {
transactionManager.resume(tx);
}
processExchange(exchange);
} catch (Throwable t) {
logger.error("Error processing exchange {}", exchange, t);
try {
// If we are transacted, check if this exception should
// rollback the transaction
if (transactionManager != null && transactionManager.getStatus() == Status.STATUS_ACTIVE) {
if (exceptionShouldRollbackTx(t)) {
transactionManager.setRollbackOnly();
}
if (!container.handleTransactions()) {
transactionManager.suspend();
}
}
if (oldStatus == ExchangeStatus.ACTIVE) {
exchange.setStatus(ExchangeStatus.ERROR);
exchange.setError(t instanceof Exception ? (Exception) t : new Exception(t));
channel.send(exchange);
}
} catch (Exception inner) {
logger.error("Error setting exchange status to ERROR", inner);
}
} finally {
try {
// Check transaction status
if (tx != null) {
int status = transactionManager.getStatus();
// We use pull delivery, so the transaction should already
// have been transfered to another thread because the
// component
// must have answered.
if (status != Status.STATUS_NO_TRANSACTION) {
logger.error("Transaction is still active after exchange processing. Trying to rollback transaction.");
try {
transactionManager.rollback();
} catch (Throwable t) {
logger.error("Error trying to rollback transaction.", t);
}
}
}
} catch (Throwable t) {
logger.error("Error checking transaction status.", t);
}
}
}
protected boolean exceptionShouldRollbackTx(Throwable t) {
return false;
}
public void onMessageExchange(MessageExchange exchange) {
if (!container.handleTransactions()) {
final Transaction tx = (Transaction) exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
processExchangeInTx(exchange, tx);
return;
}
ExchangeStatus oldStatus = exchange.getStatus();
try {
processExchange(exchange);
} catch (Throwable t) {
logger.error("Error processing exchange {}", exchange, t);
try {
// If we are transacted and this is a runtime exception
// try to mark transaction as rollback
if (transactionManager != null
&& transactionManager.getStatus() == Status.STATUS_ACTIVE
&& exceptionShouldRollbackTx(t)) {
transactionManager.setRollbackOnly();
if (!container.handleTransactions()) {
transactionManager.suspend();
}
}
if (oldStatus == ExchangeStatus.ACTIVE) {
exchange.setError(t instanceof Exception ? (Exception) t : new Exception(t));
channel.send(exchange);
}
} catch (Exception inner) {
logger.error("Error setting exchange status to ERROR", inner);
}
}
}
protected void processExchange(MessageExchange exchange) throws Exception {
logger.debug("Received exchange: status: {}, role: {}", exchange.getStatus(), (exchange.getRole() == Role.CONSUMER ? "consumer" : "provider"));
if (exchange.getRole() == Role.PROVIDER) {
boolean dynamic = false;
ServiceEndpoint endpoint = exchange.getEndpoint();
String key = EndpointSupport.getKey(exchange.getEndpoint());
Endpoint ep = this.component.getRegistry().getEndpoint(key);
if (ep == null) {
if (endpoint.getServiceName().equals(getEPRServiceName())) {
ep = getResolvedEPR(exchange.getEndpoint());
ep.activate();
ep.start();
dynamic = true;
}
if (ep == null) {
throw new IllegalStateException("Endpoint not found: " + key);
}
}
try {
doProcess(ep, exchange);
} finally {
// If the endpoint is dynamic, deactivate it
if (dynamic) {
ep.stop();
ep.deactivate();
}
}
} else {
Endpoint ep = null;
if (exchange.getProperty(JbiConstants.SENDER_ENDPOINT) != null) {
String key = exchange.getProperty(JbiConstants.SENDER_ENDPOINT).toString();
ep = this.component.getRegistry().getEndpoint(key);
}
if (ep == null) {
throw new IllegalStateException("Endpoint not found for: " + exchange.getExchangeId());
}
doProcess(ep, exchange);
}
}
/**
* Thin wrapper around the call to the processor to ensure that the Endpoints
* classloader is used where available
*/
private void doProcess(Endpoint ep, MessageExchange exchange) throws Exception {
ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
boolean processed = false;
try {
ClassLoader cl = (ep != null) ? ep.getServiceUnit().getConfigurationClassLoader() : null;
if (cl != null) {
Thread.currentThread().setContextClassLoader(cl);
}
// Read the correlation id from the exchange and set it in the correlation id property
String correlationID = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
if (correlationID != null) {
// Set the id in threadlocal variable
correlationId.set(correlationID);
}
logger.debug("Retrieved correlation id: {}", correlationID);
EndpointDeliveryChannel.setEndpoint(ep);
handleExchange(ep, exchange, exchange.getStatus() == ExchangeStatus.ACTIVE);
ep.process(exchange);
processed = true;
} finally {
if (!processed) {
handleExchange(ep, exchange, false);
}
EndpointDeliveryChannel.setEndpoint(null);
Thread.currentThread().setContextClassLoader(oldCl);
// Clean the threadlocal variable
correlationId.set(null);
}
}
public void prepareExchange(MessageExchange exchange, Endpoint endpoint) throws MessagingException {
if (exchange.getRole() == Role.CONSUMER) {
// Check if a correlation id is already set on the exchange, otherwise create it
String correlationIDValue = (String) exchange.getProperty(JbiConstants.CORRELATION_ID);
if (correlationIDValue == null) {
// Retrieve correlation id from thread local variable, if exist
correlationIDValue = correlationId.get();
if (correlationIDValue == null) {
// Set a correlation id property that have to be propagated in all components
// to trace the process instance
correlationIDValue = exchange.getExchangeId();
exchange.setProperty(JbiConstants.CORRELATION_ID, exchange.getExchangeId());
logger.debug("Created correlation id: {}", correlationIDValue);
} else {
// Use correlation id retrieved from previous message exchange
exchange.setProperty(JbiConstants.CORRELATION_ID, correlationIDValue);
logger.debug("Correlation id retrieved from ThreadLocal: {}", correlationIDValue);
}
}
// Set the sender endpoint property
exchange.setProperty(JbiConstants.SENDER_ENDPOINT, endpoint.getKey());
}
// Handle transaction
if (!container.handleTransactions()) {
try {
if ((exchange.getRole() == Role.CONSUMER && exchange.getStatus() == ExchangeStatus.ACTIVE) || exchange.getRole() == Role.PROVIDER) {
if (transactionManager != null) {
exchange.setProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME, transactionManager.suspend());
}
}
} catch (SystemException e) {
throw new MessagingException("Error handling transaction", e);
}
}
}
/**
* Prepare an endpoint for shutdown by waiting until all its pending exchanges have been finished.
*
* @param endpoint the endpoint that is about to be shut down
* @throws InterruptedException
*/
public void prepareShutdown(Endpoint endpoint) throws InterruptedException {
prepareShutdown(endpoint, 0);
}
/**
* Prepare an endpoint for shutdown by waiting until all its pending exchanges have been finished.
* This method will wait no longer than the timeout specified (in milliseconds)
*
* @param endpoint the endpoint that is about to be shut down
* @param timeout the maximum amount of time (in milliseconds) to wait
* @throws InterruptedException
*/
public void prepareShutdown(Endpoint endpoint, long timeout) throws InterruptedException {
Set<String> exchanges = getKnownExchanges(endpoint);
long start = System.currentTimeMillis();
long interval = timeout / 3; // if a timeout has been set, we'll check 3 times within the timeout period
synchronized (exchanges) {
while (!exchanges.isEmpty()) {
for (String id : exchanges) {
logger.debug("Waiting for exchange {} in {}", id, endpoint);
}
exchanges.wait(interval);
// if a timeout has been set, this would be a good time to check that
long delta = System.currentTimeMillis() - start;
if (timeout != 0 && delta >= timeout) {
logger.debug(String.format("Gave up waiting for %s exchanges in %s after %s ms",
exchanges.size(), endpoint, delta));
break;
}
}
}
}
protected Set<String> getKnownExchanges(Endpoint endpoint) {
Set<String> exchanges = knownExchanges.get(endpoint.getKey());
if (exchanges == null) {
synchronized (knownExchanges) {
exchanges = knownExchanges.get(endpoint.getKey());
if (exchanges == null) {
exchanges = new HashSet<String>();
knownExchanges.put(endpoint.getKey(), exchanges);
}
}
}
return exchanges;
}
public void handleExchange(Endpoint endpoint, MessageExchange exchange, boolean add) {
Set<String> exchanges = getKnownExchanges(endpoint);
synchronized (exchanges) {
if (add) {
exchanges.add(exchange.getExchangeId());
} else {
exchanges.remove(exchange.getExchangeId());
}
exchanges.notifyAll();
}
}
/**
* Handle an exchange sent to an EPR resolved by this component
*
* @param ep the service endpoint
* @return an endpoint to use for handling the exchange
* @throws Exception
*/
protected Endpoint getResolvedEPR(ServiceEndpoint ep) throws Exception {
throw new UnsupportedOperationException("Component does not handle EPR exchanges");
}
}