blob: e1a7bd9d08a5ff4c5f36ea81b6c3ceb09f2e632d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.engine;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.xml.namespace.QName;
import org.apache.commons.collections.comparators.ComparatorChain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.common.BpelEventFilter;
import org.apache.ode.bpel.common.Filter;
import org.apache.ode.bpel.common.InstanceFilter;
import org.apache.ode.bpel.common.ProcessFilter;
import org.apache.ode.bpel.dao.ActivityRecoveryDAO;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.CorrelationSetDAO;
import org.apache.ode.bpel.dao.PartnerLinkDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.dao.ScopeDAO;
import org.apache.ode.bpel.dao.XmlDataDAO;
import org.apache.ode.bpel.dao.ProcessManagementDAO.FailedSummaryValue;
import org.apache.ode.bpel.dao.ProcessManagementDAO.InstanceSummaryKey;
import org.apache.ode.bpel.engine.replayer.Replayer;
import org.apache.ode.bpel.evt.ActivityEvent;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.evt.CorrelationEvent;
import org.apache.ode.bpel.evt.CorrelationMatchEvent;
import org.apache.ode.bpel.evt.CorrelationSetEvent;
import org.apache.ode.bpel.evt.CorrelationSetWriteEvent;
import org.apache.ode.bpel.evt.ExpressionEvaluationEvent;
import org.apache.ode.bpel.evt.ExpressionEvaluationFailedEvent;
import org.apache.ode.bpel.evt.NewProcessInstanceEvent;
import org.apache.ode.bpel.evt.PartnerLinkEvent;
import org.apache.ode.bpel.evt.ProcessCompletionEvent;
import org.apache.ode.bpel.evt.ProcessEvent;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
import org.apache.ode.bpel.evt.ProcessInstanceStartedEvent;
import org.apache.ode.bpel.evt.ProcessInstanceStateChangeEvent;
import org.apache.ode.bpel.evt.ProcessMessageExchangeEvent;
import org.apache.ode.bpel.evt.ScopeCompletionEvent;
import org.apache.ode.bpel.evt.ScopeEvent;
import org.apache.ode.bpel.evt.ScopeFaultEvent;
import org.apache.ode.bpel.evt.VariableEvent;
import org.apache.ode.bpel.evt.VariableModificationEvent;
import org.apache.ode.bpel.evtproc.ActivityStateDocumentBuilder;
import org.apache.ode.bpel.iapi.BpelEngineException;
import org.apache.ode.bpel.iapi.BpelServer;
import org.apache.ode.bpel.iapi.EndpointReference;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.ProcessState;
import org.apache.ode.bpel.iapi.ProcessStore;
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.apache.ode.bpel.obj.OBase;
import org.apache.ode.bpel.obj.OPartnerLink;
import org.apache.ode.bpel.obj.OProcess;
import org.apache.ode.bpel.pmapi.ActivityExtInfoListDocument;
import org.apache.ode.bpel.pmapi.ActivityInfoDocument;
import org.apache.ode.bpel.pmapi.EventInfoListDocument;
import org.apache.ode.bpel.pmapi.GetCommunication;
import org.apache.ode.bpel.pmapi.GetCommunicationResponseDocument;
import org.apache.ode.bpel.pmapi.InstanceInfoDocument;
import org.apache.ode.bpel.pmapi.InstanceInfoListDocument;
import org.apache.ode.bpel.pmapi.InstanceManagement;
import org.apache.ode.bpel.pmapi.InstanceNotFoundException;
import org.apache.ode.bpel.pmapi.InvalidRequestException;
import org.apache.ode.bpel.pmapi.ManagementException;
import org.apache.ode.bpel.pmapi.ProcessInfoCustomizer;
import org.apache.ode.bpel.pmapi.ProcessInfoDocument;
import org.apache.ode.bpel.pmapi.ProcessInfoListDocument;
import org.apache.ode.bpel.pmapi.ProcessManagement;
import org.apache.ode.bpel.pmapi.ProcessNotFoundException;
import org.apache.ode.bpel.pmapi.ProcessingException;
import org.apache.ode.bpel.pmapi.Replay;
import org.apache.ode.bpel.pmapi.ReplayResponse;
import org.apache.ode.bpel.pmapi.ReplayResponseDocument;
import org.apache.ode.bpel.pmapi.ScopeInfoDocument;
import org.apache.ode.bpel.pmapi.TActivityExtInfo;
import org.apache.ode.bpel.pmapi.TActivityStatus;
import org.apache.ode.bpel.pmapi.TActivitytExtInfoList;
import org.apache.ode.bpel.pmapi.TCorrelationProperty;
import org.apache.ode.bpel.pmapi.TDefinitionInfo;
import org.apache.ode.bpel.pmapi.TDeploymentInfo;
import org.apache.ode.bpel.pmapi.TDocumentInfo;
import org.apache.ode.bpel.pmapi.TEndpointReferences;
import org.apache.ode.bpel.pmapi.TEventInfo;
import org.apache.ode.bpel.pmapi.TEventInfoList;
import org.apache.ode.bpel.pmapi.TFailureInfo;
import org.apache.ode.bpel.pmapi.TFailuresInfo;
import org.apache.ode.bpel.pmapi.TFaultInfo;
import org.apache.ode.bpel.pmapi.TInstanceInfo;
import org.apache.ode.bpel.pmapi.TInstanceInfoList;
import org.apache.ode.bpel.pmapi.TInstanceStatus;
import org.apache.ode.bpel.pmapi.TInstanceSummary;
import org.apache.ode.bpel.pmapi.TProcessInfo;
import org.apache.ode.bpel.pmapi.TProcessInfoList;
import org.apache.ode.bpel.pmapi.TProcessProperties;
import org.apache.ode.bpel.pmapi.TProcessStatus;
import org.apache.ode.bpel.pmapi.TScopeInfo;
import org.apache.ode.bpel.pmapi.TScopeRef;
import org.apache.ode.bpel.pmapi.TVariableInfo;
import org.apache.ode.bpel.pmapi.TVariableRef;
import org.apache.ode.bpel.pmapi.VariableInfoDocument;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.ISO8601DateParser;
import org.apache.ode.utils.msg.MessageBundle;
import org.apache.ode.utils.stl.CollectionsX;
import org.apache.ode.utils.stl.MemberOfFunction;
import org.apache.ode.utils.stl.UnaryFunction;
import org.apache.xmlbeans.XmlObject;
import org.apache.xmlbeans.XmlOptions;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
/**
* Implementation of the Process and InstanceManagement APIs.
*
* @todo Move this out of the engine, it no longer belongs here.
*/
public class ProcessAndInstanceManagementImpl implements InstanceManagement, ProcessManagement {
protected static final Messages __msgs = MessageBundle.getMessages(Messages.class);
protected static Logger __log = LoggerFactory.getLogger(BpelManagementFacadeImpl.class);
protected static final ProcessStatusConverter __psc = new ProcessStatusConverter();
protected BpelDatabase _db;
protected ProcessStore _store;
// Calendar can be expensive to initialize so we cache and clone it
protected Calendar _calendar = Calendar.getInstance();
protected BpelServerImpl _server;
public ProcessAndInstanceManagementImpl(BpelServer server, ProcessStore store) {
_server = (BpelServerImpl) server;
_db = _server._db;
_store = store;
}
public ProcessInfoListDocument listProcessesCustom(String filter, String orderKeys,
final ProcessInfoCustomizer custom) {
ProcessInfoListDocument ret = ProcessInfoListDocument.Factory.newInstance();
final TProcessInfoList procInfoList = ret.addNewProcessInfoList();
final ProcessFilter processFilter = new ProcessFilter(filter, orderKeys);
try {
_db.exec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection conn) throws Exception {
Collection<ProcessConf> pconfs = processQuery(processFilter);
for (ProcessConf pconf : pconfs) {
try {
fillProcessInfo(procInfoList.addNewProcessInfo(), pconf, custom);
} catch (Exception e) {
failIfSQLException(e);
__log.error("Exception when querying process " + pconf.getProcessId(), e);
}
}
try {
fillProcessInfoSummary(conn, procInfoList.getProcessInfoArray(), custom);
} catch (Exception e) {
failIfSQLException(e);
__log.error("Exception fetching instances summary", e);
}
return null;
}
});
} catch (Exception e) {
__log.error("Exception while listing processes", e);
throw new ProcessingException("Exception while listing processes: " + e.toString());
}
return ret;
}
public ProcessInfoListDocument listProcesses(String filter, String orderKeys) {
return listProcessesCustom(filter, orderKeys, ProcessInfoCustomizer.ALL);
}
public ProcessInfoListDocument listAllProcesses() {
return listProcessesCustom(null, null, ProcessInfoCustomizer.ALL);
}
public ProcessInfoListDocument listProcessesSummaryOnly() {
return listProcessesCustom(null, null, ProcessInfoCustomizer.SUMMARYONLY);
}
public ProcessInfoListDocument listProcessesSummaryAndProperties() {
return listProcessesCustom(null, null, ProcessInfoCustomizer.SUMMARY_PROPERTIES);
}
public ProcessInfoDocument getProcessInfoCustom(final QName pid, final ProcessInfoCustomizer custom) {
try {
return _db.exec(new BpelDatabase.Callable<ProcessInfoDocument>() {
public ProcessInfoDocument run(BpelDAOConnection conn) {
return genProcessInfoDocument(conn, pid, custom);
}
});
} catch (Exception ex) {
__log.error("Exception in getProcessInfoCustom()", ex);
throw new ManagementException("Exception in getProcessInfoCustom(): " + ex.toString());
}
}
public ProcessInfoDocument getProcessInfo(QName pid) {
return getProcessInfoCustom(pid, ProcessInfoCustomizer.ALL);
}
public ProcessInfoDocument activate(QName pid) {
try {
_store.setState(pid, org.apache.ode.bpel.iapi.ProcessState.ACTIVE);
} catch (Exception ex) {
__log.error("Exception while setting process state", ex);
throw new ManagementException("Error setting process state: " + ex.toString());
}
return getProcessInfoCustom(pid, ProcessInfoCustomizer.NONE);
}
public ProcessInfoDocument setRetired(QName pid, boolean retired) throws ManagementException {
try {
_store.setState(pid, retired ? ProcessState.RETIRED : ProcessState.ACTIVE);
} catch (BpelEngineException e) {
__log.error("Exception while setting process as retired", e);
throw new ProcessNotFoundException("ProcessNotFound:" + pid);
}
return getProcessInfoCustom(pid, ProcessInfoCustomizer.NONE);
}
public void setPackageRetired(String packageName, boolean retired)
throws ManagementException {
try {
_store.setRetiredPackage(packageName, retired);
} catch (BpelEngineException e) {
__log.error("Exception while setting process as retired", e);
throw new ProcessNotFoundException("PackageNotFound:" + packageName);
}
}
public ProcessInfoDocument setProcessPropertyNode(final QName pid, final QName propertyName, final Node value)
throws ManagementException {
ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance();
final TProcessInfo pi = ret.addNewProcessInfo();
try {
_db.exec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection conn) throws Exception {
try {
_store.setProperty(pid, propertyName, value);
} catch (Exception ex) {
failIfSQLException(ex);
// Likely the process no longer exists in the store.
__log.debug("Error setting property value for " + pid + "; " + propertyName, ex);
}
// We have to do this after we set the property, since the
// ProcessConf object
// is immutable.
ProcessConf proc = _store.getProcessConfiguration(pid);
if (proc == null)
throw new ProcessNotFoundException("ProcessNotFound:" + pid);
fillProcessInfo(pi, proc, new ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
return null;
}
});
} catch (ManagementException me) {
throw me;
} catch (Exception e) {
__log.error("Exception while setting process property", e);
throw new ProcessingException("Exception while setting process property: " + e.toString());
}
return ret;
}
public ProcessInfoDocument setProcessProperty(final QName pid, final QName propertyName, final String value)
throws ManagementException {
ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance();
final TProcessInfo pi = ret.addNewProcessInfo();
try {
_db.exec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection conn) throws Exception {
try {
_store.setProperty(pid, propertyName, value);
} catch (Exception ex) {
failIfSQLException(ex);
// Likely the process no longer exists in the store.
__log.debug("Error setting property value for " + pid + "; " + propertyName, ex);
}
// We have to do this after we set the property, since the
// ProcessConf object is immutable.
ProcessConf proc = _store.getProcessConfiguration(pid);
if (proc == null)
throw new ProcessNotFoundException("ProcessNotFound:" + pid);
fillProcessInfo(pi, proc, new ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES));
return null;
}
});
} catch (ManagementException me) {
throw me;
} catch (Exception e) {
__log.error("Exception while setting process property", e);
throw new ProcessingException("Exception while setting process property" + e.toString());
}
return ret;
}
private boolean garbage(String filter) {
if(filter == null) {
return false;
}
Matcher expressionMatcher = Filter.__comparatorPattern.matcher(filter);
if(!filter.trim().equals("") && !expressionMatcher.find()) {
return true;
}
return false;
}
public InstanceInfoListDocument listInstances(String filter, String order, int limit) {
InstanceInfoListDocument ret = InstanceInfoListDocument.Factory.newInstance();
final TInstanceInfoList infolist = ret.addNewInstanceInfoList();
if(garbage(filter)) {
return ret;
}
final InstanceFilter instanceFilter = new InstanceFilter(filter, order, limit);
try {
_db.exec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection conn) {
Collection<ProcessInstanceDAO> instances = conn.instanceQuery(instanceFilter);
for (ProcessInstanceDAO instance : instances) {
fillInstanceInfo(infolist.addNewInstanceInfo(), instance);
}
return null;
}
});
} catch (Exception e) {
__log.error("Exception while listing instances", e);
throw new ProcessingException("Exception while listing instances: " + e.toString());
}
return ret;
}
public InstanceInfoListDocument listInstancesSummary(String filter, String order, int limit) {
InstanceInfoListDocument ret = InstanceInfoListDocument.Factory.newInstance();
final TInstanceInfoList infolist = ret.addNewInstanceInfoList();
if(garbage(filter)) {
return ret;
}
final InstanceFilter instanceFilter = new InstanceFilter(filter, order, limit);
try {
_db.exec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection conn) {
Collection<ProcessInstanceDAO> instances = conn.instanceQuery(instanceFilter);
Map<Long, Collection<CorrelationSetDAO>> icsets = conn.getCorrelationSets(instances);
for (ProcessInstanceDAO instance : instances) {
TInstanceInfo info = infolist.addNewInstanceInfo();
fillInstanceSummary(info, instance);
Collection<CorrelationSetDAO> csets = icsets.get(instance.getInstanceId());
if (csets != null) {
for (CorrelationSetDAO cset: csets) {
Map<QName, String> props = cset.getProperties();
fillProperties(info, instance, props);
}
}
}
return null;
}
});
} catch (Exception e) {
__log.error("Exception while listing instances", e);
throw new ProcessingException("Exception while listing instances: " + e.toString());
}
return ret;
}
public InstanceInfoListDocument listAllInstances() {
return listInstancesSummary(null, null, Integer.MAX_VALUE);
}
public InstanceInfoListDocument listAllInstancesWithLimit(int limit) {
return listInstancesSummary(null, null, limit);
}
public InstanceInfoDocument getInstanceInfo(final Long iid) throws InstanceNotFoundException {
try {
return _db.exec(new BpelDatabase.Callable<InstanceInfoDocument>() {
public InstanceInfoDocument run(BpelDAOConnection conn) {
return genInstanceInfoDocument(conn, iid);
}
});
} catch (Exception e) {
__log.error("Exception while retrieving instance info", e);
throw new ProcessingException("Exception while retrieving instance info: " + e.toString());
}
}
public ScopeInfoDocument getScopeInfo(String siid) {
return getScopeInfoWithActivity(siid, false);
}
public ScopeInfoDocument getScopeInfoWithActivity(final String siid, final boolean includeActivityInfo) {
try {
return _db.exec(new BpelDatabase.Callable<ScopeInfoDocument>() {
public ScopeInfoDocument run(BpelDAOConnection conn) throws Exception {
return genScopeInfoDocument(conn, siid, includeActivityInfo);
}
});
} catch (Exception e) {
__log.error("Exception while retrieving scope info", e);
throw new ProcessingException("Exception while retrieving scope info: " + e.toString());
}
}
public VariableInfoDocument getVariableInfo(final String scopeId, final String varName) throws ManagementException {
VariableInfoDocument ret = VariableInfoDocument.Factory.newInstance();
final TVariableInfo vinf = ret.addNewVariableInfo();
final TVariableRef sref = vinf.addNewSelf();
dbexec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection session) throws Exception {
ScopeDAO scope = session.getScope(new Long(scopeId));
if (scope == null) {
throw new InvalidRequestException("ScopeNotFound:" + scopeId);
}
sref.setSiid(scopeId);
sref.setIid(scope.getProcessInstance().getInstanceId().toString());
sref.setName(varName);
XmlDataDAO var = scope.getVariable(varName);
if (var == null) {
throw new InvalidRequestException("VarNotFound:" + varName);
}
Node nval = var.get();
if (nval != null) {
TVariableInfo.Value val = vinf.addNewValue();
val.getDomNode().appendChild(val.getDomNode().getOwnerDocument().importNode(nval, true));
}
return null;
}
});
return ret;
}
public VariableInfoDocument setVariable(final String scopeId, final String varName, final XmlObject value) throws ManagementException {
VariableInfoDocument ret = VariableInfoDocument.Factory.newInstance();
final TVariableInfo vinf = ret.addNewVariableInfo();
final TVariableRef sref = vinf.addNewSelf();
dbexec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection session) throws Exception {
ScopeDAO scope = session.getScope(new Long(scopeId));
if (scope == null) {
throw new InvalidRequestException("ScopeNotFound:" + scopeId);
}
sref.setSiid(scopeId);
sref.setIid(scope.getProcessInstance().getInstanceId().toString());
sref.setName(varName);
XmlDataDAO var = scope.getVariable(varName);
if (var == null) {
throw new InvalidRequestException("VarNotFound:" + varName);
}
{
ByteArrayOutputStream out = new ByteArrayOutputStream();
value.save(out, new XmlOptions().setSaveOuter());
Node value2 = DOMUtils.getFirstChildElement(DOMUtils.stringToDOM(out.toString()));
var.set(value2);
}
Node nval = var.get();
if (nval != null) {
TVariableInfo.Value val = vinf.addNewValue();
val.getDomNode().appendChild(val.getDomNode().getOwnerDocument().importNode(nval, true));
}
return null;
}
});
return ret;
}
//
// INSTANCE ACTIONS
//
public InstanceInfoDocument fault(Long iid, QName faultname, Element faultData) {
// TODO: Implement
return getInstanceInfo(iid);
}
public InstanceInfoDocument resume(Long iid) {
// We need debugger support in order to resume (since we have to force
// a reduction. If one is not available the getDebugger() method should
// throw a ProcessingException
getDebugger(iid).resume(iid);
return getInstanceInfo(iid);
}
public InstanceInfoDocument suspend(Long iid) throws ManagementException {
DebuggerSupport debugSupport = getDebugger(iid);
assert debugSupport != null : "getDebugger(Long) returned NULL!";
debugSupport.suspend(iid);
return getInstanceInfo(iid);
}
public InstanceInfoDocument terminate(Long iid) throws ManagementException {
DebuggerSupport debugSupport = getDebugger(iid);
assert debugSupport != null : "getDebugger(Long) returned NULL!";
debugSupport.terminate(iid);
return getInstanceInfo(iid);
}
public InstanceInfoDocument recoverActivity(final Long iid, final Long aid, final String action) {
try {
_db.exec(new BpelDatabase.Callable<QName>() {
public QName run(BpelDAOConnection conn) throws Exception {
ProcessInstanceDAO instance = conn.getInstance(iid);
if (instance == null)
return null;
for (ActivityRecoveryDAO recovery : instance.getActivityRecoveries()) {
if (recovery.getActivityId() == aid) {
BpelProcess process = _server._engine._activeProcesses.get(instance.getProcess().getProcessId());
if (process != null) {
process.recoverActivity(instance, recovery.getChannel(), aid, action, null);
break;
}
}
}
return instance.getProcess().getProcessId();
}
});
} catch (Exception e) {
__log.error("Exception during activity recovery", e);
throw new ProcessingException("Exception during activity recovery" + e.toString());
}
return getInstanceInfo(iid);
}
public Collection<Long> delete(String filter) {
final InstanceFilter instanceFilter = new InstanceFilter(filter);
final List<Long> ret = new LinkedList<Long>();
if(garbage(filter)) {
return ret;
}
try {
_db.exec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection conn) {
Collection<ProcessInstanceDAO> instances = conn.instanceQuery(instanceFilter);
for (ProcessInstanceDAO instance : instances) {
ProcessConf proc = _store.getProcessConfiguration(instance.getProcess().getProcessId());
if (proc == null)
throw new ProcessNotFoundException("ProcessNotFound:" + instance.getProcess().getProcessId());
// delete the instance and all related data
instance.delete(EnumSet.allOf(CLEANUP_CATEGORY.class));
ret.add(instance.getInstanceId());
}
return null;
}
});
} catch (Exception e) {
__log.error("Exception during instance deletion", e);
throw new ProcessingException("Exception during instance deletion: " + e.toString());
}
return ret;
}
//
// EVENT RETRIEVAL
//
public List<String> getEventTimeline(String instanceFilter, String eventFilter) {
final InstanceFilter ifilter = new InstanceFilter(instanceFilter, null, 0);
final BpelEventFilter efilter = new BpelEventFilter(eventFilter, 0);
List<Date> tline = dbexec(new BpelDatabase.Callable<List<Date>>() {
public List<Date> run(BpelDAOConnection session) throws Exception {
return session.bpelEventTimelineQuery(ifilter, efilter);
}
});
ArrayList<String> ret = new ArrayList<String>(tline.size());
CollectionsX.transform(ret, tline, new UnaryFunction<Date, String>() {
public String apply(Date x) {
return ISO8601DateParser.format(x);
}
});
return ret;
}
public EventInfoListDocument listEvents(String instanceFilter, String eventFilter, int maxCount) {
final InstanceFilter ifilter = new InstanceFilter(instanceFilter, null, 0);
final BpelEventFilter efilter = new BpelEventFilter(eventFilter, maxCount);
EventInfoListDocument eid = EventInfoListDocument.Factory.newInstance();
final TEventInfoList eil = eid.addNewEventInfoList();
dbexec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection session) throws Exception {
List<BpelEvent> events = session.bpelEventQuery(ifilter, efilter);
for (BpelEvent event : events) {
TEventInfo tei = eil.addNewEventInfo();
fillEventInfo(tei, event);
}
return null;
}
});
return eid;
}
public ActivityExtInfoListDocument getExtensibilityElements(QName pid, Integer[] aids) {
ActivityExtInfoListDocument aeild = ActivityExtInfoListDocument.Factory.newInstance();
TActivitytExtInfoList taeil = aeild.addNewActivityExtInfoList();
OProcess oprocess = _server._engine.getOProcess(pid);
if (oprocess == null)
throw new ProcessNotFoundException("The process \"" + pid + "\" does not exist.");
for (int aid : aids) {
OBase obase = oprocess.getChild(aid);
if (obase != null && obase.getDebugInfo() != null && obase.getDebugInfo().getExtensibilityElements() != null) {
for (Map.Entry<QName, Object> entry : obase.getDebugInfo().getExtensibilityElements().entrySet()) {
TActivityExtInfo taei = taeil.addNewActivityExtInfo();
taei.setAiid("" + aid);
Object extValue = entry.getValue();
if (extValue instanceof Element)
taei.getDomNode().appendChild(
taei.getDomNode().getOwnerDocument().importNode((Element) extValue, true));
else if (extValue instanceof String) {
Element valueElmt = taei.getDomNode().getOwnerDocument().createElementNS(
entry.getKey().getNamespaceURI(), entry.getKey().getLocalPart());
valueElmt.appendChild(taei.getDomNode().getOwnerDocument().createTextNode((String) extValue));
taei.getDomNode().appendChild(valueElmt);
}
}
}
}
return aeild;
}
/**
* Get the {@link DebuggerSupport} object for the given process identifier.
* Debugger support is required for operations that resume execution in some
* way or manipulate the breakpoints.
*
* @param procid
* process identifier
* @return associated debugger support object
* @throws ManagementException
*/
protected final DebuggerSupport getDebugger(QName procid) throws ManagementException {
BpelProcess process = _server._engine._activeProcesses.get(procid);
if (process == null)
throw new ProcessNotFoundException("The process \"" + procid + "\" does not exist.");
return process._debugger;
}
/**
* Get the {@link DebuggerSupport} object for the given instance identifier.
* Debugger support is required for operations that resume execution in some
* way or manipulate the breakpoints.
*
* @param iid
* instance identifier
* @return associated debugger support object
* @throws ManagementException
*/
protected final DebuggerSupport getDebugger(final Long iid) {
QName processId;
try {
processId = _db.exec(new BpelDatabase.Callable<QName>() {
public QName run(BpelDAOConnection conn) throws Exception {
ProcessInstanceDAO instance = conn.getInstance(iid);
return instance == null ? null : instance.getProcess().getProcessId();
}
});
} catch (Exception e) {
__log.error("Exception during instance retrieval", e);
throw new ProcessingException("Exception during instance retrieval: " + e.toString());
}
return getDebugger(processId);
}
/**
* Execute a database transaction, unwrapping nested
* {@link ManagementException}s.
*
* @param runnable
* action to run
* @return
* @throws ManagementException
*/
protected <T> T dbexec(BpelProcessDatabase.Callable<T> runnable) throws ManagementException {
try {
return runnable.exec();
} catch (ManagementException me) {
throw me;
} catch (Exception e) {
__log.error("Exception during database operation", e);
throw new ManagementException("Exception during database operation: " + e.toString());
}
}
/**
* Execute a database transaction, unwrapping nested
* {@link ManagementException}s.
*
* @param callable
* action to run
* @return
* @throws ManagementException
*/
protected <T> T dbexec(BpelDatabase.Callable<T> callable) throws ManagementException {
try {
return _db.exec(callable);
} catch (ManagementException me) {
// Passthrough.
throw me;
} catch (Exception ex) {
__log.error("Exception during database operation", ex);
throw new ManagementException("Exception during database operation" + ex.toString());
}
}
private ProcessInfoDocument genProcessInfoDocument(BpelDAOConnection conn, QName procid, ProcessInfoCustomizer custom)
throws ManagementException {
if (procid == null) {
throw new InvalidRequestException("Valid QName as process id expected.");
}
ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance();
TProcessInfo pi = ret.addNewProcessInfo();
try {
ProcessConf pconf = _store.getProcessConfiguration(procid);
if (pconf == null)
throw new ProcessNotFoundException("ProcessNotFound:" + procid);
fillProcessInfo(pi, pconf, custom);
fillProcessInfoSummary(conn, new TProcessInfo[]{pi}, custom);
} catch (ManagementException me) {
throw me;
} catch (Exception e) {
__log.error("Exception while retrieving process information", e);
throw new ProcessingException("Exception while retrieving process information: " + e.toString());
}
return ret;
}
/**
* Generate a {@link InstanceInfoDocument} for a given instance. This
* document contains general information about the instance.
*
* @param iid
* instance identifier
* @return generated document
*/
private InstanceInfoDocument genInstanceInfoDocument(BpelDAOConnection conn, Long iid) {
if (iid == null)
throw new InvalidRequestException("Must specifiy instance id.");
InstanceInfoDocument ret = InstanceInfoDocument.Factory.newInstance();
TInstanceInfo ii = ret.addNewInstanceInfo();
ii.setIid(iid.toString());
ProcessInstanceDAO instance = conn.getInstance(iid);
if (instance == null)
throw new InstanceNotFoundException("InstanceNotFoundException " + iid);
// TODO: deal with "ERROR" state information.
fillInstanceInfo(ii, instance);
Map<Long, Collection<CorrelationSetDAO>> icsets = conn.getCorrelationSets(Arrays.asList(new ProcessInstanceDAO[] { instance }));
Collection<CorrelationSetDAO> csets = icsets.get(instance.getInstanceId());
if (csets != null) {
for (CorrelationSetDAO cset: csets) {
Map<QName, String> props = cset.getProperties();
fillProperties(ii, instance, props);
}
}
return ret;
}
/**
* Generate a {@link ScopeInfoDocument} for a given scope instance.
*
* @param siid
* scope instance identifier
* @param includeActivityInfo
* @return generated document
* @throws Exception
*/
private ScopeInfoDocument genScopeInfoDocument(BpelDAOConnection conn, String siid, boolean includeActivityInfo) throws Exception {
if (siid == null)
throw new InvalidRequestException("Must specifiy scope instance id.");
Long siidl;
try {
siidl = new Long(siid);
} catch (NumberFormatException nfe) {
throw new InvalidRequestException("Invalid scope instance id.");
}
ScopeInfoDocument ret = ScopeInfoDocument.Factory.newInstance();
TScopeInfo ii = ret.addNewScopeInfo();
ii.setSiid(siid);
try {
ScopeDAO instance = conn.getScope(siidl);
if (instance == null)
throw new InvalidRequestException("Scope not found: " + siidl);
// TODO: deal with "ERROR" state information.
fillScopeInfo(ii, instance, includeActivityInfo);
} catch (Exception e) {
failIfSQLException(e);
__log.error("Exception while retrieving scope information", e);
}
return ret;
}
private void fillProcessInfoSummary(BpelDAOConnection conn, TProcessInfo[] infos, ProcessInfoCustomizer custom) {
if (custom.includeInstanceSummary()) {
Set<String> pids = new HashSet<String>();
for (TProcessInfo i : infos) {
pids.add(i.getPid());
}
Map<InstanceSummaryKey, Long> m = conn.getProcessManagement().countInstancesSummary(pids);
Map<String, FailedSummaryValue> f = conn.getProcessManagement().findFailedCountAndLastFailedDateForProcessIds(pids);
for (TProcessInfo info : infos) {
TInstanceSummary isum = info.addNewInstanceSummary();
genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.ACTIVE, info.getPid(), m);
genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.COMPLETED, info.getPid(), m);
genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.ERROR, info.getPid(), m);
genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.FAILED, info.getPid(), m);
genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.SUSPENDED, info.getPid(), m);
genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.TERMINATED, info.getPid(), m);
getInstanceSummaryActivityFailure(isum, f, info.getPid());
}
}
}
/**
* Fill in the <code>process-info</code> element of the transfer object.
*
* @param info
* destination XMLBean
* @param pconf
* process configuration object (from store)
* @param proc
* source DAO object
* @param custom
* used to customize the quantity of information produced in the
* info
*/
private void fillProcessInfo(TProcessInfo info, ProcessConf pconf, ProcessInfoCustomizer custom) {
if (pconf == null)
throw new IllegalArgumentException("Null pconf.");
if( __log.isDebugEnabled() ) __log.debug("Filling process info for " + pconf.getProcessId());
info.setPid(pconf.getProcessId().toString());
// TODO: ACTIVE and RETIRED should be used separately.
// Active process may be retired at the same time
if (pconf.getState() == ProcessState.RETIRED) {
info.setStatus(TProcessStatus.RETIRED);
} else if (pconf.getState() == ProcessState.DISABLED) {
info.setStatus(TProcessStatus.DISABLED);
} else {
info.setStatus(TProcessStatus.ACTIVE);
}
info.setVersion(pconf.getVersion());
TDefinitionInfo definfo = info.addNewDefinitionInfo();
definfo.setProcessName(pconf.getType());
TDeploymentInfo depinfo = info.addNewDeploymentInfo();
depinfo.setPackage(pconf.getPackage());
if( __log.isDebugEnabled() ) __log.debug(" package name: " + depinfo.getPackage());
depinfo.setDocument(pconf.getBpelDocument());
depinfo.setDeployDate(toCalendar(pconf.getDeployDate()));
depinfo.setDeployer(pconf.getDeployer());
if (custom.includeDocumentLists()) {
TProcessInfo.Documents docinfo = info.addNewDocuments();
List<File> files = pconf.getFiles();
if (files != null)
genDocumentInfo(docinfo, files.toArray(new File[files.size()]), true);
else if (__log.isDebugEnabled())
__log.debug("fillProcessInfo: No files for " + pconf.getProcessId());
}
TProcessProperties properties = info.addNewProperties();
if (custom.includeProcessProperties()) {
for (Map.Entry<QName, Node> propEntry : pconf.getProcessProperties().entrySet()) {
TProcessProperties.Property tprocProp = properties.addNewProperty();
tprocProp.setName(new QName(propEntry.getKey().getNamespaceURI(), propEntry.getKey().getLocalPart()));
Node propNode = tprocProp.getDomNode();
Document processInfoDoc = propNode.getOwnerDocument();
Node node2append = processInfoDoc.importNode(propEntry.getValue(), true);
propNode.appendChild(node2append);
}
}
TEndpointReferences eprs = info.addNewEndpoints();
OProcess oprocess = _server._engine.getOProcess(pconf.getProcessId());
if (custom.includeEndpoints() && oprocess != null) {
for (OPartnerLink oplink : oprocess.getAllPartnerLinks()) {
if (oplink.hasPartnerRole() && oplink.isInitializePartnerRole()) {
// TODO: this is very uncool.
EndpointReference pepr = _server._engine._activeProcesses.get(pconf.getProcessId())
.getInitialPartnerRoleEPR(oplink);
if (pepr != null) {
TEndpointReferences.EndpointRef epr = eprs.addNewEndpointRef();
Document eprNodeDoc = epr.getDomNode().getOwnerDocument();
epr.getDomNode().appendChild(eprNodeDoc.importNode(pepr.toXML().getDocumentElement(), true));
}
}
}
}
// TODO: add documents to the above data structure.
}
String findVersionStringFromNodeToken(String packageName) {
int i = packageName.length() - 1;
while( i > 0 && Character.isDigit(packageName.charAt(i)) ) {
i--;
}
if( i < packageName.length() - 1 && packageName.charAt(i) == '-') {
return packageName.substring(i + 1);
}
return null;
}
/**
* Generate document information elements for a set of files.
*
* @param docinfo
* target element
* @param files
* files
* @param recurse
* recurse down directories?
*/
private void genDocumentInfo(TProcessInfo.Documents docinfo, File[] files, boolean recurse) {
if (files == null)
return;
for (File f : files) {
if (f.isHidden())
continue;
if (f.isDirectory()) {
if (recurse)
genDocumentInfo(docinfo, f.listFiles(), true);
} else if (f.isFile()) {
genDocumentInfo(docinfo, f);
}
}
}
private void genDocumentInfo(TProcessInfo.Documents docinfo, File f) {
DocumentInfoGenerator dig = new DocumentInfoGenerator(f);
if (dig.isRecognized() && dig.isVisible()) {
TDocumentInfo doc = docinfo.addNewDocument();
doc.setName(dig.getName());
doc.setSource(dig.getURL());
doc.setType(dig.getType());
}
}
private void genInstanceSummaryEntry(TInstanceSummary.Instances instances,
TInstanceStatus.Enum state, String pid, Map<InstanceSummaryKey, Long> summary)
{
instances.setState(state);
Long count = summary.get(new InstanceSummaryKey(pid, state.toString()));
instances.setCount(count == null ? 0 : count.intValue());
}
private void getInstanceSummaryActivityFailure(TInstanceSummary summary, Map<String, FailedSummaryValue> f, String pid) {
FailedSummaryValue v = f.get(pid);
if (v != null) {
TFailuresInfo failures = summary.addNewFailures();
failures.setDtFailure(toCalendar(v.lastFailed));
failures.setCount(v.count.intValue());
}
}
private void fillProperties(TInstanceInfo info, ProcessInstanceDAO instance, Map<QName, String> props) {
TInstanceInfo.CorrelationProperties corrProperties = info.addNewCorrelationProperties();
for (Map.Entry<QName, String> property : props.entrySet()) {
TCorrelationProperty tproperty = corrProperties.addNewCorrelationProperty();
// not setting correlation-set id here -- too inconvenient for performance
// tproperty.setCsetid("" + cset.getCorrelationSetId());
tproperty.setPropertyName(property.getKey());
tproperty.setStringValue(property.getValue());
}
}
private void fillInstanceSummary(TInstanceInfo info, ProcessInstanceDAO instance) {
info.setIid("" + instance.getInstanceId());
ProcessDAO processDAO = instance.getProcess();
info.setPid(processDAO.getProcessId().toString());
info.setProcessName(processDAO.getType());
info.setDtStarted(toCalendar(instance.getCreateTime()));
info.setDtLastActive(toCalendar(instance.getLastActiveTime()));
info.setStatus(__psc.cvtInstanceStatus(instance.getState()));
if (instance.getFault() != null) {
TFaultInfo faultInfo = info.addNewFaultInfo();
faultInfo.setName(instance.getFault().getName());
faultInfo.setExplanation(instance.getFault().getExplanation());
faultInfo.setAiid(instance.getFault().getActivityId());
faultInfo.setLineNumber(instance.getFault().getLineNo());
}
if (instance.getActivityFailureCount() > 0) {
TFailuresInfo failures = info.addNewFailures();
failures.setDtFailure(toCalendar(instance.getActivityFailureDateTime()));
failures.setCount(instance.getActivityFailureCount());
}
}
private void fillInstanceInfo(TInstanceInfo info, ProcessInstanceDAO instance) {
fillInstanceSummary(info, instance);
if (instance.getRootScope() != null)
info.setRootScope(genScopeRef(instance.getRootScope()));
ProcessInstanceDAO.EventsFirstLastCountTuple flc = instance.getEventsFirstLastCount();
TInstanceInfo.EventInfo eventInfo = info.addNewEventInfo();
if (flc != null) {
eventInfo.setFirstDtime(toCalendar(flc.first));
eventInfo.setLastDtime(toCalendar(flc.last));
eventInfo.setCount(flc.count);
}
// Setting valued correlation properties
if (!instance.getCorrelationSets().isEmpty()) {
TInstanceInfo.CorrelationProperties corrProperties = info.addNewCorrelationProperties();
for (CorrelationSetDAO correlationSetDAO : instance.getCorrelationSets()) {
for (Map.Entry<QName, String> property : correlationSetDAO.getProperties().entrySet()) {
TCorrelationProperty tproperty = corrProperties.addNewCorrelationProperty();
tproperty.setCsetid("" + correlationSetDAO.getCorrelationSetId());
tproperty.setPropertyName(property.getKey());
tproperty.setStringValue(property.getValue());
}
}
}
}
private void fillScopeInfo(TScopeInfo scopeInfo, ScopeDAO scope, boolean includeActivityInfo) {
scopeInfo.setSiid("" + scope.getScopeInstanceId());
scopeInfo.setName(scope.getName());
if (scope.getParentScope() != null)
scopeInfo.setParentScopeRef(genScopeRef(scope.getParentScope()));
scopeInfo.setStatus(__psc.cvtScopeStatus(scope.getState()));
TScopeInfo.Children children = scopeInfo.addNewChildren();
for (ScopeDAO i : scope.getChildScopes())
fillScopeRef(children.addNewChildRef(), i);
TScopeInfo.Variables vars = scopeInfo.addNewVariables();
for (XmlDataDAO i : scope.getVariables())
fillVariableRef(vars.addNewVariableRef(), i);
// Setting correlations and their valued properties
if (!scope.getCorrelationSets().isEmpty()) {
TScopeInfo.CorrelationSets correlationSets = scopeInfo.addNewCorrelationSets();
for (CorrelationSetDAO correlationSetDAO : scope.getCorrelationSets()) {
TScopeInfo.CorrelationSets.CorrelationSet correlationSet = correlationSets.addNewCorrelationSet();
correlationSet.setCsetid("" + correlationSetDAO.getCorrelationSetId());
correlationSet.setName(correlationSetDAO.getName());
for (Map.Entry<QName, String> property : correlationSetDAO.getProperties().entrySet()) {
TCorrelationProperty tproperty = correlationSet.addNewCorrelationProperty();
tproperty.setCsetid("" + correlationSetDAO.getCorrelationSetId());
tproperty.setPropertyName(property.getKey());
tproperty.setStringValue(property.getValue());
}
}
}
if (includeActivityInfo) {
Collection<ActivityRecoveryDAO> recoveries = scope.getProcessInstance().getActivityRecoveries();
TScopeInfo.Activities activities = scopeInfo.addNewActivities();
List<BpelEvent> events = scope.listEvents();
// if event generation was enabled
if(events!=null && events.size() >0) {
ActivityStateDocumentBuilder b = new ActivityStateDocumentBuilder();
for (BpelEvent e : events)
b.onEvent(e);
for (ActivityInfoDocument ai : b.getActivities()) {
for (ActivityRecoveryDAO recovery : recoveries) {
if (String.valueOf(recovery.getActivityId()).equals(ai.getActivityInfo().getAiid())) {
TFailureInfo failure = ai.getActivityInfo().addNewFailure();
failure.setReason(recovery.getReason());
failure.setDtFailure(toCalendar(recovery.getDateTime()));
failure.setActions(recovery.getActions());
failure.setRetries(recovery.getRetries());
ai.getActivityInfo().setStatus(TActivityStatus.FAILURE);
}
}
activities.addNewActivityInfo().set(ai.getActivityInfo());
}
}
// otherwise at least try to get the information about failed activities
// TODO: we are losing information about which scope does failed activities belong to
// as failure table does not have scope id, we would attach every failed activity to process scope
else {
if(scope.getParentScope() == null) {
for (ActivityRecoveryDAO recovery : recoveries) {
ActivityInfoDocument ai = ActivityInfoDocument.Factory.newInstance();
ai.addNewActivityInfo().setAiid(String.valueOf(recovery.getActivityId()));
ai.getActivityInfo().setType("OActivity");
ai.getActivityInfo().setScope(TScopeRef.Factory.newInstance());
TFailureInfo failure = ai.getActivityInfo().addNewFailure();
failure.setReason(recovery.getReason());
failure.setDtFailure(toCalendar(recovery.getDateTime()));
failure.setActions(recovery.getActions());
failure.setRetries(recovery.getRetries());
ai.getActivityInfo().setStatus(TActivityStatus.FAILURE);
activities.addNewActivityInfo().set(ai.getActivityInfo());
}
}
}
}
Collection<PartnerLinkDAO> plinks = scope.getPartnerLinks();
if (plinks.size() > 0) {
TEndpointReferences refs = scopeInfo.addNewEndpoints();
for (PartnerLinkDAO plink : plinks) {
if (plink.getPartnerRoleName() != null && plink.getPartnerRoleName().length() > 0) {
TEndpointReferences.EndpointRef ref = refs.addNewEndpointRef();
ref.setPartnerLink(plink.getPartnerLinkName());
ref.setPartnerRole(plink.getPartnerRoleName());
if (plink.getPartnerEPR() != null) {
Document eprNodeDoc = ref.getDomNode().getOwnerDocument();
ref.getDomNode().appendChild(eprNodeDoc.importNode(plink.getPartnerEPR(), true));
}
}
}
}
}
private void fillVariableRef(TVariableRef ref, XmlDataDAO i) {
ref.setIid(i.getScopeDAO().getProcessInstance().getInstanceId().toString());
ref.setSiid(i.getScopeDAO().getScopeInstanceId().toString());
ref.setName(i.getName());
}
private TScopeRef genScopeRef(ScopeDAO scope) {
TScopeRef tref = TScopeRef.Factory.newInstance();
fillScopeRef(tref, scope);
return tref;
}
private void fillScopeRef(TScopeRef tref, ScopeDAO scope) {
tref.setSiid(scope.getScopeInstanceId().toString());
tref.setStatus(__psc.cvtScopeStatus(scope.getState()));
tref.setName(scope.getName());
tref.setModelId("" + scope.getModelId());
}
private void fillEventInfo(TEventInfo info, BpelEvent event) {
info.setName(BpelEvent.eventName(event));
info.setType(event.getType().toString());
info.setLineNumber(event.getLineNo());
info.setTimestamp(toCalendar(event.getTimestamp()));
if (event instanceof ActivityEvent) {
info.setActivityName(((ActivityEvent) event).getActivityName());
info.setActivityId(((ActivityEvent) event).getActivityId());
info.setActivityType(((ActivityEvent) event).getActivityType());
info.setActivityDefinitionId(((ActivityEvent) event).getActivityDeclarationId());
}
if (event instanceof CorrelationEvent) {
info.setPortType(((CorrelationEvent) event).getPortType());
info.setOperation(((CorrelationEvent) event).getOperation());
info.setMexId(((CorrelationEvent) event).getMessageExchangeId());
}
if (event instanceof CorrelationMatchEvent) {
info.setPortType(((CorrelationMatchEvent) event).getPortType());
}
if (event instanceof CorrelationSetEvent) {
info.setCorrelationSet(((CorrelationSetEvent) event).getCorrelationSetName());
}
if (event instanceof CorrelationSetWriteEvent) {
info.setCorrelationKey(((CorrelationSetWriteEvent) event).getCorrelationSetName());
}
if (event instanceof ExpressionEvaluationEvent) {
info.setExpression(((ExpressionEvaluationEvent) event).getExpression());
}
if (event instanceof ExpressionEvaluationFailedEvent) {
info.setFault(((ExpressionEvaluationFailedEvent) event).getFault());
}
if (event instanceof NewProcessInstanceEvent) {
if ((((NewProcessInstanceEvent) event).getRootScopeId()) != null)
info.setRootScopeId(((NewProcessInstanceEvent) event).getRootScopeId());
info.setScopeDefinitionId(((NewProcessInstanceEvent) event).getScopeDeclarationId());
}
if (event instanceof PartnerLinkEvent) {
info.setPartnerLinkName(((PartnerLinkEvent) event).getpLinkName());
}
if (event instanceof ProcessCompletionEvent) {
info.setFault(((ProcessCompletionEvent) event).getFault());
}
if (event instanceof ProcessEvent) {
info.setProcessId(((ProcessEvent) event).getProcessId());
info.setProcessType(((ProcessEvent) event).getProcessName());
}
if (event instanceof ProcessInstanceEvent) {
info.setInstanceId(((ProcessInstanceEvent) event).getProcessInstanceId());
}
if (event instanceof ProcessInstanceStartedEvent) {
info.setRootScopeId(((ProcessInstanceStartedEvent) event).getRootScopeId());
info.setRootScopeDeclarationId(((ProcessInstanceStartedEvent) event).getScopeDeclarationId());
}
if (event instanceof ProcessInstanceStateChangeEvent) {
info.setOldState(((ProcessInstanceStateChangeEvent) event).getOldState());
info.setNewState(((ProcessInstanceStateChangeEvent) event).getNewState());
}
if (event instanceof ProcessMessageExchangeEvent) {
info.setPortType(((ProcessMessageExchangeEvent) event).getPortType());
info.setOperation(((ProcessMessageExchangeEvent) event).getOperation());
info.setMexId(((ProcessMessageExchangeEvent) event).getMessageExchangeId());
}
if (event instanceof ScopeCompletionEvent) {
info.setSuccess(true);
}
if (event instanceof ScopeEvent) {
info.setScopeId(((ScopeEvent) event).getScopeId());
if (((ScopeEvent) event).getParentScopeId() != null)
info.setParentScopeId(((ScopeEvent) event).getParentScopeId());
if (((ScopeEvent) event).getScopeName() != null)
info.setScopeName(((ScopeEvent) event).getScopeName());
info.setScopeDefinitionId(((ScopeEvent) event).getScopeDeclarationId());
}
if (event instanceof ScopeFaultEvent) {
info.setFault(((ScopeFaultEvent) event).getFaultType());
info.setFaultLineNumber(((ScopeFaultEvent) event).getFaultLineNo());
info.setExplanation(((ScopeFaultEvent) event).getExplanation());
}
if (event instanceof VariableEvent) {
info.setVariableName(((VariableEvent) event).getVarName());
}
if(event instanceof VariableModificationEvent) {
if(((VariableModificationEvent) event).getNewValue()!=null)
info.setNewValue(DOMUtils.domToString(((VariableModificationEvent) event).getNewValue()));
}
}
/**
* Convert a {@link Date} to a {@link Calendar}.
*
* @param dtime
* a {@link Date}
* @return a {@link Calendar}
*/
private Calendar toCalendar(Date dtime) {
if (dtime == null)
return null;
Calendar c = (Calendar) _calendar.clone();
c.setTime(dtime);
return c;
}
/**
* @see org.apache.ode.bpel.pmapi.InstanceManagement#queryInstances(java.lang.String)
*/
public InstanceInfoListDocument queryInstances(final String query) {
InstanceInfoListDocument ret = InstanceInfoListDocument.Factory.newInstance();
final TInstanceInfoList infolist = ret.addNewInstanceInfoList();
try {
_db.exec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection conn) {
Collection<ProcessInstanceDAO> instances = conn.instanceQuery(query);
for (ProcessInstanceDAO instance : instances) {
fillInstanceInfo(infolist.addNewInstanceInfo(), instance);
}
return null;
}
});
} catch (Exception e) {
__log.error("Exception while querying instances", e);
throw new ProcessingException("Exception while querying instances: " + e.toString());
}
return ret;
}
/**
* Query processes based on a {@link ProcessFilter} criteria. This is
* implemented in memory rather than via database calls since the processes
* are managed by the {@link ProcessStore} object and we don't want to make
* this needlessly complicated.
*
* @param filter
* @return
*/
@SuppressWarnings("unchecked")
Collection<ProcessConf> processQuery(ProcessFilter filter) {
List<QName> pids = _store.getProcesses();
// Name filter can be implemented using only the PIDs.
if (filter != null && filter.getNameFilter() != null) {
final Pattern pattern = Pattern.compile(filter.getNameFilter().replace("*",".*") + "(-\\d*)?");
CollectionsX.remove_if(pids, new MemberOfFunction<QName>() {
@Override
public boolean isMember(QName o) {
return !pattern.matcher(o.getLocalPart()).matches();
}
});
}
if (filter != null && filter.getNamespaceFilter() != null) {
final Pattern pattern = Pattern.compile(filter.getNamespaceFilter().replace("*",".*"));
CollectionsX.remove_if(pids, new MemberOfFunction<QName>() {
@Override
public boolean isMember(QName o) {
String ns = o.getNamespaceURI() == null ? "" : o.getNamespaceURI();
return !pattern.matcher(ns).matches();
}
});
}
// Now we need the process conf objects, we need to be
// careful since someone could have deleted them by now
List<ProcessConf> confs = new LinkedList<ProcessConf>();
for (QName pid : pids) {
ProcessConf pconf = _store.getProcessConfiguration(pid);
confs.add(pconf);
}
if (filter != null) {
// TODO Implement process status filtering when status will exist
// Specific filter for deployment date.
if (filter.getDeployedDateFilter() != null) {
for (final String ddf : filter.getDeployedDateFilter()) {
final Date dd;
try {
dd = ISO8601DateParser.parse(Filter.getDateWithoutOp(ddf));
} catch (ParseException e) {
// Should never happen.
__log.error("Exception while parsing date", e);
throw new RuntimeException(e.toString());
}
CollectionsX.remove_if(confs, new MemberOfFunction<ProcessConf>() {
@Override
public boolean isMember(ProcessConf o) {
if (ddf.startsWith("="))
return !o.getDeployDate().equals(dd);
if (ddf.startsWith("<="))
return o.getDeployDate().getTime() > dd.getTime();
if (ddf.startsWith(">="))
return o.getDeployDate().getTime() < dd.getTime();
if (ddf.startsWith("<"))
return o.getDeployDate().getTime() >= dd.getTime();
if (ddf.startsWith(">"))
return o.getDeployDate().getTime() <= dd.getTime();
return false;
}
});
}
}
// Ordering
if (filter.getOrders() != null) {
ComparatorChain cchain = new ComparatorChain();
for (String key : filter.getOrders()) {
boolean ascending = true;
String orderKey = key;
if (key.startsWith("+") || key.startsWith("-")) {
orderKey = key.substring(1, key.length());
if (key.startsWith("-"))
ascending = false;
}
Comparator c;
if ("name".equals(orderKey))
c = new Comparator<ProcessConf>() {
public int compare(ProcessConf o1, ProcessConf o2) {
return o1.getProcessId().getLocalPart().compareTo(o2.getProcessId().getLocalPart());
}
};
else if ("namespace".equals(orderKey))
c = new Comparator<ProcessConf>() {
public int compare(ProcessConf o1, ProcessConf o2) {
String ns1 = o1.getProcessId().getNamespaceURI() == null ? "" : o1.getProcessId()
.getNamespaceURI();
String ns2 = o2.getProcessId().getNamespaceURI() == null ? "" : o2.getProcessId()
.getNamespaceURI();
return ns1.compareTo(ns2);
}
};
else if ("version".equals(orderKey))
c = new Comparator<ProcessConf>() {
public int compare(ProcessConf o1, ProcessConf o2) {
// TODO: implement version comparisons.
return 0;
}
};
else if ("deployed".equals(orderKey))
c = new Comparator<ProcessConf>() {
public int compare(ProcessConf o1, ProcessConf o2) {
return o1.getDeployDate().compareTo(o2.getDeployDate());
}
};
else {
// unrecognized
__log.debug("unrecognized order key" + orderKey);
continue;
}
cchain.addComparator(c, !ascending);
}
Collections.sort(confs, cchain);
}
}
return confs;
}
public ReplayResponseDocument replay(final Replay request) throws ManagementException {
final Throwable[] e = new Throwable[1];
try {
ReplayResponseDocument response = _db.exec(new BpelDatabase.Callable<ReplayResponseDocument>() {
public ReplayResponseDocument run(BpelDAOConnection conn) throws Exception {
try {
ReplayResponse response = ReplayResponse.Factory.newInstance();
Replayer replayer = new Replayer();
List<Long> iids = replayer.replayInstances(request, _server.getEngine(), conn);
for (Long iid : iids) {
response.addRestoredIID(iid);
}
ReplayResponseDocument responseDoc = ReplayResponseDocument.Factory.newInstance();
responseDoc.setReplayResponse(response);
return responseDoc;
} catch (Throwable t) {
e[0] = t;
throw new Exception("", t);
}
}
});
if (e[0] != null) {
__log.debug("throwing pending exception");
throw e[0];
}
return response;
} catch (Throwable e2) {
throw new ManagementException("", e2);
}
}
public GetCommunicationResponseDocument getCommunication(final GetCommunication request) throws ManagementException {
final Throwable[] e = new Throwable[1];
try {
GetCommunicationResponseDocument response = _db.exec(new BpelDatabase.Callable<GetCommunicationResponseDocument>() {
public GetCommunicationResponseDocument run(BpelDAOConnection conn) throws Exception {
try {
Replayer replayer = new Replayer();
GetCommunicationResponseDocument responseDoc = GetCommunicationResponseDocument.Factory.newInstance();
responseDoc.setGetCommunicationResponse(replayer.getCommunication(request, conn));
return responseDoc;
} catch (Throwable e2) {
e[0] = e2;
throw new Exception("", e2);
}
}
});
if (e[0] != null) {
__log.debug("throwing pending exception");
throw e[0];
}
return response;
} catch (Throwable e2) {
throw new ManagementException("", e2);
}
}
private static void failIfSQLException(Exception e) throws Exception {
String[] fatal = { "org.hibernate", "org.apache.openjpa", "java.sql", "javax.sql" };
for (int i=0; i<fatal.length; i++) {
if (e.getClass().getName().startsWith(fatal[i])) throw e;
}
}
}