blob: 7ef3ecb5b2fe36d33d7665d728ba8a94e6358a42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.engine;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.iapi.BindingContext;
import org.apache.ode.bpel.iapi.BpelEngine;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.BpelEventListener;
import org.apache.ode.bpel.iapi.BpelServer;
import org.apache.ode.bpel.iapi.EndpointReferenceContext;
import org.apache.ode.bpel.iapi.MessageExchangeContext;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
import org.apache.ode.bpel.iapi.Scheduler.Synchronizer;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.utils.msg.MessageBundle;
import org.apache.ode.utils.stl.CollectionsX;
import org.apache.ode.utils.stl.MemberOfFunction;
import org.apache.ode.bpel.evar.ExternalVariableModule;
import org.apache.ode.bpel.engine.migration.MigrationHandler;
/**
* <p>
* The BPEL server implementation.
* </p>
*
* <p>
* This implementation is intended to be thread safe. The key concurrency
* mechanism is a "management" read/write lock that synchronizes all management
* operations (they require "write" access) and prevents concurrent management
* operations and processing (processing requires "read" access). Write access
* to the lock is scoped to the method, while read access is scoped to a
* transaction.
* </p>
*
* @author Maciej Szefler <mszefler at gmail dot com>
* @author Matthieu Riou <mriou at apache dot org>
*/
public class BpelServerImpl implements BpelServer, Scheduler.JobProcessor {
private static final Log __log = LogFactory.getLog(BpelServerImpl.class);
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
/** Maximum age of a process before it is quiesced */
private static Long __processMaxAge;
/**
* Set of processes that are registered with the server. Includes hydrated and dehydrated processes.
* Guarded by _mngmtLock.writeLock().
*/
private final Set<BpelProcess> _registeredProcesses = new HashSet<BpelProcess>();
private State _state = State.SHUTDOWN;
private Contexts _contexts = new Contexts();
private DehydrationPolicy _dehydrationPolicy;
private Properties _configProperties;
BpelEngineImpl _engine;
protected BpelDatabase _db;
/**
* Management lock for synchronizing management operations and preventing
* processing (transactions) from occuring while management operations are
* in progress.
*/
private ReadWriteLock _mngmtLock = new ReentrantReadWriteLock();
static {
// TODO Clean this up and factorize engine configuration
try {
String processMaxAge = System.getProperty("ode.process.maxage");
if (processMaxAge != null && processMaxAge.length() > 0) {
__processMaxAge = Long.valueOf(processMaxAge);
__log.info("Process definition max age adjusted. Max age = " + __processMaxAge + "ms.");
}
} catch (Throwable t) {
if (__log.isDebugEnabled()) {
__log.debug("Could not parse ode.process.maxage environment variable.", t);
} else {
__log.info("Could not parse ode.process.maxage environment variable; reaping disabled.");
}
}
}
private enum State {
SHUTDOWN, INIT, RUNNING
}
public BpelServerImpl() {
}
public void start() {
_mngmtLock.writeLock().lock();
try {
if (!checkState(State.INIT, State.RUNNING)) {
__log.debug("start() ignored -- already started");
return;
}
__log.debug("BPEL SERVER starting.");
// Eventually running some migrations before starting
new MigrationHandler(_contexts).migrate(_registeredProcesses);
_state = State.RUNNING;
__log.info(__msgs.msgServerStarted());
if (_dehydrationPolicy != null) new Thread(new ProcessDefReaper()).start();
} finally {
_mngmtLock.writeLock().unlock();
}
}
public void registerExternalVariableEngine(ExternalVariableModule eve) {
_contexts.externalVariableEngines.put(eve.getName(), eve);
}
/**
* Register a global listener to receive {@link BpelEvent}s froom all
* processes.
* @param listener
*/
public void registerBpelEventListener(BpelEventListener listener) {
// Do not synchronize, eventListeners is copy-on-write array.
listener.startup(_configProperties);
_contexts.eventListeners.add(listener);
}
/**
* Unregister a global listener from receive {@link BpelEvent}s from all
* processes.
* @param listener
*/
public void unregisterBpelEventListener(BpelEventListener listener) {
// Do not synchronize, eventListeners is copy-on-write array.
try {
listener.shutdown();
} catch (Exception e) {
__log.warn("Stopping BPEL event listener " + listener.getClass().getName() + " failed, nevertheless it has been unregistered.", e);
} finally {
_contexts.eventListeners.remove(listener);
}
}
private void unregisterBpelEventListeners() {
for (BpelEventListener l : _contexts.eventListeners) {
unregisterBpelEventListener(l);
}
}
public void stop() {
_mngmtLock.writeLock().lock();
try {
if (!checkState(State.RUNNING, State.INIT)) {
__log.debug("stop() ignored -- already stopped");
return;
}
__log.debug("BPEL SERVER STOPPING");
_contexts.scheduler.stop();
_engine = null;
_state = State.INIT;
__log.info(__msgs.msgServerStopped());
} finally {
_mngmtLock.writeLock().unlock();
}
}
public void init() throws BpelEngineException {
_mngmtLock.writeLock().lock();
try {
if (!checkState(State.SHUTDOWN, State.INIT))
return;
__log.debug("BPEL SERVER initializing ");
_db = new BpelDatabase(_contexts.dao, _contexts.scheduler);
_state = State.INIT;
_engine = createBpelEngineImpl(_contexts);
} finally {
_mngmtLock.writeLock().unlock();
}
}
// enable extensibility
protected BpelEngineImpl createBpelEngineImpl(Contexts contexts) {
return new BpelEngineImpl(contexts);
}
public void shutdown() throws BpelEngineException {
_mngmtLock.writeLock().lock();
try {
stop();
unregisterBpelEventListeners();
_db = null;
_engine = null;
_state = State.SHUTDOWN;
} finally {
_mngmtLock.writeLock().unlock();
}
}
public BpelEngine getEngine() {
boolean registered = false;
_mngmtLock.readLock().lock();
try {
_contexts.scheduler.registerSynchronizer(new Synchronizer() {
public void afterCompletion(boolean success) {
_mngmtLock.readLock().unlock();
}
public void beforeCompletion() {
}
});
registered = true;
} finally {
// If we failed to register the synchro,then there was an ex/throwable; we need to unlock now.
if (!registered)
_mngmtLock.readLock().unlock();
}
return _engine;
}
public void register(ProcessConf conf) {
if (conf == null)
throw new NullPointerException("must specify non-null process configuration.");
__log.debug("register: " + conf.getProcessId());
// Ok, IO out of the way, we will mod the server state, so need to get a
// lock.
try {
_mngmtLock.writeLock().lockInterruptibly();
} catch (InterruptedException ie) {
__log.debug("register(...) interrupted.", ie);
throw new BpelEngineException(__msgs.msgOperationInterrupted());
}
try {
// If the process is already active, do nothing.
if (_engine.isProcessRegistered(conf.getProcessId())) {
__log.debug("skipping doRegister" + conf.getProcessId() + ") -- process is already registered");
return;
}
__log.debug("Registering process " + conf.getProcessId() + " with server.");
BpelProcess process = createBpelProcess(conf);
_engine.registerProcess(process);
_registeredProcesses.add(process);
if (_dehydrationPolicy == null) process.hydrate();
__log.info(__msgs.msgProcessRegistered(conf.getProcessId()));
} finally {
_mngmtLock.writeLock().unlock();
}
}
// enable extensibility
protected BpelProcess createBpelProcess(ProcessConf conf) {
return new BpelProcess(conf);
}
public void unregister(QName pid) throws BpelEngineException {
if (__log.isTraceEnabled())
__log.trace("unregister: " + pid);
try {
_mngmtLock.writeLock().lockInterruptibly();
} catch (InterruptedException ie) {
__log.debug("unregister() interrupted.", ie);
throw new BpelEngineException(__msgs.msgOperationInterrupted());
}
try {
BpelProcess p = null;
if (_engine != null) {
p = _engine.unregisterProcess(pid);
if (p != null)
{
_registeredProcesses.remove(p);
__log.info(__msgs.msgProcessUnregistered(pid));
}
}
} catch (Exception ex) {
__log.error(__msgs.msgProcessUnregisterFailed(pid), ex);
throw new BpelEngineException(ex);
} finally {
_mngmtLock.writeLock().unlock();
}
}
public void cleanupProcess(QName pid) throws BpelEngineException {
deleteProcessDAO(pid);
}
/**
* Register a global message exchange interceptor.
* @param interceptor message-exchange interceptor
*/
public void registerMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
// NOTE: do not synchronize, globalInterceptors is copy-on-write.
_contexts.globalIntereceptors.add(interceptor);
}
/**
* Unregister a global message exchange interceptor.
* @param interceptor message-exchange interceptor
*/
public void unregisterMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
// NOTE: do not synchronize, globalInterceptors is copy-on-write.
_contexts.globalIntereceptors.remove(interceptor);
}
/**
* Check a state transition from state "i" to state "j".
*/
private boolean checkState(State i, State j) {
if (_state == i)
return true;
if (_state == j)
return false;
return false;
}
/* TODO: We need to have a method of cleaning up old deployment data. */
private boolean deleteProcessDAO(final QName pid) {
try {
// Delete it from the database.
return _db.exec(new BpelDatabase.Callable<Boolean>() {
public Boolean run(BpelDAOConnection conn) throws Exception {
ProcessDAO proc = conn.getProcess(pid);
if (proc != null) {
proc.delete();
return true;
}
return false;
}
});
} catch (Exception ex) {
String errmsg = "DbError";
__log.error(errmsg, ex);
throw new BpelEngineException(errmsg, ex);
}
}
public void onScheduledJob(JobInfo jobInfo) throws JobProcessorException {
getEngine().onScheduledJob(jobInfo);
}
private class ProcessDefReaper implements Runnable {
public void run() {
__log.debug("Starting process definition reaper thread.");
long pollingTime = 10000;
try {
while (true) {
Thread.sleep(pollingTime);
_mngmtLock.writeLock().lockInterruptibly();
try {
__log.debug("Kicking reaper, OProcess instances: " + OProcess.instanceCount);
// Copying the runnning process list to avoid synchronization
// problems and a potential mess if a policy modifies the list
List<BpelProcess> candidates = new ArrayList<BpelProcess>(_registeredProcesses);
CollectionsX.remove_if(candidates, new MemberOfFunction<BpelProcess>() {
public boolean isMember(BpelProcess o) {
return !o.hintIsHydrated();
}
});
// And the happy winners are...
List<BpelProcess> ripped = _dehydrationPolicy.markForDehydration(candidates);
// Bye bye
for (BpelProcess process : ripped) {
__log.debug("Dehydrating process " + process.getPID());
process.dehydrate();
}
} finally {
_mngmtLock.writeLock().unlock();
}
}
} catch (InterruptedException e) {
__log.info(e);
}
}
}
public void setDehydrationPolicy(DehydrationPolicy dehydrationPolicy) {
_dehydrationPolicy = dehydrationPolicy;
}
public void setConfigProperties(Properties configProperties) {
_configProperties = configProperties;
}
public void setMessageExchangeContext(MessageExchangeContext mexContext) throws BpelEngineException {
_contexts.mexContext = mexContext;
}
public void setScheduler(Scheduler scheduler) throws BpelEngineException {
_contexts.scheduler = scheduler;
}
public void setEndpointReferenceContext(EndpointReferenceContext eprContext) throws BpelEngineException {
_contexts.eprContext = eprContext;
}
/**
* Set the DAO connection factory. The DAO is used by the BPEL engine to
* persist information about active processes.
*
* @param daoCF
* {@link BpelDAOConnectionFactory} implementation.
*/
public void setDaoConnectionFactory(BpelDAOConnectionFactory daoCF) throws BpelEngineException {
_contexts.dao = daoCF;
}
public void setInMemDaoConnectionFactory(BpelDAOConnectionFactory daoCF) {
_contexts.inMemDao = daoCF;
}
public void setBindingContext(BindingContext bc) {
_contexts.bindingContext = bc;
}
}