blob: 0ab3d673d92b9889fda9a5c83133317a78a4b25d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.dao.jpa;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.common.ProcessState;
import org.apache.ode.bpel.dao.ActivityRecoveryDAO;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.CorrelationSetDAO;
import org.apache.ode.bpel.dao.CorrelatorDAO;
import org.apache.ode.bpel.dao.FaultDAO;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
import org.apache.ode.bpel.dao.ScopeDAO;
import org.apache.ode.bpel.dao.ScopeStateEnum;
import org.apache.ode.bpel.dao.XmlDataDAO;
import org.apache.ode.bpel.evt.ProcessInstanceEvent;
import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY;
import org.w3c.dom.Element;
import javax.persistence.Basic;
import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.FetchType;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.ManyToOne;
import javax.persistence.NamedQueries;
import javax.persistence.NamedQuery;
import javax.persistence.OneToMany;
import javax.persistence.OneToOne;
import javax.persistence.Table;
import javax.xml.namespace.QName;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@Entity
@Table(name="ODE_PROCESS_INSTANCE")
@NamedQueries({
@NamedQuery(name=ProcessInstanceDAOImpl.DELETE_INSTANCES_BY_PROCESS, query="delete from ProcessInstanceDAOImpl as i where i._process = :process"),
@NamedQuery(name=ProcessInstanceDAOImpl.SELECT_INSTANCE_IDS_BY_PROCESS, query="select i._instanceId from ProcessInstanceDAOImpl as i where i._process = :process"),
@NamedQuery(name=ProcessInstanceDAOImpl.COUNT_INSTANCE_IDS_BY_PROCESS, query="select count(i._instanceId) from ProcessInstanceDAOImpl as i where i._process = :process"),
@NamedQuery(name=ProcessInstanceDAOImpl.SELECT_FAULT_IDS_BY_PROCESS, query="select i._faultId from ProcessInstanceDAOImpl as i where i._process = :process and i._faultId is not null"),
@NamedQuery(name=ProcessInstanceDAOImpl.COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_ID,
query="select count(i._instanceId), max(i._lastRecovery) from ProcessInstanceDAOImpl as i where i._process._processId = :processId and i._state in(:states) and exists(select r from ActivityRecoveryDAOImpl r where i = r._instance)"),
@NamedQuery(name=ProcessInstanceDAOImpl.COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_IDS,
query="select i._process._processId, count(i._instanceId), max(i._lastRecovery) from ProcessInstanceDAOImpl as i where i._process._processId in (:processIds) and i._state = 20 and exists(select r from ActivityRecoveryDAOImpl r where i = r._instance) group by i._process._processId"),
@NamedQuery(name=ProcessInstanceDAOImpl.COUNT_INSTANCES_BY_PROCESSES_IDS_AND_STATES,
query="select i._process._processId, count(i._instanceId) from ProcessInstanceDAOImpl as i where i._process._processId in (:processIds) and i._state in(:states) group by i._process._processId")
})
public class ProcessInstanceDAOImpl extends OpenJPADAO implements ProcessInstanceDAO {
private static final Logger __log = LoggerFactory.getLogger(ProcessInstanceDAOImpl.class);
public final static String DELETE_INSTANCES_BY_PROCESS = "DELETE_INSTANCES_BY_PROCESS";
public final static String SELECT_INSTANCE_IDS_BY_PROCESS = "SELECT_INSTANCE_IDS_BY_PROCESS";
public final static String COUNT_INSTANCE_IDS_BY_PROCESS = "COUNT_INSTANCE_IDS_BY_PROCESS";
public final static String SELECT_FAULT_IDS_BY_PROCESS = "SELECT_FAULT_IDS_BY_PROCESS";
public final static String COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_ID = "COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_ID";
public final static String COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_IDS = "COUNT_FAILED_INSTANCES_BY_STATUS_AND_PROCESS_IDS";
public static final String COUNT_INSTANCES_BY_PROCESSES_IDS_AND_STATES = "COUNT_INSTANCES_BY_PROCESSES_IDS_AND_STATES";
@Id @Column(name="ID")
@GeneratedValue(strategy=GenerationType.AUTO)
private Long _instanceId;
@Basic @Column(name="LAST_RECOVERY_DATE")
private Date _lastRecovery;
@Basic @Column(name="LAST_ACTIVE_TIME")
private Date _lastActive;
@Basic @Column(name="INSTANCE_STATE")
private short _state;
@Basic @Column(name="PREVIOUS_STATE")
private short _previousState;
@Lob @Column(name="EXECUTION_STATE")
private byte[] _executionState;
@Basic @Column(name="SEQUENCE")
private long _sequence;
@Basic @Column(name="DATE_CREATED")
private Date _dateCreated = new Date();
@OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH}) @Column(name="ROOT_SCOPE_ID")
private ScopeDAOImpl _rootScope;
@OneToMany(targetEntity=ScopeDAOImpl.class,mappedBy="_processInstance",fetch=FetchType.LAZY,cascade={CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH})
private Collection<ScopeDAO> _scopes = new ArrayList<ScopeDAO>();
@OneToMany(targetEntity=ActivityRecoveryDAOImpl.class,mappedBy="_instance",fetch=FetchType.LAZY,cascade={CascadeType.ALL})
private Collection<ActivityRecoveryDAO> _recoveries = new ArrayList<ActivityRecoveryDAO>();
@SuppressWarnings("unused")
@Basic @Column(name="FAULT_ID", insertable=false, updatable=false, nullable=true)
private long _faultId;
@OneToOne(fetch=FetchType.LAZY,cascade={CascadeType.MERGE, CascadeType.PERSIST, CascadeType.REFRESH}) @Column(name="FAULT_ID")
private FaultDAOImpl _fault;
@ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="PROCESS_ID")
private ProcessDAOImpl _process;
@ManyToOne(fetch=FetchType.LAZY,cascade={CascadeType.PERSIST}) @Column(name="INSTANTIATING_CORRELATOR_ID")
private CorrelatorDAOImpl _instantiatingCorrelator;
@OneToMany(targetEntity=MessageExchangeDAOImpl.class,mappedBy="_processInst",fetch=FetchType.LAZY)
@SuppressWarnings("unused")
private Collection<MessageExchangeDAO> _messageExchanges = new ArrayList<MessageExchangeDAO>();
private transient int _activityFailureCount = -1;
public ProcessInstanceDAOImpl() {}
public ProcessInstanceDAOImpl(CorrelatorDAOImpl correlator, ProcessDAOImpl process) {
_instantiatingCorrelator = correlator;
_process = process;
}
public void createActivityRecovery(String channel, long activityId,
String reason, Date dateTime, Element data, String[] actions,
int retries) {
ActivityRecoveryDAOImpl ar = new ActivityRecoveryDAOImpl(channel, activityId, reason, dateTime, data, actions, retries);
_recoveries.add(ar);
ar.setInstance(this);
_lastRecovery = dateTime;
}
public ScopeDAO createScope(ScopeDAO parentScope, String name, int scopeModelId) {
ScopeDAOImpl ret = new ScopeDAOImpl((ScopeDAOImpl)parentScope,name,scopeModelId,this);
ret.setState(ScopeStateEnum.ACTIVE);
_scopes.add(ret);
_rootScope = (parentScope == null)?ret:_rootScope;
// Must persist the scope to generate a scope ID
getEM().persist(ret);
return ret;
}
@SuppressWarnings("unchecked")
public Collection<CorrelationSetDAO> selectCorrelationSets(Collection<ProcessInstanceDAO> instances) {
return getEM().createNamedQuery(CorrelationSetDAOImpl.SELECT_CORRELATION_SETS_BY_INSTANCES).setParameter("instances", instances).getResultList();
}
public void delete(Set<CLEANUP_CATEGORY> cleanupCategories) {
delete(cleanupCategories, true);
}
public void delete(Set<CLEANUP_CATEGORY> cleanupCategories, boolean deleteMyRoleMex) {
if(__log.isDebugEnabled()) __log.debug("Cleaning up instance Data with " + cleanupCategories);
// remove jacob state
setExecutionState(null);
if (getEM() != null) {
if( !cleanupCategories.isEmpty() ) {
// by default, we do not flush before select; flush it, so we can delete no matter if an entity is loaded up
// or not; more importantly, OpenJPA will secretly load from the entire table if some entities reside only
// in memory
getEM().flush();
}
if (cleanupCategories.contains(CLEANUP_CATEGORY.EVENTS)) {
deleteEvents();
}
if (cleanupCategories.contains(CLEANUP_CATEGORY.CORRELATIONS)) {
deleteCorrelations();
}
if( cleanupCategories.contains(CLEANUP_CATEGORY.MESSAGES) ) {
deleteMessageRoutes();
}
if (cleanupCategories.contains(CLEANUP_CATEGORY.VARIABLES)) {
deleteVariables();
}
if (cleanupCategories.contains(CLEANUP_CATEGORY.INSTANCE)) {
deleteInstance();
}
getEM().flush();
}
}
private void deleteInstance() {
if( _fault != null ) {
getEM().remove(_fault);
}
getEM().remove(this); // This deletes ActivityRecoveryDAO
}
@SuppressWarnings("unchecked")
private void deleteVariables() {
Collection xmlDataIds = getEM().createNamedQuery(XmlDataDAOImpl.SELECT_XMLDATA_IDS_BY_INSTANCE).setParameter("instance", this).getResultList();
batchUpdateByIds(xmlDataIds.iterator(), getEM().createNamedQuery(XmlDataProperty.DELETE_XML_DATA_PROPERTIES_BY_XML_DATA_IDS), "xmlDataIds");
Collection scopeIds = getEM().createNamedQuery(ScopeDAOImpl.SELECT_SCOPE_IDS_BY_INSTANCE).setParameter("instance", this).getResultList();
batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(XmlDataDAOImpl.DELETE_XMLDATA_BY_SCOPE_IDS), "scopeIds");
batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(PartnerLinkDAOImpl.DELETE_PARTNER_LINKS_BY_SCOPE_IDS), "scopeIds");
batchUpdateByIds(scopeIds.iterator(), getEM().createNamedQuery(ScopeDAOImpl.DELETE_SCOPES_BY_SCOPE_IDS), "ids");
}
private void deleteMessageRoutes() {
getEM().createNamedQuery(MessageRouteDAOImpl.DELETE_MESSAGE_ROUTES_BY_INSTANCE).setParameter ("instance", this).executeUpdate();
}
@SuppressWarnings("unchecked")
private void deleteCorrelations() {
Collection corrSetIds = getEM().createNamedQuery(CorrelationSetDAOImpl.SELECT_CORRELATION_SET_IDS_BY_INSTANCE).setParameter("instance", this).getResultList();
batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrSetProperty.DELETE_CORSET_PROPERTIES_BY_PROPERTY_IDS), "corrSetIds");
batchUpdateByIds(corrSetIds.iterator(), getEM().createNamedQuery(CorrelationSetDAOImpl.DELETE_CORRELATION_SETS_BY_IDS), "ids");
}
private void deleteEvents() {
getEM().createNamedQuery(EventDAOImpl.DELETE_EVENTS_BY_INSTANCE).setParameter ("instance", this).executeUpdate();
}
public void deleteActivityRecovery(String channel) {
ActivityRecoveryDAOImpl toRemove = null;
for (ActivityRecoveryDAO _recovery : _recoveries) {
ActivityRecoveryDAOImpl arElement = (ActivityRecoveryDAOImpl) _recovery;
if (arElement.getChannel().equals(channel)) {
toRemove = arElement;
break;
}
}
if (toRemove != null) {
getEM().remove(toRemove);
_recoveries.remove(toRemove);
}
}
public void finishCompletion() {
// make sure we have completed.
assert (ProcessState.isFinished(this.getState()));
// let our process know that we've done our work.
}
public long genMonotonic() {
return _sequence++;
}
public int getActivityFailureCount() {
if( _activityFailureCount == -1 ) {
_activityFailureCount = _recoveries.size();
}
return _activityFailureCount;
}
public void setActivityFailureCount(int activityFailureCount) {
_activityFailureCount = activityFailureCount;
}
public Date getActivityFailureDateTime() {
return _lastRecovery;
}
public Collection<ActivityRecoveryDAO> getActivityRecoveries() {
return _recoveries;
}
public CorrelationSetDAO getCorrelationSet(String name) {
// TODO: should this method be deprecated?
// Its not clear where the correlation set for the process is used
// or populated.
throw new UnsupportedOperationException();
//return null;
}
public Set<CorrelationSetDAO> getCorrelationSets() {
// TODO: should this method be deprecated?
// Its not clear where the correlation set for the process is used
// or populated.
return new HashSet<CorrelationSetDAO>();
}
public Date getCreateTime() {
return _dateCreated;
}
public EventsFirstLastCountTuple getEventsFirstLastCount() {
// TODO Auto-generated method stub
return null;
}
public byte[] getExecutionState() {
return _executionState;
}
public FaultDAO getFault() {
return _fault;
}
public Long getInstanceId() {
return _instanceId;
}
public CorrelatorDAO getInstantiatingCorrelator() {
return _instantiatingCorrelator;
}
public Date getLastActiveTime() {
return _lastActive;
}
public short getPreviousState() {
return _previousState;
}
public ProcessDAO getProcess() {
return _process;
}
public ScopeDAO getRootScope() {
return _rootScope;
}
public ScopeDAO getScope(Long scopeInstanceId) {
return getEM().find(ScopeDAOImpl.class, scopeInstanceId);
}
public Collection<ScopeDAO> getScopes(String scopeName) {
Collection<ScopeDAO> ret = new ArrayList<ScopeDAO>();
for (ScopeDAO sElement : _scopes) {
if ( sElement.getName().equals(scopeName)) ret.add(sElement);
}
return ret;
}
public Collection<ScopeDAO> getScopes() {
return _scopes;
}
public short getState() {
return _state;
}
public XmlDataDAO[] getVariables(String variableName, int scopeModelId) {
//TODO: This method is not used and should be considered a deprecation candidate.
List<XmlDataDAO> results = new ArrayList<XmlDataDAO>();
for (ScopeDAO sElement : _scopes) {
if ( sElement.getModelId() == scopeModelId) {
XmlDataDAO var = sElement.getVariable(variableName);
if ( var != null ) results.add(var);
}
}
return results.toArray(new XmlDataDAO[results.size()]);
}
public void insertBpelEvent(ProcessInstanceEvent event) {
getConn().insertBpelEvent(event, getProcess(), this);
}
public void setExecutionState(byte[] execState) {
_executionState = execState;
}
public void setFault(FaultDAO fault) {
_fault = (FaultDAOImpl)fault;
}
public void setFault(QName faultName, String explanation, int faultLineNo,
int activityId, Element faultMessage) {
_fault = new FaultDAOImpl(faultName,explanation,faultLineNo,activityId,faultMessage);
}
public void setLastActiveTime(Date dt) {
_lastActive = dt;
}
public void setState(short state) {
_previousState = _state;
_state = state;
if (state == ProcessState.STATE_TERMINATED) {
deleteMessageRoutes();
}
}
void removeRoutes(String routeGroupId) {
_process.removeRoutes(routeGroupId, this);
}
public BpelDAOConnection getConnection() {
return new BPELDAOConnectionImpl(getEM());
}
public Collection<String> getMessageExchangeIds() {
Collection<String> c = new HashSet<String>();
for (MessageExchangeDAO m : _messageExchanges) {
c.add(m.getMessageExchangeId());
}
return c;
}
}