blob: 97544bdf888f37219f5e131c88113c148f0459c1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.engine;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import javax.wsdl.Operation;
import javax.xml.namespace.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.common.CorrelationKey;
import org.apache.ode.bpel.common.CorrelationKeySet;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.common.ProcessState;
import org.apache.ode.bpel.dao.CorrelationSetDAO;
import org.apache.ode.bpel.dao.CorrelatorDAO;
import org.apache.ode.bpel.dao.MessageDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.MessageRouteDAO;
import org.apache.ode.bpel.dao.PartnerLinkDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.dao.ScopeDAO;
import org.apache.ode.bpel.dao.ScopeStateEnum;
import org.apache.ode.bpel.dao.XmlDataDAO;
import org.apache.ode.bpel.evar.ExternalVariableModule.Value;
import org.apache.ode.bpel.evar.ExternalVariableModuleException;
import org.apache.ode.bpel.evt.CorrelationSetWriteEvent;
import org.apache.ode.bpel.evt.ProcessCompletionEvent;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
import org.apache.ode.bpel.evt.ProcessInstanceStateChangeEvent;
import org.apache.ode.bpel.evt.ProcessMessageExchangeEvent;
import org.apache.ode.bpel.evt.ProcessTerminationEvent;
import org.apache.ode.bpel.evt.ScopeCompletionEvent;
import org.apache.ode.bpel.evt.ScopeEvent;
import org.apache.ode.bpel.evt.ScopeFaultEvent;
import org.apache.ode.bpel.evt.ScopeStartEvent;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Endpoint;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.iapi.MessageExchange;
import org.apache.ode.bpel.iapi.MessageExchange.FailureType;
import org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.apache.ode.bpel.iapi.ProcessConf.PartnerRoleConfig;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.iapi.Scheduler.JobType;
import org.apache.ode.bpel.memdao.ProcessInstanceDaoImpl;
import org.apache.ode.bpel.obj.OMessageVarType;
import org.apache.ode.bpel.obj.OPartnerLink;
import org.apache.ode.bpel.obj.OProcess;
import org.apache.ode.bpel.obj.OScope;
import org.apache.ode.bpel.obj.OScope.Variable;
import org.apache.ode.bpel.runtime.BpelJacobRunnable;
import org.apache.ode.bpel.runtime.BpelRuntimeContext;
import org.apache.ode.bpel.runtime.CorrelationSetInstance;
import org.apache.ode.bpel.runtime.ExpressionLanguageRuntimeRegistry;
import org.apache.ode.bpel.runtime.PROCESS;
import org.apache.ode.bpel.runtime.PartnerLinkInstance;
import org.apache.ode.bpel.runtime.Selector;
import org.apache.ode.bpel.runtime.VariableInstance;
import org.apache.ode.bpel.runtime.channels.ActivityRecovery;
import org.apache.ode.bpel.runtime.channels.FaultData;
import org.apache.ode.bpel.runtime.channels.InvokeResponse;
import org.apache.ode.bpel.runtime.channels.PickResponse;
import org.apache.ode.bpel.runtime.channels.TimerResponse;
import org.apache.ode.jacob.JacobRunnable;
import org.apache.ode.jacob.ProcessUtil;
import org.apache.ode.jacob.vpu.ExecutionQueueImpl;
import org.apache.ode.jacob.vpu.JacobVPU;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.Namespaces;
import org.apache.ode.utils.ObjectPrinter;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
public class BpelRuntimeContextImpl implements BpelRuntimeContext {
private static final Logger __log = LoggerFactory.getLogger(BpelRuntimeContextImpl.class);
/** Data-access object for process instance. */
protected ProcessInstanceDAO _dao;
/** Process Instance ID */
private final Long _iid;
/** JACOB VPU */
protected JacobVPU _vpu;
/** JACOB ExecutionQueue (state) */
protected ExecutionQueueImpl _soup;
private MyRoleMessageExchangeImpl _instantiatingMessageExchange;
protected OutstandingRequestManager _outstandingRequests;
protected IMAManager2 _imaManager;
protected BpelProcess _bpelProcess;
private Date _currentEventDateTime;
private boolean _forceFlush;
/** Five second maximum for continous execution. */
private long _maxReductionTimeMs = 2000000;
public BpelRuntimeContextImpl(BpelProcess bpelProcess, ProcessInstanceDAO dao, PROCESS PROCESS,
MyRoleMessageExchangeImpl instantiatingMessageExchange) {
_bpelProcess = bpelProcess;
_dao = dao;
_iid = dao.getInstanceId();
_instantiatingMessageExchange = instantiatingMessageExchange;
_vpu = new JacobVPU();
_vpu.registerExtension(BpelRuntimeContext.class, this);
_soup = new ExecutionQueueImpl(null);
_soup.setReplacementMap(_bpelProcess.getReplacementMap(dao.getProcess().getProcessId()));
_outstandingRequests = null;
_imaManager = new IMAManager2();
_vpu.setContext(_soup);
if (bpelProcess.isInMemory()) {
ProcessInstanceDaoImpl inmem = (ProcessInstanceDaoImpl) _dao;
if (inmem.getSoup() != null) {
_soup = (ExecutionQueueImpl) inmem.getSoup();
_imaManager = (IMAManager2) _soup.getGlobalData();
_vpu.setContext(_soup);
}
} else {
byte[] daoState = dao.getExecutionState();
if (daoState != null) {
ByteArrayInputStream iis = new ByteArrayInputStream(daoState);
try {
_soup.read(iis);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
_imaManager = (IMAManager2) _soup.getGlobalData();
}
}
if (PROCESS != null) {
_vpu.inject(PROCESS);
}
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("BpelRuntimeContextImpl created for instance " + _iid + ". INDEXED STATE=" + _soup.getIndex());
}
}
public Long getPid() {
return _iid;
}
public long genId() {
return _dao.genMonotonic();
}
/**
* @see BpelRuntimeContext#isCorrelationInitialized(org.apache.ode.bpel.runtime.CorrelationSetInstance)
*/
public boolean isCorrelationInitialized(CorrelationSetInstance correlationSet) {
ScopeDAO scopeDAO = _dao.getScope(correlationSet.scopeInstance);
CorrelationSetDAO cs = scopeDAO.getCorrelationSet(correlationSet.declaration.getName());
return cs.getValue() != null;
}
/**
* @see BpelRuntimeContext#isVariableInitialized(org.apache.ode.bpel.runtime.VariableInstance)
*/
public boolean isVariableInitialized(VariableInstance var) {
if (var == null) {
return false;
}
ScopeDAO scopeDAO = _dao.getScope(var.scopeInstance);
XmlDataDAO dataDAO = scopeDAO.getVariable(var.declaration.getName());
return !dataDAO.isNull();
}
public boolean isPartnerRoleEndpointInitialized(PartnerLinkInstance pLink) {
PartnerLinkDAO spl = fetchPartnerLinkDAO(pLink);
return spl.getPartnerEPR() != null || _bpelProcess.getInitialPartnerRoleEPR(pLink.partnerLink) != null;
}
/**
* @see BpelRuntimeContext#completedFault(org.apache.ode.bpel.runtime.channels.FaultData)
*/
public void completedFault(FaultData faultData) {
BpelProcess.__log.warn("Instance " + _dao.getInstanceId() + " of " + _bpelProcess.getPID() + " has completed with fault: " + faultData);
_dao.setFault(faultData.getFaultName(), faultData.getExplanation(), faultData.getFaultLineNo(), faultData
.getActivityId(), faultData.getFaultMessage());
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(_dao.getState());
_dao.setState(ProcessState.STATE_COMPLETED_WITH_FAULT);
evt.setNewState(ProcessState.STATE_COMPLETED_WITH_FAULT);
sendEvent(evt);
sendEvent(new ProcessCompletionEvent(faultData.getFaultName()));
_dao.finishCompletion();
faultOutstandingMessageExchanges(faultData);
_bpelProcess._engine._contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
}
public void beforeCompletion() {
_dao.delete(_bpelProcess.getCleanupCategories(false), false);
}
});
}
/**
* @see BpelRuntimeContext#completedOk()
*/
public void completedOk() {
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("ProcessImpl " + _bpelProcess.getPID() + " completed OK.");
}
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(_dao.getState());
_dao.setState(ProcessState.STATE_COMPLETED_OK);
evt.setNewState(ProcessState.STATE_COMPLETED_OK);
sendEvent(evt);
sendEvent(new ProcessCompletionEvent(null));
_dao.finishCompletion();
completeOutstandingMessageExchanges();
_bpelProcess._engine._contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
}
public void beforeCompletion() {
_dao.delete(_bpelProcess.getCleanupCategories(true), false);
}
});
}
/**
* @see BpelRuntimeContext#createScopeInstance(Long,
* org.apache.ode.bpel.obj.OScope)
*/
public Long createScopeInstance(Long parentScopeId, OScope scope) {
if (BpelProcess.__log.isTraceEnabled()) {
BpelProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("createScopeInstance", new Object[] {
"parentScopeId", parentScopeId, "scope", scope }));
}
ScopeDAO parent = null;
if (parentScopeId != null) {
parent = _dao.getScope(parentScopeId);
}
ScopeDAO scopeDao = _dao.createScope(parent, scope.getName(), scope.getId());
return scopeDao.getScopeInstanceId();
}
public void initializePartnerLinks(Long parentScopeId, Collection<OPartnerLink> partnerLinks) {
if (BpelProcess.__log.isTraceEnabled()) {
BpelProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("initializeEndpointReferences", new Object[] {
"parentScopeId", parentScopeId, "partnerLinks", partnerLinks }));
}
ScopeDAO parent = _dao.getScope(parentScopeId);
for (OPartnerLink partnerLink : partnerLinks) {
PartnerLinkDAO pdao = parent.createPartnerLink(partnerLink.getId(), partnerLink.getName(),
partnerLink.getMyRoleName(), partnerLink.getPartnerRoleName());
// If there is a myrole on the link, initialize the session id so it
// is always
// available for opaque correlations. The myrole session id should
// never be changed.
if (partnerLink.hasMyRole())
pdao.setMySessionId(new GUID().toString());
}
}
public void select(PickResponse pickResponseChannel, Date timeout, boolean createInstance,
Selector[] selectors) throws FaultException {
if (BpelProcess.__log.isTraceEnabled())
BpelProcess.__log.trace(ObjectPrinter.stringifyMethodEnter("select", new Object[] { "pickResponseChannel",
pickResponseChannel, "timeout", timeout, "createInstance", createInstance,
"selectors", selectors }));
ProcessDAO processDao = _dao.getProcess();
// check if this is first pick
if (_dao.getState() == ProcessState.STATE_NEW) {
assert createInstance;
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(ProcessState.STATE_NEW);
_dao.setState(ProcessState.STATE_READY);
evt.setNewState(ProcessState.STATE_READY);
sendEvent(evt);
}
final String pickResponseChannelStr = ProcessUtil.exportChannel(pickResponseChannel);
List<CorrelatorDAO> correlators = new ArrayList<CorrelatorDAO>(selectors.length);
for (Selector selector : selectors) {
String correlatorId = BpelProcess.genCorrelatorId(selector.plinkInstance.partnerLink, selector.opName);
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("SELECT: " + pickResponseChannel + ": USING CORRELATOR " + correlatorId);
}
correlators.add(processDao.getCorrelator(correlatorId));
}
// Checking conflicts
int conflict = _imaManager.findConflict(selectors);
if (conflict != -1)
throw new FaultException(_bpelProcess.getOProcess().getConstants().getQnConflictingReceive(), selectors[conflict]
.toString());
// Check for ambiguous receive
for (int i = 0; i < selectors.length; ++i) {
CorrelatorDAO correlator = correlators.get(i);
Selector selector = selectors[i];
if (!correlator.checkRoute(selector.correlationKeySet)) {
throw new FaultException(_bpelProcess.getOProcess().getConstants().qnAmbiguousReceive(), selector.toString());
}
}
//Registering
_imaManager.register(pickResponseChannelStr, selectors);
// First check if we match to a new instance.
if (_instantiatingMessageExchange != null && _dao.getState() == ProcessState.STATE_READY) {
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("SELECT: " + pickResponseChannel + ": CHECKING for NEW INSTANCE match");
}
for (int i = 0; i < correlators.size(); ++i) {
CorrelatorDAO ci = correlators.get(i);
if (ci.equals(_dao.getInstantiatingCorrelator())) {
inputMsgMatch(pickResponseChannelStr, i, _instantiatingMessageExchange);
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("SELECT: " + pickResponseChannel
+ ": FOUND match for NEW instance mexRef=" + _instantiatingMessageExchange);
}
return;
}
}
}
if (timeout != null) {
registerTimer(pickResponseChannel, timeout);
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("SELECT: " + pickResponseChannel + "REGISTERED TIMEOUT for " + timeout);
}
}
for (int i = 0; i < selectors.length; ++i) {
CorrelatorDAO correlator = correlators.get(i);
Selector selector = selectors[i];
correlator.addRoute(ProcessUtil.exportChannel(pickResponseChannel), _dao, i, selector.correlationKeySet, selector.route);
scheduleCorrelatorMatcher(correlator.getCorrelatorId(), selector.correlationKeySet);
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("SELECT: " + pickResponseChannel + ": ADDED ROUTE " + correlator.getCorrelatorId() + ": "
+ selector.correlationKeySet + " --> " + _dao.getInstanceId());
}
}
}
/**
* @see BpelRuntimeContext#readCorrelation(org.apache.ode.bpel.runtime.CorrelationSetInstance)
*/
public CorrelationKey readCorrelation(CorrelationSetInstance cset) {
ScopeDAO scopeDAO = _dao.getScope(cset.scopeInstance);
CorrelationSetDAO cs = scopeDAO.getCorrelationSet(cset.declaration.getName());
return cs.getValue();
}
public Element fetchPartnerRoleEndpointReferenceData(PartnerLinkInstance pLink) throws FaultException {
PartnerLinkDAO pl = fetchPartnerLinkDAO(pLink);
Element epr = pl.getPartnerEPR();
if (epr == null) {
EndpointReference e = _bpelProcess.getInitialPartnerRoleEPR(pLink.partnerLink);
if (e != null)
epr = e.toXML().getDocumentElement();
}
if (epr == null) {
throw new FaultException(_bpelProcess.getOProcess().getConstants().getQnUninitializedPartnerRole());
}
return epr;
}
public Element fetchMyRoleEndpointReferenceData(PartnerLinkInstance pLink) {
return _bpelProcess.getInitialMyRoleEPR(pLink.partnerLink).toXML().getDocumentElement();
}
protected PartnerLinkDAO fetchPartnerLinkDAO(PartnerLinkInstance pLink) {
ScopeDAO scopeDAO = _dao.getScope(pLink.scopeInstanceId);
return scopeDAO.getPartnerLink(pLink.partnerLink.getId());
}
/**
* Evaluate a property alias query expression against a variable, returning
* the normalized {@link String} representation of the property value.
*
* @param variable
* variable to read
* @param property
* property to read
* @return value of property for variable, in String form
* @throws org.apache.ode.bpel.common.FaultException
* in case of selection or other fault
*/
public String readProperty(VariableInstance variable, OProcess.OProperty property) throws FaultException {
Node varData = readVariable(variable.scopeInstance, variable.declaration.getName(), false);
OProcess.OPropertyAlias alias = property.getAlias(variable.declaration.getType());
String val = _bpelProcess.extractProperty((Element) varData, Collections.EMPTY_MAP, alias, variable.declaration.getDescription());
if (BpelProcess.__log.isTraceEnabled()) {
BpelProcess.__log.trace("readPropertyAlias(variable=" + variable + ", alias=" + alias + ") = "
+ val.toString());
}
return val;
}
public void writeEndpointReference(PartnerLinkInstance variable, Element data) throws FaultException {
if (__log.isDebugEnabled()) {
__log.debug("Writing endpoint reference " + variable.partnerLink.getName() + " with value "
+ DOMUtils.domToString(data));
}
PartnerLinkDAO eprDAO = fetchPartnerLinkDAO(variable);
eprDAO.setPartnerEPR(data);
}
public String fetchEndpointSessionId(PartnerLinkInstance pLink, boolean isMyEPR) throws FaultException {
PartnerLinkDAO dao = fetchPartnerLinkDAO(pLink);
return isMyEPR ? dao.getMySessionId() : dao.getPartnerSessionId();
}
public Node convertEndpointReference(Element sourceNode, Node targetNode) {
QName nodeQName;
if (targetNode.getNodeType() == Node.TEXT_NODE) {
nodeQName = new QName(Namespaces.XML_SCHEMA, "string");
} else {
// We have an element
nodeQName = new QName(targetNode.getNamespaceURI(), targetNode.getLocalName());
}
return _bpelProcess._engine._contexts.eprContext.convertEndpoint(nodeQName, sourceNode).toXML();
}
public Node readVariable(Long scopeInstanceId, String varname, boolean forWriting) throws FaultException {
ScopeDAO scopedao = _dao.getScope(scopeInstanceId);
XmlDataDAO var = scopedao.getVariable(varname);
return (var == null || var.isNull()) ? null : var.get();
}
public Node writeVariable(VariableInstance variable, Node changes) {
ScopeDAO scopeDAO = _dao.getScope(variable.scopeInstance);
XmlDataDAO dataDAO = scopeDAO.getVariable(variable.declaration.getName());
dataDAO.set(changes);
writeProperties(variable, changes, dataDAO);
return dataDAO.get();
}
public void cancelOutstandingRequests(String channelId) {
_imaManager.cancel(channelId, false);
}
public void processOutstandingRequest(PartnerLinkInstance partnerLink, String opName, String bpelMexId, String odeMexId) throws FaultException {
String mexRef = _imaManager.processOutstandingRequest(partnerLink, opName, bpelMexId, odeMexId);
if (mexRef != null) {
reply2(partnerLink, opName, bpelMexId, null, _bpelProcess.getOProcess().getConstants().getQnConflictingRequest(), false, mexRef);
throw new FaultException(_bpelProcess.getOProcess().getConstants().getQnConflictingRequest());
}
}
public void reply(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg,
QName fault) throws FaultException {
String mexRef = _imaManager.release(plinkInstnace, opName, mexId);
reply2(plinkInstnace, opName, mexId, msg, fault, false, mexRef);
}
public void reply2(final PartnerLinkInstance plinkInstnace, final String opName, final String mexId, Element msg,
QName fault, boolean failure, final String mexRef) throws FaultException {
// prepare event
ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
evt.setMexId(mexId);
evt.setOperation(opName);
evt.setPortType(plinkInstnace.partnerLink.getMyRolePortType().getQName());
MessageExchangeDAO mex = _dao.getConnection().getMessageExchange(mexRef);
MessageDAO message = mex.createMessage(plinkInstnace.partnerLink.getMyRoleOperation(opName).getOutput()
.getMessage().getQName());
buildOutgoingMessage(message, msg);
MyRoleMessageExchangeImpl m = new MyRoleMessageExchangeImpl(_bpelProcess, _bpelProcess._engine, mex);
_bpelProcess.initMyRoleMex(m);
m.setResponse(new MessageImpl(message));
if (failure) {
mex.setStatus(MessageExchange.Status.FAILURE.toString());
} else if (fault != null) {
mex.setStatus(MessageExchange.Status.FAULT.toString());
mex.setFault(fault);
evt.setAspect(ProcessMessageExchangeEvent.PROCESS_FAULT);
} else {
mex.setStatus(MessageExchange.Status.RESPONSE.toString());
evt.setAspect(ProcessMessageExchangeEvent.PROCESS_OUTPUT);
}
_bpelProcess.doAsyncReply(m, this);
// send event
sendEvent(evt);
}
/**
* @see BpelRuntimeContext#writeCorrelation(org.apache.ode.bpel.runtime.CorrelationSetInstance,
* org.apache.ode.bpel.common.CorrelationKey)
*/
public void writeCorrelation(CorrelationSetInstance cset, CorrelationKey correlation) {
ScopeDAO scopeDAO = _dao.getScope(cset.scopeInstance);
CorrelationSetDAO cs = scopeDAO.getCorrelationSet(cset.declaration.getName());
QName[] propNames = new QName[cset.declaration.getProperties().size()];
for (int m = 0; m < cset.declaration.getProperties().size(); m++) {
OProcess.OProperty oProperty = cset.declaration.getProperties().get(m);
propNames[m] = oProperty.getName();
}
cs.setValue(propNames, correlation);
CorrelationSetWriteEvent cswe = new CorrelationSetWriteEvent(cset.declaration.getName(), correlation);
cswe.setScopeId(cset.scopeInstance);
sendEvent(cswe);
}
/**
* Common functionality to initialize a correlation set based on data
* available in a variable.
*
* @param cset
* the correlation set instance
* @param variable
* variable instance
*
* @throws IllegalStateException
* DOCUMENTME
*/
public void initializeCorrelation(CorrelationSetInstance cset, VariableInstance variable) throws FaultException {
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("Initializing correlation set " + cset.declaration.getName());
}
// if correlation set is already initialized, then skip
if (isCorrelationInitialized(cset)) {
// if already set, we ignore
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("OCorrelation set " + cset + " is already set: ignoring");
}
return;
}
String[] propNames = new String[cset.declaration.getProperties().size()];
String[] propValues = new String[cset.declaration.getProperties().size()];
for (int i = 0; i < cset.declaration.getProperties().size(); ++i) {
OProcess.OProperty property = cset.declaration.getProperties().get(i);
propValues[i] = readProperty(variable, property);
propNames[i] = property.getName().toString();
}
CorrelationKey ckeyVal = new CorrelationKey(cset.declaration.getName(), propValues);
writeCorrelation(cset, ckeyVal);
}
public ExpressionLanguageRuntimeRegistry getExpLangRuntime() {
return _bpelProcess._expLangRuntimeRegistry;
}
/**
* @see BpelRuntimeContext#terminate()
*/
public void terminate() {
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(_dao.getState());
_dao.setState(ProcessState.STATE_TERMINATED);
evt.setNewState(ProcessState.STATE_TERMINATED);
sendEvent(evt);
sendEvent(new ProcessTerminationEvent());
_dao.finishCompletion();
failOutstandingMessageExchanges();
_bpelProcess._engine._contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
}
public void beforeCompletion() {
_dao.delete(_bpelProcess.getCleanupCategories(false), false);
}
});
}
public void registerTimer(TimerResponse timerChannel, Date timeToFire) {
JobDetails we = new JobDetails();
we.setInstanceId(_dao.getInstanceId());
we.setChannel(ProcessUtil.exportChannel(timerChannel));
we.setType(JobType.TIMER);
we.setInMem(_bpelProcess.isInMemory());
if(_bpelProcess.isInMemory()){
_bpelProcess._engine._contexts.scheduler.scheduleVolatileJob(true, we, timeToFire);
}else{
_bpelProcess._engine._contexts.scheduler.schedulePersistedJob(we, timeToFire);
}
}
private void scheduleCorrelatorMatcher(String correlatorId, CorrelationKeySet keySet) {
JobDetails we = new JobDetails();
we.setInstanceId(_dao.getInstanceId());
we.setType(JobType.MATCHER);
we.setCorrelatorId(correlatorId);
we.setCorrelationKeySet(keySet);
we.setInMem(_bpelProcess.isInMemory());
if(_bpelProcess.isInMemory()){
_bpelProcess._engine._contexts.scheduler.scheduleVolatileJob(true, we);
}else{
_bpelProcess._engine._contexts.scheduler.schedulePersistedJob(we, null);
}
}
public void checkInvokeExternalPermission() {}
/**
* Called back when the process executes an invokation.
*
* @param activityId The activity id in the process definition (id of OInvoke)
* @param partnerLinkInstance The partner link variable instance
* @param operation The wsdl operation.
* @param outboundMsg The message sent outside as a DOM
* @param invokeResponseChannel Object called back when the response is received.
* @return The instance id of the message exchange.
* @throws FaultException When the response is a fault or when the invoke could not be executed
* in which case it is one of the bpel standard fault.
*/
public String invoke(int aid, PartnerLinkInstance partnerLink, Operation operation, Element outgoingMessage,
InvokeResponse channel) throws FaultException {
PartnerLinkDAO plinkDAO = fetchPartnerLinkDAO(partnerLink);
// The target (partner endpoint) -- if it has not been explicitly
// initialized
// then use the value from bthe deployment descriptor ..
Element partnerEPR = plinkDAO.getPartnerEPR();
EndpointReference partnerEpr;
if (partnerEPR == null) {
partnerEpr = _bpelProcess.getInitialPartnerRoleEPR(partnerLink.partnerLink);
// In this case, the partner link has not been initialized.
if (partnerEpr == null)
throw new FaultException(partnerLink.partnerLink.getOwner().getConstants().getQnUninitializedPartnerRole());
} else {
partnerEpr = _bpelProcess._engine._contexts.eprContext.resolveEndpointReference(partnerEPR);
}
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("INVOKING PARTNER: partnerLink=" + partnerLink +
", op=" + operation.getName() + " channel=" + channel + ")");
}
// prepare event
ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
evt.setOperation(operation.getName());
evt.setPortType(partnerLink.partnerLink.getPartnerRolePortType().getQName());
evt.setAspect(ProcessMessageExchangeEvent.PARTNER_INPUT);
MessageExchangeDAO mexDao = _dao.getConnection().createMessageExchange(
MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE);
mexDao.setCreateTime(getCurrentEventDateTime());
mexDao.setStatus(MessageExchange.Status.NEW.toString());
mexDao.setOperation(operation.getName());
mexDao.setPortType(partnerLink.partnerLink.getPartnerRolePortType().getQName());
mexDao.setPartnerLinkModelId(partnerLink.partnerLink.getId());
mexDao.setPartnerLink(plinkDAO);
mexDao.setProcess(_dao.getProcess());
mexDao.setInstance(_dao);
mexDao.setPattern((operation.getOutput() != null ? MessageExchangePattern.REQUEST_RESPONSE
: MessageExchangePattern.REQUEST_ONLY).toString());
mexDao.setChannel(channel == null ? null : ProcessUtil.exportChannel(channel));
// Properties used by stateful-exchange protocol.
String mySessionId = plinkDAO.getMySessionId();
String partnerSessionId = plinkDAO.getPartnerSessionId();
if ( mySessionId != null )
mexDao.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, mySessionId);
if ( partnerSessionId != null )
mexDao.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, partnerSessionId);
if (__log.isDebugEnabled())
__log.debug("INVOKE PARTNER (SEP): sessionId=" + mySessionId + " partnerSessionId=" + partnerSessionId);
MessageDAO message = null;
if (operation.getInput() != null)
message = mexDao.createMessage(operation.getInput().getMessage().getQName());
else
message = mexDao.createMessage(null);
mexDao.setRequest(message);
if (operation.getInput() != null)
message.setType(operation.getInput().getMessage().getQName());
else
message.setType(null);
buildOutgoingMessage(message, outgoingMessage);
// Get he my-role EPR (if myrole exists) for optional use by partner
// (for callback mechanism).
EndpointReference myRoleEndpoint = partnerLink.partnerLink.hasMyRole() ? _bpelProcess
.getInitialMyRoleEPR(partnerLink.partnerLink) : null;
PartnerRoleMessageExchangeImpl mex =
createPartnerRoleMessageExchangeImpl(mexDao, partnerLink,
operation, partnerEpr, myRoleEndpoint);
mex.setProperty("activityId", ""+aid);
List<BpelProcess> p2pProcesses = null;
Endpoint partnerEndpoint = _bpelProcess.getInitialPartnerRoleEndpoint(partnerLink.partnerLink);
if (getConfigForPartnerLink(partnerLink.partnerLink).usePeer2Peer && partnerEndpoint != null)
p2pProcesses = _bpelProcess.getEngine().route(partnerEndpoint.serviceName, mex.getRequest());
if (p2pProcesses != null && !p2pProcesses.isEmpty()) {
// Creating a my mex using the same message id as partner mex to "pipe" them
MyRoleMessageExchange myRoleMex = _bpelProcess.getEngine().createMessageExchange(
mex.getMessageExchangeId(), partnerEndpoint.serviceName,
operation.getName(), mex.getMessageExchangeId());
if (myRoleMex instanceof BrokeredMyRoleMessageExchangeImpl) {
mex.setSubscriberCount(((BrokeredMyRoleMessageExchangeImpl) myRoleMex).getSubscriberCount());
}
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("Invoking in a p2p interaction, partnerrole " + mex + " - myrole " + myRoleMex);
}
Message odeRequest = myRoleMex.createMessage(operation.getInput().getMessage().getQName());
odeRequest.setMessage(outgoingMessage);
((MessageImpl)odeRequest)._dao.setHeader(message.getHeader());
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("Setting myRoleMex session ids for p2p interaction, mySession "
+ partnerSessionId + " - partnerSess " + mySessionId);
}
if ( partnerSessionId != null )
myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID, partnerSessionId);
if ( mySessionId != null )
myRoleMex.setProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID, mySessionId);
mex.setStatus(MessageExchange.Status.REQUEST);
myRoleMex.invoke(odeRequest);
// Can't expect any sync response
scheduleInvokeCheck(mex, partnerLink.partnerLink, true);
mex.replyAsync();
} else {
// If we couldn't find the endpoint, then there is no sense
// in asking the IL to invoke.
if (partnerEpr != null) {
checkInvokeExternalPermission();
mexDao.setEPR(partnerEpr.toXML().getDocumentElement());
mex.setStatus(MessageExchange.Status.REQUEST);
// Assuming an unreliable protocol, we schedule a task to check if recovery mode will be needed
scheduleInvokeCheck(mex, partnerLink.partnerLink, false);
_bpelProcess._engine._contexts.mexContext.invokePartner(mex);
} else {
__log.error("Couldn't find endpoint for partner EPR " + DOMUtils.domToString(partnerEPR));
mex.setFailure(FailureType.UNKNOWN_ENDPOINT, "UnknownEndpoint", partnerEPR);
}
}
evt.setMexId(mexDao.getMessageExchangeId());
sendEvent(evt);
// MEX pattern is request only, at this point the status can only be a one way
if (mexDao.getPattern().equals(MessageExchangePattern.REQUEST_ONLY.toString())) {
mexDao.setStatus(MessageExchange.Status.ASYNC.toString());
// This mex can now be released
boolean succeeded = mex.getStatus() != MessageExchange.Status.FAILURE && mex.getStatus() != MessageExchange.Status.FAULT;
mexDao.release(_bpelProcess.isCleanupCategoryEnabled(succeeded, CLEANUP_CATEGORY.MESSAGES));
}
// Check if there is a synchronous response, if so, we need to inject the
// message on the response channel.
switch (mex.getStatus()) {
case NEW:
throw new AssertionError("Impossible!");
case ASYNC:
break;
case RESPONSE:
case FAULT:
case FAILURE:
invocationResponse(mex);
break;
default:
__log.error("Partner did not acknowledge message exchange: " + mex);
mex.setFailure(FailureType.NO_RESPONSE, "Partner did not acknowledge.", null);
invocationResponse(mex);
}
return mexDao.getMessageExchangeId();
}
// enable extensibility
protected PartnerRoleMessageExchangeImpl createPartnerRoleMessageExchangeImpl(MessageExchangeDAO mexDao,
PartnerLinkInstance partnerLink, Operation operation, EndpointReference partnerEpr,
EndpointReference myRoleEndpoint) {
return new PartnerRoleMessageExchangeImpl(getBpelProcess().getEngine(), mexDao,
partnerLink.partnerLink.getPartnerRolePortType(), operation, partnerEpr, myRoleEndpoint,
getBpelProcess().getPartnerRoleChannel(partnerLink.partnerLink));
}
protected BpelProcess getBpelProcess() {
return _bpelProcess;
}
private void scheduleInvokeCheck(PartnerRoleMessageExchangeImpl mex, OPartnerLink partnerLink, boolean p2p) {
boolean isTwoWay = mex.getMessageExchangePattern() ==
org.apache.ode.bpel.iapi.MessageExchange.MessageExchangePattern.REQUEST_RESPONSE;
if (!_bpelProcess.isInMemory() && isTwoWay) {
JobDetails event = new JobDetails();
event.setMexId(mex.getMessageExchangeId());
event.setProcessId(_bpelProcess.getPID());
event.setInMem(false);
event.setType(JobType.INVOKE_CHECK);
// use a greater timeout to make sure the check job does not get executed while the service invocation is still waiting for a response
long timeout = getBpelProcess().getTimeout(partnerLink, p2p);
if (__log.isDebugEnabled()) __log.debug("Creating invocation check event in "+timeout+"ms for mexid " + mex.getMessageExchangeId());
Date future = new Date(System.currentTimeMillis() + timeout);
String jobId = _bpelProcess._engine._contexts.scheduler.schedulePersistedJob(event, future);
mex.setProperty("invokeCheckJobId", jobId);
}
}
protected void buildOutgoingMessage(MessageDAO message, Element outgoingElmt) {
if (outgoingElmt == null) return;
Document doc = DOMUtils.newDocument();
Element header = doc.createElement("header");
NodeList parts = outgoingElmt.getChildNodes();
for (int m = 0; m < parts.getLength(); m++) {
if (parts.item(m).getNodeType() == Node.ELEMENT_NODE) {
Element part = (Element) parts.item(m);
if (part.getAttribute("headerPart") != null && part.getAttribute("headerPart").length() > 0) {
header.appendChild(doc.importNode(part, true));
// remove the element from the list AND decrement the index to avoid skipping the next element!!
outgoingElmt.removeChild(part);
m--;
}
}
}
message.setData(outgoingElmt);
message.setHeader(header);
}
public void execute() {
long maxTime = System.currentTimeMillis() + _maxReductionTimeMs;
boolean canReduce = true;
assert _outstandingRequests == null && _imaManager != null;
while (ProcessState.canExecute(_dao.getState()) && System.currentTimeMillis() < maxTime && canReduce && !_forceFlush) {
canReduce = _vpu.execute();
}
_dao.setLastActiveTime(new Date());
if (!ProcessState.isFinished(_dao.getState())) {
if (__log.isDebugEnabled()) __log.debug("Setting execution state on instance " + _iid);
_soup.setGlobalData(_imaManager);
if (_bpelProcess.isInMemory()) {
// don't serialize in-memory processes
((ProcessInstanceDaoImpl) _dao).setSoup(_soup);
} else {
ByteArrayOutputStream bos = new ByteArrayOutputStream(10000);
try {
_soup.write(bos);
bos.close();
} catch (Exception ex) {
throw new RuntimeException(ex);
}
_dao.setExecutionState(bos.toByteArray());
}
if (ProcessState.canExecute(_dao.getState()) && canReduce) {
// Max time exceeded (possibly an infinite loop).
if (__log.isDebugEnabled())
__log.debug("MaxTime exceeded for instance # " + _iid);
try {
JobDetails we = new JobDetails();
we.setInstanceId(_iid);
we.setType(JobType.RESUME);
we.setInMem(_bpelProcess.isInMemory());
if (_bpelProcess.isInMemory())
_bpelProcess._engine._contexts.scheduler.scheduleVolatileJob(true, we);
else
_bpelProcess._engine._contexts.scheduler.schedulePersistedJob(we, new Date());
} catch (ContextException e) {
__log.error("Failed to schedule resume task.", e);
throw new BpelEngineException(e);
}
}
}
}
public void inputMsgMatch(final String responsechannel, final int idx, MyRoleMessageExchangeImpl mex) {
// if we have a message match, this instance should be marked
// active if it isn't already
if (_dao.getState() == ProcessState.STATE_READY) {
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("INPUTMSGMATCH: Changing process instance state from ready to active");
}
_dao.setState(ProcessState.STATE_ACTIVE);
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(ProcessState.STATE_READY);
evt.setNewState(ProcessState.STATE_ACTIVE);
sendEvent(evt);
}
final String mexId = mex.getMessageExchangeId();
_vpu.inject(new JacobRunnable() {
private static final long serialVersionUID = 3168964409165899533L;
public void run() {
PickResponse pickResponse = importChannel(responsechannel, PickResponse.class);
pickResponse.onRequestRcvd(idx, mexId);
}
});
}
protected void timerEvent(final String timerResponseChannel) {
// In case this is a pick event, we remove routes,
// and cancel the outstanding requests.
_dao.getProcess().removeRoutes(timerResponseChannel, _dao);
_imaManager.cancel(timerResponseChannel, true);
// Ignore timer events after the process is finished.
if (ProcessState.isFinished(_dao.getState())) {
return;
}
_vpu.inject(new JacobRunnable() {
private static final long serialVersionUID = -7767141033611036745L;
public void run() {
TimerResponse responseChannel = importChannel(timerResponseChannel, TimerResponse.class);
responseChannel.onTimeout();
}
});
execute();
}
public void cancel(final TimerResponse timerResponseChannel) {
// In case this is a pick response channel, we need to cancel routes and
// receive/reply association.
final String id = ProcessUtil.exportChannel(timerResponseChannel);
_dao.getProcess().removeRoutes(id, _dao);
_imaManager.cancel(id, true);
_vpu.inject(new JacobRunnable() {
private static final long serialVersionUID = 6157913683737696396L;
public void run() {
TimerResponse timerResponse = importChannel(id, TimerResponse.class);
timerResponse.onCancel();
}
});
}
void invocationResponse(PartnerRoleMessageExchangeImpl mex) {
invocationResponse(mex.getDAO().getMessageExchangeId(), mex.getDAO().getChannel());
}
public void invocationResponse(final String mexid, final String responseChannelId) {
if (responseChannelId == null)
throw new NullPointerException("Null responseChannelId");
if (mexid == null)
throw new NullPointerException("Null mexId");
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("Invoking message response for mexid " + mexid + " and channel " + responseChannelId);
}
_vpu.inject(new BpelJacobRunnable() {
private static final long serialVersionUID = -1095444335740879981L;
public void run() {
((BpelRuntimeContextImpl) getBpelRuntimeContext()).invocationResponse2(
mexid, importChannel(responseChannelId, InvokeResponse.class));
}
});
}
/**
* Continuation of the above.
*
* @param mexid
* @param responseChannel
*/
private void invocationResponse2(String mexid, InvokeResponse responseChannel) {
__log.debug("Triggering response");
MessageExchangeDAO mex = _dao.getConnection().getMessageExchange(mexid);
ProcessMessageExchangeEvent evt = new ProcessMessageExchangeEvent();
evt.setPortType(mex.getPortType());
evt.setMexId(mexid);
evt.setOperation(mex.getOperation());
MessageExchange.Status status = MessageExchange.Status.valueOf(mex.getStatus());
switch (status) {
case FAULT:
evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAULT);
responseChannel.onFault();
break;
case RESPONSE:
evt.setAspect(ProcessMessageExchangeEvent.PARTNER_OUTPUT);
responseChannel.onResponse();
break;
case FAILURE:
evt.setAspect(ProcessMessageExchangeEvent.PARTNER_FAILURE);
responseChannel.onFailure();
break;
default:
__log.error("Invalid response state for mex " + mexid + ": " + status);
}
sendEvent(evt);
}
private void saveScopeState(Long scopeId, ScopeStateEnum scopeState) {
ScopeDAO scope = _dao.getScope(scopeId);
scope.setState(scopeState);
}
/**
* @see BpelRuntimeContext#sendEvent(org.apache.ode.bpel.evt.ProcessInstanceEvent)
*/
public void sendEvent(ProcessInstanceEvent event) {
// fill in missing pieces
event.setProcessId(_dao.getProcess().getProcessId());
event.setProcessName(_dao.getProcess().getType());
event.setProcessInstanceId(_dao.getInstanceId());
_bpelProcess._debugger.onEvent(event);
// filter scopes
List<String> scopeNames = null;
if (event instanceof ScopeEvent) {
ScopeEvent sevent = (ScopeEvent) event;
scopeNames = sevent.getParentScopesNames();
if (sevent instanceof ScopeStartEvent) {
saveScopeState(sevent.getScopeId(), ScopeStateEnum.ACTIVE);
} else if (sevent instanceof ScopeCompletionEvent) {
saveScopeState(sevent.getScopeId(), ScopeStateEnum.COMPLETED);
} else if (sevent instanceof ScopeFaultEvent) {
saveScopeState(sevent.getScopeId(), ScopeStateEnum.FAULT);
}
}
// saving
_bpelProcess.saveEvent(event, _dao, scopeNames);
}
public static String debugInfoToString(org.apache.ode.bpel.obj.DebugInfo debugInfo) {
if (debugInfo == null) return "";
else return " at " + debugInfo.getSourceURI() + ":" + debugInfo.getStartLine();
}
/**
* We record all values of properties of a 'MessageType' variable for
* efficient lookup.
*/
private void writeProperties(VariableInstance variable, Node value, XmlDataDAO dao) {
if (variable.declaration.getType() instanceof OMessageVarType) {
for (OProcess.OProperty property : variable.declaration.getOwner().getProperties()) {
OProcess.OPropertyAlias alias = property.getAlias(variable.declaration.getType());
if (alias != null) {
try {
String val = _bpelProcess.extractProperty((Element) value, Collections.EMPTY_MAP, alias, variable.declaration
.getDescription());
if (val != null) {
dao.setProperty(property.getName().toString(), val);
}
} catch (FaultException e) {
// This will fail as we're basically trying to extract properties on all
// received messages for optimization purposes.
if (__log.isWarnEnabled())
__log.warn("Couldn't extract property '" + property.toString()
+ "' and variable " + variable.declaration + debugInfoToString(variable.declaration.getDebugInfo()) + " in property pre-extraction: " + e.toString());
}
}
}
}
}
private void completeOutstandingMessageExchanges() {
String[] mexRefs = _imaManager.releaseAll();
for (String mexId : mexRefs) {
MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
if (mexDao != null) {
MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess, _bpelProcess._engine, mexDao);
switch (mex.getStatus()) {
case ASYNC:
case RESPONSE:
mex.setStatus(MessageExchange.Status.COMPLETED_OK);
break;
case REQUEST:
if (mex.getPattern().equals(MessageExchange.MessageExchangePattern.REQUEST_ONLY)) {
mex.setStatus(MessageExchange.Status.COMPLETED_OK);
break;
}
default:
mex.setFailure(FailureType.OTHER, "No response.", null);
_bpelProcess.doAsyncReply(mex, this);
}
}
}
}
private void faultOutstandingMessageExchanges(FaultData faultData) {
String[] mexRefs = _imaManager.releaseAll();
for (String mexId : mexRefs) {
MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
if (mexDao != null) {
MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess, _bpelProcess._engine, mexDao);
_bpelProcess.initMyRoleMex(mex);
Message message = mex.createMessage(faultData.getFaultName());
if (faultData.getFaultMessage() != null)
message.setMessage(faultData.getFaultMessage());
mex.setResponse(message);
mex.setFault(faultData.getFaultName(), message);
mex.setFaultExplanation(faultData.getExplanation());
_bpelProcess.doAsyncReply(mex, this);
}
}
}
private void failOutstandingMessageExchanges() {
String[] mexRefs = _imaManager.releaseAll();
for (String mexId : mexRefs) {
MessageExchangeDAO mexDao = _dao.getConnection().getMessageExchange(mexId);
if (mexDao != null) {
MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess, _bpelProcess._engine, mexDao);
_bpelProcess.initMyRoleMex(mex);
mex.setFailure(FailureType.OTHER, "No response.", null);
_bpelProcess.doAsyncReply(mex, this);
}
}
}
public Element getPartnerResponse(String mexId) {
return mergeHeaders(_getPartnerResponse(mexId));
}
public Element getMyRequest(String mexId) {
MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId);
if (dao == null) {
// this should not happen....
String msg = "Engine requested non-existent message exchange: " + mexId;
__log.error(msg);
throw new BpelEngineException(msg);
}
if (dao.getDirection() != MessageExchangeDAO.DIR_PARTNER_INVOKES_MYROLE) {
// this should not happen....
String msg = "Engine requested my-role request for a partner-role mex: " + mexId;
__log.error(msg);
throw new BpelEngineException(msg);
}
MessageExchange.Status status = MessageExchange.Status.valueOf(dao.getStatus());
switch (status) {
case ASYNC:
case REQUEST:
// In the case of pub-sub, the status may already be OK.
case COMPLETED_OK:
MessageDAO request = dao.getRequest();
if (request == null) {
// this also should not happen
String msg = "Engine requested request for message exchange that did not have one: " + mexId;
__log.error(msg);
throw new BpelEngineException(msg);
}
return mergeHeaders(request);
default:
// We should not be in any other state when requesting this.
String msg = "Engine requested response while the message exchange " + mexId + " was in the state "
+ status;
__log.error(msg);
throw new BpelEngineException(msg);
}
}
private Element mergeHeaders(MessageDAO msg) {
if(msg==null) return null;
// Merging header data, it's all stored in the same variable
Element data = msg.getData();
if (msg.getHeader() != null) {
if (data == null) {
Document doc = DOMUtils.newDocument();
data = doc.createElement("message");
doc.appendChild(data);
}
NodeList headerParts = msg.getHeader().getChildNodes();
for (int m = 0; m < headerParts.getLength(); m++) {
if (headerParts.item(m).getNodeType() == Node.ELEMENT_NODE) {
Element headerPart = (Element) headerParts.item(m);
headerPart.setAttribute("headerPart", "true");
data.appendChild(data.getOwnerDocument().importNode(headerPart, true));
}
}
}
return data;
}
public QName getPartnerFault(String mexId) {
MessageExchangeDAO mex = _getPartnerResponse(mexId).getMessageExchange();
return mex.getFault();
}
public QName getPartnerResponseType(String mexId) {
return _getPartnerResponse(mexId).getType();
}
public String getPartnerFaultExplanation(String mexId) {
MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId);
return dao != null ? dao.getFaultExplanation() : null;
}
private MessageDAO _getPartnerResponse(String mexId) {
MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId);
if (dao == null) {
// this should not happen....
String msg = "Engine requested non-existent message exchange: " + mexId;
__log.error(msg);
throw new BpelEngineException(msg);
}
if (dao.getDirection() != MessageExchangeDAO.DIR_BPEL_INVOKES_PARTNERROLE) {
// this should not happen....
String msg = "Engine requested partner response for a my-role mex: " + mexId;
__log.error(msg);
throw new BpelEngineException(msg);
}
MessageDAO response;
MessageExchange.Status status = MessageExchange.Status.valueOf(dao.getStatus());
switch (status) {
case FAULT:
case RESPONSE:
response = dao.getResponse();
if (response == null) {
// this also should not happen
String msg = "Engine requested response for message exchange that did not have one: " + mexId;
__log.error(msg);
throw new BpelEngineException(msg);
}
break;
case FAILURE:
response = dao.getResponse();
break;
default:
// We should not be in any other state when requesting this.
String msg = "Engine requested response while the message exchange " + mexId + " was in the state "
+ status;
__log.error(msg);
throw new BpelEngineException(msg);
}
return response;
}
public void releasePartnerMex(String mexId, boolean instanceSucceeded) {
MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId);
dao.release(_bpelProcess.isCleanupCategoryEnabled(instanceSucceeded, CLEANUP_CATEGORY.MESSAGES) );
// We used to cancel the invoke check job here but it turns out
// it creates more contention on the ODE_JOB table. It's better
// just to let the job get scheduled and discarded quietly
/*
String jobId = dao.getProperty("invokeCheckJobId");
if (jobId != null)
_bpelProcess._engine._contexts.scheduler.cancelJob(jobId);
*/
}
public Element getSourceEPR(String mexId) {
MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId);
String epr = dao.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_EPR);
if (epr == null)
return null;
try {
Element eepr = DOMUtils.stringToDOM(epr);
return eepr;
} catch (Exception ex) {
__log.error("Invalid value for SEP property " + MessageExchange.PROPERTY_SEP_PARTNERROLE_EPR + ": " + epr);
}
return null;
}
public String getSourceSessionId(String mexId) {
MessageExchangeDAO dao = _dao.getConnection().getMessageExchange(mexId);
return dao.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
}
public void registerActivityForRecovery(ActivityRecovery channel, long activityId, String reason,
Date dateTime, Element details, String[] actions, int retries) {
if (reason == null)
reason = "Unspecified";
if (dateTime == null)
dateTime = new Date();
__log.info("ActivityRecovery: Registering activity " + activityId + ", failure reason: " + reason +
" on channel " + ProcessUtil.exportChannel(channel));
_dao.createActivityRecovery(ProcessUtil.exportChannel(channel), (int) activityId, reason, dateTime, details, actions, retries);
}
public void unregisterActivityForRecovery(ActivityRecovery channel) {
_dao.deleteActivityRecovery(ProcessUtil.exportChannel(channel));
}
public void recoverActivity(final String channel, final long activityId, final String action, final FaultData fault) {
_vpu.inject(new JacobRunnable() {
private static final long serialVersionUID = 3168964409165899533L;
public void run() {
ActivityRecovery recovery = importChannel(channel, ActivityRecovery.class);
__log.info("ActivityRecovery: Recovering activity " + activityId + " with action " + action +
" on channel " + recovery);
if (recovery != null) {
if ("cancel".equals(action))
recovery.cancel();
else if ("retry".equals(action))
recovery.retry();
else if ("fault".equals(action))
recovery.fault(fault);
}
}
});
//_dao.deleteActivityRecovery(channel);
execute();
}
/**
* Fetch the session-identifier for the partner link from the database.
*/
public String fetchMySessionId(PartnerLinkInstance pLink) {
String sessionId = fetchPartnerLinkDAO(pLink).getMySessionId();
assert sessionId != null : "Session ID should always be set!";
return sessionId;
}
public String fetchPartnersSessionId(PartnerLinkInstance pLink) {
return fetchPartnerLinkDAO(pLink).getPartnerSessionId();
}
public void initializePartnersSessionId(PartnerLinkInstance pLink, String session) {
if (__log.isDebugEnabled())
__log.debug("initializing partner " + pLink + " sessionId to " + session);
fetchPartnerLinkDAO(pLink).setPartnerSessionId(session);
}
/**
* Attempt to match message exchanges on a correlator.
*
*/
public void matcherEvent(String correlatorId, CorrelationKeySet ckeySet) {
if (BpelProcess.__log.isDebugEnabled()) {
__log.debug("MatcherEvent handling: correlatorId=" + correlatorId + ", ckeySet=" + ckeySet);
}
CorrelatorDAO correlator = _dao.getProcess().getCorrelator(correlatorId);
// Find the route first, this is a SELECT FOR UPDATE on the "selector" row,
// So we want to acquire the lock before we do anthing else.
List<MessageRouteDAO> mroutes = correlator.findRoute(ckeySet);
if (mroutes == null || mroutes.size() == 0) {
// Ok, this means that a message arrived before we did, so nothing to do.
__log.debug("MatcherEvent handling: nothing to do, route no longer in DB");
return;
}
// Now see if there is a message that matches this selector.
MessageExchangeDAO mexdao = correlator.dequeueMessage(ckeySet);
if (mexdao != null) {
__log.debug("MatcherEvent handling: found matching message in DB (i.e. message arrived before <receive>)");
if( MessageExchangePattern.REQUEST_RESPONSE.toString().equals(mexdao.getPattern())) {
__log.warn("A message arrived before a receive is ready for a request/response pattern. This may be processed to success. However, you should consider revising your process since a TCP port and a container thread will be held for a longer time and the process will not scale under heavy load.");
}
for (MessageRouteDAO mroute : mroutes) {
// We have a match, so we can get rid of the routing entries.
correlator.removeRoutes(mroute.getGroupId(), _dao);
}
// Selecting first route to proceed, other matching entries are ignored
MessageRouteDAO mroute = mroutes.get(0);
// Found message matching one of our selectors.
if (BpelProcess.__log.isDebugEnabled()) {
BpelProcess.__log.debug("SELECT: " + mroute.getGroupId() + ": matched to MESSAGE " + mexdao
+ " on CKEYSET " + ckeySet);
}
MyRoleMessageExchangeImpl mex = new MyRoleMessageExchangeImpl(_bpelProcess, _bpelProcess._engine, mexdao);
inputMsgMatch(mroute.getGroupId(), mroute.getIndex(), mex);
execute();
// Do not release yet if the process is suspended, the mex will be used again
if (_dao.getState() != ProcessState.STATE_SUSPENDED)
mexdao.releasePremieMessages();
} else {
__log.debug("MatcherEvent handling: nothing to do, no matching message in DB");
}
}
public Node readExtVar(Variable variable, Node reference) throws ExternalVariableModuleException {
Value val = _bpelProcess.getEVM().read(variable, reference, _iid);
return val.value;
}
public ValueReferencePair writeExtVar(Variable variable, Node reference, Node value) throws ExternalVariableModuleException {
ValueReferencePair vrp = new ValueReferencePair();
Value val = _bpelProcess.getEVM().write(variable, reference, value, _iid);
vrp.reference = val.locator.reference;
vrp.value = val.value;
return vrp;
}
public URI getBaseResourceURI() {
return _bpelProcess.getBaseResourceURI();
}
public Node getProcessProperty(QName propertyName) {
return _bpelProcess.getProcessProperty(propertyName);
}
public QName getProcessQName() {
return _bpelProcess.getProcessType();
}
public Date getCurrentEventDateTime() {
if (_currentEventDateTime == null)
return Calendar.getInstance().getTime();
else
return _currentEventDateTime;
}
public void setCurrentEventDateTime(Date eventDateTime) {
_currentEventDateTime = eventDateTime;
}
public ClassLoader getProcessClassLoader() {
return _bpelProcess._classLoader;
}
public PartnerRoleConfig getConfigForPartnerLink(OPartnerLink pLink) {
PartnerRoleConfig c = _bpelProcess.getConf().getPartnerRoleConfig().get(pLink.getName());
if (c == null) {
return new PartnerRoleConfig(null, true);
} else {
return c;
}
}
public void forceFlush() {
_forceFlush = true;
}
}