| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.ode.bpel.engine; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.File; |
| import java.text.ParseException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Calendar; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.Date; |
| import java.util.EnumSet; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| import javax.xml.namespace.QName; |
| |
| import org.apache.commons.collections.comparators.ComparatorChain; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.ode.bpel.common.BpelEventFilter; |
| import org.apache.ode.bpel.common.Filter; |
| import org.apache.ode.bpel.common.InstanceFilter; |
| import org.apache.ode.bpel.common.ProcessFilter; |
| import org.apache.ode.bpel.dao.ActivityRecoveryDAO; |
| import org.apache.ode.bpel.dao.BpelDAOConnection; |
| import org.apache.ode.bpel.dao.CorrelationSetDAO; |
| import org.apache.ode.bpel.dao.PartnerLinkDAO; |
| import org.apache.ode.bpel.dao.ProcessDAO; |
| import org.apache.ode.bpel.dao.ProcessInstanceDAO; |
| import org.apache.ode.bpel.dao.ScopeDAO; |
| import org.apache.ode.bpel.dao.XmlDataDAO; |
| import org.apache.ode.bpel.dao.ProcessManagementDAO.FailedSummaryValue; |
| import org.apache.ode.bpel.dao.ProcessManagementDAO.InstanceSummaryKey; |
| import org.apache.ode.bpel.engine.replayer.Replayer; |
| import org.apache.ode.bpel.evt.ActivityEvent; |
| import org.apache.ode.bpel.evt.BpelEvent; |
| import org.apache.ode.bpel.evt.CorrelationEvent; |
| import org.apache.ode.bpel.evt.CorrelationMatchEvent; |
| import org.apache.ode.bpel.evt.CorrelationSetEvent; |
| import org.apache.ode.bpel.evt.CorrelationSetWriteEvent; |
| import org.apache.ode.bpel.evt.ExpressionEvaluationEvent; |
| import org.apache.ode.bpel.evt.ExpressionEvaluationFailedEvent; |
| import org.apache.ode.bpel.evt.NewProcessInstanceEvent; |
| import org.apache.ode.bpel.evt.PartnerLinkEvent; |
| import org.apache.ode.bpel.evt.ProcessCompletionEvent; |
| import org.apache.ode.bpel.evt.ProcessEvent; |
| import org.apache.ode.bpel.evt.ProcessInstanceEvent; |
| import org.apache.ode.bpel.evt.ProcessInstanceStartedEvent; |
| import org.apache.ode.bpel.evt.ProcessInstanceStateChangeEvent; |
| import org.apache.ode.bpel.evt.ProcessMessageExchangeEvent; |
| import org.apache.ode.bpel.evt.ScopeCompletionEvent; |
| import org.apache.ode.bpel.evt.ScopeEvent; |
| import org.apache.ode.bpel.evt.ScopeFaultEvent; |
| import org.apache.ode.bpel.evt.VariableEvent; |
| import org.apache.ode.bpel.evt.VariableModificationEvent; |
| import org.apache.ode.bpel.evtproc.ActivityStateDocumentBuilder; |
| import org.apache.ode.bpel.iapi.BpelEngineException; |
| import org.apache.ode.bpel.iapi.BpelServer; |
| import org.apache.ode.bpel.iapi.EndpointReference; |
| import org.apache.ode.bpel.iapi.ProcessConf; |
| import org.apache.ode.bpel.iapi.ProcessState; |
| import org.apache.ode.bpel.iapi.ProcessStore; |
| import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY; |
| import org.apache.ode.bpel.obj.OBase; |
| import org.apache.ode.bpel.obj.OPartnerLink; |
| import org.apache.ode.bpel.obj.OProcess; |
| import org.apache.ode.bpel.pmapi.ActivityExtInfoListDocument; |
| import org.apache.ode.bpel.pmapi.ActivityInfoDocument; |
| import org.apache.ode.bpel.pmapi.EventInfoListDocument; |
| import org.apache.ode.bpel.pmapi.GetCommunication; |
| import org.apache.ode.bpel.pmapi.GetCommunicationResponseDocument; |
| import org.apache.ode.bpel.pmapi.InstanceInfoDocument; |
| import org.apache.ode.bpel.pmapi.InstanceInfoListDocument; |
| import org.apache.ode.bpel.pmapi.InstanceManagement; |
| import org.apache.ode.bpel.pmapi.InstanceNotFoundException; |
| import org.apache.ode.bpel.pmapi.InvalidRequestException; |
| import org.apache.ode.bpel.pmapi.ManagementException; |
| import org.apache.ode.bpel.pmapi.ProcessInfoCustomizer; |
| import org.apache.ode.bpel.pmapi.ProcessInfoDocument; |
| import org.apache.ode.bpel.pmapi.ProcessInfoListDocument; |
| import org.apache.ode.bpel.pmapi.ProcessManagement; |
| import org.apache.ode.bpel.pmapi.ProcessNotFoundException; |
| import org.apache.ode.bpel.pmapi.ProcessingException; |
| import org.apache.ode.bpel.pmapi.Replay; |
| import org.apache.ode.bpel.pmapi.ReplayResponse; |
| import org.apache.ode.bpel.pmapi.ReplayResponseDocument; |
| import org.apache.ode.bpel.pmapi.ScopeInfoDocument; |
| import org.apache.ode.bpel.pmapi.TActivityExtInfo; |
| import org.apache.ode.bpel.pmapi.TActivityStatus; |
| import org.apache.ode.bpel.pmapi.TActivitytExtInfoList; |
| import org.apache.ode.bpel.pmapi.TCorrelationProperty; |
| import org.apache.ode.bpel.pmapi.TDefinitionInfo; |
| import org.apache.ode.bpel.pmapi.TDeploymentInfo; |
| import org.apache.ode.bpel.pmapi.TDocumentInfo; |
| import org.apache.ode.bpel.pmapi.TEndpointReferences; |
| import org.apache.ode.bpel.pmapi.TEventInfo; |
| import org.apache.ode.bpel.pmapi.TEventInfoList; |
| import org.apache.ode.bpel.pmapi.TFailureInfo; |
| import org.apache.ode.bpel.pmapi.TFailuresInfo; |
| import org.apache.ode.bpel.pmapi.TFaultInfo; |
| import org.apache.ode.bpel.pmapi.TInstanceInfo; |
| import org.apache.ode.bpel.pmapi.TInstanceInfoList; |
| import org.apache.ode.bpel.pmapi.TInstanceStatus; |
| import org.apache.ode.bpel.pmapi.TInstanceSummary; |
| import org.apache.ode.bpel.pmapi.TProcessInfo; |
| import org.apache.ode.bpel.pmapi.TProcessInfoList; |
| import org.apache.ode.bpel.pmapi.TProcessProperties; |
| import org.apache.ode.bpel.pmapi.TProcessStatus; |
| import org.apache.ode.bpel.pmapi.TScopeInfo; |
| import org.apache.ode.bpel.pmapi.TScopeRef; |
| import org.apache.ode.bpel.pmapi.TVariableInfo; |
| import org.apache.ode.bpel.pmapi.TVariableRef; |
| import org.apache.ode.bpel.pmapi.VariableInfoDocument; |
| import org.apache.ode.utils.DOMUtils; |
| import org.apache.ode.utils.ISO8601DateParser; |
| import org.apache.ode.utils.msg.MessageBundle; |
| import org.apache.ode.utils.stl.CollectionsX; |
| import org.apache.ode.utils.stl.MemberOfFunction; |
| import org.apache.ode.utils.stl.UnaryFunction; |
| import org.apache.xmlbeans.XmlObject; |
| import org.apache.xmlbeans.XmlOptions; |
| import org.w3c.dom.Document; |
| import org.w3c.dom.Element; |
| import org.w3c.dom.Node; |
| |
| /** |
| * Implementation of the Process and InstanceManagement APIs. |
| * |
| * @todo Move this out of the engine, it no longer belongs here. |
| */ |
| public class ProcessAndInstanceManagementImpl implements InstanceManagement, ProcessManagement { |
| |
| protected static final Messages __msgs = MessageBundle.getMessages(Messages.class); |
| |
| protected static Logger __log = LoggerFactory.getLogger(BpelManagementFacadeImpl.class); |
| |
| protected static final ProcessStatusConverter __psc = new ProcessStatusConverter(); |
| |
| protected BpelDatabase _db; |
| |
| protected ProcessStore _store; |
| |
| // Calendar can be expensive to initialize so we cache and clone it |
| protected Calendar _calendar = Calendar.getInstance(); |
| |
| protected BpelServerImpl _server; |
| |
| public ProcessAndInstanceManagementImpl(BpelServer server, ProcessStore store) { |
| _server = (BpelServerImpl) server; |
| _db = _server._db; |
| _store = store; |
| } |
| |
| public ProcessInfoListDocument listProcessesCustom(String filter, String orderKeys, |
| final ProcessInfoCustomizer custom) { |
| ProcessInfoListDocument ret = ProcessInfoListDocument.Factory.newInstance(); |
| final TProcessInfoList procInfoList = ret.addNewProcessInfoList(); |
| final ProcessFilter processFilter = new ProcessFilter(filter, orderKeys); |
| try { |
| _db.exec(new BpelDatabase.Callable<Object>() { |
| public Object run(BpelDAOConnection conn) throws Exception { |
| Collection<ProcessConf> pconfs = processQuery(processFilter); |
| for (ProcessConf pconf : pconfs) { |
| try { |
| fillProcessInfo(procInfoList.addNewProcessInfo(), pconf, custom); |
| } catch (Exception e) { |
| failIfSQLException(e); |
| __log.error("Exception when querying process " + pconf.getProcessId(), e); |
| } |
| } |
| try { |
| fillProcessInfoSummary(conn, procInfoList.getProcessInfoArray(), custom); |
| } catch (Exception e) { |
| failIfSQLException(e); |
| __log.error("Exception fetching instances summary", e); |
| } |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| __log.error("Exception while listing processes", e); |
| throw new ProcessingException("Exception while listing processes: " + e.toString()); |
| } |
| return ret; |
| } |
| |
| public ProcessInfoListDocument listProcesses(String filter, String orderKeys) { |
| return listProcessesCustom(filter, orderKeys, ProcessInfoCustomizer.ALL); |
| } |
| |
| public ProcessInfoListDocument listAllProcesses() { |
| return listProcessesCustom(null, null, ProcessInfoCustomizer.ALL); |
| } |
| |
| public ProcessInfoListDocument listProcessesSummaryOnly() { |
| return listProcessesCustom(null, null, ProcessInfoCustomizer.SUMMARYONLY); |
| } |
| |
| public ProcessInfoListDocument listProcessesSummaryAndProperties() { |
| return listProcessesCustom(null, null, ProcessInfoCustomizer.SUMMARY_PROPERTIES); |
| } |
| |
| public ProcessInfoDocument getProcessInfoCustom(final QName pid, final ProcessInfoCustomizer custom) { |
| try { |
| return _db.exec(new BpelDatabase.Callable<ProcessInfoDocument>() { |
| public ProcessInfoDocument run(BpelDAOConnection conn) { |
| return genProcessInfoDocument(conn, pid, custom); |
| } |
| }); |
| } catch (Exception ex) { |
| __log.error("Exception in getProcessInfoCustom()", ex); |
| throw new ManagementException("Exception in getProcessInfoCustom(): " + ex.toString()); |
| } |
| } |
| |
| public ProcessInfoDocument getProcessInfo(QName pid) { |
| return getProcessInfoCustom(pid, ProcessInfoCustomizer.ALL); |
| } |
| |
| public ProcessInfoDocument activate(QName pid) { |
| try { |
| _store.setState(pid, org.apache.ode.bpel.iapi.ProcessState.ACTIVE); |
| } catch (Exception ex) { |
| __log.error("Exception while setting process state", ex); |
| throw new ManagementException("Error setting process state: " + ex.toString()); |
| } |
| return getProcessInfoCustom(pid, ProcessInfoCustomizer.NONE); |
| } |
| |
| public ProcessInfoDocument setRetired(QName pid, boolean retired) throws ManagementException { |
| try { |
| _store.setState(pid, retired ? ProcessState.RETIRED : ProcessState.ACTIVE); |
| } catch (BpelEngineException e) { |
| __log.error("Exception while setting process as retired", e); |
| throw new ProcessNotFoundException("ProcessNotFound:" + pid); |
| } |
| return getProcessInfoCustom(pid, ProcessInfoCustomizer.NONE); |
| } |
| |
| public void setPackageRetired(String packageName, boolean retired) |
| throws ManagementException { |
| try { |
| _store.setRetiredPackage(packageName, retired); |
| } catch (BpelEngineException e) { |
| __log.error("Exception while setting process as retired", e); |
| throw new ProcessNotFoundException("PackageNotFound:" + packageName); |
| } |
| } |
| |
| public ProcessInfoDocument setProcessPropertyNode(final QName pid, final QName propertyName, final Node value) |
| throws ManagementException { |
| ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance(); |
| final TProcessInfo pi = ret.addNewProcessInfo(); |
| try { |
| _db.exec(new BpelDatabase.Callable<Object>() { |
| public Object run(BpelDAOConnection conn) throws Exception { |
| try { |
| _store.setProperty(pid, propertyName, value); |
| } catch (Exception ex) { |
| failIfSQLException(ex); |
| |
| // Likely the process no longer exists in the store. |
| __log.debug("Error setting property value for " + pid + "; " + propertyName, ex); |
| } |
| |
| // We have to do this after we set the property, since the |
| // ProcessConf object |
| // is immutable. |
| ProcessConf proc = _store.getProcessConfiguration(pid); |
| if (proc == null) |
| throw new ProcessNotFoundException("ProcessNotFound:" + pid); |
| |
| fillProcessInfo(pi, proc, new ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES)); |
| |
| return null; |
| } |
| }); |
| } catch (ManagementException me) { |
| throw me; |
| } catch (Exception e) { |
| __log.error("Exception while setting process property", e); |
| throw new ProcessingException("Exception while setting process property: " + e.toString()); |
| } |
| return ret; |
| } |
| |
| public ProcessInfoDocument setProcessProperty(final QName pid, final QName propertyName, final String value) |
| throws ManagementException { |
| ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance(); |
| final TProcessInfo pi = ret.addNewProcessInfo(); |
| try { |
| _db.exec(new BpelDatabase.Callable<Object>() { |
| public Object run(BpelDAOConnection conn) throws Exception { |
| try { |
| _store.setProperty(pid, propertyName, value); |
| } catch (Exception ex) { |
| failIfSQLException(ex); |
| |
| // Likely the process no longer exists in the store. |
| __log.debug("Error setting property value for " + pid + "; " + propertyName, ex); |
| } |
| |
| // We have to do this after we set the property, since the |
| // ProcessConf object is immutable. |
| ProcessConf proc = _store.getProcessConfiguration(pid); |
| if (proc == null) |
| throw new ProcessNotFoundException("ProcessNotFound:" + pid); |
| |
| fillProcessInfo(pi, proc, new ProcessInfoCustomizer(ProcessInfoCustomizer.Item.PROPERTIES)); |
| |
| return null; |
| } |
| }); |
| } catch (ManagementException me) { |
| throw me; |
| } catch (Exception e) { |
| __log.error("Exception while setting process property", e); |
| throw new ProcessingException("Exception while setting process property" + e.toString()); |
| } |
| return ret; |
| } |
| |
| private boolean garbage(String filter) { |
| if(filter == null) { |
| return false; |
| } |
| Matcher expressionMatcher = Filter.__comparatorPattern.matcher(filter); |
| if(!filter.trim().equals("") && !expressionMatcher.find()) { |
| return true; |
| } |
| return false; |
| } |
| |
| public InstanceInfoListDocument listInstances(String filter, String order, int limit) { |
| InstanceInfoListDocument ret = InstanceInfoListDocument.Factory.newInstance(); |
| final TInstanceInfoList infolist = ret.addNewInstanceInfoList(); |
| if(garbage(filter)) { |
| return ret; |
| } |
| final InstanceFilter instanceFilter = new InstanceFilter(filter, order, limit); |
| try { |
| _db.exec(new BpelDatabase.Callable<Object>() { |
| public Object run(BpelDAOConnection conn) { |
| Collection<ProcessInstanceDAO> instances = conn.instanceQuery(instanceFilter); |
| for (ProcessInstanceDAO instance : instances) { |
| fillInstanceInfo(infolist.addNewInstanceInfo(), instance); |
| } |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| __log.error("Exception while listing instances", e); |
| throw new ProcessingException("Exception while listing instances: " + e.toString()); |
| } |
| return ret; |
| } |
| |
| public InstanceInfoListDocument listInstancesSummary(String filter, String order, int limit) { |
| InstanceInfoListDocument ret = InstanceInfoListDocument.Factory.newInstance(); |
| final TInstanceInfoList infolist = ret.addNewInstanceInfoList(); |
| if(garbage(filter)) { |
| return ret; |
| } |
| final InstanceFilter instanceFilter = new InstanceFilter(filter, order, limit); |
| try { |
| _db.exec(new BpelDatabase.Callable<Object>() { |
| public Object run(BpelDAOConnection conn) { |
| Collection<ProcessInstanceDAO> instances = conn.instanceQuery(instanceFilter); |
| Map<Long, Collection<CorrelationSetDAO>> icsets = conn.getCorrelationSets(instances); |
| for (ProcessInstanceDAO instance : instances) { |
| TInstanceInfo info = infolist.addNewInstanceInfo(); |
| fillInstanceSummary(info, instance); |
| Collection<CorrelationSetDAO> csets = icsets.get(instance.getInstanceId()); |
| if (csets != null) { |
| for (CorrelationSetDAO cset: csets) { |
| Map<QName, String> props = cset.getProperties(); |
| fillProperties(info, instance, props); |
| } |
| } |
| } |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| __log.error("Exception while listing instances", e); |
| throw new ProcessingException("Exception while listing instances: " + e.toString()); |
| } |
| return ret; |
| } |
| |
| public InstanceInfoListDocument listAllInstances() { |
| return listInstancesSummary(null, null, Integer.MAX_VALUE); |
| } |
| |
| public InstanceInfoListDocument listAllInstancesWithLimit(int limit) { |
| return listInstancesSummary(null, null, limit); |
| } |
| |
| public InstanceInfoDocument getInstanceInfo(final Long iid) throws InstanceNotFoundException { |
| try { |
| return _db.exec(new BpelDatabase.Callable<InstanceInfoDocument>() { |
| public InstanceInfoDocument run(BpelDAOConnection conn) { |
| return genInstanceInfoDocument(conn, iid); |
| } |
| }); |
| } catch (Exception e) { |
| __log.error("Exception while retrieving instance info", e); |
| throw new ProcessingException("Exception while retrieving instance info: " + e.toString()); |
| } |
| } |
| |
| public ScopeInfoDocument getScopeInfo(String siid) { |
| return getScopeInfoWithActivity(siid, false); |
| } |
| |
| public ScopeInfoDocument getScopeInfoWithActivity(final String siid, final boolean includeActivityInfo) { |
| try { |
| return _db.exec(new BpelDatabase.Callable<ScopeInfoDocument>() { |
| public ScopeInfoDocument run(BpelDAOConnection conn) throws Exception { |
| return genScopeInfoDocument(conn, siid, includeActivityInfo); |
| } |
| }); |
| } catch (Exception e) { |
| __log.error("Exception while retrieving scope info", e); |
| throw new ProcessingException("Exception while retrieving scope info: " + e.toString()); |
| } |
| } |
| |
| public VariableInfoDocument getVariableInfo(final String scopeId, final String varName) throws ManagementException { |
| VariableInfoDocument ret = VariableInfoDocument.Factory.newInstance(); |
| final TVariableInfo vinf = ret.addNewVariableInfo(); |
| final TVariableRef sref = vinf.addNewSelf(); |
| dbexec(new BpelDatabase.Callable<Object>() { |
| public Object run(BpelDAOConnection session) throws Exception { |
| ScopeDAO scope = session.getScope(new Long(scopeId)); |
| if (scope == null) { |
| throw new InvalidRequestException("ScopeNotFound:" + scopeId); |
| } |
| |
| sref.setSiid(scopeId); |
| sref.setIid(scope.getProcessInstance().getInstanceId().toString()); |
| sref.setName(varName); |
| |
| XmlDataDAO var = scope.getVariable(varName); |
| if (var == null) { |
| throw new InvalidRequestException("VarNotFound:" + varName); |
| } |
| |
| Node nval = var.get(); |
| if (nval != null) { |
| TVariableInfo.Value val = vinf.addNewValue(); |
| val.getDomNode().appendChild(val.getDomNode().getOwnerDocument().importNode(nval, true)); |
| } |
| return null; |
| } |
| }); |
| return ret; |
| } |
| |
| public VariableInfoDocument setVariable(final String scopeId, final String varName, final XmlObject value) throws ManagementException { |
| VariableInfoDocument ret = VariableInfoDocument.Factory.newInstance(); |
| final TVariableInfo vinf = ret.addNewVariableInfo(); |
| final TVariableRef sref = vinf.addNewSelf(); |
| dbexec(new BpelDatabase.Callable<Object>() { |
| public Object run(BpelDAOConnection session) throws Exception { |
| ScopeDAO scope = session.getScope(new Long(scopeId)); |
| if (scope == null) { |
| throw new InvalidRequestException("ScopeNotFound:" + scopeId); |
| } |
| |
| sref.setSiid(scopeId); |
| sref.setIid(scope.getProcessInstance().getInstanceId().toString()); |
| sref.setName(varName); |
| |
| XmlDataDAO var = scope.getVariable(varName); |
| if (var == null) { |
| throw new InvalidRequestException("VarNotFound:" + varName); |
| } |
| |
| { |
| ByteArrayOutputStream out = new ByteArrayOutputStream(); |
| value.save(out, new XmlOptions().setSaveOuter()); |
| Node value2 = DOMUtils.getFirstChildElement(DOMUtils.stringToDOM(out.toString())); |
| var.set(value2); |
| } |
| |
| Node nval = var.get(); |
| if (nval != null) { |
| TVariableInfo.Value val = vinf.addNewValue(); |
| val.getDomNode().appendChild(val.getDomNode().getOwnerDocument().importNode(nval, true)); |
| } |
| return null; |
| } |
| }); |
| return ret; |
| } |
| |
| // |
| // INSTANCE ACTIONS |
| // |
| public InstanceInfoDocument fault(Long iid, QName faultname, Element faultData) { |
| // TODO: Implement |
| return getInstanceInfo(iid); |
| } |
| |
| public InstanceInfoDocument resume(Long iid) { |
| // We need debugger support in order to resume (since we have to force |
| // a reduction. If one is not available the getDebugger() method should |
| // throw a ProcessingException |
| getDebugger(iid).resume(iid); |
| |
| return getInstanceInfo(iid); |
| } |
| |
| public InstanceInfoDocument suspend(Long iid) throws ManagementException { |
| DebuggerSupport debugSupport = getDebugger(iid); |
| assert debugSupport != null : "getDebugger(Long) returned NULL!"; |
| debugSupport.suspend(iid); |
| |
| return getInstanceInfo(iid); |
| } |
| |
| public InstanceInfoDocument terminate(Long iid) throws ManagementException { |
| DebuggerSupport debugSupport = getDebugger(iid); |
| assert debugSupport != null : "getDebugger(Long) returned NULL!"; |
| debugSupport.terminate(iid); |
| |
| return getInstanceInfo(iid); |
| } |
| |
| public InstanceInfoDocument recoverActivity(final Long iid, final Long aid, final String action) { |
| try { |
| _db.exec(new BpelDatabase.Callable<QName>() { |
| public QName run(BpelDAOConnection conn) throws Exception { |
| ProcessInstanceDAO instance = conn.getInstance(iid); |
| if (instance == null) |
| return null; |
| for (ActivityRecoveryDAO recovery : instance.getActivityRecoveries()) { |
| if (recovery.getActivityId() == aid) { |
| BpelProcess process = _server._engine._activeProcesses.get(instance.getProcess().getProcessId()); |
| if (process != null) { |
| process.recoverActivity(instance, recovery.getChannel(), aid, action, null); |
| break; |
| } |
| } |
| } |
| return instance.getProcess().getProcessId(); |
| } |
| }); |
| } catch (Exception e) { |
| __log.error("Exception during activity recovery", e); |
| throw new ProcessingException("Exception during activity recovery" + e.toString()); |
| } |
| return getInstanceInfo(iid); |
| } |
| |
| public Collection<Long> delete(String filter) { |
| final InstanceFilter instanceFilter = new InstanceFilter(filter); |
| final List<Long> ret = new LinkedList<Long>(); |
| if(garbage(filter)) { |
| return ret; |
| } |
| try { |
| _db.exec(new BpelDatabase.Callable<Object>() { |
| public Object run(BpelDAOConnection conn) { |
| Collection<ProcessInstanceDAO> instances = conn.instanceQuery(instanceFilter); |
| for (ProcessInstanceDAO instance : instances) { |
| ProcessConf proc = _store.getProcessConfiguration(instance.getProcess().getProcessId()); |
| if (proc == null) |
| throw new ProcessNotFoundException("ProcessNotFound:" + instance.getProcess().getProcessId()); |
| // delete the instance and all related data |
| instance.delete(EnumSet.allOf(CLEANUP_CATEGORY.class)); |
| ret.add(instance.getInstanceId()); |
| } |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| __log.error("Exception during instance deletion", e); |
| throw new ProcessingException("Exception during instance deletion: " + e.toString()); |
| } |
| return ret; |
| } |
| |
| // |
| // EVENT RETRIEVAL |
| // |
| public List<String> getEventTimeline(String instanceFilter, String eventFilter) { |
| final InstanceFilter ifilter = new InstanceFilter(instanceFilter, null, 0); |
| final BpelEventFilter efilter = new BpelEventFilter(eventFilter, 0); |
| |
| List<Date> tline = dbexec(new BpelDatabase.Callable<List<Date>>() { |
| public List<Date> run(BpelDAOConnection session) throws Exception { |
| return session.bpelEventTimelineQuery(ifilter, efilter); |
| } |
| }); |
| |
| ArrayList<String> ret = new ArrayList<String>(tline.size()); |
| CollectionsX.transform(ret, tline, new UnaryFunction<Date, String>() { |
| public String apply(Date x) { |
| return ISO8601DateParser.format(x); |
| } |
| }); |
| return ret; |
| } |
| |
| public EventInfoListDocument listEvents(String instanceFilter, String eventFilter, int maxCount) { |
| final InstanceFilter ifilter = new InstanceFilter(instanceFilter, null, 0); |
| final BpelEventFilter efilter = new BpelEventFilter(eventFilter, maxCount); |
| EventInfoListDocument eid = EventInfoListDocument.Factory.newInstance(); |
| final TEventInfoList eil = eid.addNewEventInfoList(); |
| dbexec(new BpelDatabase.Callable<Object>() { |
| public Object run(BpelDAOConnection session) throws Exception { |
| List<BpelEvent> events = session.bpelEventQuery(ifilter, efilter); |
| for (BpelEvent event : events) { |
| TEventInfo tei = eil.addNewEventInfo(); |
| fillEventInfo(tei, event); |
| } |
| return null; |
| } |
| }); |
| return eid; |
| } |
| |
| public ActivityExtInfoListDocument getExtensibilityElements(QName pid, Integer[] aids) { |
| ActivityExtInfoListDocument aeild = ActivityExtInfoListDocument.Factory.newInstance(); |
| TActivitytExtInfoList taeil = aeild.addNewActivityExtInfoList(); |
| OProcess oprocess = _server._engine.getOProcess(pid); |
| if (oprocess == null) |
| throw new ProcessNotFoundException("The process \"" + pid + "\" does not exist."); |
| |
| for (int aid : aids) { |
| OBase obase = oprocess.getChild(aid); |
| if (obase != null && obase.getDebugInfo() != null && obase.getDebugInfo().getExtensibilityElements() != null) { |
| for (Map.Entry<QName, Object> entry : obase.getDebugInfo().getExtensibilityElements().entrySet()) { |
| TActivityExtInfo taei = taeil.addNewActivityExtInfo(); |
| taei.setAiid("" + aid); |
| Object extValue = entry.getValue(); |
| if (extValue instanceof Element) |
| taei.getDomNode().appendChild( |
| taei.getDomNode().getOwnerDocument().importNode((Element) extValue, true)); |
| else if (extValue instanceof String) { |
| Element valueElmt = taei.getDomNode().getOwnerDocument().createElementNS( |
| entry.getKey().getNamespaceURI(), entry.getKey().getLocalPart()); |
| valueElmt.appendChild(taei.getDomNode().getOwnerDocument().createTextNode((String) extValue)); |
| taei.getDomNode().appendChild(valueElmt); |
| } |
| } |
| } |
| } |
| return aeild; |
| } |
| |
| /** |
| * Get the {@link DebuggerSupport} object for the given process identifier. |
| * Debugger support is required for operations that resume execution in some |
| * way or manipulate the breakpoints. |
| * |
| * @param procid |
| * process identifier |
| * @return associated debugger support object |
| * @throws ManagementException |
| */ |
| protected final DebuggerSupport getDebugger(QName procid) throws ManagementException { |
| BpelProcess process = _server._engine._activeProcesses.get(procid); |
| if (process == null) |
| throw new ProcessNotFoundException("The process \"" + procid + "\" does not exist."); |
| |
| return process._debugger; |
| } |
| |
| /** |
| * Get the {@link DebuggerSupport} object for the given instance identifier. |
| * Debugger support is required for operations that resume execution in some |
| * way or manipulate the breakpoints. |
| * |
| * @param iid |
| * instance identifier |
| * @return associated debugger support object |
| * @throws ManagementException |
| */ |
| protected final DebuggerSupport getDebugger(final Long iid) { |
| QName processId; |
| try { |
| processId = _db.exec(new BpelDatabase.Callable<QName>() { |
| public QName run(BpelDAOConnection conn) throws Exception { |
| ProcessInstanceDAO instance = conn.getInstance(iid); |
| return instance == null ? null : instance.getProcess().getProcessId(); |
| } |
| }); |
| } catch (Exception e) { |
| __log.error("Exception during instance retrieval", e); |
| throw new ProcessingException("Exception during instance retrieval: " + e.toString()); |
| } |
| |
| return getDebugger(processId); |
| } |
| |
| /** |
| * Execute a database transaction, unwrapping nested |
| * {@link ManagementException}s. |
| * |
| * @param runnable |
| * action to run |
| * @return |
| * @throws ManagementException |
| */ |
| protected <T> T dbexec(BpelProcessDatabase.Callable<T> runnable) throws ManagementException { |
| try { |
| return runnable.exec(); |
| } catch (ManagementException me) { |
| throw me; |
| } catch (Exception e) { |
| __log.error("Exception during database operation", e); |
| throw new ManagementException("Exception during database operation: " + e.toString()); |
| } |
| } |
| |
| /** |
| * Execute a database transaction, unwrapping nested |
| * {@link ManagementException}s. |
| * |
| * @param callable |
| * action to run |
| * @return |
| * @throws ManagementException |
| */ |
| protected <T> T dbexec(BpelDatabase.Callable<T> callable) throws ManagementException { |
| try { |
| return _db.exec(callable); |
| } catch (ManagementException me) { |
| // Passthrough. |
| throw me; |
| } catch (Exception ex) { |
| __log.error("Exception during database operation", ex); |
| throw new ManagementException("Exception during database operation" + ex.toString()); |
| } |
| } |
| |
| private ProcessInfoDocument genProcessInfoDocument(BpelDAOConnection conn, QName procid, ProcessInfoCustomizer custom) |
| throws ManagementException { |
| if (procid == null) { |
| throw new InvalidRequestException("Valid QName as process id expected."); |
| } |
| ProcessInfoDocument ret = ProcessInfoDocument.Factory.newInstance(); |
| TProcessInfo pi = ret.addNewProcessInfo(); |
| try { |
| ProcessConf pconf = _store.getProcessConfiguration(procid); |
| if (pconf == null) |
| throw new ProcessNotFoundException("ProcessNotFound:" + procid); |
| fillProcessInfo(pi, pconf, custom); |
| fillProcessInfoSummary(conn, new TProcessInfo[]{pi}, custom); |
| } catch (ManagementException me) { |
| throw me; |
| } catch (Exception e) { |
| __log.error("Exception while retrieving process information", e); |
| throw new ProcessingException("Exception while retrieving process information: " + e.toString()); |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * Generate a {@link InstanceInfoDocument} for a given instance. This |
| * document contains general information about the instance. |
| * |
| * @param iid |
| * instance identifier |
| * @return generated document |
| */ |
| private InstanceInfoDocument genInstanceInfoDocument(BpelDAOConnection conn, Long iid) { |
| if (iid == null) |
| throw new InvalidRequestException("Must specifiy instance id."); |
| |
| InstanceInfoDocument ret = InstanceInfoDocument.Factory.newInstance(); |
| TInstanceInfo ii = ret.addNewInstanceInfo(); |
| |
| ii.setIid(iid.toString()); |
| ProcessInstanceDAO instance = conn.getInstance(iid); |
| if (instance == null) |
| throw new InstanceNotFoundException("InstanceNotFoundException " + iid); |
| // TODO: deal with "ERROR" state information. |
| fillInstanceInfo(ii, instance); |
| Map<Long, Collection<CorrelationSetDAO>> icsets = conn.getCorrelationSets(Arrays.asList(new ProcessInstanceDAO[] { instance })); |
| Collection<CorrelationSetDAO> csets = icsets.get(instance.getInstanceId()); |
| if (csets != null) { |
| for (CorrelationSetDAO cset: csets) { |
| Map<QName, String> props = cset.getProperties(); |
| fillProperties(ii, instance, props); |
| } |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * Generate a {@link ScopeInfoDocument} for a given scope instance. |
| * |
| * @param siid |
| * scope instance identifier |
| * @param includeActivityInfo |
| * @return generated document |
| * @throws Exception |
| */ |
| private ScopeInfoDocument genScopeInfoDocument(BpelDAOConnection conn, String siid, boolean includeActivityInfo) throws Exception { |
| if (siid == null) |
| throw new InvalidRequestException("Must specifiy scope instance id."); |
| |
| Long siidl; |
| try { |
| siidl = new Long(siid); |
| } catch (NumberFormatException nfe) { |
| throw new InvalidRequestException("Invalid scope instance id."); |
| } |
| |
| ScopeInfoDocument ret = ScopeInfoDocument.Factory.newInstance(); |
| TScopeInfo ii = ret.addNewScopeInfo(); |
| |
| ii.setSiid(siid); |
| try { |
| ScopeDAO instance = conn.getScope(siidl); |
| if (instance == null) |
| throw new InvalidRequestException("Scope not found: " + siidl); |
| // TODO: deal with "ERROR" state information. |
| fillScopeInfo(ii, instance, includeActivityInfo); |
| } catch (Exception e) { |
| failIfSQLException(e); |
| __log.error("Exception while retrieving scope information", e); |
| } |
| return ret; |
| } |
| |
| private void fillProcessInfoSummary(BpelDAOConnection conn, TProcessInfo[] infos, ProcessInfoCustomizer custom) { |
| if (custom.includeInstanceSummary()) { |
| Set<String> pids = new HashSet<String>(); |
| for (TProcessInfo i : infos) { |
| pids.add(i.getPid()); |
| } |
| |
| Map<InstanceSummaryKey, Long> m = conn.getProcessManagement().countInstancesSummary(pids); |
| Map<String, FailedSummaryValue> f = conn.getProcessManagement().findFailedCountAndLastFailedDateForProcessIds(pids); |
| |
| for (TProcessInfo info : infos) { |
| TInstanceSummary isum = info.addNewInstanceSummary(); |
| genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.ACTIVE, info.getPid(), m); |
| genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.COMPLETED, info.getPid(), m); |
| genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.ERROR, info.getPid(), m); |
| genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.FAILED, info.getPid(), m); |
| genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.SUSPENDED, info.getPid(), m); |
| genInstanceSummaryEntry(isum.addNewInstances(), TInstanceStatus.TERMINATED, info.getPid(), m); |
| getInstanceSummaryActivityFailure(isum, f, info.getPid()); |
| } |
| } |
| } |
| |
| /** |
| * Fill in the <code>process-info</code> element of the transfer object. |
| * |
| * @param info |
| * destination XMLBean |
| * @param pconf |
| * process configuration object (from store) |
| * @param proc |
| * source DAO object |
| * @param custom |
| * used to customize the quantity of information produced in the |
| * info |
| */ |
| private void fillProcessInfo(TProcessInfo info, ProcessConf pconf, ProcessInfoCustomizer custom) { |
| if (pconf == null) |
| throw new IllegalArgumentException("Null pconf."); |
| |
| if( __log.isDebugEnabled() ) __log.debug("Filling process info for " + pconf.getProcessId()); |
| |
| info.setPid(pconf.getProcessId().toString()); |
| // TODO: ACTIVE and RETIRED should be used separately. |
| // Active process may be retired at the same time |
| if (pconf.getState() == ProcessState.RETIRED) { |
| info.setStatus(TProcessStatus.RETIRED); |
| } else if (pconf.getState() == ProcessState.DISABLED) { |
| info.setStatus(TProcessStatus.DISABLED); |
| } else { |
| info.setStatus(TProcessStatus.ACTIVE); |
| } |
| info.setVersion(pconf.getVersion()); |
| |
| TDefinitionInfo definfo = info.addNewDefinitionInfo(); |
| definfo.setProcessName(pconf.getType()); |
| |
| TDeploymentInfo depinfo = info.addNewDeploymentInfo(); |
| depinfo.setPackage(pconf.getPackage()); |
| if( __log.isDebugEnabled() ) __log.debug(" package name: " + depinfo.getPackage()); |
| depinfo.setDocument(pconf.getBpelDocument()); |
| depinfo.setDeployDate(toCalendar(pconf.getDeployDate())); |
| depinfo.setDeployer(pconf.getDeployer()); |
| |
| if (custom.includeDocumentLists()) { |
| TProcessInfo.Documents docinfo = info.addNewDocuments(); |
| List<File> files = pconf.getFiles(); |
| if (files != null) |
| genDocumentInfo(docinfo, files.toArray(new File[files.size()]), true); |
| else if (__log.isDebugEnabled()) |
| __log.debug("fillProcessInfo: No files for " + pconf.getProcessId()); |
| } |
| |
| TProcessProperties properties = info.addNewProperties(); |
| if (custom.includeProcessProperties()) { |
| for (Map.Entry<QName, Node> propEntry : pconf.getProcessProperties().entrySet()) { |
| TProcessProperties.Property tprocProp = properties.addNewProperty(); |
| tprocProp.setName(new QName(propEntry.getKey().getNamespaceURI(), propEntry.getKey().getLocalPart())); |
| Node propNode = tprocProp.getDomNode(); |
| Document processInfoDoc = propNode.getOwnerDocument(); |
| Node node2append = processInfoDoc.importNode(propEntry.getValue(), true); |
| propNode.appendChild(node2append); |
| } |
| } |
| |
| TEndpointReferences eprs = info.addNewEndpoints(); |
| OProcess oprocess = _server._engine.getOProcess(pconf.getProcessId()); |
| if (custom.includeEndpoints() && oprocess != null) { |
| for (OPartnerLink oplink : oprocess.getAllPartnerLinks()) { |
| if (oplink.hasPartnerRole() && oplink.isInitializePartnerRole()) { |
| // TODO: this is very uncool. |
| EndpointReference pepr = _server._engine._activeProcesses.get(pconf.getProcessId()) |
| .getInitialPartnerRoleEPR(oplink); |
| |
| if (pepr != null) { |
| TEndpointReferences.EndpointRef epr = eprs.addNewEndpointRef(); |
| Document eprNodeDoc = epr.getDomNode().getOwnerDocument(); |
| epr.getDomNode().appendChild(eprNodeDoc.importNode(pepr.toXML().getDocumentElement(), true)); |
| } |
| } |
| } |
| } |
| |
| // TODO: add documents to the above data structure. |
| } |
| |
| String findVersionStringFromNodeToken(String packageName) { |
| int i = packageName.length() - 1; |
| while( i > 0 && Character.isDigit(packageName.charAt(i)) ) { |
| i--; |
| } |
| if( i < packageName.length() - 1 && packageName.charAt(i) == '-') { |
| return packageName.substring(i + 1); |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Generate document information elements for a set of files. |
| * |
| * @param docinfo |
| * target element |
| * @param files |
| * files |
| * @param recurse |
| * recurse down directories? |
| */ |
| private void genDocumentInfo(TProcessInfo.Documents docinfo, File[] files, boolean recurse) { |
| if (files == null) |
| return; |
| for (File f : files) { |
| if (f.isHidden()) |
| continue; |
| |
| if (f.isDirectory()) { |
| if (recurse) |
| genDocumentInfo(docinfo, f.listFiles(), true); |
| } else if (f.isFile()) { |
| genDocumentInfo(docinfo, f); |
| } |
| } |
| } |
| |
| private void genDocumentInfo(TProcessInfo.Documents docinfo, File f) { |
| DocumentInfoGenerator dig = new DocumentInfoGenerator(f); |
| if (dig.isRecognized() && dig.isVisible()) { |
| TDocumentInfo doc = docinfo.addNewDocument(); |
| |
| doc.setName(dig.getName()); |
| doc.setSource(dig.getURL()); |
| doc.setType(dig.getType()); |
| } |
| } |
| |
| private void genInstanceSummaryEntry(TInstanceSummary.Instances instances, |
| TInstanceStatus.Enum state, String pid, Map<InstanceSummaryKey, Long> summary) |
| { |
| instances.setState(state); |
| Long count = summary.get(new InstanceSummaryKey(pid, state.toString())); |
| instances.setCount(count == null ? 0 : count.intValue()); |
| } |
| |
| private void getInstanceSummaryActivityFailure(TInstanceSummary summary, Map<String, FailedSummaryValue> f, String pid) { |
| FailedSummaryValue v = f.get(pid); |
| if (v != null) { |
| TFailuresInfo failures = summary.addNewFailures(); |
| failures.setDtFailure(toCalendar(v.lastFailed)); |
| failures.setCount(v.count.intValue()); |
| } |
| } |
| |
| private void fillProperties(TInstanceInfo info, ProcessInstanceDAO instance, Map<QName, String> props) { |
| TInstanceInfo.CorrelationProperties corrProperties = info.addNewCorrelationProperties(); |
| for (Map.Entry<QName, String> property : props.entrySet()) { |
| TCorrelationProperty tproperty = corrProperties.addNewCorrelationProperty(); |
| // not setting correlation-set id here -- too inconvenient for performance |
| // tproperty.setCsetid("" + cset.getCorrelationSetId()); |
| tproperty.setPropertyName(property.getKey()); |
| tproperty.setStringValue(property.getValue()); |
| } |
| } |
| |
| private void fillInstanceSummary(TInstanceInfo info, ProcessInstanceDAO instance) { |
| info.setIid("" + instance.getInstanceId()); |
| ProcessDAO processDAO = instance.getProcess(); |
| info.setPid(processDAO.getProcessId().toString()); |
| info.setProcessName(processDAO.getType()); |
| info.setDtStarted(toCalendar(instance.getCreateTime())); |
| info.setDtLastActive(toCalendar(instance.getLastActiveTime())); |
| info.setStatus(__psc.cvtInstanceStatus(instance.getState())); |
| if (instance.getFault() != null) { |
| TFaultInfo faultInfo = info.addNewFaultInfo(); |
| faultInfo.setName(instance.getFault().getName()); |
| faultInfo.setExplanation(instance.getFault().getExplanation()); |
| faultInfo.setAiid(instance.getFault().getActivityId()); |
| faultInfo.setLineNumber(instance.getFault().getLineNo()); |
| } |
| if (instance.getActivityFailureCount() > 0) { |
| TFailuresInfo failures = info.addNewFailures(); |
| failures.setDtFailure(toCalendar(instance.getActivityFailureDateTime())); |
| failures.setCount(instance.getActivityFailureCount()); |
| } |
| } |
| |
| private void fillInstanceInfo(TInstanceInfo info, ProcessInstanceDAO instance) { |
| fillInstanceSummary(info, instance); |
| |
| if (instance.getRootScope() != null) |
| info.setRootScope(genScopeRef(instance.getRootScope())); |
| |
| ProcessInstanceDAO.EventsFirstLastCountTuple flc = instance.getEventsFirstLastCount(); |
| TInstanceInfo.EventInfo eventInfo = info.addNewEventInfo(); |
| if (flc != null) { |
| eventInfo.setFirstDtime(toCalendar(flc.first)); |
| eventInfo.setLastDtime(toCalendar(flc.last)); |
| eventInfo.setCount(flc.count); |
| } |
| |
| // Setting valued correlation properties |
| if (!instance.getCorrelationSets().isEmpty()) { |
| TInstanceInfo.CorrelationProperties corrProperties = info.addNewCorrelationProperties(); |
| for (CorrelationSetDAO correlationSetDAO : instance.getCorrelationSets()) { |
| for (Map.Entry<QName, String> property : correlationSetDAO.getProperties().entrySet()) { |
| TCorrelationProperty tproperty = corrProperties.addNewCorrelationProperty(); |
| tproperty.setCsetid("" + correlationSetDAO.getCorrelationSetId()); |
| tproperty.setPropertyName(property.getKey()); |
| tproperty.setStringValue(property.getValue()); |
| } |
| } |
| } |
| } |
| |
| private void fillScopeInfo(TScopeInfo scopeInfo, ScopeDAO scope, boolean includeActivityInfo) { |
| scopeInfo.setSiid("" + scope.getScopeInstanceId()); |
| scopeInfo.setName(scope.getName()); |
| if (scope.getParentScope() != null) |
| scopeInfo.setParentScopeRef(genScopeRef(scope.getParentScope())); |
| |
| scopeInfo.setStatus(__psc.cvtScopeStatus(scope.getState())); |
| |
| TScopeInfo.Children children = scopeInfo.addNewChildren(); |
| for (ScopeDAO i : scope.getChildScopes()) |
| fillScopeRef(children.addNewChildRef(), i); |
| |
| TScopeInfo.Variables vars = scopeInfo.addNewVariables(); |
| for (XmlDataDAO i : scope.getVariables()) |
| fillVariableRef(vars.addNewVariableRef(), i); |
| |
| // Setting correlations and their valued properties |
| if (!scope.getCorrelationSets().isEmpty()) { |
| TScopeInfo.CorrelationSets correlationSets = scopeInfo.addNewCorrelationSets(); |
| for (CorrelationSetDAO correlationSetDAO : scope.getCorrelationSets()) { |
| TScopeInfo.CorrelationSets.CorrelationSet correlationSet = correlationSets.addNewCorrelationSet(); |
| correlationSet.setCsetid("" + correlationSetDAO.getCorrelationSetId()); |
| correlationSet.setName(correlationSetDAO.getName()); |
| for (Map.Entry<QName, String> property : correlationSetDAO.getProperties().entrySet()) { |
| TCorrelationProperty tproperty = correlationSet.addNewCorrelationProperty(); |
| tproperty.setCsetid("" + correlationSetDAO.getCorrelationSetId()); |
| tproperty.setPropertyName(property.getKey()); |
| tproperty.setStringValue(property.getValue()); |
| } |
| } |
| |
| } |
| |
| if (includeActivityInfo) { |
| Collection<ActivityRecoveryDAO> recoveries = scope.getProcessInstance().getActivityRecoveries(); |
| |
| TScopeInfo.Activities activities = scopeInfo.addNewActivities(); |
| List<BpelEvent> events = scope.listEvents(); |
| |
| // if event generation was enabled |
| if(events!=null && events.size() >0) { |
| ActivityStateDocumentBuilder b = new ActivityStateDocumentBuilder(); |
| for (BpelEvent e : events) |
| b.onEvent(e); |
| for (ActivityInfoDocument ai : b.getActivities()) { |
| for (ActivityRecoveryDAO recovery : recoveries) { |
| if (String.valueOf(recovery.getActivityId()).equals(ai.getActivityInfo().getAiid())) { |
| TFailureInfo failure = ai.getActivityInfo().addNewFailure(); |
| failure.setReason(recovery.getReason()); |
| failure.setDtFailure(toCalendar(recovery.getDateTime())); |
| failure.setActions(recovery.getActions()); |
| failure.setRetries(recovery.getRetries()); |
| ai.getActivityInfo().setStatus(TActivityStatus.FAILURE); |
| } |
| } |
| activities.addNewActivityInfo().set(ai.getActivityInfo()); |
| } |
| } |
| |
| // otherwise at least try to get the information about failed activities |
| // TODO: we are losing information about which scope does failed activities belong to |
| // as failure table does not have scope id, we would attach every failed activity to process scope |
| else { |
| if(scope.getParentScope() == null) { |
| for (ActivityRecoveryDAO recovery : recoveries) { |
| ActivityInfoDocument ai = ActivityInfoDocument.Factory.newInstance(); |
| ai.addNewActivityInfo().setAiid(String.valueOf(recovery.getActivityId())); |
| ai.getActivityInfo().setType("OActivity"); |
| ai.getActivityInfo().setScope(TScopeRef.Factory.newInstance()); |
| TFailureInfo failure = ai.getActivityInfo().addNewFailure(); |
| failure.setReason(recovery.getReason()); |
| failure.setDtFailure(toCalendar(recovery.getDateTime())); |
| failure.setActions(recovery.getActions()); |
| failure.setRetries(recovery.getRetries()); |
| ai.getActivityInfo().setStatus(TActivityStatus.FAILURE); |
| activities.addNewActivityInfo().set(ai.getActivityInfo()); |
| } |
| } |
| } |
| } |
| |
| Collection<PartnerLinkDAO> plinks = scope.getPartnerLinks(); |
| if (plinks.size() > 0) { |
| TEndpointReferences refs = scopeInfo.addNewEndpoints(); |
| for (PartnerLinkDAO plink : plinks) { |
| if (plink.getPartnerRoleName() != null && plink.getPartnerRoleName().length() > 0) { |
| TEndpointReferences.EndpointRef ref = refs.addNewEndpointRef(); |
| ref.setPartnerLink(plink.getPartnerLinkName()); |
| ref.setPartnerRole(plink.getPartnerRoleName()); |
| if (plink.getPartnerEPR() != null) { |
| Document eprNodeDoc = ref.getDomNode().getOwnerDocument(); |
| ref.getDomNode().appendChild(eprNodeDoc.importNode(plink.getPartnerEPR(), true)); |
| } |
| } |
| } |
| } |
| } |
| |
| private void fillVariableRef(TVariableRef ref, XmlDataDAO i) { |
| ref.setIid(i.getScopeDAO().getProcessInstance().getInstanceId().toString()); |
| ref.setSiid(i.getScopeDAO().getScopeInstanceId().toString()); |
| ref.setName(i.getName()); |
| } |
| |
| private TScopeRef genScopeRef(ScopeDAO scope) { |
| TScopeRef tref = TScopeRef.Factory.newInstance(); |
| fillScopeRef(tref, scope); |
| return tref; |
| } |
| |
| private void fillScopeRef(TScopeRef tref, ScopeDAO scope) { |
| tref.setSiid(scope.getScopeInstanceId().toString()); |
| tref.setStatus(__psc.cvtScopeStatus(scope.getState())); |
| tref.setName(scope.getName()); |
| tref.setModelId("" + scope.getModelId()); |
| } |
| |
| private void fillEventInfo(TEventInfo info, BpelEvent event) { |
| info.setName(BpelEvent.eventName(event)); |
| info.setType(event.getType().toString()); |
| info.setLineNumber(event.getLineNo()); |
| info.setTimestamp(toCalendar(event.getTimestamp())); |
| if (event instanceof ActivityEvent) { |
| info.setActivityName(((ActivityEvent) event).getActivityName()); |
| info.setActivityId(((ActivityEvent) event).getActivityId()); |
| info.setActivityType(((ActivityEvent) event).getActivityType()); |
| info.setActivityDefinitionId(((ActivityEvent) event).getActivityDeclarationId()); |
| } |
| if (event instanceof CorrelationEvent) { |
| info.setPortType(((CorrelationEvent) event).getPortType()); |
| info.setOperation(((CorrelationEvent) event).getOperation()); |
| info.setMexId(((CorrelationEvent) event).getMessageExchangeId()); |
| } |
| if (event instanceof CorrelationMatchEvent) { |
| info.setPortType(((CorrelationMatchEvent) event).getPortType()); |
| } |
| if (event instanceof CorrelationSetEvent) { |
| info.setCorrelationSet(((CorrelationSetEvent) event).getCorrelationSetName()); |
| } |
| if (event instanceof CorrelationSetWriteEvent) { |
| info.setCorrelationKey(((CorrelationSetWriteEvent) event).getCorrelationSetName()); |
| } |
| if (event instanceof ExpressionEvaluationEvent) { |
| info.setExpression(((ExpressionEvaluationEvent) event).getExpression()); |
| } |
| if (event instanceof ExpressionEvaluationFailedEvent) { |
| info.setFault(((ExpressionEvaluationFailedEvent) event).getFault()); |
| } |
| if (event instanceof NewProcessInstanceEvent) { |
| if ((((NewProcessInstanceEvent) event).getRootScopeId()) != null) |
| info.setRootScopeId(((NewProcessInstanceEvent) event).getRootScopeId()); |
| info.setScopeDefinitionId(((NewProcessInstanceEvent) event).getScopeDeclarationId()); |
| } |
| if (event instanceof PartnerLinkEvent) { |
| info.setPartnerLinkName(((PartnerLinkEvent) event).getpLinkName()); |
| } |
| if (event instanceof ProcessCompletionEvent) { |
| info.setFault(((ProcessCompletionEvent) event).getFault()); |
| } |
| if (event instanceof ProcessEvent) { |
| info.setProcessId(((ProcessEvent) event).getProcessId()); |
| info.setProcessType(((ProcessEvent) event).getProcessName()); |
| } |
| if (event instanceof ProcessInstanceEvent) { |
| info.setInstanceId(((ProcessInstanceEvent) event).getProcessInstanceId()); |
| } |
| if (event instanceof ProcessInstanceStartedEvent) { |
| info.setRootScopeId(((ProcessInstanceStartedEvent) event).getRootScopeId()); |
| info.setRootScopeDeclarationId(((ProcessInstanceStartedEvent) event).getScopeDeclarationId()); |
| } |
| if (event instanceof ProcessInstanceStateChangeEvent) { |
| info.setOldState(((ProcessInstanceStateChangeEvent) event).getOldState()); |
| info.setNewState(((ProcessInstanceStateChangeEvent) event).getNewState()); |
| } |
| if (event instanceof ProcessMessageExchangeEvent) { |
| info.setPortType(((ProcessMessageExchangeEvent) event).getPortType()); |
| info.setOperation(((ProcessMessageExchangeEvent) event).getOperation()); |
| info.setMexId(((ProcessMessageExchangeEvent) event).getMessageExchangeId()); |
| } |
| if (event instanceof ScopeCompletionEvent) { |
| info.setSuccess(true); |
| } |
| if (event instanceof ScopeEvent) { |
| info.setScopeId(((ScopeEvent) event).getScopeId()); |
| if (((ScopeEvent) event).getParentScopeId() != null) |
| info.setParentScopeId(((ScopeEvent) event).getParentScopeId()); |
| if (((ScopeEvent) event).getScopeName() != null) |
| info.setScopeName(((ScopeEvent) event).getScopeName()); |
| info.setScopeDefinitionId(((ScopeEvent) event).getScopeDeclarationId()); |
| } |
| if (event instanceof ScopeFaultEvent) { |
| info.setFault(((ScopeFaultEvent) event).getFaultType()); |
| info.setFaultLineNumber(((ScopeFaultEvent) event).getFaultLineNo()); |
| info.setExplanation(((ScopeFaultEvent) event).getExplanation()); |
| } |
| if (event instanceof VariableEvent) { |
| info.setVariableName(((VariableEvent) event).getVarName()); |
| } |
| if(event instanceof VariableModificationEvent) { |
| if(((VariableModificationEvent) event).getNewValue()!=null) |
| info.setNewValue(DOMUtils.domToString(((VariableModificationEvent) event).getNewValue())); |
| } |
| } |
| |
| /** |
| * Convert a {@link Date} to a {@link Calendar}. |
| * |
| * @param dtime |
| * a {@link Date} |
| * @return a {@link Calendar} |
| */ |
| private Calendar toCalendar(Date dtime) { |
| if (dtime == null) |
| return null; |
| |
| Calendar c = (Calendar) _calendar.clone(); |
| c.setTime(dtime); |
| return c; |
| } |
| |
| /** |
| * @see org.apache.ode.bpel.pmapi.InstanceManagement#queryInstances(java.lang.String) |
| */ |
| public InstanceInfoListDocument queryInstances(final String query) { |
| InstanceInfoListDocument ret = InstanceInfoListDocument.Factory.newInstance(); |
| final TInstanceInfoList infolist = ret.addNewInstanceInfoList(); |
| try { |
| _db.exec(new BpelDatabase.Callable<Object>() { |
| public Object run(BpelDAOConnection conn) { |
| Collection<ProcessInstanceDAO> instances = conn.instanceQuery(query); |
| for (ProcessInstanceDAO instance : instances) { |
| fillInstanceInfo(infolist.addNewInstanceInfo(), instance); |
| } |
| return null; |
| } |
| }); |
| } catch (Exception e) { |
| __log.error("Exception while querying instances", e); |
| throw new ProcessingException("Exception while querying instances: " + e.toString()); |
| } |
| return ret; |
| } |
| |
| /** |
| * Query processes based on a {@link ProcessFilter} criteria. This is |
| * implemented in memory rather than via database calls since the processes |
| * are managed by the {@link ProcessStore} object and we don't want to make |
| * this needlessly complicated. |
| * |
| * @param filter |
| * @return |
| */ |
| @SuppressWarnings("unchecked") |
| Collection<ProcessConf> processQuery(ProcessFilter filter) { |
| |
| List<QName> pids = _store.getProcesses(); |
| |
| // Name filter can be implemented using only the PIDs. |
| if (filter != null && filter.getNameFilter() != null) { |
| final Pattern pattern = Pattern.compile(filter.getNameFilter().replace("*",".*") + "(-\\d*)?"); |
| CollectionsX.remove_if(pids, new MemberOfFunction<QName>() { |
| @Override |
| public boolean isMember(QName o) { |
| return !pattern.matcher(o.getLocalPart()).matches(); |
| } |
| }); |
| } |
| |
| if (filter != null && filter.getNamespaceFilter() != null) { |
| final Pattern pattern = Pattern.compile(filter.getNamespaceFilter().replace("*",".*")); |
| CollectionsX.remove_if(pids, new MemberOfFunction<QName>() { |
| @Override |
| public boolean isMember(QName o) { |
| String ns = o.getNamespaceURI() == null ? "" : o.getNamespaceURI(); |
| return !pattern.matcher(ns).matches(); |
| } |
| |
| }); |
| } |
| |
| // Now we need the process conf objects, we need to be |
| // careful since someone could have deleted them by now |
| List<ProcessConf> confs = new LinkedList<ProcessConf>(); |
| for (QName pid : pids) { |
| ProcessConf pconf = _store.getProcessConfiguration(pid); |
| confs.add(pconf); |
| } |
| |
| if (filter != null) { |
| // TODO Implement process status filtering when status will exist |
| // Specific filter for deployment date. |
| if (filter.getDeployedDateFilter() != null) { |
| for (final String ddf : filter.getDeployedDateFilter()) { |
| final Date dd; |
| try { |
| dd = ISO8601DateParser.parse(Filter.getDateWithoutOp(ddf)); |
| } catch (ParseException e) { |
| // Should never happen. |
| __log.error("Exception while parsing date", e); |
| throw new RuntimeException(e.toString()); |
| } |
| |
| CollectionsX.remove_if(confs, new MemberOfFunction<ProcessConf>() { |
| @Override |
| public boolean isMember(ProcessConf o) { |
| |
| if (ddf.startsWith("=")) |
| return !o.getDeployDate().equals(dd); |
| |
| if (ddf.startsWith("<=")) |
| return o.getDeployDate().getTime() > dd.getTime(); |
| |
| if (ddf.startsWith(">=")) |
| return o.getDeployDate().getTime() < dd.getTime(); |
| |
| if (ddf.startsWith("<")) |
| return o.getDeployDate().getTime() >= dd.getTime(); |
| |
| if (ddf.startsWith(">")) |
| return o.getDeployDate().getTime() <= dd.getTime(); |
| |
| return false; |
| } |
| |
| }); |
| |
| } |
| } |
| |
| // Ordering |
| if (filter.getOrders() != null) { |
| ComparatorChain cchain = new ComparatorChain(); |
| for (String key : filter.getOrders()) { |
| boolean ascending = true; |
| String orderKey = key; |
| if (key.startsWith("+") || key.startsWith("-")) { |
| orderKey = key.substring(1, key.length()); |
| if (key.startsWith("-")) |
| ascending = false; |
| } |
| |
| Comparator c; |
| if ("name".equals(orderKey)) |
| c = new Comparator<ProcessConf>() { |
| public int compare(ProcessConf o1, ProcessConf o2) { |
| return o1.getProcessId().getLocalPart().compareTo(o2.getProcessId().getLocalPart()); |
| } |
| }; |
| else if ("namespace".equals(orderKey)) |
| c = new Comparator<ProcessConf>() { |
| public int compare(ProcessConf o1, ProcessConf o2) { |
| String ns1 = o1.getProcessId().getNamespaceURI() == null ? "" : o1.getProcessId() |
| .getNamespaceURI(); |
| String ns2 = o2.getProcessId().getNamespaceURI() == null ? "" : o2.getProcessId() |
| .getNamespaceURI(); |
| return ns1.compareTo(ns2); |
| } |
| }; |
| else if ("version".equals(orderKey)) |
| c = new Comparator<ProcessConf>() { |
| public int compare(ProcessConf o1, ProcessConf o2) { |
| // TODO: implement version comparisons. |
| return 0; |
| } |
| }; |
| else if ("deployed".equals(orderKey)) |
| c = new Comparator<ProcessConf>() { |
| public int compare(ProcessConf o1, ProcessConf o2) { |
| return o1.getDeployDate().compareTo(o2.getDeployDate()); |
| } |
| |
| }; |
| |
| else { |
| // unrecognized |
| __log.debug("unrecognized order key" + orderKey); |
| continue; |
| } |
| |
| cchain.addComparator(c, !ascending); |
| } |
| |
| Collections.sort(confs, cchain); |
| } |
| |
| } |
| |
| return confs; |
| } |
| |
| public ReplayResponseDocument replay(final Replay request) throws ManagementException { |
| final Throwable[] e = new Throwable[1]; |
| try { |
| ReplayResponseDocument response = _db.exec(new BpelDatabase.Callable<ReplayResponseDocument>() { |
| public ReplayResponseDocument run(BpelDAOConnection conn) throws Exception { |
| try { |
| ReplayResponse response = ReplayResponse.Factory.newInstance(); |
| Replayer replayer = new Replayer(); |
| List<Long> iids = replayer.replayInstances(request, _server.getEngine(), conn); |
| for (Long iid : iids) { |
| response.addRestoredIID(iid); |
| } |
| ReplayResponseDocument responseDoc = ReplayResponseDocument.Factory.newInstance(); |
| responseDoc.setReplayResponse(response); |
| return responseDoc; |
| } catch (Throwable t) { |
| e[0] = t; |
| throw new Exception("", t); |
| } |
| } |
| }); |
| |
| if (e[0] != null) { |
| __log.debug("throwing pending exception"); |
| throw e[0]; |
| } |
| return response; |
| } catch (Throwable e2) { |
| throw new ManagementException("", e2); |
| } |
| } |
| |
| public GetCommunicationResponseDocument getCommunication(final GetCommunication request) throws ManagementException { |
| final Throwable[] e = new Throwable[1]; |
| try { |
| GetCommunicationResponseDocument response = _db.exec(new BpelDatabase.Callable<GetCommunicationResponseDocument>() { |
| public GetCommunicationResponseDocument run(BpelDAOConnection conn) throws Exception { |
| try { |
| Replayer replayer = new Replayer(); |
| |
| GetCommunicationResponseDocument responseDoc = GetCommunicationResponseDocument.Factory.newInstance(); |
| responseDoc.setGetCommunicationResponse(replayer.getCommunication(request, conn)); |
| return responseDoc; |
| } catch (Throwable e2) { |
| e[0] = e2; |
| throw new Exception("", e2); |
| } |
| } |
| }); |
| if (e[0] != null) { |
| __log.debug("throwing pending exception"); |
| throw e[0]; |
| } |
| return response; |
| } catch (Throwable e2) { |
| throw new ManagementException("", e2); |
| } |
| } |
| |
| private static void failIfSQLException(Exception e) throws Exception { |
| String[] fatal = { "org.hibernate", "org.apache.openjpa", "java.sql", "javax.sql" }; |
| for (int i=0; i<fatal.length; i++) { |
| if (e.getClass().getName().startsWith(fatal[i])) throw e; |
| } |
| } |
| |
| } |