blob: e00269596963bf6a162457d295d6b85cb1793e69 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.axis2;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
import org.apache.commons.httpclient.util.IdleConnectionTimeoutThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.axis2.deploy.DeploymentPoller;
import org.apache.ode.axis2.service.DeploymentWebService;
import org.apache.ode.axis2.service.ManagementService;
import org.apache.ode.axis2.util.ClusterUrlTransformer;
import org.apache.ode.bpel.clapi.ClusterManager;
import org.apache.ode.bpel.clapi.ClusterMemberListener;
import org.apache.ode.bpel.clapi.ClusterProcessStore;
import org.apache.ode.bpel.connector.BpelServerConnector;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy;
import org.apache.ode.bpel.engine.cron.CronScheduler;
import org.apache.ode.bpel.extvar.jdbc.JdbcExternalVariableModule;
import org.apache.ode.bpel.iapi.*;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
import org.apache.ode.bpel.pmapi.InstanceManagement;
import org.apache.ode.bpel.pmapi.ProcessManagement;
import org.apache.ode.il.config.OdeConfigProperties;
import org.apache.ode.il.dbutil.Database;
import org.apache.ode.scheduler.simple.JdbcDelegate;
import org.apache.ode.scheduler.simple.SimpleScheduler;
import org.apache.ode.store.ClusterProcessStoreImpl;
import org.apache.ode.store.ProcessStoreImpl;
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.fs.TempFileManager;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.sql.DataSource;
import javax.transaction.*;
import javax.transaction.xa.XAResource;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
/**
* Server class called by our Axis hooks to handle all ODE lifecycle management.
*
* @author Matthieu Riou <mriou at apache dot org>
*/
public class ODEServer {
protected final Logger __log = LoggerFactory.getLogger(getClass());
protected final Logger __logTx = LoggerFactory.getLogger("org.apache.ode.tx");
private static final Messages __msgs = Messages.getMessages(Messages.class);
protected File _appRoot;
protected File _workRoot;
protected File _configRoot;
protected BpelServerImpl _bpelServer;
protected ProcessStoreImpl _store;
protected ODEConfigProperties _odeConfig;
protected ConfigurationContext _configContext;
protected TransactionManager _txMgr;
protected BpelDAOConnectionFactory _daoCF;
protected ExecutorService _executorService;
protected Scheduler _scheduler;
protected CronScheduler _cronScheduler;
protected Database _db;
protected ClusterManager _clusterManager;
private DeploymentPoller _poller;
private BpelServerConnector _connector;
private ManagementService _mgtService;
protected ClusterUrlTransformer _clusterUrlTransformer;
protected MultiThreadedHttpConnectionManager httpConnectionManager;
protected IdleConnectionTimeoutThread idleConnectionTimeoutThread;
public Runnable txMgrCreatedCallback;
private boolean clusteringEnabled = false;
public void init(ServletConfig config, ConfigurationContext configContext) throws ServletException {
init(config.getServletContext().getRealPath("/WEB-INF"), configContext);
}
public void init(String contextPath, ConfigurationContext configContext) throws ServletException {
init(contextPath, configContext, null);
}
public void init(String contextPath, ConfigurationContext configContext, ODEConfigProperties config) throws ServletException {
_configContext = configContext;
String rootDir = System.getProperty("org.apache.ode.rootDir");
if (rootDir != null) _appRoot = new File(rootDir);
else _appRoot = new File(contextPath);
if (!_appRoot.isDirectory())
throw new IllegalArgumentException(_appRoot + " does not exist or is not a directory");
TempFileManager.setWorkingDirectory(_appRoot);
__log.debug("Loading properties");
String confDir = System.getProperty("org.apache.ode.configDir");
_configRoot = confDir == null ? new File(_appRoot, "conf") : new File(confDir);
if (!_configRoot.isDirectory())
throw new IllegalArgumentException(_configRoot + " does not exist or is not a directory");
try {
if (config == null) {
_odeConfig = new ODEConfigProperties(_configRoot);
_odeConfig.load();
} else {
_odeConfig = config;
}
} catch (FileNotFoundException fnf) {
String errmsg = __msgs.msgOdeInstallErrorCfgNotFound(_odeConfig.getFile());
__log.warn(errmsg);
} catch (Exception ex) {
String errmsg = __msgs.msgOdeInstallErrorCfgReadError(_odeConfig.getFile());
__log.error(errmsg, ex);
throw new ServletException(errmsg, ex);
}
String wdir = _odeConfig.getWorkingDir();
if (wdir == null) _workRoot = _appRoot;
else _workRoot = new File(wdir.trim());
if (!_workRoot.isDirectory())
throw new IllegalArgumentException(_workRoot + " does not exist or is not a directory");
__log.debug("Initializing transaction manager");
initTxMgr();
if (txMgrCreatedCallback != null) {
txMgrCreatedCallback.run();
}
clusteringEnabled = _odeConfig.isClusteringEnabled();
if (clusteringEnabled) {
initClustering();
} else __log.info(__msgs.msgOdeClusteringNotInitialized());
__log.debug("Creating data source.");
initDataSource();
__log.debug("Starting DAO.");
initDAO();
EndpointReferenceContextImpl eprContext = new EndpointReferenceContextImpl(this);
__log.debug("Initializing BPEL process store.");
initProcessStore(eprContext);
__log.debug("Initializing BPEL server.");
initBpelServer(eprContext);
__log.debug("Initializing HTTP connection manager");
initHttpConnectionManager();
// Register BPEL event listeners configured in axis2.properties file.
registerEventListeners();
registerMexInterceptors();
registerExternalVariableModules();
_store.loadAll();
if (_clusterManager != null) {
_clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler);
_clusterManager.setClusterProcessStore((ClusterProcessStore) _store);
_clusterManager.init(_configRoot);
((SimpleScheduler)_scheduler).setNodeId(_clusterManager.getNodeID());
}
try {
_bpelServer.start();
} catch (Exception ex) {
String errmsg = __msgs.msgOdeBpelServerStartFailure();
__log.error(errmsg, ex);
throw new ServletException(errmsg, ex);
}
_poller = getDeploymentPollerExt();
if( _poller == null ) {
_poller = new DeploymentPoller(_store.getDeployDir(), this);
}
_mgtService = new ManagementService();
_mgtService.enableService(_configContext.getAxisConfiguration(), _bpelServer, _store, _appRoot.getAbsolutePath());
try {
__log.debug("Initializing Deployment Web Service");
new DeploymentWebService().enableService(_configContext.getAxisConfiguration(), _store, _poller, _appRoot.getAbsolutePath(), _workRoot.getAbsolutePath(),this);
} catch (Exception e) {
throw new ServletException(e);
}
__log.debug("Starting scheduler");
_scheduler.start();
__log.debug("Initializing JCA adapter.");
initConnector();
_poller.start();
__log.info(__msgs.msgPollingStarted(_store.getDeployDir().getAbsolutePath()));
__log.info(__msgs.msgOdeStarted());
}
@SuppressWarnings("unchecked")
private DeploymentPoller getDeploymentPollerExt() {
DeploymentPoller poller = null;
InputStream is = null;
try {
is = ODEServer.class.getResourceAsStream("/deploy-ext.properties");
if( is != null ) {
__log.info("A deploy-ext.properties found; will use the provided class if applicable.");
try {
Properties props = new Properties();
props.load(is);
String deploymentPollerClass = props.getProperty("deploymentPoller.class");
if( deploymentPollerClass == null ) {
__log.warn("deploy-ext.properties found in the class path; however, the file does not have 'deploymentPoller.class' as one of the properties!!");
} else {
Class pollerClass = Class.forName(deploymentPollerClass);
poller = (DeploymentPoller)pollerClass.getConstructor(File.class, ODEServer.class).newInstance(_store.getDeployDir(), this);
__log.info("A custom deployment poller: " + deploymentPollerClass + " has been plugged in.");
}
} catch( Exception e ) {
__log.warn("Deployment poller extension class is not loadable, falling back to the default DeploymentPoller.", e);
}
} else if( __log.isDebugEnabled() ) __log.debug("No deploy-ext.properties found.");
} finally {
try {
if(is != null) is.close();
} catch( IOException ie ) {
// ignore
}
}
return poller;
}
private void initDataSource() throws ServletException {
_db = Database.create(_odeConfig);
_db.setTransactionManager(_txMgr);
_db.setWorkRoot(_workRoot);
try {
_db.start();
} catch (Exception ex) {
String errmsg = __msgs.msgOdeDbConfigError();
__log.error(errmsg, ex);
throw new ServletException(errmsg, ex);
}
}
public TransactionManager getTransactionManager() {
return _txMgr;
}
/**
* Shutdown the service engine. This performs cleanup before the BPE is terminated. Once this method has been called, init()
* must be called before the transformation engine can be started again with a call to start().
*
* @throws AxisFault if the engine is unable to shut down.
*/
public void shutDown() throws AxisFault {
ClassLoader old = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
try {
if (_poller != null)
try {
__log.debug("shutting down poller");
_poller.stop();
_poller = null;
} catch (Throwable t) {
__log.debug("Error stopping poller.", t);
}
if (_bpelServer != null)
try {
__log.debug("shutting down ODE server.");
_bpelServer.shutdown();
_bpelServer = null;
} catch (Throwable ex) {
__log.debug("Error stopping services.", ex);
}
if( _cronScheduler != null ) {
try {
__log.debug("shutting down cron scheduler.");
_cronScheduler.shutdown();
_cronScheduler = null;
} catch (Exception ex) {
__log.debug("Cron scheduler couldn't be shutdown.", ex);
}
}
if (_scheduler != null)
try {
__log.debug("shutting down scheduler.");
_scheduler.shutdown();
_scheduler = null;
} catch (Exception ex) {
__log.debug("Scheduler couldn't be shutdown.", ex);
}
if (_store != null)
try {
_store.shutdown();
_store = null;
} catch (Throwable t) {
__log.debug("Store could not be shutdown.", t);
}
if (_daoCF != null)
try {
_daoCF.shutdown();
} catch (Throwable ex) {
__log.debug("DOA shutdown failed.", ex);
} finally {
_daoCF = null;
}
if (_db != null)
try {
_db.shutdown();
} catch (Throwable ex) {
__log.debug("DB shutdown failed.", ex);
} finally {
_db = null;
}
if (_txMgr != null) {
__log.debug("shutting down transaction manager.");
_txMgr = null;
}
if (_clusterManager != null) {
try {
__log.debug("shutting down cluster manager.");
_clusterManager.shutdown();
_clusterManager = null;
} catch (Exception ex) {
__log.debug("Cluster manager shutdown failed.", ex);
}
}
if (_connector != null) {
try {
__log.debug("shutdown BpelConnector");
_connector.shutdown();
_connector = null;
} catch (Throwable t) {
__log.error("Unable to cleanup temp files.", t);
}
}
if (httpConnectionManager != null) {
__log.debug("shutting down HTTP connection manager.");
try {
httpConnectionManager.shutdown();
httpConnectionManager = null;
} catch(Throwable t) {
__log.error("Unable to shut down HTTP connection manager.", t);
}
}
if (idleConnectionTimeoutThread != null) {
__log.debug("shutting down Idle Connection Timeout Thread.");
try {
idleConnectionTimeoutThread.shutdown();
idleConnectionTimeoutThread = null;
} catch(Throwable t) {
__log.error("Unable to shut down Idle Connection Timeout Thread.", t);
}
}
try {
__log.debug("cleaning up temporary files.");
TempFileManager.cleanup();
} catch (Throwable t) {
__log.error("Unable to cleanup temp files.", t);
}
if (_executorService != null) {
_executorService.shutdownNow();
_executorService = null;
}
__log.info(__msgs.msgOdeShutdownCompleted());
} finally {
Thread.currentThread().setContextClassLoader(old);
}
}
@SuppressWarnings("unchecked")
private void initTxMgr() throws ServletException {
if (_odeConfig.getDbMode().equals(OdeConfigProperties.DatabaseMode.EXTERNAL) &&
_odeConfig.getTxFactoryClass().equals(OdeConfigProperties.DEFAULT_TX_FACTORY_CLASS_NAME)) {
throw new ServletException("No external transaction manager factory configured. Please use the INTERNAL mode or configure an external transaction manager that is associated with external datasource.");
}
String txFactoryName = _odeConfig.getTxFactoryClass();
__log.debug("Initializing transaction manager using " + txFactoryName);
try {
Class txFactClass = this.getClass().getClassLoader().loadClass(txFactoryName);
Object txFact = txFactClass.newInstance();
_txMgr = (TransactionManager) txFactClass.getMethod("getTransactionManager", (Class[]) null).invoke(txFact);
if (__logTx.isDebugEnabled() && System.getProperty("ode.debug.tx") != null)
_txMgr = new DebugTxMgr(_txMgr);
} catch (Exception e) {
__log.error("Couldn't initialize a transaction manager with factory: " + txFactoryName, e);
throw new ServletException("Couldn't initialize a transaction manager with factory: " + txFactoryName, e);
}
}
private void initConnector() throws ServletException {
int port = _odeConfig.getConnectorPort();
if (port == 0) {
__log.info("Skipping connector initialization.");
} else {
_connector = new BpelServerConnector();
_connector.setBpelServer(_bpelServer);
_connector.setProcessStore(_store);
_connector.setPort(_odeConfig.getConnectorPort());
_connector.setId("jcaServer");
try {
_connector.start();
} catch (Exception e) {
__log.error("Failed to initialize JCA connector.", e);
}
}
}
public boolean isClusteringEnabled() {
return clusteringEnabled;
}
/**
* Initialize the clustering if it is enabled
*/
private void initClustering() {
String clusterImplName = _odeConfig.getClusteringImplClass();
try {
Class<?> clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName);
_clusterManager = (ClusterManager) clusterImplClass.newInstance();
} catch (Exception ex) {
__log.error("Error while loading class : " + clusterImplName, ex);
}
}
/**
* Initialize the DAO.
*
* @throws ServletException
*/
protected void initDAO() throws ServletException {
__log.info(__msgs.msgOdeUsingDAOImpl(_odeConfig.getDAOConnectionFactory()));
try {
_daoCF = _db.createDaoCF();
} catch (Exception ex) {
String errmsg = __msgs.msgDAOInstantiationFailed(_odeConfig.getDAOConnectionFactory());
__log.error(errmsg, ex);
throw new ServletException(errmsg, ex);
}
}
protected void initProcessStore(EndpointReferenceContext eprContext) {
_store = createProcessStore(eprContext, _db.getDataSource());
_store.registerListener(new ProcessStoreListenerImpl());
_store.setDeployDir(
_odeConfig.getDeployDir() != null ?
new File(_odeConfig.getDeployDir()) :
new File(_workRoot, "processes"));
_store.setConfigDir(_configRoot);
}
protected ProcessStoreImpl createProcessStore(EndpointReferenceContext eprContext, DataSource ds) {
if (clusteringEnabled)
return new ClusterProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false, _clusterManager);
else return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false);
}
protected Scheduler createScheduler() {
SimpleScheduler scheduler;
if (clusteringEnabled) {
scheduler = new SimpleScheduler(_clusterManager.getNodeID(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), clusteringEnabled);
scheduler.setClusterManager(_clusterManager);
} else
scheduler = new SimpleScheduler(new GUID().toString(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
scheduler.setExecutorService(_executorService);
scheduler.setTransactionManager(_txMgr);
return scheduler;
}
private void initBpelServer(EndpointReferenceContextImpl eprContext) {
if (__log.isDebugEnabled()) {
__log.debug("ODE initializing");
}
ThreadFactory threadFactory = new ThreadFactory() {
int threadNumber = 0;
public Thread newThread(Runnable r) {
threadNumber += 1;
Thread t = new Thread(r, "ODEServer-"+threadNumber);
t.setDaemon(true);
return t;
}
};
if (_odeConfig.getThreadPoolMaxSize() == 0)
_executorService = Executors.newCachedThreadPool(threadFactory);
else
_executorService = Executors.newFixedThreadPool(_odeConfig.getThreadPoolMaxSize(), threadFactory);
{
List<String> targets = new ArrayList<String>();
Collections.addAll(targets, _odeConfig.getProperty("cluster.localRoute.targets", "").split(","));
_clusterUrlTransformer = new ClusterUrlTransformer(targets, _odeConfig.getProperty("cluster.localRoute.base", "http://localhost:8080/ode/processes/"));
}
_bpelServer = new BpelServerImpl();
_scheduler = createScheduler();
_scheduler.setJobProcessor(_bpelServer);
BpelServerImpl.PolledRunnableProcessor polledRunnableProcessor = new BpelServerImpl.PolledRunnableProcessor();
polledRunnableProcessor.setPolledRunnableExecutorService(_executorService);
polledRunnableProcessor.setContexts(_bpelServer.getContexts());
_scheduler.setPolledRunnableProcesser(polledRunnableProcessor);
_cronScheduler = new CronScheduler();
_cronScheduler.setScheduledTaskExec(_executorService);
_cronScheduler.setContexts(_bpelServer.getContexts());
_bpelServer.setCronScheduler(_cronScheduler);
_bpelServer.setDaoConnectionFactory(_daoCF);
_bpelServer.setClusterManagerImpl(_clusterManager);
_bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler, _odeConfig.getInMemMexTtl()));
_bpelServer.setEndpointReferenceContext(eprContext);
_bpelServer.setMessageExchangeContext(new MessageExchangeContextImpl(this));
_bpelServer.setBindingContext(new BindingContextImpl(this));
_bpelServer.setScheduler(_scheduler);
if (_odeConfig.isDehydrationEnabled()) {
CountLRUDehydrationPolicy dehy = new CountLRUDehydrationPolicy();
dehy.setProcessMaxAge(_odeConfig.getDehydrationMaximumAge());
dehy.setProcessMaxCount(_odeConfig.getDehydrationMaximumCount());
_bpelServer.setDehydrationPolicy(dehy);
}
_bpelServer.setMigrationTransactionTimeout(_odeConfig.getMigrationTransactionTimeout());
_bpelServer.setConfigProperties(_odeConfig.getProperties());
_bpelServer.init();
_bpelServer.setInstanceThrottledMaximumCount(_odeConfig.getInstanceThrottledMaximumCount());
_bpelServer.setProcessThrottledMaximumCount(_odeConfig.getProcessThrottledMaximumCount());
_bpelServer.setProcessThrottledMaximumSize(_odeConfig.getProcessThrottledMaximumSize());
_bpelServer.setHydrationLazy(_odeConfig.isHydrationLazy());
_bpelServer.setHydrationLazyMinimumSize(_odeConfig.getHydrationLazyMinimumSize());
}
private void initHttpConnectionManager() throws ServletException {
httpConnectionManager = new MultiThreadedHttpConnectionManager();
// settings may be overridden from ode-axis2.properties using the same properties as HttpClient
// /!\ If the size of the conn pool is smaller than the size of the thread pool, the thread pool might get starved.
int max_per_host = Integer.parseInt(_odeConfig.getProperty(HttpConnectionManagerParams.MAX_HOST_CONNECTIONS, ""+_odeConfig.getPoolMaxSize()));
int max_total = Integer.parseInt(_odeConfig.getProperty(HttpConnectionManagerParams.MAX_TOTAL_CONNECTIONS, ""+_odeConfig.getPoolMaxSize()));
if(__log.isDebugEnabled()) {
__log.debug(HttpConnectionManagerParams.MAX_HOST_CONNECTIONS+"="+max_per_host);
__log.debug(HttpConnectionManagerParams.MAX_TOTAL_CONNECTIONS+"="+max_total);
}
if(max_per_host<1 || max_total <1){
String errmsg = HttpConnectionManagerParams.MAX_HOST_CONNECTIONS+" and "+ HttpConnectionManagerParams.MAX_TOTAL_CONNECTIONS+" must be positive integers!";
__log.error(errmsg);
throw new ServletException(errmsg);
}
httpConnectionManager.getParams().setDefaultMaxConnectionsPerHost(max_per_host);
httpConnectionManager.getParams().setMaxTotalConnections(max_total);
// Register the connection manager to a idle check thread
idleConnectionTimeoutThread = new IdleConnectionTimeoutThread();
idleConnectionTimeoutThread.setName("Http_Idle_Connection_Timeout_Thread");
long idleConnectionTimeout = Long.parseLong(_odeConfig.getProperty("http.idle.connection.timeout", "30000"));
long idleConnectionCheckInterval = Long.parseLong(_odeConfig.getProperty("http.idle.connection.check.interval", "30000"));
if(__log.isDebugEnabled()){
__log.debug("http.idle.connection.timeout="+idleConnectionTimeout);
__log.debug("http.idle.connection.check.interval="+idleConnectionCheckInterval);
}
idleConnectionTimeoutThread.setConnectionTimeout(idleConnectionTimeout);
idleConnectionTimeoutThread.setTimeoutInterval(idleConnectionCheckInterval);
idleConnectionTimeoutThread.addConnectionManager(httpConnectionManager);
idleConnectionTimeoutThread.start();
}
public ProcessStoreImpl getProcessStore() {
return _store;
}
public BpelServerImpl getBpelServer() {
return _bpelServer;
}
public InstanceManagement getInstanceManagement() {
return _mgtService.getInstanceMgmt();
}
public ProcessManagement getProcessManagement() {
return _mgtService.getProcessMgmt();
}
public File getAppRoot() {
return _appRoot;
}
public File getConfigRoot() {
return _configRoot;
}
private void registerEventListeners() {
String listenersStr = _odeConfig.getEventListeners();
if (listenersStr != null) {
for (StringTokenizer tokenizer = new StringTokenizer(listenersStr, ",;"); tokenizer.hasMoreTokens();) {
String listenerCN = tokenizer.nextToken();
try {
_bpelServer.registerBpelEventListener((BpelEventListener) Class.forName(listenerCN).newInstance());
__log.info(__msgs.msgBpelEventListenerRegistered(listenerCN));
} catch (Exception e) {
__log.warn("Couldn't register the event listener " + listenerCN + ", the class couldn't be "
+ "loaded properly: " + e);
}
}
}
}
private void registerMexInterceptors() {
String listenersStr = _odeConfig.getMessageExchangeInterceptors();
if (listenersStr != null) {
for (StringTokenizer tokenizer = new StringTokenizer(listenersStr, ",;"); tokenizer.hasMoreTokens();) {
String interceptorCN = tokenizer.nextToken();
try {
_bpelServer.registerMessageExchangeInterceptor((MessageExchangeInterceptor) Class.forName(interceptorCN).newInstance());
__log.info(__msgs.msgMessageExchangeInterceptorRegistered(interceptorCN));
} catch (Exception e) {
__log.warn("Couldn't register the event listener " + interceptorCN + ", the class couldn't be "
+ "loaded properly: " + e);
}
}
}
}
private void registerExternalVariableModules() {
JdbcExternalVariableModule jdbcext;
jdbcext = new JdbcExternalVariableModule();
jdbcext.registerDataSource("ode", _db.getDataSource());
_bpelServer.registerExternalVariableEngine(jdbcext);
}
private class ProcessStoreListenerImpl implements ProcessStoreListener {
public void onProcessStoreEvent(ProcessStoreEvent event) {
handleEvent(event);
}
}
private void handleEvent(ProcessStoreEvent pse) {
if (__log.isDebugEnabled()) {
__log.debug("Process store event: " + pse);
}
ProcessConf pconf = _store.getProcessConfiguration(pse.pid);
switch (pse.type) {
case DEPLOYED:
if (pconf != null) {
/*
* If and only if an old process exists with the same pid, the old process is cleaned up.
* The following line is IMPORTANT and used for the case when the deployment and store
* do not have the process while the process itself exists in the BPEL_PROCESS table.
* Notice that the new process is actually created on the 'ACTIVATED' event.
*/
_bpelServer.cleanupProcess(pconf);
}
break;
case ACTVIATED:
// bounce the process
_bpelServer.unregister(pse.pid);
if (pconf != null) {
_bpelServer.register(pconf);
} else {
__log.debug("slighly odd: recevied event " +
pse + " for process not in store!");
}
break;
case RETIRED:
// are there are instances of this process running?
boolean instantiated = _bpelServer.hasActiveInstances(pse.pid);
// remove the process
_bpelServer.unregister(pse.pid);
// bounce the process if necessary
if (instantiated) {
if (pconf != null) {
_bpelServer.register(pconf);
} else {
__log.debug("slighly odd: recevied event " +
pse + " for process not in store!");
}
} else {
// we may have potentially created a lot of garbage, so,
// let's hope the garbage collector is configured properly.
if (pconf != null) {
_bpelServer.cleanupProcess(pconf);
}
}
break;
case DISABLED:
case UNDEPLOYED:
_bpelServer.unregister(pse.pid);
if (pconf != null) {
_bpelServer.cleanupProcess(pconf);
}
break;
default:
__log.debug("Ignoring store event: " + pse);
}
if( pconf != null ) {
if( pse.type == ProcessStoreEvent.Type.UNDEPLOYED) {
__log.debug("Cancelling all cron scheduled jobs on store event: " + pse);
_bpelServer.getContexts().cronScheduler.cancelProcessCronJobs(pse.pid, true);
}
// Except for undeploy event, we need to re-schedule process dependent jobs
__log.debug("(Re)scheduling cron scheduled jobs on store event: " + pse);
if( pse.type != ProcessStoreEvent.Type.UNDEPLOYED) {
_bpelServer.getContexts().cronScheduler.scheduleProcessCronJobs(pse.pid, pconf);
}
}
}
// Transactional debugging stuff, to track down all these little annoying bugs.
private class DebugTxMgr implements TransactionManager {
private TransactionManager _tm;
public DebugTxMgr(TransactionManager tm) {
_tm = tm;
}
public void begin() throws NotSupportedException, SystemException {
__logTx.debug("Txm begin");
_tm.begin();
}
public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
if (__log.isDebugEnabled()) {
__logTx.debug("Txm commit");
for (StackTraceElement traceElement : Thread.currentThread().getStackTrace()) {
__logTx.debug(traceElement.toString());
}
}
_tm.commit();
}
public int getStatus() throws SystemException {
__logTx.debug("Txm status");
return _tm.getStatus();
}
public Transaction getTransaction() throws SystemException {
Transaction tx = _tm.getTransaction();
if (__log.isDebugEnabled()) {
__logTx.debug("Txm get tx " + tx);
}
return tx == null ? null : new DebugTx(tx);
}
public void resume(Transaction transaction) throws IllegalStateException, InvalidTransactionException, SystemException {
__logTx.debug("Txm resume");
_tm.resume(transaction);
}
public void rollback() throws IllegalStateException, SecurityException, SystemException {
__logTx.debug("Txm rollback");
_tm.rollback();
}
public void setRollbackOnly() throws IllegalStateException, SystemException {
__logTx.debug("Txm set rollback");
_tm.setRollbackOnly();
}
public void setTransactionTimeout(int i) throws SystemException {
if (__log.isDebugEnabled()) {
__logTx.debug("Txm set tiemout " + i);
}
_tm.setTransactionTimeout(i);
}
public Transaction suspend() throws SystemException {
__logTx.debug("Txm suspend");
return _tm.suspend();
}
}
private class DebugTx implements Transaction {
private Transaction _tx;
public DebugTx(Transaction tx) {
_tx = tx;
}
public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
__logTx.debug("Tx commit");
_tx.commit();
}
public boolean delistResource(XAResource xaResource, int i) throws IllegalStateException, SystemException {
return _tx.delistResource(xaResource, i);
}
public boolean enlistResource(XAResource xaResource) throws IllegalStateException, RollbackException, SystemException {
return _tx.enlistResource(xaResource);
}
public int getStatus() throws SystemException {
return _tx.getStatus();
}
public void registerSynchronization(Synchronization synchronization) throws IllegalStateException, RollbackException, SystemException {
if (__log.isDebugEnabled()) {
__logTx.debug("Synchronization registration on " + synchronization.getClass().getName());
}
_tx.registerSynchronization(synchronization);
}
public void rollback() throws IllegalStateException, SystemException {
__logTx.debug("Tx rollback");
_tx.rollback();
}
public void setRollbackOnly() throws IllegalStateException, SystemException {
__logTx.debug("Tx set rollback");
_tx.setRollbackOnly();
}
}
}