blob: 2ee6840625501082a2d9d9109a644c7e2507bc25 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.memdao;
import org.apache.ode.bpel.common.ProcessState;
import org.apache.ode.bpel.dao.ActivityRecoveryDAO;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.CorrelationSetDAO;
import org.apache.ode.bpel.dao.CorrelatorDAO;
import org.apache.ode.bpel.dao.FaultDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.dao.ScopeDAO;
import org.apache.ode.bpel.dao.XmlDataDAO;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.apache.ode.utils.QNameUtils;
import org.w3c.dom.Element;
import javax.xml.namespace.QName;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* A very simple, in-memory implementation of the {@link ProcessInstanceDAO}
* interface.
*/
public class ProcessInstanceDaoImpl extends DaoBaseImpl implements ProcessInstanceDAO {
private static final Collection<ScopeDAO> EMPTY_SCOPE_DAOS = Collections.emptyList();
private short _previousState;
private short _state;
private Long _instanceId;
private ProcessDaoImpl _processDao;
private Object _soup;
private Map<Long, ScopeDAO> _scopes = new HashMap<Long, ScopeDAO>();
private Map<String, List<ScopeDAO>> _scopesByName = new HashMap<String, List<ScopeDAO>>();
private Map<String, byte[]> _messageExchanges = new HashMap<String, byte[]>();
private ScopeDAO _rootScope;
private FaultDAO _fault;
private CorrelatorDAO _instantiatingCorrelator;
private BpelDAOConnection _conn;
private int _failureCount;
private Date _failureDateTime;
private Map<String, ActivityRecoveryDAO> _activityRecoveries = new HashMap<String, ActivityRecoveryDAO>();
// TODO: Remove this, we should be using the main event store...
private List<ProcessInstanceEvent> _events = new ArrayList<ProcessInstanceEvent>();
private Date _lastActive;
private int _seq;
ProcessInstanceDaoImpl(BpelDAOConnection conn, ProcessDaoImpl processDao, CorrelatorDAO correlator) {
_state = 0;
_processDao = processDao;
_instantiatingCorrelator = correlator;
_soup = null;
_instanceId = IdGen.newProcessId();
_conn = conn;
}
public XmlDataDAO[] getVariables(String variableName, int scopeModelId) {
ArrayList<XmlDataDAO> res = new ArrayList<XmlDataDAO>();
for (ScopeDAO scope : _scopes.values()) {
if (scope.getModelId() == scopeModelId) {
XmlDataDAO xmld = scope.getVariable(variableName);
if (xmld != null)
res.add(xmld);
}
}
return res.toArray(new XmlDataDAO[res.size()]);
}
public Set<CorrelationSetDAO> getCorrelationSets() {
HashSet<CorrelationSetDAO> res = new HashSet<CorrelationSetDAO>();
for (ScopeDAO scopeDAO : _scopes.values()) {
res.addAll(scopeDAO.getCorrelationSets());
}
return res;
}
public CorrelationSetDAO getCorrelationSet(String name) {
for (ScopeDAO scopeDAO : _scopes.values()) {
if (scopeDAO.getCorrelationSet(name) != null)
return scopeDAO.getCorrelationSet(name);
}
return null;
}
public void setFault(QName name, String explanation, int lineNo, int activityId, Element faultData) {
_fault = new FaultDaoImpl(QNameUtils.fromQName(name), explanation, faultData, lineNo, activityId);
}
public void setFault(FaultDAO fault) {
_fault = fault;
}
public FaultDAO getFault() {
return _fault;
}
/**
* @see ProcessInstanceDAO#getExecutionState()
*/
public byte[] getExecutionState() {
throw new IllegalStateException("In-memory instances are never serialized");
}
public void setExecutionState(byte[] bytes) {
throw new IllegalStateException("In-memory instances are never serialized");
}
public Object getSoup() {
return _soup;
}
public void setSoup(Object soup) {
_soup = soup;
}
public byte[] getMessageExchange(String identifier) {
byte[] mex = _messageExchanges.get(identifier);
assert (mex != null);
return mex;
}
/**
* @see ProcessInstanceDAO#getProcess()
*/
public ProcessDAO getProcess() {
return _processDao;
}
/**
* @see ProcessInstanceDAO#getRootScope()
*/
public ScopeDAO getRootScope() {
return _rootScope;
}
/**
* @see ProcessInstanceDAO#setState(short)
*/
public void setState(short state) {
_previousState = _state;
_state = state;
if (state == ProcessState.STATE_TERMINATED) {
for (CorrelatorDAO correlatorDAO : _processDao.getCorrelators()) {
correlatorDAO.removeRoutes(null, this);
}
}
}
/**
* @see ProcessInstanceDAO#getState()
*/
public short getState() {
return _state;
}
public void addMessageExchange(String identifier, byte[] data) {
assert (!_messageExchanges.containsKey(identifier));
_messageExchanges.put(identifier, data);
}
public ScopeDAO createScope(ScopeDAO parentScope, String scopeType, int scopeModelId) {
ScopeDaoImpl newScope = new ScopeDaoImpl(this, parentScope, scopeType, scopeModelId);
_scopes.put(newScope.getScopeInstanceId(), newScope);
List<ScopeDAO> namedScopes = _scopesByName.get(scopeType);
if (namedScopes == null) {
namedScopes = new LinkedList<ScopeDAO>();
_scopesByName.put(scopeType, namedScopes);
}
namedScopes.add(newScope);
if (parentScope == null) {
assert _rootScope == null;
_rootScope = newScope;
}
return newScope;
}
public Long getInstanceId() {
return _instanceId;
}
/**
* @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getScope(java.lang.Long)
*/
public ScopeDAO getScope(Long scopeInstanceId) {
return _scopes.get(scopeInstanceId);
}
public List<ProcessInstanceEvent> getEvents(int idx, int count) {
int sidx = Math.max(idx, 0);
sidx = Math.min(sidx, _events.size() - 1);
int eidx = Math.min(sidx + count, _events.size());
return _events.subList(sidx, eidx);
}
/**
* @see org.apache.ode.bpel.dao.ProcessInstanceDAO#insertBpelEvent(org.apache.ode.bpel.evt.ProcessInstanceEvent)
*/
public void insertBpelEvent(ProcessInstanceEvent event) {
_events.add(event);
}
public int getEventCount() {
return _events.size();
}
/**
* @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getInstantiatingCorrelator()
*/
public CorrelatorDAO getInstantiatingCorrelator() {
return _instantiatingCorrelator;
}
/**
* @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getScopes(java.lang.String)
*/
public Collection<ScopeDAO> getScopes(String scopeName) {
List<ScopeDAO> scopes = _scopesByName.get(scopeName);
return (scopes == null ? EMPTY_SCOPE_DAOS : scopes);
}
/**
* @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getPreviousState()
*/
public short getPreviousState() {
return _previousState;
}
/**
* @see org.apache.ode.bpel.dao.ProcessInstanceDAO#getLastActiveTime()
*/
public Date getLastActiveTime() {
return _lastActive;
}
/**
* @see org.apache.ode.bpel.dao.ProcessInstanceDAO#setLastActiveTime(java.util.Date)
*/
public void setLastActiveTime(Date dt) {
_lastActive = dt;
}
/**
* @see org.apache.ode.bpel.dao.ProcessInstanceDAO#finishCompletion()
*/
public void finishCompletion() {
// make sure we have completed.
assert (ProcessState.isFinished(this.getState()));
// let our process know that we've done our work.
this.getProcess().instanceCompleted(this);
}
public void delete(Set<CLEANUP_CATEGORY> cleanupCategories) {
_processDao._instances.remove(_instanceId);
}
public Collection<ScopeDAO> getScopes() {
return _scopes.values();
}
public EventsFirstLastCountTuple getEventsFirstLastCount() {
EventsFirstLastCountTuple ret = new EventsFirstLastCountTuple();
ret.count = _events.size();
Date first = new Date();
Date last = new Date(0);
for (ProcessInstanceEvent event : _events) {
if (event.getTimestamp().before(first))
first = event.getTimestamp();
if (event.getTimestamp().after(last))
last = event.getTimestamp();
}
ret.first = first;
ret.last = last;
return ret;
}
public int getActivityFailureCount() {
return _failureCount;
}
public Date getActivityFailureDateTime() {
return _failureDateTime;
}
public Collection<ActivityRecoveryDAO> getActivityRecoveries() {
return _activityRecoveries.values();
}
public void createActivityRecovery(String channel, long activityId, String reason, Date dateTime, Element data,
String[] actions, int retries) {
_activityRecoveries
.put(channel, new ActivityRecoveryDAOImpl(channel, activityId, reason, dateTime, data, actions, retries));
_failureCount = _activityRecoveries.size();
_failureDateTime = dateTime;
}
public void deleteActivityRecovery(String channel) {
_activityRecoveries.remove(channel);
_failureCount = _activityRecoveries.size();
}
public synchronized long genMonotonic() {
return ++_seq;
}
static class ActivityRecoveryDAOImpl implements ActivityRecoveryDAO {
private long _activityId;
private String _channel;
private String _reason;
private Element _details;
private Date _dateTime;
private String _actions;
private int _retries;
ActivityRecoveryDAOImpl(String channel, long activityId, String reason, Date dateTime, Element details, String[] actions,
int retries) {
_activityId = activityId;
_channel = channel;
_reason = reason;
_details = details;
_dateTime = dateTime;
_actions = actions[0];
for (int i = 1; i < actions.length; ++i)
_actions += " " + actions[i];
_retries = retries;
}
public long getActivityId() {
return _activityId;
}
public String getChannel() {
return _channel;
}
public String getReason() {
return _reason;
}
public Element getDetails() {
return _details;
}
public Date getDateTime() {
return _dateTime;
}
public String getActions() {
return _actions;
}
public String[] getActionsList() {
return _actions.split(" ");
}
public int getRetries() {
return _retries;
}
}
void removeRoutes(String routeGroupId) {
for (CorrelatorDaoImpl correlator : _processDao._correlators.values())
correlator._removeRoutes(routeGroupId, this);
}
public BpelDAOConnection getConnection() {
return _conn;
}
public String toString() {
return "mem.instance(type=" + _processDao.getType() + " iid=" + _instanceId + ")";
}
}