blob: 5858d93182e602b6a56aa188eb70b356788e3eeb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.engine;
import java.io.File;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.wsdl.Fault;
import javax.xml.namespace.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.agents.memory.SizingAgent;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.common.ProcessState;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.engine.extvar.ExternalVariableConf;
import org.apache.ode.bpel.engine.extvar.ExternalVariableManager;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
import org.apache.ode.bpel.explang.ConfigurationException;
import org.apache.ode.bpel.explang.EvaluationException;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.PartnerRoleChannel;
import org.apache.ode.bpel.iapi.PartnerRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.iapi.Scheduler.JobType;
import org.apache.ode.bpel.intercept.InstanceCountThrottler;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.obj.OElementVarType;
import org.apache.ode.bpel.obj.OExpressionLanguage;
import org.apache.ode.bpel.obj.OMessageVarType;
import org.apache.ode.bpel.obj.OPartnerLink;
import org.apache.ode.bpel.obj.OProcess;
import org.apache.ode.bpel.obj.serde.DeSerializer;
import org.apache.ode.bpel.runtime.BpelRuntimeContext;
import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
import org.apache.ode.bpel.runtime.InvalidProcessException;
import org.apache.ode.bpel.runtime.PROCESS;
import org.apache.ode.bpel.runtime.PropertyAliasEvaluationContext;
import org.apache.ode.bpel.runtime.channels.FaultData;
import org.apache.ode.jacob.soup.ReplacementMap;
import org.apache.ode.utils.ObjectPrinter;
import org.apache.ode.utils.Properties;
import org.apache.ode.utils.msg.MessageBundle;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
/**
* Entry point into the runtime of a BPEL process.
*
* @author mszefler
* @author Matthieu Riou <mriou at apache dot org>
*/
public class BpelProcess {
static final Logger __log = LoggerFactory.getLogger(BpelProcess.class);
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
private volatile Map<OPartnerLink, PartnerLinkPartnerRoleImpl> _partnerRoles;
private volatile Map<OPartnerLink, PartnerLinkMyRoleImpl> _myRoles;
/** Mapping from a myrole to a {"Service Name" (QNAME) / port}. It's actually more a tuple than a map as
* it's important to note that the same process with the same endpoint can have 2 different myroles. */
private volatile Map<PartnerLinkMyRoleImpl, Endpoint> _endpointToMyRoleMap;
/** Mapping from a potentially shared endpoint to its EPR */
private SharedEndpoints _sharedEps;
// Backup hashmaps to keep initial endpoints handy after dehydration
private Map<Endpoint, EndpointReference> _myEprs = new HashMap<Endpoint, EndpointReference>();
private Map<Endpoint, EndpointReference> _partnerEprs = new HashMap<Endpoint, EndpointReference>();
private Map<Endpoint, PartnerRoleChannel> _partnerChannels = new HashMap<Endpoint, PartnerRoleChannel>();
final QName _pid;
private volatile OProcess _oprocess;
// Has the process already been hydrated before?
private boolean _hydratedOnce = false;
/** Last time the process was used. */
private volatile long _lastUsed;
BpelEngineImpl _engine;
ClassLoader _classLoader = getClass().getClassLoader();
DebuggerSupport _debugger;
ExpressionLanguageRuntimeRegistry _expLangRuntimeRegistry;
private ReplacementMap _replacementMap;
final ProcessConf _pconf;
/** {@link MessageExchangeInterceptor}s registered for this process. */
private final List<MessageExchangeInterceptor> _mexInterceptors = new ArrayList<MessageExchangeInterceptor>();
/** Latch-like thing to control hydration/dehydration. */
private HydrationLatch _hydrationLatch;
/** Deploy-time configuraton for external variables. */
private ExternalVariableConf _extVarConf;
private ExternalVariableManager _evm;
public static final QName PROP_PATH = new QName("PATH");
public static final QName PROP_SVG = new QName("SVG");
public static final QName PROP_LAZY_HYDRATE = new QName("process.hydration.lazy");
public static final QName PROP_MAX_INSTANCES = new QName("process.instance.throttled.maximum.count");
// The ratio of in-memory vs serialized size of compiled bpel object.
private static final int PROCESS_MEMORY_TO_SERIALIZED_SIZE_RATIO = 5;
public BpelProcess(ProcessConf conf) {
_pid = conf.getProcessId();
_pconf = conf;
_hydrationLatch = new HydrationLatch();
}
/**
* Retrives the base URI to use for local resource resolution.
*
* @return URI - instance representing the absolute file path to the physical location of the process definition folder.
*/
public URI getBaseResourceURI() {
return this._pconf.getBaseURI();
}
/**
* Intiialize the external variable configuration/engine manager. This is called from hydration logic, so it
* is possible to change the external variable configuration at runtime.
*
*/
void initExternalVariables() {
List<Element> conf = _pconf.getExtensionElement(ExternalVariableConf.EXTVARCONF_ELEMENT);
_extVarConf = new ExternalVariableConf(conf);
_evm = new ExternalVariableManager(_pid, _extVarConf, _engine._contexts.externalVariableEngines, _oprocess);
}
public String toString() {
return "BpelProcess[" + _pid + "]";
}
public ExternalVariableManager getEVM() {
return _evm;
}
public void recoverActivity(ProcessInstanceDAO instanceDAO, String channel, long activityId, String action, FaultData fault) {
if (__log.isDebugEnabled())
__log.debug("Recovering activity in process " + instanceDAO.getInstanceId() + " with action " + action);
markused();
BpelRuntimeContextImpl processInstance = createRuntimeContext(instanceDAO, null, null);
processInstance.recoverActivity(channel, activityId, action, fault);
}
protected DebuggerSupport createDebuggerSupport() {
return new DebuggerSupport(this);
}
protected DebuggerSupport getDebuggerSupport() {
return _debugger;
}
static String generateMessageExchangeIdentifier(String partnerlinkName, String operationName) {
StringBuffer sb = new StringBuffer(partnerlinkName);
sb.append('.');
sb.append(operationName);
return sb.toString();
}
public interface InvokeHandler {
boolean invoke(PartnerLinkMyRoleImpl target, PartnerLinkMyRoleImpl.RoutingInfo routingInfo, boolean createInstance);
}
public boolean invokeProcess(MyRoleMessageExchangeImpl mex, InvokeHandler invokeHandler, boolean enqueue) {
boolean routed = false;
try {
_hydrationLatch.latch(1);
List<PartnerLinkMyRoleImpl> targets = getMyRolesForService(mex.getServiceName());
if (targets.isEmpty()) {
// String errmsg = __msgs.msgMyRoleRoutingFailure(mex.getMessageExchangeId());
// __log.error(errmsg);
// mex.setFailure(MessageExchange.FailureType.UNKNOWN_ENDPOINT, errmsg, null);
return false;
}
mex.getDAO().setProcess(getProcessDAO());
if (!processInterceptors(mex, InterceptorInvoker.__onProcessInvoked)) {
if (__log.isDebugEnabled()) {
__log.debug("Aborting processing of mex " + mex + " due to interceptors.");
}
return false;
}
markused();
// Ideally, if Java supported closure, the routing code would return null or the appropriate
// closure to handle the route.
List<PartnerLinkMyRoleImpl.RoutingInfo> routings = null;
for (PartnerLinkMyRoleImpl target : targets) {
routings = target.findRoute(mex);
boolean createInstance = target.isCreateInstance(mex);
if (mex.getStatus() != MessageExchange.Status.FAILURE && routings!=null) {
for (PartnerLinkMyRoleImpl.RoutingInfo routing : routings) {
routed = routed || invokeHandler.invoke(target, routing, createInstance);
}
}
if (routed) {
break;
}
}
// Nothing found, saving for later
if (!routed && enqueue) {
// TODO this is kind of hackish when no match and more than one myrole is selected.
// we save the routing on the last myrole
// actually the message queue should be attached to the instance instead of the correlator
targets.get(targets.size()-1).noRoutingMatch(mex, routings);
} else {
// Now we have to update our message exchange status. If the <reply> was not hit during the
// invocation, then we will be in the "REQUEST" phase which means that either this was a one-way
// or a two-way that needs to delivery the reply asynchronously.
if (mex.getStatus() == MessageExchange.Status.REQUEST) {
mex.setStatus(MessageExchange.Status.ASYNC);
}
markused();
}
} finally {
_hydrationLatch.release(1);
}
// For a one way, once the engine is done, the mex can be safely released.
// Sean: not really, if route is not found, we cannot delete the mex yet
// Rafal: We don't do cleanup here
// if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY) && routed && getCleanupCategories(false).contains(CLEANUP_CATEGORY.MESSAGES)) {
// mex.release();
// }
return routed;
}
private boolean isActive() {
return _pconf.getState() == org.apache.ode.bpel.iapi.ProcessState.ACTIVE;
}
/**
* Entry point for message exchanges aimed at the my role.
*
* @param mex
*/
boolean invokeProcess(final MyRoleMessageExchangeImpl mex, boolean enqueue) {
return invokeProcess(mex, new InvokeHandler() {
public boolean invoke(PartnerLinkMyRoleImpl target, PartnerLinkMyRoleImpl.RoutingInfo routing, boolean createInstance) {
if (routing.messageRoute == null && createInstance && isActive()) {
// No route but we can create a new instance
target.invokeNewInstance(mex, routing);
return true;
} else if (routing.messageRoute != null) {
// Found a route, hitting it
_engine.acquireInstanceLock(routing.messageRoute.getTargetInstance().getInstanceId());
target.invokeInstance(mex, routing);
return true;
}
return false;
}
}, enqueue);
}
/** Several myroles can use the same service in a given process */
private List<PartnerLinkMyRoleImpl> getMyRolesForService(QName serviceName) {
List<PartnerLinkMyRoleImpl> myRoles = new ArrayList<PartnerLinkMyRoleImpl>(5);
for (Map.Entry<PartnerLinkMyRoleImpl,Endpoint> e : getEndpointToMyRoleMap().entrySet()) {
if (e.getValue().serviceName.equals(serviceName))
myRoles.add(e.getKey());
}
return myRoles;
}
void initMyRoleMex(MyRoleMessageExchangeImpl mex) {
markused();
PartnerLinkMyRoleImpl target = null;
for (Map.Entry<PartnerLinkMyRoleImpl,Endpoint> e : getEndpointToMyRoleMap().entrySet()) {
if (e.getValue().serviceName.equals(mex.getServiceName())) {
// First one is fine as we're only interested in the portType and operation here and
// even if a process has 2 myrole partner links
target = e.getKey();
break;
}
}
if (target != null) {
mex.setPortOp(target._plinkDef.getMyRolePortType(), target._plinkDef.getMyRoleOperation(mex.getOperationName()));
} else {
__log.warn("Couldn't find endpoint from service " + mex.getServiceName() + " when initializing a myRole mex.");
}
}
/**
* Extract the value of a BPEL property from a BPEL messsage variable.
*
* @param msgData
* message variable data
* @param alias
* alias to apply
* @param target
* description of the data (for error logging only)
* @return value of the property
* @throws FaultException
*/
String extractProperty(Element msgData, Map<String, Node> headerParts, OProcess.OPropertyAlias alias, String target) throws FaultException {
markused();
PropertyAliasEvaluationContext ectx = new PropertyAliasEvaluationContext(msgData, headerParts, alias);
Node lValue = ectx.getRootNode();
if (alias.getLocation() != null) {
try {
lValue = _expLangRuntimeRegistry.evaluateNode(alias.getLocation(), ectx);
} catch (EvaluationException ec) {
throw new FaultException(getOProcess().getConstants().getQnSelectionFailure(), alias.getDescription());
}
}
if (lValue == null) {
String errmsg = __msgs.msgPropertyAliasReturnedNullSet(alias.getDescription(), target);
if (__log.isErrorEnabled()) {
__log.error(errmsg);
}
throw new FaultException(getOProcess().getConstants().getQnSelectionFailure(), errmsg);
}
if (lValue.getNodeType() == Node.ELEMENT_NODE) {
// This is a bit hokey, we concatenate all the children's values; we
// really should be checking to make sure that we are only dealing
// with text and attribute nodes.
StringBuffer val = new StringBuffer();
NodeList nl = lValue.getChildNodes();
for (int i = 0; i < nl.getLength(); ++i) {
Node n = nl.item(i);
val.append(n.getNodeValue());
}
return val.toString();
} else if (lValue.getNodeType() == Node.TEXT_NODE) {
return ((Text) lValue).getWholeText();
} else
return null;
}
/**
* Get the element name for a given WSDL part. If the part is an <em>element</em> part, the name of that element is returned.
* If the part is an XML schema typed part, then the name of the part is returned in the null namespace.
*
* @param part
* WSDL {@link javax.wsdl.Part}
* @return name of element containing said part
*/
static QName getElementNameForPart(OMessageVarType.Part part) {
return (part.getType() instanceof OElementVarType) ? ((OElementVarType) part.getType()).getElementType() : new QName(null, part.getName());
}
/**
* Process the message-exchange interceptors.
*
* @param mex
* message exchange
* @return <code>true</code> if execution should continue, <code>false</code> otherwise
*/
public boolean processInterceptors(MyRoleMessageExchangeImpl mex, InterceptorInvoker invoker) {
InterceptorContextImpl ictx = new InterceptorContextImpl(_engine._contexts.dao.getConnection(), getProcessDAO(), _pconf, _engine, this);
for (MessageExchangeInterceptor i : _mexInterceptors)
if (!mex.processInterceptor(i, mex, ictx, invoker))
return false;
for (MessageExchangeInterceptor i : getEngine().getGlobalInterceptors())
if (!mex.processInterceptor(i, mex, ictx, invoker))
return false;
return true;
}
/**
* @see org.apache.ode.bpel.engine.BpelProcess#handleJobDetails(java.util.Map<java.lang.String,java.lang.Object>)
*/
public boolean handleJobDetails(JobDetails jobData) {
boolean routed = true;
try {
_hydrationLatch.latch(1);
markused();
if (__log.isDebugEnabled()) {
__log.debug(ObjectPrinter.stringifyMethodEnter("handleJobDetails", new Object[] { "jobData", jobData }));
}
JobDetails we = jobData;
// Process level events
if (we.getType() == JobType.INVOKE_INTERNAL || we.getType() == JobType.MEX_MATCHER) {
if (__log.isDebugEnabled()) {
__log.debug(we.getType() + " event for mexid " + we.getMexId());
}
MyRoleMessageExchangeImpl mex = (MyRoleMessageExchangeImpl) _engine.getMessageExchange(we.getMexId());
if (we.getType() == JobType.MEX_MATCHER && !mex.getDAO().lockPremieMessages()) {
//Skip if already processed
return true;
}
routed = invokeProcess(mex, (Boolean) jobData.detailsExt.get("enqueue"));
if (we.getType() == JobType.MEX_MATCHER && routed) {
mex.getDAO().releasePremieMessages();
}
} else {
// Instance level events
ProcessInstanceDAO procInstance = getProcessDAO().getInstance(we.getInstanceId());
if (procInstance == null) {
if (__log.isDebugEnabled()) {
__log.debug("handleJobDetails: no ProcessInstance found with iid " + we.getInstanceId() + "; ignoring.");
}
return true;
}
BpelRuntimeContextImpl processInstance = createRuntimeContext(procInstance, null, null);
switch (we.getType()) {
case TIMER:
if (__log.isDebugEnabled()) {
__log.debug("handleJobDetails: TimerWork event for process instance " + processInstance);
}
processInstance.timerEvent(we.getChannel());
break;
case RESUME:
if (__log.isDebugEnabled()) {
__log.debug("handleJobDetails: ResumeWork event for iid " + we.getInstanceId());
}
processInstance.execute();
break;
case INVOKE_RESPONSE:
if (__log.isDebugEnabled()) {
__log.debug("InvokeResponse event for iid " + we.getInstanceId());
}
processInstance.invocationResponse(we.getMexId(), we.getChannel());
processInstance.execute();
break;
case MATCHER:
if (__log.isDebugEnabled()) {
__log.debug("Matcher event for iid " + we.getInstanceId());
}
if( procInstance.getState() == ProcessState.STATE_COMPLETED_OK
|| procInstance.getState() == ProcessState.STATE_COMPLETED_WITH_FAULT ) {
__log.debug("A matcher event was aborted. The process is already completed.");
return true;
}
processInstance.matcherEvent(we.getCorrelatorId(), we.getCorrelationKeySet());
}
}
} finally {
_hydrationLatch.release(1);
}
return routed;
}
private void setRoles(OProcess oprocess) {
_partnerRoles = new HashMap<OPartnerLink, PartnerLinkPartnerRoleImpl>();
_myRoles = new HashMap<OPartnerLink, PartnerLinkMyRoleImpl>();
_endpointToMyRoleMap = new HashMap<PartnerLinkMyRoleImpl, Endpoint>();
// Create myRole endpoint name mapping (from deployment descriptor)
HashMap<OPartnerLink, Endpoint> myRoleEndpoints = new HashMap<OPartnerLink, Endpoint>();
for (Map.Entry<String, Endpoint> provide : _pconf.getProvideEndpoints().entrySet()) {
OPartnerLink plink = oprocess.getPartnerLink(provide.getKey());
if (plink == null) {
String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link "
+ provide.getKey();
__log.error(errmsg);
throw new BpelEngineException(errmsg);
}
myRoleEndpoints.put(plink, provide.getValue());
}
// Create partnerRole initial value mapping
for (Map.Entry<String, Endpoint> invoke : _pconf.getInvokeEndpoints().entrySet()) {
OPartnerLink plink = oprocess.getPartnerLink(invoke.getKey());
if (plink == null) {
String errmsg = "Error in deployment descriptor for process " + _pid + "; reference to unknown partner link "
+ invoke.getKey();
__log.error(errmsg);
throw new BpelEngineException(errmsg);
}
if (__log.isDebugEnabled()) {
__log.debug("Processing <invoke> element for process " + _pid + ": partnerlink " + invoke.getKey() + " --> "
+ invoke.getValue());
}
}
for (OPartnerLink pl : oprocess.getAllPartnerLinks()) {
if (pl.hasMyRole()) {
Endpoint endpoint = myRoleEndpoints.get(pl);
if (endpoint == null)
throw new IllegalArgumentException("No service name for myRole plink " + pl.getName());
PartnerLinkMyRoleImpl myRole = new PartnerLinkMyRoleImpl(this, pl, endpoint);
_myRoles.put(pl, myRole);
_endpointToMyRoleMap.put(myRole, endpoint);
}
if (pl.hasPartnerRole()) {
Endpoint endpoint = _pconf.getInvokeEndpoints().get(pl.getName());
if (endpoint == null && pl.isInitializePartnerRole())
throw new IllegalArgumentException(pl.getName() + " must be bound to an endpoint in deploy.xml");
PartnerLinkPartnerRoleImpl partnerRole = new PartnerLinkPartnerRoleImpl(this, pl, endpoint);
_partnerRoles.put(pl, partnerRole);
}
}
}
public ProcessDAO getProcessDAO() {
return _pconf.isTransient() ? _engine._contexts.inMemDao.getConnection().getProcess(_pid) : getEngine()._contexts.dao
.getConnection().getProcess(_pid);
}
static String genCorrelatorId(OPartnerLink plink, String opName) {
return plink.getName() + "." + opName;
}
/**
* De-serialize the compiled process representation from a stream.
*
* @param is
* input stream
* @return process information from configuration database
*/
private OProcess deserializeCompiledProcess(File file) throws Exception {
OProcess compiledProcess;
DeSerializer deserializer = new DeSerializer(file);
compiledProcess = deserializer.deserialize();
return compiledProcess;
}
/**
* Get all the services that are implemented by this process.
*
* @return list of qualified names corresponding to the myroles.
*/
public Set<Endpoint> getServiceNames() {
Set<Endpoint> endpoints = new HashSet<Endpoint>();
for (Endpoint provide : _pconf.getProvideEndpoints().values()) {
endpoints.add(provide);
}
return endpoints;
}
void activate(BpelEngineImpl engine) {
_engine = engine;
_sharedEps = _engine.getSharedEndpoints();
_debugger = createDebuggerSupport();
if (getInstanceMaximumCount() < Integer.MAX_VALUE)
registerMessageExchangeInterceptor(new InstanceCountThrottler());
if (__log.isDebugEnabled()) {
__log.debug("Activating " + _pid);
}
// Activate all the my-role endpoints.
for (Map.Entry<String, Endpoint> entry : _pconf.getProvideEndpoints().entrySet()) {
Endpoint endpoint = entry.getValue();
EndpointReference initialEPR = null;
if (isShareable(endpoint)) {
// Check if the EPR already exists for the given endpoint
initialEPR = _sharedEps.getEndpointReference(endpoint);
if (initialEPR == null) {
// Create an EPR by physically activating the endpoint
initialEPR = _engine._contexts.bindingContext.activateMyRoleEndpoint(_pid, endpoint);
_sharedEps.addEndpoint(endpoint, initialEPR);
if (__log.isDebugEnabled()) {
__log.debug("Activated " + _pid + " myrole " + entry.getKey() + ": EPR is " + initialEPR);
}
}
// Increment the reference count on the endpoint
_sharedEps.incrementReferenceCount(endpoint);
} else {
// Create an EPR by physically activating the endpoint
initialEPR = _engine._contexts.bindingContext.activateMyRoleEndpoint(_pid, endpoint);
if (__log.isDebugEnabled()) {
__log.debug("Activated " + _pid + " myrole " + entry.getKey() + ": EPR is " + initialEPR);
}
}
_myEprs.put(endpoint, initialEPR);
}
if (__log.isDebugEnabled()) {
__log.debug("Activated " + _pid);
}
markused();
}
void deactivate() {
// Deactivate all the my-role endpoints.
for (Endpoint endpoint : _myEprs.keySet()) {
// Deactivate the EPR only if there are no more references
// to this endpoint from any (active) BPEL process.
if (isShareable(endpoint)) {
if(__log.isDebugEnabled()) __log.debug("deactivating shared endpoint " + endpoint+ " for pid "+ _pid);
if (!_sharedEps.decrementReferenceCount(endpoint)) {
_engine._contexts.bindingContext.deactivateMyRoleEndpoint(endpoint);
_sharedEps.removeEndpoint(endpoint);
}
} else {
if(__log.isDebugEnabled()) __log.debug("deactivating non-shared endpoint " + endpoint + " for pid "+ _pid);
_engine._contexts.bindingContext.deactivateMyRoleEndpoint(endpoint);
}
}
// TODO Deactivate all the partner-role channels
}
private boolean isShareable(Endpoint endpoint) {
return _pconf.isSharedService(endpoint.serviceName);
}
protected EndpointReference getInitialPartnerRoleEPR(OPartnerLink link) {
try {
_hydrationLatch.latch(1);
PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(link);
if (prole == null)
throw new IllegalStateException("Unknown partner link " + link);
return prole.getInitialEPR();
} finally {
_hydrationLatch.release(1);
}
}
protected Endpoint getInitialPartnerRoleEndpoint(OPartnerLink link) {
try {
_hydrationLatch.latch(1);
PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(link);
if (prole == null)
throw new IllegalStateException("Unknown partner link " + link);
return prole._initialPartner;
} finally {
_hydrationLatch.release(1);
}
}
protected EndpointReference getInitialMyRoleEPR(OPartnerLink link) {
try {
_hydrationLatch.latch(1);
PartnerLinkMyRoleImpl myRole = _myRoles.get(link);
if (myRole == null)
throw new IllegalStateException("Unknown partner link " + link);
return myRole.getInitialEPR();
} finally {
_hydrationLatch.release(1);
}
}
public QName getPID() {
return _pid;
}
protected PartnerRoleChannel getPartnerRoleChannel(OPartnerLink partnerLink) {
try {
_hydrationLatch.latch(1);
PartnerLinkPartnerRoleImpl prole = _partnerRoles.get(partnerLink);
if (prole == null)
throw new IllegalStateException("Unknown partner link " + partnerLink);
return prole._channel;
} finally {
_hydrationLatch.release(1);
}
}
public void saveEvent(ProcessInstanceEvent event, ProcessInstanceDAO instanceDao) {
saveEvent(event, instanceDao, null);
}
public void saveEvent(ProcessInstanceEvent event, ProcessInstanceDAO instanceDao, List<String> scopeNames) {
markused();
if (_pconf.isEventEnabled(scopeNames, event.getType())) {
// notify the listeners
_engine.fireEvent(event);
if (instanceDao != null && !event.isEventPersistingCancelled())
instanceDao.insertBpelEvent(event);
else
__log.debug("Couldn't find instance to save event, no event generated!");
}
}
/**
* Ask the process to dehydrate.
*/
void dehydrate() {
try {
_hydrationLatch.latch(0);
// We don't actually need to do anything, the latch will run the doDehydrate method
// when necessary..
} finally {
_hydrationLatch.release(0);
}
}
void hydrate() {
try {
_hydrationLatch.latch(1);
// We don't actually need to do anything, the latch will run the doHydrate method
// when necessary..
} finally {
_hydrationLatch.release(1);
}
}
public OProcess getOProcess() {
try {
_hydrationLatch.latch(1);
return _oprocess;
} finally {
_hydrationLatch.release(1);
}
}
private Map<PartnerLinkMyRoleImpl,Endpoint> getEndpointToMyRoleMap() {
try {
_hydrationLatch.latch(1);
return _endpointToMyRoleMap;
} finally {
_hydrationLatch.release(1);
}
}
public ReplacementMap getReplacementMap(QName processName) {
try {
_hydrationLatch.latch(1);
if (processName.equals(_pid)) return _replacementMap;
else
try {
// We're asked for an older version of this process, fetching it
OProcess oprocess = _engine.getOProcess(processName);
if (oprocess == null) {
String errmsg = "The process " + _pid + " is not available anymore.";
__log.error(errmsg);
throw new InvalidProcessException(errmsg, InvalidProcessException.RETIRED_CAUSE_CODE);
}
// Older versions may ventually need more expression languages
registerExprLang(oprocess);
return new ReplacementMapImpl(oprocess);
} catch (Exception e) {
String errmsg = "The process " + _pid + " is not available anymore.";
__log.error(errmsg, e);
throw new InvalidProcessException(errmsg, InvalidProcessException.RETIRED_CAUSE_CODE);
}
} finally {
_hydrationLatch.release(1);
}
}
protected BpelEngineImpl getEngine() {
return _engine;
}
public boolean isInMemory() {
return _pconf.isTransient();
}
public long getLastUsed() {
return _lastUsed;
}
QName getProcessType() {
return _pconf.getType();
}
/**
* Get a hint as to whether this process is hydrated. Note this is only a hint, since things could change.
*/
public boolean hintIsHydrated() {
return _oprocess != null;
}
/** Keep track of the time the process was last used. */
private final void markused() {
_lastUsed = System.currentTimeMillis();
}
/** Create a version-appropriate runtime context. */
protected BpelRuntimeContextImpl createRuntimeContext(ProcessInstanceDAO dao, PROCESS template,
MyRoleMessageExchangeImpl instantiatingMessageExchange) {
return new BpelRuntimeContextImpl(this, dao, template, instantiatingMessageExchange);
}
private class HydrationLatch extends NStateLatch {
HydrationLatch() {
super(new Runnable[2]);
_transitions[0] = new Runnable() {
public void run() {
doDehydrate();
}
};
_transitions[1] = new Runnable() {
public void run() {
doHydrate();
}
};
}
private void doDehydrate() {
if (_oprocess != null) {
_oprocess.dehydrate();
_oprocess = null;
}
if (_myRoles != null) {
_myRoles.clear();
}
if (_endpointToMyRoleMap != null) {
_endpointToMyRoleMap.clear();
}
if (_partnerRoles != null) {
_partnerRoles.clear();
}
// Don't clear stuff you can't re-populate
// if (_myEprs != null) {
// _myEprs.clear();
// }
// if (_partnerChannels != null) {
// _partnerChannels.clear();
// }
// if (_partnerEprs != null) {
// _partnerEprs.clear();
// }
_replacementMap = null;
_expLangRuntimeRegistry = null;
}
private void doHydrate() {
markused();
if (__log.isDebugEnabled()) {
__log.debug("Rehydrating process " + _pconf.getProcessId());
}
try {
File file = _pconf.getCBPFile();
_oprocess = deserializeCompiledProcess(file);
} catch (Exception e) {
String errmsg = "The process " + _pid + " is no longer available.";
__log.error(errmsg, e);
throw new BpelEngineException(errmsg, e);
}
if (_partnerRoles == null) {
_partnerRoles = new HashMap<OPartnerLink, PartnerLinkPartnerRoleImpl>();
}
if (_myRoles == null) {
_myRoles = new HashMap<OPartnerLink, PartnerLinkMyRoleImpl>();
}
if (_endpointToMyRoleMap == null) {
_endpointToMyRoleMap = new HashMap<PartnerLinkMyRoleImpl, Endpoint>();
}
if (_myEprs == null) {
_myEprs = new HashMap<Endpoint, EndpointReference>();
}
if (_partnerChannels == null) {
_partnerChannels = new HashMap<Endpoint, PartnerRoleChannel>();
}
if (_partnerEprs == null) {
_partnerEprs = new HashMap<Endpoint, EndpointReference>();
}
_replacementMap = new ReplacementMapImpl(_oprocess);
// Create an expression language registry for this process
_expLangRuntimeRegistry = new ExpressionLanguageRuntimeRegistry();
registerExprLang(_oprocess);
setRoles(_oprocess);
initExternalVariables();
if (!_hydratedOnce) {
for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) {
// Null for initializePartnerRole = false
if (prole._initialPartner != null) {
PartnerRoleChannel channel = _engine._contexts.bindingContext.createPartnerRoleChannel(_pid,
prole._plinkDef.getPartnerRolePortType(), prole._initialPartner);
prole._channel = channel;
_partnerChannels.put(prole._initialPartner, prole._channel);
EndpointReference epr = channel.getInitialEndpointReference();
if (epr != null) {
prole._initialEPR = epr;
_partnerEprs.put(prole._initialPartner, epr);
}
if (__log.isDebugEnabled()) {
__log.debug("Activated " + _pid + " partnerrole " + prole.getPartnerLinkName() + ": EPR is "
+ prole._initialEPR);
}
}
}
_engine.setProcessSize(_pid, true);
_hydratedOnce = true;
}
for (PartnerLinkMyRoleImpl myrole : _myRoles.values()) {
myrole._initialEPR = _myEprs.get(myrole._endpoint);
}
for (PartnerLinkPartnerRoleImpl prole : _partnerRoles.values()) {
prole._channel = _partnerChannels.get(prole._initialPartner);
if (_partnerEprs.get(prole._initialPartner) != null) {
prole._initialEPR = _partnerEprs.get(prole._initialPartner);
}
}
/*
* If necessary, create an object in the data store to represent the process. We'll re-use an existing object if it already
* exists and matches the GUID.
*/
if (isInMemory()) {
createProcessDAO(_engine._contexts.inMemDao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
} else if (_engine._contexts.scheduler.isTransacted()) {
// If we have a transaction, we do this in the current transaction
if(__log.isDebugEnabled()) __log.debug("Creating new process DAO for " + _pid + " (guid=" + _oprocess.getGuid() + ")...");
createProcessDAO(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
if(__log.isDebugEnabled()) __log.debug("Created new process DAO for " + _pid + " (guid=" + _oprocess.getGuid() + ").");
} else {
try {
_engine._contexts.scheduler.execTransaction(new Callable<Object>() {
public Object call() throws Exception {
bounceProcessDAOInDB(_engine._contexts.dao.getConnection(), _pid, _pconf.getVersion(), _oprocess);
return null;
}
});
} catch( RuntimeException re ) {
throw re;
} catch(Exception e) {
throw new RuntimeException(e);
}
}
}
}
private void bounceProcessDAOInMemory(BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) {
if(__log.isInfoEnabled()) __log.info("Creating new process DAO[mem] for " + pid + " (guid=" + oprocess.getGuid() + ").");
createProcessDAO(conn, pid, version, oprocess);
}
private void bounceProcessDAOInDB(final BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) {
if(__log.isDebugEnabled()) __log.debug("Creating new process DAO for " + pid + " (guid=" + oprocess.getGuid() + ")...");
createProcessDAO(conn, pid, version, oprocess);
if(__log.isDebugEnabled()) __log.debug("Created new process DAO for " + pid + " (guid=" + oprocess.getGuid() + ").");
}
public int getInstanceInUseCount() {
return hintIsHydrated() ? _hydrationLatch.getDepth(1) : 0;
}
private void createProcessDAO(BpelDAOConnection conn, final QName pid, final long version, final OProcess oprocess) {
try {
boolean create = true;
ProcessDAO old = conn.getProcess(pid);
if (old != null) {
if (__log.isDebugEnabled()) {
__log.debug("Found ProcessDAO for " + pid + " with GUID " + old.getGuid());
}
if (oprocess.getGuid() == null) {
// No guid, old version assume its good
create = false;
} else {
if (old.getGuid().equals(oprocess.getGuid())) {
// Guids match, no need to create
create = false;
}
}
}
if (create) {
if(__log.isDebugEnabled()) __log.debug("Creating process DAO for " + pid + " (guid=" + oprocess.getGuid() + ")");
ProcessDAO newDao = conn.createProcess(pid, oprocess.getQName(), oprocess.getGuid(), (int) version);
for (String correlator : oprocess.getCorrelators()) {
newDao.addCorrelator(correlator);
}
if(__log.isDebugEnabled()) __log.debug("Created new process DAO for " + pid + " (guid=" + oprocess.getGuid() + ")");
}
} catch (BpelEngineException ex) {
throw ex;
} catch (Exception dce) {
__log.error("DbError", dce);
throw new BpelEngineException("DbError", dce);
}
}
private void registerExprLang(OProcess oprocess) {
for (OExpressionLanguage elang : oprocess.getExpressionLanguages()) {
try {
_expLangRuntimeRegistry.registerRuntime(elang);
} catch (ConfigurationException e) {
String msg = __msgs.msgExpLangRegistrationError(elang.getExpressionLanguageUri(), elang.getProperties());
__log.error(msg, e);
throw new BpelEngineException(msg, e);
}
}
}
public boolean isCleanupCategoryEnabled(boolean instanceSucceeded, CLEANUP_CATEGORY category) {
return _pconf.isCleanupCategoryEnabled(instanceSucceeded, category);
}
public Set<CLEANUP_CATEGORY> getCleanupCategories(boolean instanceSucceeded) {
return _pconf.getCleanupCategories(instanceSucceeded);
}
public Node getProcessProperty(QName propertyName) {
Node value = getEngine()._contexts.customProcessProperties.getProperty(propertyName);
if (value != null) {
return value;
}
Map<QName, Node> properties = _pconf.getProcessProperties();
if (properties != null) {
return properties.get(propertyName);
}
return null;
}
public ProcessConf getConf() {
return _pconf;
}
public boolean hasActiveInstances() {
try {
_hydrationLatch.latch(1);
if (isInMemory() || _engine._contexts.scheduler.isTransacted()) {
return hasActiveInstances(getProcessDAO());
} else {
// If we do not have a transaction we need to create one.
try {
return (Boolean) _engine._contexts.scheduler.execTransaction(new Callable<Object>() {
public Object call() throws Exception {
return hasActiveInstances(getProcessDAO());
}
});
} catch (Exception e) {
String errmsg = "DbError";
__log.error(errmsg, e);
return false;
}
}
} finally {
_hydrationLatch.release(1);
}
}
private boolean hasActiveInstances(ProcessDAO processDAO) {
// Select count of instances instead of all active instances
// Collection<ProcessInstanceDAO> activeInstances = processDAO.getActiveInstances();
// return (activeInstances != null && activeInstances.size() > 0);
return processDAO.getNumInstances() > 0;
}
public void registerMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
_mexInterceptors.add(interceptor);
}
public void unregisterMessageExchangeInterceptor(MessageExchangeInterceptor interceptor) {
_mexInterceptors.remove(interceptor);
}
public long sizeOf() {
// try to get actual size from sizing agent, if enabled
long footprint = SizingAgent.deepSizeOf(this);
// if unsuccessful, estimate size (this is a inaccurate guess)
if (footprint == 0) {
footprint = getEstimatedHydratedSize();
}
// add the sizes of all the services this process provides
for (EndpointReference myEpr : _myEprs.values()) {
footprint += _engine._contexts.bindingContext.calculateSizeofService(myEpr);
}
// return the total footprint
return footprint;
}
public String getProcessProperty(QName property, String defaultValue) {
Text text = (Text) getProcessProperty(property);
if (text == null) {
return defaultValue;
}
String value = text.getWholeText();
return (value == null) ? defaultValue : value;
}
public boolean isHydrationLazy() {
return Boolean.valueOf(getProcessProperty(PROP_LAZY_HYDRATE, "true"));
}
public boolean isHydrationLazySet() {
return getProcessProperty(PROP_LAZY_HYDRATE) != null;
}
public int getInstanceMaximumCount() {
return Integer.valueOf(getProcessProperty(PROP_MAX_INSTANCES, Integer.toString(_engine.getInstanceThrottledMaximumCount())));
}
public long getEstimatedHydratedSize() {
return _pconf.getCBPFileSize() *
PROCESS_MEMORY_TO_SERIALIZED_SIZE_RATIO;
}
public long getTimeout(OPartnerLink partnerLink, boolean p2p) {
// OPartnerLink, PartnerLinkPartnerRoleImpl
final PartnerLinkPartnerRoleImpl linkPartnerRole = _partnerRoles.get(partnerLink);
long timeout = Properties.DEFAULT_MEX_TIMEOUT;
if (linkPartnerRole._initialEPR != null) {
String property = null;
String value = null;
Map<String, String> props = _pconf.getEndpointProperties(linkPartnerRole._initialEPR);
if (p2p) {
property = Properties.PROP_P2P_MEX_TIMEOUT;
value = props.get(property);
}
if (value == null) {
property = Properties.PROP_MEX_TIMEOUT;
value = props.get(property);
}
if (value != null) {
try {
timeout = Long.parseLong(value);
} catch (NumberFormatException e) {
if (__log.isWarnEnabled())
__log.warn("Mal-formatted Property: [" + property + "=" + value + "] Default value (" + timeout + ") will be used");
}
}
}
return timeout;
}
public long getVersion() {
return Long.parseLong(_pid.getLocalPart().substring(_pid.getLocalPart().lastIndexOf('-') + 1));
}
public void doAsyncReply(MyRoleMessageExchangeImpl m, BpelRuntimeContext context) {
MessageExchangeDAO mex = m.getDAO();
PartnerRoleMessageExchange pmex = null;
if (mex.getPipedMessageExchangeId() != null) {
pmex = (PartnerRoleMessageExchange) getEngine().getMessageExchange(mex.getPipedMessageExchangeId());
}
if (pmex != null) {
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("Replying to a p2p mex, myrole " + m + " - partnerole " + pmex);
}
if (pmex.getStatus() == Status.ASYNC || pmex.getStatus() == Status.REQUEST) {
try {
switch (m.getStatus()) {
case FAILURE:
// We can't seem to get the failure out of the myrole mex?
pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "operation failed", null);
break;
case FAULT:
Fault fault = pmex.getOperation().getFault(m.getFault().getLocalPart());
if (fault == null) {
__log.error("process " + this + " instance " + (context != null ? context.getPid() : null) + " thrown unmapped fault in p2p communication " + m.getFault() + " " + m.getFaultExplanation() + " - converted to failure");
pmex.replyWithFailure(MessageExchange.FailureType.OTHER, "process thrown unmapped fault in p2p communication " + m.getFault() + " " + m.getFaultExplanation() + " - converted to failure", m.getFaultResponse().getMessage());
} else {
Message faultRes = pmex.createMessage(pmex.getOperation().getFault(m.getFault().getLocalPart())
.getMessage().getQName());
faultRes.setMessage(m.getResponse().getMessage());
pmex.replyWithFault(m.getFault(), faultRes);
}
break;
case RESPONSE:
Message response = pmex.createMessage(pmex.getOperation().getOutput().getMessage().getQName());
response.setMessage(m.getResponse().getMessage());
pmex.reply(response);
break;
default:
__log.warn("Unexpected state: " + m.getStatus());
break;
}
} finally {
mex.release(this.isCleanupCategoryEnabled(m.getStatus() == MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
}
} else {
__log.warn("Can't send response to a p2p mex: " + mex + " partner mex: " + pmex);
}
} else {
if (context != null) context.checkInvokeExternalPermission();
this._engine._contexts.mexContext.onAsyncReply(m);
//mex.release(_bpelProcess.isCleanupCategoryEnabled(m.getStatus() == MessageExchange.Status.RESPONSE, CLEANUP_CATEGORY.MESSAGES));
}
}
}