blob: ec64736bb2c07a84e412dc65815026936e7f1276 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.engine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.iapi.DebuggerContext;
import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
import org.apache.ode.bpel.iapi.Scheduler.JobType;
import org.apache.ode.bpel.bdi.breaks.Breakpoint;
import org.apache.ode.bpel.common.ProcessState;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.evt.ActivityExecStartEvent;
import org.apache.ode.bpel.evt.BpelEvent;
import org.apache.ode.bpel.evt.ProcessCompletionEvent;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
import org.apache.ode.bpel.evt.ProcessInstanceStateChangeEvent;
import org.apache.ode.bpel.evt.ProcessTerminationEvent;
import org.apache.ode.bpel.evt.ScopeCompletionEvent;
import org.apache.ode.bpel.obj.OProcess;
import org.apache.ode.bpel.pmapi.BpelManagementFacade;
import org.apache.ode.bpel.pmapi.InstanceNotFoundException;
import org.apache.ode.bpel.pmapi.ManagementException;
import org.apache.ode.bpel.pmapi.ProcessingException;
import org.apache.ode.bpel.runtime.breaks.BreakpointImpl;
import org.apache.ode.utils.CollectionUtils;
import org.apache.ode.utils.msg.MessageBundle;
import javax.xml.namespace.QName;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
* Class providing functions used to support debugging funtionality
* in the BPEL engine. This class serves as the underlying
* implementation of the {@link BpelManagementFacade} interface, and
* the various MBean interfaces.
*
* @todo Need to revisit the whole stepping/suspend/resume mechanism.
*/
public class DebuggerSupport implements DebuggerContext {
private static final Logger __log = LoggerFactory.getLogger(DebuggerSupport.class);
private static final Messages __msgs = MessageBundle.getMessages(Messages.class);
static final Breakpoint[] EMPTY_BP = new Breakpoint[0];
private boolean _enabled = true;
private Breakpoint[] _globalBreakPoints = EMPTY_BP;
private final Set<Long> _step = new HashSet<Long>();
private final Map<Long, Breakpoint[]>_instanceBreakPoints = new HashMap<Long, Breakpoint[]>();
/** BPEL process database */
private BpelProcessDatabase _db;
/** BPEL process. */
private BpelProcess _process;
/**
* Constructor.
* @param db BPEL process database
*/
protected DebuggerSupport(BpelProcess process) {
_process = process;
_db = new BpelProcessDatabase(_process._engine._contexts.dao,
_process._engine._contexts.scheduler,
_process._pid);
}
public void enable(boolean enabled){
_enabled = enabled;
}
public Breakpoint[] getGlobalBreakpoints(){
return _globalBreakPoints;
}
public Breakpoint[] getBreakpoints(Long pid){
Breakpoint[] arr = _instanceBreakPoints.get(pid);
return (arr == null)
? EMPTY_BP
: arr;
}
public void addGlobalBreakpoint(Breakpoint breakpoint){
Collection<Breakpoint> c = CollectionUtils.makeCollection(ArrayList.class, _globalBreakPoints);
c.add(breakpoint);
_globalBreakPoints = c.toArray(new Breakpoint[c.size()]);
}
public void addBreakpoint(Long pid, Breakpoint breakpoint){
Breakpoint[] bpArr = _instanceBreakPoints.get(pid);
if(bpArr == null) {
bpArr = new Breakpoint[]{breakpoint};
}
else{
Collection<Breakpoint> c = CollectionUtils.makeCollection(ArrayList.class, bpArr);
c.add(breakpoint);
bpArr = c.toArray(new Breakpoint[c.size()]);
}
_instanceBreakPoints.put(pid, bpArr);
}
public void removeGlobalBreakpoint(Breakpoint breakpoint){
Collection<Breakpoint> c = CollectionUtils.makeCollection(ArrayList.class, _globalBreakPoints);
c.remove(breakpoint);
_globalBreakPoints = c.toArray(new Breakpoint[c.size()]);
}
public void removeBreakpoint(Long pid, Breakpoint breakpoint){
Breakpoint[] bpArr = _instanceBreakPoints.get(pid);
if(bpArr != null){
Collection<Breakpoint> c = CollectionUtils.makeCollection(ArrayList.class, bpArr);
c.remove(breakpoint);
bpArr = c.toArray(new Breakpoint[c.size()]);
if(bpArr.length == 0) {
_instanceBreakPoints.remove(pid);
}
else {
_instanceBreakPoints.put(pid, bpArr);
}
}
}
public boolean step(final Long iid) {
boolean doit = false;
try {
doit = _db.exec(new BpelDatabase.Callable<Boolean>() {
public Boolean run(BpelDAOConnection conn) throws Exception {
ProcessInstanceDAO instance = conn.getInstance(iid);
if (instance == null)
throw new InstanceNotFoundException("" + iid);
if(ProcessState.STATE_SUSPENDED == instance.getState()){
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(ProcessState.STATE_SUSPENDED);
short previousState = instance.getPreviousState();
instance.setState(previousState);
evt.setNewState(previousState);
evt.setProcessInstanceId(iid);
evt.setProcessName(instance.getProcess().getType());
evt.setProcessId(_db.getProcessId());
_process.saveEvent(evt, instance);
onEvent(evt);
if (__log.isDebugEnabled()) {
__log.debug("step(" + iid + ") adding step indicator to table.");
}
_step.add(iid);
JobDetails we = new JobDetails();
we.setInstanceId(iid);
we.setType(JobType.RESUME);
_process._engine._contexts.scheduler.schedulePersistedJob(we, null);
return true;
}
return false;
}
});
} catch (InstanceNotFoundException infe) {
throw infe;
} catch (Exception ex) {
__log.error("UnexpectedEx", ex);
throw new RuntimeException(ex);
}
return doit;
}
/**
* Process BPEL events WRT debugging.
* @param event BPEL event
*/
public void onEvent(BpelEvent event) {
if(_enabled && (event instanceof ProcessInstanceEvent) &&
// I have this excluded since we are recursing here when onEvent()
// is called from DebugSupport codepath's which change state
!(event instanceof ProcessInstanceStateChangeEvent)) {
final ProcessInstanceEvent evt = (ProcessInstanceEvent)event;
//
// prevent leaking of memory
//
if(evt instanceof ProcessCompletionEvent ||
evt instanceof ProcessTerminationEvent) {
_step.remove(evt.getProcessInstanceId());
_instanceBreakPoints.remove(evt.getProcessInstanceId());
return;
}
boolean suspend = checkStep(evt);
if (!suspend) {
suspend = checkBreakPoints(evt, _globalBreakPoints);
}
if (!suspend){
Breakpoint[] bp = _instanceBreakPoints.get(evt.getProcessInstanceId());
if(bp != null) {
suspend = checkBreakPoints(evt, bp);
}
}
if(suspend){
_step.remove(evt.getProcessInstanceId());
try {
ProcessDAO process = _db.getProcessDAO();
ProcessInstanceDAO instance = process.getInstance(evt.getProcessInstanceId());
if(ProcessState.canExecute(instance.getState())){
// send event
ProcessInstanceStateChangeEvent changeEvent = new ProcessInstanceStateChangeEvent();
changeEvent.setOldState(instance.getState());
instance.setState(ProcessState.STATE_SUSPENDED);
changeEvent.setNewState(ProcessState.STATE_SUSPENDED);
changeEvent.setProcessInstanceId(instance.getInstanceId());
changeEvent.setProcessName(process.getType());
changeEvent.setProcessId(_db.getProcessId());
_process.saveEvent(changeEvent, instance);
onEvent(changeEvent);
}
} catch (Exception dce) {
__log.error(__msgs.msgDbError(), dce);
}
}
}
}
private boolean checkStep(ProcessInstanceEvent event){
Long pid = event.getProcessInstanceId();
return (_step.contains(pid)
&& (event instanceof ActivityExecStartEvent
|| event instanceof ScopeCompletionEvent));
}
private boolean checkBreakPoints(ProcessInstanceEvent event, Breakpoint[] breakpoints){
boolean suspended = false;
for(int i = 0; i < breakpoints.length; ++i){
if (((BreakpointImpl)breakpoints[i]).checkBreak(event)){
suspended = true;
break;
}
}
return suspended;
}
public boolean resume(final Long iid) {
boolean doit = false;
try {
doit = _db.exec(new BpelDatabase.Callable<Boolean>() {
public Boolean run(BpelDAOConnection conn) throws Exception {
ProcessInstanceDAO instance = conn.getInstance(iid);
if (instance == null)
throw new InstanceNotFoundException("" + iid);
if(ProcessState.STATE_SUSPENDED == instance.getState()){
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(ProcessState.STATE_SUSPENDED);
short previousState = instance.getPreviousState();
instance.setState(previousState);
evt.setNewState(previousState);
evt.setProcessInstanceId(iid);
evt.setProcessName(instance.getProcess().getType());
evt.setProcessId(_db.getProcessId());
_process.saveEvent(evt, instance);
onEvent(evt);
JobDetails we = new JobDetails();
we.setType(JobType.RESUME);
we.setInstanceId(iid);
_process._engine._contexts.scheduler.schedulePersistedJob(we, null);
return true;
}
return false;
}
});
} catch (InstanceNotFoundException infe) {
throw infe;
} catch (Exception ex) {
__log.error("ProcessingEx", ex);
throw new ProcessingException(ex.getMessage(),ex);
}
return doit;
}
public void suspend(final Long iid) {
try {
_db.exec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection conn) throws Exception {
ProcessInstanceDAO instance = conn.getInstance(iid);
if (instance == null) {
throw new InstanceNotFoundException("" + iid);
}
if (ProcessState.canExecute(instance.getState())) {
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(instance.getState());
instance.setState(ProcessState.STATE_SUSPENDED);
evt.setNewState(ProcessState.STATE_SUSPENDED);
evt.setProcessInstanceId(iid);
ProcessDAO process = instance.getProcess();
evt.setProcessName(process.getType());
evt.setProcessId(process.getProcessId());
_process.saveEvent(evt, instance);
onEvent(evt);
}
return null;
}
});
} catch (ManagementException me) {
throw me;
} catch (Exception ex) {
__log.error("DbError", ex);
throw new RuntimeException(ex);
}
}
public void terminate(final Long iid) {
try {
_db.exec(new BpelDatabase.Callable<Object>() {
public Object run(BpelDAOConnection conn) throws Exception {
ProcessInstanceDAO instance = conn.getInstance(iid);
if (instance == null)
throw new ManagementException("InstanceNotFound:" + iid);
// send event
ProcessInstanceStateChangeEvent evt = new ProcessInstanceStateChangeEvent();
evt.setOldState(instance.getState());
instance.setState(ProcessState.STATE_TERMINATED);
evt.setNewState(ProcessState.STATE_TERMINATED);
evt.setProcessInstanceId(iid);
ProcessDAO process = instance.getProcess();
QName processName = process.getType();
evt.setProcessName(processName);
QName processId = process.getProcessId();
evt.setProcessId(processId);
_process.saveEvent(evt, instance);
//
// TerminationEvent (peer of ProcessCompletionEvent)
//
ProcessTerminationEvent terminationEvent =
new ProcessTerminationEvent();
terminationEvent.setProcessInstanceId(iid);
terminationEvent.setProcessName(processName);
terminationEvent.setProcessId(processId);
_process.saveEvent(terminationEvent, instance);
onEvent(evt);
onEvent(terminationEvent);
return null;
}
});
} catch (ManagementException me) {
throw me;
} catch (Exception e) {
__log.error("DbError", e);
throw new RuntimeException(e);
}
}
/**
* @return the process model. Currently an {@link OProcess}
* However it is not guaranteed that it will remain an OProcess
* in future versions of ODE or for different types
* of process lanaguage than BPEL.
*/
public Object getProcessModel() {
return _process.getOProcess();
}
}