blob: f083ec64733bb150464eed4488d41e2f135604ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.engine;
import java.io.File;
import java.io.FileOutputStream;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import javax.wsdl.Operation;
import javax.wsdl.PortType;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.iapi.BpelEngine;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.o.OPartnerLink;
import org.apache.ode.bpel.o.OProcess;
import org.apache.ode.utils.msg.MessageBundle;
/**
* Implementation of the {@link BpelEngine} interface: provides the server methods that should be invoked in the context of a
* transaction.
*
* @author mszefler
* @author Matthieu Riou <mriou at apache dot org>
*/
public class BpelEngineImpl implements BpelEngine {
private static final Log __log = LogFactory.getLog(BpelEngineImpl.class);
/** RNG, for delays */
private Random _random = new Random(System.currentTimeMillis());
private static double _delayMean = 0;
static {
try {
String delay = System.getenv("ODE_DEBUG_TX_DELAY");
if (delay != null && delay.length() > 0) {
_delayMean = Double.valueOf(delay);
__log.info("Stochastic debugging delay activated. Delay (Mean)=" + _delayMean + "ms.");
}
} catch (Throwable t) {
if (__log.isDebugEnabled()) {
__log.debug("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay", t);
} else {
__log.info("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay");
}
}
}
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
/** Maximum number of tries for async jobs. */
private static final int MAX_RETRIES = 3;
/** Active processes, keyed by process id. */
final HashMap<QName, BpelProcess> _activeProcesses = new HashMap<QName, BpelProcess>();
/** Mapping from myrole endpoint name to active process. */
private final HashMap<Endpoint, List<BpelProcess>> _serviceMap = new HashMap<Endpoint, List<BpelProcess>>();
/** Mapping from a potentially shared endpoint to its EPR */
private SharedEndpoints _sharedEps;
/** Manage instance-level locks. */
private final InstanceLockManager _instanceLockManager = new InstanceLockManager();
final Contexts _contexts;
public BpelEngineImpl(Contexts contexts) {
_contexts = contexts;
_sharedEps = new SharedEndpoints();
_sharedEps.init();
}
public SharedEndpoints getSharedEndpoints() {
return _sharedEps;
}
public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService,
String operation, String pipedMexId)
throws BpelEngineException {
List<BpelProcess> targets = route(targetService, null);
if (targets == null || targets.size() == 0)
throw new BpelEngineException("NoSuchService: " + targetService);
if (targets.size() == 1) {
// If the number of targets is one, create and return a simple MEX
BpelProcess target = targets.get(0);
return createNewMyRoleMex(target, clientKey, targetService, operation, pipedMexId);
} else {
// If the number of targets is greater than one, create and return
// a brokered MEX that embeds the simple MEXs for each of the targets
BpelProcess template = targets.get(0);
ArrayList<MyRoleMessageExchange> meps = new ArrayList<MyRoleMessageExchange>();
for (BpelProcess target : targets) {
meps.add(createNewMyRoleMex(target, clientKey, targetService, operation, pipedMexId));
}
return createNewMyRoleMex(template, meps);
}
}
private MyRoleMessageExchange createNewMyRoleMex(BpelProcess target, String clientKey, QName targetService, String operation, String pipedMexId) {
MessageExchangeDAO dao;
if (target == null || target.isInMemory()) {
dao = _contexts.inMemDao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
} else {
dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
}
dao.setCorrelationId(clientKey);
dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
dao.setPattern(MessageExchangePattern.UNKNOWN.toString());
dao.setCallee(targetService);
dao.setStatus(Status.NEW.toString());
dao.setOperation(operation);
dao.setPipedMessageExchangeId(pipedMexId);
MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(target, this, dao);
if (target != null) {
target.initMyRoleMex(mex);
}
return mex;
}
/**
* Return a brokered MEX that delegates invocations to each of the embedded
* MEXs contained in the <code>meps</code> list, using the appropriate style.
*
* @param target
* @param meps
* @return
* @throws BpelEngineException
*/
private MyRoleMessageExchange createNewMyRoleMex(BpelProcess target, List<MyRoleMessageExchange> meps)
throws BpelEngineException {
MyRoleMessageExchangeImpl templateMex = (MyRoleMessageExchangeImpl) meps.get(0);
MessageExchangeDAO templateMexDao = templateMex.getDAO();
return new BrokeredMyRoleMessageExchangeImpl(target, this, meps, templateMexDao, templateMex);
}
public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService, String operation) {
return createMessageExchange(clientKey, targetService, operation, null);
}
public MessageExchange getMessageExchange(String mexId) throws BpelEngineException {
MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId);
if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(mexId);
if (mexdao == null)
return null;
ProcessDAO pdao = mexdao.getProcess();
BpelProcess process = pdao == null ? null : _activeProcesses.get(pdao.getProcessId());
MessageExchangeImpl mex;
switch (mexdao.getDirection()) {
case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
if (process == null) {
String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
__log.error(errmsg);
// TODO: Perhaps we should define a checked exception for this
// condition.
throw new BpelEngineException(errmsg);
}
{
OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
PortType ptype = plink.partnerRolePortType;
Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
// TODO: recover Partner's EPR
mex = createPartnerRoleMessageExchangeImpl(mexdao, ptype, op, plink, process);
}
break;
case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
mex = new MyRoleMessageExchangeImpl(process, this, mexdao);
if (process != null) {
OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
PortType ptype = plink.myRolePortType;
Operation op = plink.getMyRoleOperation(mexdao.getOperation());
mex.setPortOp(ptype, op);
}
break;
default:
String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
__log.fatal(errmsg);
throw new BpelEngineException(errmsg);
}
return mex;
}
// enable extensibility
protected PartnerRoleMessageExchangeImpl createPartnerRoleMessageExchangeImpl(
MessageExchangeDAO mexdao, PortType ptype, Operation op, OPartnerLink plink, BpelProcess process) {
return new PartnerRoleMessageExchangeImpl(this, mexdao, ptype, op, null, plink.hasMyRole() ? process
.getInitialMyRoleEPR(plink) : null, process.getPartnerRoleChannel(plink));
}
BpelProcess unregisterProcess(QName process) {
BpelProcess p = _activeProcesses.remove(process);
if (p != null) {
if (__log.isDebugEnabled())
__log.debug("Deactivating process " + p.getPID());
Iterator<Map.Entry<Endpoint,List<BpelProcess>>> serviceIter = _serviceMap.entrySet().iterator();
while (serviceIter.hasNext()) {
Map.Entry<Endpoint,List<BpelProcess>> processEntry = serviceIter.next();
Iterator<BpelProcess> entryProcesses = processEntry.getValue().iterator();
while (entryProcesses.hasNext()) {
BpelProcess entryProcess = entryProcesses.next();
if (entryProcess.getPID().equals(process)) {
entryProcesses.remove();
}
}
}
p.deactivate();
}
return p;
}
boolean isProcessRegistered(QName pid) {
return _activeProcesses.containsKey(pid);
}
/**
* Register a process with the engine.
* @param process the process to register
*/
void registerProcess(BpelProcess process) {
_activeProcesses.put(process.getPID(), process);
for (Endpoint e : process.getServiceNames()) {
__log.debug("Register process: serviceId=" + e + ", process=" + process);
List<BpelProcess> processes = _serviceMap.get(e);
if (processes == null) {
processes = new ArrayList<BpelProcess>();
_serviceMap.put(e, processes);
}
// Remove any older version of the process from the list
Iterator<BpelProcess> processesIter = processes.iterator();
while (processesIter.hasNext()) {
BpelProcess cachedVersion = processesIter.next();
__log.debug("cached version " + cachedVersion.getPID() + " vs registering version " + process.getPID());
if (cachedVersion.getProcessType().equals(process.getProcessType())) {
processesIter.remove();
cachedVersion.deactivate();
}
}
processes.add(process);
}
process.activate(this);
}
/**
* Route to a process using the service id. Note, that we do not need the endpoint name here, we are assuming that two processes
* would not be registered under the same service qname but different endpoint.
*
* @param service
* target service id
* @param request
* request message
* @return process corresponding to the targetted service, or <code>null</code> if service identifier is not recognized.
*/
List<BpelProcess> route(QName service, Message request) {
// TODO: use the message to route to the correct service if more than
// one service is listening on the same endpoint.
List<BpelProcess> routed = null;
for (Endpoint endpoint : _serviceMap.keySet()) {
if (endpoint.serviceName.equals(service))
routed = _serviceMap.get(endpoint);
}
if (__log.isDebugEnabled())
__log.debug("Routed: svcQname " + service + " --> " + routed);
return routed;
}
OProcess getOProcess(QName processId) {
BpelProcess process = _activeProcesses.get(processId);
if (process == null) return null;
return process.getOProcess();
}
public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
final WorkEvent we = new WorkEvent(jobInfo.jobDetail);
// We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks,
// Note that we don't want to wait too long here to get our lock, since we are likely holding
// on to scheduler's locks of various sorts.
try {
_instanceLockManager.lock(we.getIID(), 1, TimeUnit.MICROSECONDS);
_contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
_instanceLockManager.unlock(we.getIID());
}
public void beforeCompletion() { }
});
} catch (InterruptedException e) {
// Retry later.
__log.debug("Thread interrupted, job will be rescheduled: " + jobInfo);
throw new Scheduler.JobProcessorException(true);
} catch (org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {
__log.debug("Instance " + we.getIID() + " is busy, rescheduling job.");
// TODO: This should really be more of something like the exponential backoff algorithm in ethernet.
_contexts.scheduler.schedulePersistedJob(jobInfo.jobDetail, new Date(System.currentTimeMillis()
+ Math.min(randomExp(1000), 10000)));
return;
}
// DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle
// all types of failure here, the scheduler is not going to know how to handle our errors,
// ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come
// to a grinding halt.
try {
BpelProcess process;
if (we.getProcessId() != null) {
process = _activeProcesses.get(we.getProcessId());
} else {
ProcessInstanceDAO instance;
if (we.isInMem()) instance = _contexts.inMemDao.getConnection().getInstance(we.getIID());
else instance = _contexts.dao.getConnection().getInstance(we.getIID());
if (instance == null) {
__log.debug(__msgs.msgScheduledJobReferencesUnknownInstance(we.getIID()));
// nothing we can do, this instance is not in the database, it will always fail, not
// exactly an error since can occur in normal course of events.
return;
}
ProcessDAO processDao = instance.getProcess();
process = _activeProcesses.get(processDao.getProcessId());
}
if (process == null) {
// The process is not active, there's nothing we can do with this job
__log.debug("Process " + we.getProcessId() + " can't be found, job abandoned.");
return;
}
if (we.getType().equals(WorkEvent.Type.INVOKE_CHECK)) {
if (__log.isDebugEnabled()) __log.debug("handleWorkEvent: InvokeCheck event for mexid " + we.getMexId());
PartnerRoleMessageExchange mex = (PartnerRoleMessageExchange) getMessageExchange(we.getMexId());
if (mex.getStatus() == MessageExchange.Status.ASYNC || mex.getStatus() == MessageExchange.Status.REQUEST) {
String msg = "Dangling invocation (mexId=" + we.getMexId() + "), forcing it into a failed state.";
if (__log.isDebugEnabled()) __log.debug(msg);
mex.replyWithFailure(MessageExchange.FailureType.COMMUNICATION_ERROR, msg, null);
}
return;
}
process.handleWorkEvent(jobInfo.jobDetail);
debuggingDelay();
} catch (BpelEngineException bee) {
__log.error(__msgs.msgScheduledJobFailed(we.getDetail()), bee);
throw new Scheduler.JobProcessorException(bee, checkRetry(jobInfo, bee));
} catch (ContextException ce) {
__log.error(__msgs.msgScheduledJobFailed(we.getDetail()), ce);
throw new Scheduler.JobProcessorException(ce, checkRetry(jobInfo, ce));
} catch (RuntimeException rte) {
__log.error(__msgs.msgScheduledJobFailed(we.getDetail()), rte);
throw new Scheduler.JobProcessorException(rte, checkRetry(jobInfo, rte));
} catch (Throwable t) {
__log.error(__msgs.msgScheduledJobFailed(we.getDetail()), t);
throw new Scheduler.JobProcessorException(false);
}
}
private boolean checkRetry(final JobInfo jobInfo, Throwable t) {
__log.error("Job could not be completed after " + jobInfo.retryCount + " retries: " + jobInfo, t);
return jobInfo.jobDetail.get("inmem") == null;
}
/**
* Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially
* distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable.
*/
private void debuggingDelay() {
// Do a delay for debugging purposes.
if (_delayMean != 0)
try {
long delay = randomExp(_delayMean);
// distribution
// with mean
// _delayMean
__log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms.");
Thread.sleep(delay);
} catch (InterruptedException e) {
; // ignore
}
}
private long randomExp(double mean) {
double u = _random.nextDouble(); // Uniform
long delay = (long) (-Math.log(u) * mean); // Exponential
return delay;
}
void fireEvent(BpelEvent event) {
// Note that the eventListeners list is a copy-on-write array, so need
// to mess with synchronization.
for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) {
l.onEvent(event);
}
}
/**
* Get the list of globally-registered message-exchange interceptors.
*
* @return list
*/
List<MessageExchangeInterceptor> getGlobalInterceptors() {
return _contexts.globalIntereceptors;
}
}