blob: df6340058c0615306632fbcebb71d36977e05630 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.memdao;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.xml.namespace.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.common.CorrelationKey;
import org.apache.ode.bpel.common.ProcessState;
import org.apache.ode.bpel.dao.CorrelationSetDAO;
import org.apache.ode.bpel.dao.CorrelatorDAO;
import org.apache.ode.bpel.dao.PartnerLinkDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
/**
* A very simple, in-memory implementation of the {@link ProcessDAO} interface.
*/
class ProcessDaoImpl extends DaoBaseImpl implements ProcessDAO {
private static final Logger __log = LoggerFactory.getLogger(ProcessDaoImpl.class);
private QName _processId;
private QName _type;
private long _version;
final Map<String, CorrelatorDaoImpl> _correlators = new ConcurrentHashMap<String, CorrelatorDaoImpl>();
protected final Map<Long, ProcessInstanceDAO> _instances = new ConcurrentHashMap<Long, ProcessInstanceDAO>();
protected final Map<Long, Long> _instancesAge = new ConcurrentHashMap<Long, Long>();
protected final Map<Integer, PartnerLinkDAO> _plinks = new ConcurrentHashMap<Integer, PartnerLinkDAO>();
private Map<QName, ProcessDaoImpl> _store;
private BpelDAOConnectionImpl _conn;
private int _executionCount = 0;
private Collection<Long> _instancesToRemove = new ConcurrentLinkedQueue<Long>();
private volatile long _lastRemoval = 0;
private String _guid;
public ProcessDaoImpl(BpelDAOConnectionImpl conn, Map<QName, ProcessDaoImpl> store,
QName processId, QName type, String guid, long version) {
if (__log.isDebugEnabled()) {
__log.debug("Creating ProcessDao object for process \"" + processId + "\".");
}
_guid = guid;
_conn = conn;
_store = store;
_processId = processId;
_type = type;
_version = version;
}
public Serializable getId() {
return _guid;
}
public QName getProcessId() {
return _processId;
}
public CorrelatorDAO getCorrelator(String cid) {
CorrelatorDAO ret = _correlators.get(cid);
if (ret == null) {
throw new IllegalArgumentException("no such correlator: " + cid);
}
return ret;
}
public Collection<CorrelatorDAO> getCorrelators() {
// Note: _correlators.values() is a Collection<CorrealatorDaoImpl>. We can't just return this object
// since Collection<CorrelatorDAO> is /not/ assignment compatible with Collection<CorrelatorDaoImpl>.
// However, a immutable Collection<CorrelationDAO> is assignment compatible with Collection<CorrelatorDaoImpl>,
// but.... we need to introduce some ambiguity into the type hierarchy so that Java will infer the correct type.
// Make an ambiguous collection.
Collection<? extends CorrelatorDAO> foo = _correlators.values();
// In order to get a collection of the super-type from a sub-type we must make the collection read-only.
return Collections.unmodifiableCollection(foo);
}
public void removeRoutes(String routeId, ProcessInstanceDAO target) {
for (CorrelatorDAO correlatorDAO : _correlators.values()) {
correlatorDAO.removeRoutes(routeId, target);
}
}
public ProcessInstanceDAO createInstance(CorrelatorDAO correlator) {
final ProcessInstanceDaoImpl newInstance = new ProcessInstanceDaoImpl(_conn, this, correlator);
_conn.defer(new Runnable() {
public void run() {
_instances.put(newInstance.getInstanceId(), newInstance);
_instancesAge.put(newInstance.getInstanceId(), System.currentTimeMillis());
}
});
discardOldInstances();
// Removing right away on rollback
final Long iid = newInstance.getInstanceId();
_conn.onRollback(new Runnable() {
public void run() {
_instances.remove(iid);
_instancesAge.remove(iid);
}
});
_executionCount++;
return newInstance;
}
public ProcessInstanceDAO getInstance(Long instanceId) {
return _instances.get(instanceId);
}
public Collection<ProcessInstanceDAO> findInstance(CorrelationKey key) {
ArrayList<ProcessInstanceDAO> result = new ArrayList<ProcessInstanceDAO>();
for (ProcessInstanceDAO instance : _instances.values()) {
for (CorrelationSetDAO corrSet : instance.getCorrelationSets()) {
if (corrSet.getValue().equals(key)) result.add(instance);
}
}
return result;
}
public void instanceCompleted(ProcessInstanceDAO instance) {
// Cleaning up
if (__log.isDebugEnabled())
__log.debug("Removing completed process instance " + instance.getInstanceId() + " from in-memory store.");
_instancesAge.remove(instance.getInstanceId());
ProcessInstanceDAO removed = _instances.remove(instance.getInstanceId());
if (removed == null) {
// Checking for leftover instances that should be removed
ArrayList<Long> removals = new ArrayList<Long>(_instancesToRemove);
for (Long iid : removals) {
_instances.remove(iid);
}
_instancesToRemove.removeAll(removals);
// The instance can't be found probably because the transaction isn't committed yet and
// it doesn't exist. Saving its id for later cleanup.
_instancesToRemove.add(instance.getInstanceId());
}
}
public void deleteProcessAndRoutes() {
_store.remove(_processId);
}
public long getVersion() {
return _version;
}
public String getDeployer() {
return "nobody";
}
public QName getType() {
return _type;
}
public CorrelatorDAO addCorrelator(String correlator) {
CorrelatorDaoImpl corr = new CorrelatorDaoImpl(correlator, _conn);
_correlators.put(corr.getCorrelatorId(), corr);
return corr;
}
/**
* Nothing to do.
*/
public void update() {
//TODO Check requirement for persisting.
}
public int getNumInstances() {
// Instances are removed after execution, using a counter instead
return _executionCount;
}
public ProcessInstanceDAO getInstanceWithLock(Long iid) {
return getInstance(iid);
}
public int getActivityFailureCount() {
return 0;
}
public Date getActivityFailureDateTime() {
return null;
}
public String getGuid() {
return _guid;
}
public void setGuid(String guid) {
_guid = guid;
}
public Collection<ProcessInstanceDAO> getActiveInstances() {
ArrayList<ProcessInstanceDAO> pis = new ArrayList<ProcessInstanceDAO>();
for (ProcessInstanceDAO processInstanceDAO : _instances.values()) {
if (processInstanceDAO.getState() == ProcessState.STATE_ACTIVE)
pis.add(processInstanceDAO);
}
return pis;
}
/**
* Discard in-memory instances that exceeded their time-to-live to prevent memory leaks
*/
void discardOldInstances() {
long now = System.currentTimeMillis();
if (now > _lastRemoval + (_conn._mexTtl/10)) {
_lastRemoval = now;
Object[] oldInstances = _instancesAge.keySet().toArray();
for (int i=oldInstances.length-1; i>=0; i--) {
Long id = (Long) oldInstances[i];
Long age = _instancesAge.get(id);
if (age != null && now-age > _conn._mexTtl) {
if (_instances.get(id) != null) {
__log.warn("Discarding in-memory instance "+id+" because it exceeded its time-to-live: "+_instances.get(id));
}
_instances.remove(id);
_instancesAge.remove(id);
}
}
}
}
}