blob: da4f86c925f70436ccfcc5f704127e9f182cd2d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.engine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.clapi.ClusterLock;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.iapi.BpelEngine;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.OdeGlobalConfig;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessState;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.iapi.Scheduler.JobType;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.intercept.ProcessCountThrottler;
import org.apache.ode.bpel.intercept.ProcessSizeThrottler;
import org.apache.ode.bpel.obj.OConstants;
import org.apache.ode.bpel.obj.OPartnerLink;
import org.apache.ode.bpel.obj.OProcess;
import org.apache.ode.bpel.runtime.InvalidProcessException;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.Namespaces;
import org.apache.ode.utils.msg.MessageBundle;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import javax.wsdl.Operation;
import javax.wsdl.PortType;
import javax.xml.namespace.QName;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
* Implementation of the {@link BpelEngine} interface: provides the server methods that should be invoked in the context of a
* transaction.
*
* @author mszefler
* @author Matthieu Riou <mriou at apache dot org>
*/
public class BpelEngineImpl implements BpelEngine {
private static final Logger __log = LoggerFactory.getLogger(BpelEngineImpl.class);
/** RNG, for delays */
private Random _random = new Random(System.currentTimeMillis());
private static double _delayMean = 0;
static {
try {
String delay = System.getenv("ODE_DEBUG_TX_DELAY");
if (delay != null && delay.length() > 0) {
_delayMean = Double.valueOf(delay);
__log.info("Stochastic debugging delay activated. Delay (Mean)=" + _delayMean + "ms.");
}
} catch (Throwable t) {
if (__log.isDebugEnabled()) {
__log.debug("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay", t);
} else {
__log.info("Could not read ODE_DEBUG_TX_DELAY environment variable; assuming 0 (mean) delay");
}
}
}
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
private static final double PROCESS_OVERHEAD_MEMORY_FACTOR = 1.2;
/** Active processes, keyed by process id. */
public final HashMap<QName, BpelProcess> _activeProcesses = new HashMap<QName, BpelProcess>();
/** Mapping from myrole service name to active process. */
private final HashMap<QName, List<BpelProcess>> _serviceMap = new HashMap<QName, List<BpelProcess>>();
/** Mapping from a potentially shared endpoint to its EPR */
private SharedEndpoints _sharedEps;
/** Manage instance-level locks. */
private final ClusterLock<Long> _instanceLockManager;
final Contexts _contexts;
private final Map<QName, Long> _hydratedSizes = new HashMap<QName, Long>();
private final Map<QName, Long> _unhydratedSizes = new HashMap<QName, Long>();
public BpelEngineImpl(Contexts contexts) {
_contexts = contexts;
if(_contexts.clusterManager != null) {
_instanceLockManager = _contexts.clusterManager.getInstanceLock();
}
else _instanceLockManager = new InstanceLockManager();
_sharedEps = new SharedEndpoints();
_sharedEps.init();
}
public SharedEndpoints getSharedEndpoints() {
return _sharedEps;
}
public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService,
String operation, String pipedMexId)
throws BpelEngineException {
List<BpelProcess> targets = route(targetService, null);
List<BpelProcess> activeTargets = new ArrayList<BpelProcess>();
for (BpelProcess target : targets) {
if (target.getConf().getState() == ProcessState.ACTIVE) {
activeTargets.add(target);
}
}
if (targets == null || targets.size() == 0)
throw new BpelEngineException("NoSuchService: " + targetService);
if (targets.size() == 1 || activeTargets.size() == 1) {
// If the number of targets is one, create and return a simple MEX
BpelProcess target;
if (activeTargets.size() == 1) {
target = activeTargets.get(0);
} else {
target = targets.get(0);
}
return createNewMyRoleMex(target, clientKey, targetService, operation, pipedMexId);
} else {
// If the number of targets is greater than one, create and return
// a brokered MEX that embeds the simple MEXs for each of the targets
BpelProcess template = activeTargets.get(0);
ArrayList<MyRoleMessageExchange> meps = new ArrayList<MyRoleMessageExchange>();
for (BpelProcess target : activeTargets) {
meps.add(createNewMyRoleMex(target, clientKey, targetService, operation, pipedMexId));
}
return createNewMyRoleMex(template, meps);
}
}
private MyRoleMessageExchange createNewMyRoleMex(BpelProcess target, String clientKey, QName targetService, String operation, String pipedMexId) {
MessageExchangeDAO dao;
if (target == null || target.isInMemory()) {
dao = _contexts.inMemDao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
} else {
dao = _contexts.dao.getConnection().createMessageExchange(MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE);
}
dao.setCorrelationId(clientKey);
dao.setCorrelationStatus(CorrelationStatus.UKNOWN_ENDPOINT.toString());
dao.setPattern(MessageExchangePattern.UNKNOWN.toString());
dao.setCallee(targetService);
dao.setStatus(Status.NEW.toString());
dao.setOperation(operation);
dao.setPipedMessageExchangeId(pipedMexId);
MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(target, this, dao);
if (target != null) {
target.initMyRoleMex(mex);
}
return mex;
}
/**
* Return a brokered MEX that delegates invocations to each of the embedded
* MEXs contained in the <code>meps</code> list, using the appropriate style.
*
* @param target
* @param meps
* @return
* @throws BpelEngineException
*/
private MyRoleMessageExchange createNewMyRoleMex(BpelProcess target, List<MyRoleMessageExchange> meps)
throws BpelEngineException {
MyRoleMessageExchangeImpl templateMex = (MyRoleMessageExchangeImpl) meps.get(0);
MessageExchangeDAO templateMexDao = templateMex.getDAO();
return new BrokeredMyRoleMessageExchangeImpl(target, this, meps, templateMexDao, templateMex);
}
public MyRoleMessageExchange createMessageExchange(String clientKey, QName targetService, String operation) {
return createMessageExchange(clientKey, targetService, operation, null);
}
private void setMessageExchangeProcess(String mexId, ProcessDAO processDao) {
MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId);
if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(mexId);
if (mexdao != null)
mexdao.setProcess(processDao);
}
public MessageExchange getMessageExchange(String mexId) {
MessageExchangeDAO mexdao = _contexts.inMemDao.getConnection().getMessageExchange(mexId);
if (mexdao == null) mexdao = _contexts.dao.getConnection().getMessageExchange(mexId);
if (mexdao == null)
return null;
ProcessDAO pdao = mexdao.getProcess();
BpelProcess process = pdao == null ? null : _activeProcesses.get(pdao.getProcessId());
MessageExchangeImpl mex;
switch (mexdao.getDirection()) {
case MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE:
if (process == null) {
String errmsg = __msgs.msgProcessNotActive(pdao.getProcessId());
__log.error(errmsg);
// TODO: Perhaps we should define a checked exception for this
// condition.
throw new BpelEngineException(errmsg);
}
{
OPartnerLink plink = (OPartnerLink) process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
PortType ptype = plink.getPartnerRolePortType();
Operation op = plink.getPartnerRoleOperation(mexdao.getOperation());
// TODO: recover Partner's EPR
mex = createPartnerRoleMessageExchangeImpl(mexdao, ptype, op, plink, process);
}
break;
case MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE:
mex = new MyRoleMessageExchangeImpl(process, this, mexdao);
if (process != null) {
Object child = process.getOProcess().getChild(mexdao.getPartnerLinkModelId());
if (child instanceof OPartnerLink) {
OPartnerLink plink = (OPartnerLink) child;
// the partner link might not be hydrated
if (plink != null) {
PortType ptype = plink.getMyRolePortType();
Operation op = plink.getMyRoleOperation(mexdao.getOperation());
mex.setPortOp(ptype, op);
}
}
}
break;
default:
String errmsg = "BpelEngineImpl: internal error, invalid MexDAO direction: " + mexId;
__log.error(errmsg);
throw new BpelEngineException(errmsg);
}
return mex;
}
// enable extensibility
protected PartnerRoleMessageExchangeImpl createPartnerRoleMessageExchangeImpl(
MessageExchangeDAO mexdao, PortType ptype, Operation op, OPartnerLink plink, BpelProcess process) {
return new PartnerRoleMessageExchangeImpl(this, mexdao, ptype, op, null, plink.hasMyRole() ? process
.getInitialMyRoleEPR(plink) : null, process.getPartnerRoleChannel(plink));
}
BpelProcess unregisterProcess(QName process) {
BpelProcess p = _activeProcesses.remove(process);
if (__log.isDebugEnabled()) {
__log.debug("Unregister process: serviceId=" + process + ", process=" + p);
}
if (p != null) {
if (__log.isDebugEnabled())
__log.debug("Deactivating process " + p.getPID());
boolean deactivate = true;
Iterator<Endpoint> endpointItr = p.getServiceNames().iterator();
ProcessState state = null;
while(endpointItr.hasNext()){
Endpoint endPoint = endpointItr.next();
List<BpelProcess> processList = _serviceMap.get(endPoint.serviceName);
Iterator<BpelProcess> processListItr = processList.iterator();
while(processListItr.hasNext()){
BpelProcess entryProcess = processListItr.next();
state = entryProcess.getConf().getState();
// Don't deactivate process services if there is another process in Active/Retired state and
// associated with the same service name
if( (ProcessState.ACTIVE.equals(state) || ProcessState.RETIRED.equals(state)) &&
!(entryProcess.getPID().equals(p.getPID()))) {
deactivate = false;
} else {
if(entryProcess.getPID().equals(process)) {
processListItr.remove();
}
}
}
}
if(deactivate){
// unregister the services provided by the process
p.deactivate();
}
// release the resources held by this process
p.dehydrate();
// update the process footprints list
_hydratedSizes.remove(p.getPID());
}
return p;
}
boolean isProcessRegistered(QName pid) {
return _activeProcesses.containsKey(pid);
}
public BpelProcess getProcess(QName pid) {
return _activeProcesses.get(pid);
}
/**
* Register a process with the engine.
* @param process the process to register
*/
void registerProcess(BpelProcess process) {
_activeProcesses.put(process.getPID(), process);
for (Endpoint e : process.getServiceNames()) {
if (__log.isDebugEnabled()) {
__log.debug("Register process: serviceId=" + e + ", process=" + process);
}
List<BpelProcess> processes = _serviceMap.get(e.serviceName);
if (processes == null) {
processes = new ArrayList<BpelProcess>();
_serviceMap.put(e.serviceName, processes);
}
// Remove any older version of the process from the list
Iterator<BpelProcess> processesIter = processes.iterator();
while (processesIter.hasNext()) {
BpelProcess cachedVersion = processesIter.next();
if (__log.isDebugEnabled()) {
__log.debug("cached version " + cachedVersion.getPID() + " vs registering version " + process.getPID());
}
if (cachedVersion.getProcessType().equals(process.getProcessType())) {
if (cachedVersion.getConf().getState() == ProcessState.ACTIVE
&& process.getConf().getState() == ProcessState.ACTIVE
&& !OdeGlobalConfig.autoRetireProcess()) {
throw new ContextException("Can't activate two processes of the same name: " + process.getConf().getPackage() + ", " + cachedVersion.getConf().getPackage() + ", name: " + process.getProcessType());
}
//Check for versions to retain newer one
if (cachedVersion.getVersion() > process.getVersion()) {
__log.debug("removing current version");
process.activate(this);
process.deactivate();
return;
} else {
__log.debug("removing cached older version");
processesIter.remove();
cachedVersion.deactivate();
}
}
}
processes.add(process);
}
process.activate(this);
}
/**
* Route to a process using the service id. Note, that we do not need the endpoint name here, we are assuming that two processes
* would not be registered under the same service qname but different endpoint.
*
* @param service
* target service id
* @param request
* request message
* @return process corresponding to the targetted service, or <code>null</code> if service identifier is not recognized.
*/
List<BpelProcess> route(QName service, Message request) {
// TODO: use the message to route to the correct service if more than
// one service is listening on the same endpoint.
List<BpelProcess> routed = _serviceMap.get(service);
if (__log.isDebugEnabled())
__log.debug("Routed: svcQname " + service + " --> " + routed);
return routed;
}
OProcess getOProcess(QName processId) {
BpelProcess process = _activeProcesses.get(processId);
if (process == null) return null;
return process.getOProcess();
}
private List<BpelProcess> getAllProcesses(QName processId) {
String qName = processId.toString();
if(qName.lastIndexOf("-") > 0) {
qName = qName.substring(0, qName.lastIndexOf("-"));
}
List<BpelProcess> ret = new ArrayList<BpelProcess>();
Iterator<Map.Entry<QName, BpelProcess>> it = _activeProcesses.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<QName, BpelProcess> pairs = it.next();
if(pairs.getKey().toString().startsWith(qName)) {
ret.add(pairs.getValue());
}
}
return ret;
}
public void acquireInstanceLock(final Long iid) {
// We lock the instance to prevent concurrent transactions and prevent unnecessary rollbacks,
// Note that we don't want to wait too long here to get our lock, since we are likely holding
// on to scheduler's locks of various sorts.
try {
_instanceLockManager.
lock(iid, 1, TimeUnit.MICROSECONDS);
_contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
_instanceLockManager.unlock(iid);
}
public void beforeCompletion() { }
});
} catch (InterruptedException e) {
// Retry later.
__log.debug("Thread interrupted, job will be rescheduled");
throw new Scheduler.JobProcessorException(true);
} catch (org.apache.ode.bpel.engine.InstanceLockManager.TimeoutException e) {
__log.debug("Instance " + iid + " is busy, rescheduling job.");
throw new Scheduler.JobProcessorException(true);
}
}
public void onScheduledJob(Scheduler.JobInfo jobInfo) throws Scheduler.JobProcessorException {
final JobDetails we = jobInfo.jobDetail;
if( __log.isTraceEnabled() ) __log.trace("[JOB] onScheduledJob " + jobInfo + "" + we.getInstanceId());
acquireInstanceLock(we.getInstanceId());
// DONT PUT CODE HERE-need this method real tight in a try/catch block, we need to handle
// all types of failure here, the scheduler is not going to know how to handle our errors,
// ALSO we have to release the lock obtained above (IMPORTANT), lest the whole system come
// to a grinding halt.
BpelProcess process = null;
try {
if (we.getProcessId() != null) {
process = _activeProcesses.get(we.getProcessId());
} else {
ProcessInstanceDAO instance;
if (we.getInMem()) instance = _contexts.inMemDao.getConnection().getInstance(we.getInstanceId());
else instance = _contexts.dao.getConnection().getInstance(we.getInstanceId());
if (instance == null) {
__log.debug(__msgs.msgScheduledJobReferencesUnknownInstance(we.getInstanceId()));
// nothing we can do, this instance is not in the database, it will always fail, not
// exactly an error since can occur in normal course of events.
return;
}
ProcessDAO processDao = instance.getProcess();
process = _activeProcesses.get(processDao.getProcessId());
}
if (process == null) {
// The process is not active, there's nothing we can do with this job
if (__log.isDebugEnabled()) {
__log.debug("Process " + we.getProcessId() + " can't be found, job abandoned.");
}
return;
}
ClassLoader cl = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(process._classLoader);
if (we.getType().equals(JobType.INVOKE_CHECK)) {
if (__log.isDebugEnabled()) __log.debug("handleJobDetails: InvokeCheck event for mexid " + we.getMexId());
sendPartnerRoleFailure(we, MessageExchange.FailureType.COMMUNICATION_ERROR);
return;
} else if (we.getType().equals(JobType.INVOKE_INTERNAL)) {
if (__log.isDebugEnabled()) __log.debug("handleJobDetails: InvokeInternal event for mexid " + we.getMexId());
setMessageExchangeProcess(we.getMexId(), process.getProcessDAO());
MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) getMessageExchange(we.getMexId());
if (!process.processInterceptors(mex, InterceptorInvoker.__onJobScheduled)) {
boolean isTwoWay = Boolean.valueOf(mex.getProperty("isTwoWay"));
if (isTwoWay) {
String causeCodeValue = mex.getProperty("causeCode");
mex.getDAO().setProcess(process.getProcessDAO());
sendMyRoleFault(process, we, causeCodeValue != null ?
Integer.valueOf(causeCodeValue) : InvalidProcessException.DEFAULT_CAUSE_CODE);
return;
} else {
throw new Scheduler.JobProcessorException(checkRetry(we));
}
}
}
if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) {
List<BpelProcess> processes = getAllProcesses(we.getProcessId());
boolean routed = false;
jobInfo.jobDetail.detailsExt.put("enqueue", false);
for(BpelProcess proc : processes) {
routed = routed || proc.handleJobDetails(jobInfo.jobDetail);
}
if(!routed && we.getType() == JobType.INVOKE_INTERNAL) {
jobInfo.jobDetail.detailsExt.put("enqueue", true);
process.handleJobDetails(jobInfo.jobDetail);
}
}
else {
process.handleJobDetails(jobInfo.jobDetail);
}
debuggingDelay();
} finally {
Thread.currentThread().setContextClassLoader(cl);
}
} catch (Scheduler.JobProcessorException e) {
throw e;
} catch (BpelEngineException bee) {
__log.error(__msgs.msgScheduledJobFailed(we), bee);
throw new Scheduler.JobProcessorException(bee, checkRetry(we));
} catch (ContextException ce) {
__log.error(__msgs.msgScheduledJobFailed(we), ce);
throw new Scheduler.JobProcessorException(ce, checkRetry(we));
} catch (InvalidProcessException ipe) {
__log.error(__msgs.msgScheduledJobFailed(we), ipe);
sendMyRoleFault(process, we, ipe.getCauseCode());
} catch (RuntimeException rte) {
__log.error(__msgs.msgScheduledJobFailed(we), rte);
throw new Scheduler.JobProcessorException(rte, checkRetry(we));
} catch (Throwable t) {
__log.error(__msgs.msgScheduledJobFailed(we), t);
throw new Scheduler.JobProcessorException(t, checkRetry(we));
}
}
private boolean checkRetry(JobDetails we) {
// Only retry if the job is NOT in memory. Not that this does not guaranty that a retry will be scheduled.
// Actually events are not retried if not persisted and the scheduler might choose to discard the event if it has been retried too many times.
return !we.getInMem();
}
/**
* Block the thread for random amount of time. Used for testing for races and the like. The delay generated is exponentially
* distributed with the mean obtained from the <code>ODE_DEBUG_TX_DELAY</code> environment variable.
*/
private void debuggingDelay() {
// Do a delay for debugging purposes.
if (_delayMean != 0)
try {
long delay = randomExp(_delayMean);
// distribution
// with mean
// _delayMean
__log.warn("Debugging delay has been activated; delaying transaction for " + delay + "ms.");
Thread.sleep(delay);
} catch (InterruptedException e) {
; // ignore
}
}
private long randomExp(double mean) {
double u = _random.nextDouble(); // Uniform
long delay = (long) (-Math.log(u) * mean); // Exponential
return delay;
}
void fireEvent(BpelEvent event) {
// Note that the eventListeners list is a copy-on-write array, so need
// to mess with synchronization.
for (org.apache.ode.bpel.iapi.BpelEventListener l : _contexts.eventListeners) {
l.onEvent(event);
}
}
/**
* Get the list of globally-registered message-exchange interceptors.
*
* @return list
*/
List<MessageExchangeInterceptor> getGlobalInterceptors() {
return _contexts.globalInterceptors;
}
public void registerMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
_contexts.globalInterceptors.add(interceptor);
}
public void unregisterMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
_contexts.globalInterceptors.remove(interceptor);
}
public void unregisterMessageExchangeInterceptor(Class interceptorClass) {
MessageExchangeInterceptor candidate = null;
for (MessageExchangeInterceptor interceptor : _contexts.globalInterceptors) {
if (interceptor.getClass().isAssignableFrom(interceptorClass)) {
candidate = interceptor;
break;
}
}
if (candidate != null) {
_contexts.globalInterceptors.remove(candidate);
}
}
public long getTotalBpelFootprint() {
long bpelFootprint = 0;
for (BpelProcess process : _activeProcesses.values()) {
Long size = _hydratedSizes.get(process.getPID());
if (size == null) {
size = _unhydratedSizes.get(process.getPID());
}
if (size != null && size.longValue() > 0) {
bpelFootprint += size;
}
}
return bpelFootprint;
}
public long getHydratedFootprint() {
long hydratedFootprint = 0;
for (BpelProcess process : _activeProcesses.values()) {
if (!process.hintIsHydrated()) {
continue;
}
Long size = _hydratedSizes.get(process.getPID());
if (size == null) {
size = _unhydratedSizes.get(process.getPID());
}
if (size != null && size.longValue() > 0) {
hydratedFootprint += size;
}
}
return hydratedFootprint;
}
public long getHydratedProcessSize(QName processName) {
return getHydratedProcessSize(_activeProcesses.get(processName));
}
private long getHydratedProcessSize(BpelProcess process) {
long potentialGrowth = 0;
if (!process.hintIsHydrated()) {
Long mySize = _hydratedSizes.get(process.getPID());
if (mySize == null) {
mySize = _unhydratedSizes.get(process.getPID());
}
if (mySize != null && mySize.longValue() > 0) {
potentialGrowth = mySize.longValue();
}
}
return getHydratedProcessSize(potentialGrowth);
}
private long getHydratedProcessSize(long potentialGrowth) {
long processMemory = (long)
((getHydratedFootprint() + potentialGrowth) *
PROCESS_OVERHEAD_MEMORY_FACTOR);
return processMemory;
}
public int getHydratedProcessCount(QName processName) {
int processCount = 0;
for (BpelProcess process : _activeProcesses.values()) {
if (process.hintIsHydrated() || process.getPID().equals(processName)) {
processCount++;
}
}
return processCount;
}
private long _processThrottledMaximumSize = Long.MAX_VALUE;
private int _processThrottledMaximumCount = Integer.MAX_VALUE;
private int _instanceThrottledMaximumCount = Integer.MAX_VALUE;
private boolean _hydrationThrottled = false;
public void setInstanceThrottledMaximumCount(
int instanceThrottledMaximumCount) {
this._instanceThrottledMaximumCount = instanceThrottledMaximumCount;
}
public int getInstanceThrottledMaximumCount() {
return _instanceThrottledMaximumCount;
}
public void setProcessThrottledMaximumCount(
int hydrationThrottledMaximumCount) {
this._processThrottledMaximumCount = hydrationThrottledMaximumCount;
if (hydrationThrottledMaximumCount < Integer.MAX_VALUE) {
registerMessageExchangeInterceptor(new ProcessCountThrottler());
} else {
unregisterMessageExchangeInterceptor(ProcessCountThrottler.class);
}
}
public int getProcessThrottledMaximumCount() {
return _processThrottledMaximumCount;
}
public void setProcessThrottledMaximumSize(
long hydrationThrottledMaximumSize) {
this._processThrottledMaximumSize = hydrationThrottledMaximumSize;
if (hydrationThrottledMaximumSize < Long.MAX_VALUE) {
registerMessageExchangeInterceptor(new ProcessSizeThrottler());
} else {
unregisterMessageExchangeInterceptor(ProcessSizeThrottler.class);
}
}
public long getProcessThrottledMaximumSize() {
return _processThrottledMaximumSize;
}
public void setProcessSize(QName processId, boolean hydratedOnce) {
BpelProcess process = _activeProcesses.get(processId);
long processSize = process.sizeOf();
if (hydratedOnce) {
_hydratedSizes.put(process.getPID(), Long.valueOf(processSize));
_unhydratedSizes.remove(process.getPID());
} else {
_hydratedSizes.remove(process.getPID());
_unhydratedSizes.put(process.getPID(), Long.valueOf(processSize));
}
}
/**
* Returns true if the last used process was dehydrated because it was not in-use.
*/
public boolean dehydrateLastUnusedProcess() {
BpelProcess lastUnusedProcess = null;
long lastUsedMinimum = Long.MAX_VALUE;
for (BpelProcess process : _activeProcesses.values()) {
if (process.hintIsHydrated()
&& process.getLastUsed() < lastUsedMinimum
&& process.getInstanceInUseCount() == 0) {
lastUsedMinimum = process.getLastUsed();
lastUnusedProcess = process;
}
}
if (lastUnusedProcess != null) {
lastUnusedProcess.dehydrate();
return true;
}
return false;
}
public void sendMyRoleFault(BpelProcess process, JobDetails we, int causeCode) {
MessageExchange mex = (MessageExchange) getMessageExchange(we.getMexId());
if (!(mex instanceof MyRoleMessageExchange)) {
return;
}
QName faultQName = null;
OConstants constants = process.getOProcess().getConstants();
if (constants != null) {
Document document = DOMUtils.newDocument();
Element faultElement = document.createElementNS(Namespaces.SOAP_ENV_NS, "Fault");
Element faultDetail = document.createElementNS(Namespaces.ODE_EXTENSION_NS, "fault");
faultElement.appendChild(faultDetail);
switch (causeCode) {
case InvalidProcessException.TOO_MANY_PROCESSES_CAUSE_CODE:
faultQName = constants.getQnTooManyProcesses();
faultDetail.setTextContent("The total number of processes in use is over the limit.");
break;
case InvalidProcessException.TOO_HUGE_PROCESSES_CAUSE_CODE:
faultQName = constants.getQnTooHugeProcesses();
faultDetail.setTextContent("The total size of processes in use is over the limit");
break;
case InvalidProcessException.TOO_MANY_INSTANCES_CAUSE_CODE:
faultQName = constants.getQnTooManyInstances();
faultDetail.setTextContent("No more instances of the process allowed at start at this time.");
break;
case InvalidProcessException.RETIRED_CAUSE_CODE:
// we're invoking a target process, trying to see if we can retarget the message
// to the current version (only applies when it's a new process creation)
for (BpelProcess activeProcess : _activeProcesses.values()) {
if (activeProcess.getConf().getState().equals(org.apache.ode.bpel.iapi.ProcessState.ACTIVE)
&& activeProcess.getConf().getType().equals(process.getConf().getType())) {
we.setProcessId(activeProcess._pid);
((MyRoleMessageExchangeImpl) mex)._process = activeProcess;
process.handleJobDetails(we);
return;
}
}
faultQName = constants.getQnRetiredProcess();
faultDetail.setTextContent("The process you're trying to instantiate has been retired.");
break;
case InvalidProcessException.DEFAULT_CAUSE_CODE:
default:
faultQName = constants.getQnUnknownFault();
break;
}
MexDaoUtil.setFaulted((MessageExchangeImpl) mex, faultQName, faultElement);
}
}
private void sendPartnerRoleFailure(JobDetails we, FailureType failureType) {
MessageExchange mex = (MessageExchange) getMessageExchange(we.getMexId());
if (mex instanceof PartnerRoleMessageExchange) {
if (mex.getStatus() == MessageExchange.Status.ASYNC || mex.getStatus() == MessageExchange.Status.REQUEST) {
String msg = "No response received for invoke (mexId=" + we.getMexId() + "), forcing it into a failed state.";
if (__log.isDebugEnabled()) __log.debug(msg);
MexDaoUtil.setFailure((PartnerRoleMessageExchangeImpl) mex, failureType, msg, null);
}
}
}
public BpelProcess getNewestProcessByType(QName processType) {
long v = -1;
BpelProcess q = null;
for (BpelProcess p : _activeProcesses.values()) {
if (p.getProcessType().equals(processType) && v < p.getVersion()) {
v = p.getVersion();
q = p;
}
}
return q;
}
}