| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.ode.bpel.memdao; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| |
| import javax.xml.namespace.QName; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.ode.bpel.common.CorrelationKey; |
| import org.apache.ode.bpel.common.ProcessState; |
| import org.apache.ode.bpel.dao.CorrelationSetDAO; |
| import org.apache.ode.bpel.dao.CorrelatorDAO; |
| import org.apache.ode.bpel.dao.PartnerLinkDAO; |
| import org.apache.ode.bpel.dao.ProcessDAO; |
| import org.apache.ode.bpel.dao.ProcessInstanceDAO; |
| |
| /** |
| * A very simple, in-memory implementation of the {@link ProcessDAO} interface. |
| */ |
| class ProcessDaoImpl extends DaoBaseImpl implements ProcessDAO { |
| private static final Logger __log = LoggerFactory.getLogger(ProcessDaoImpl.class); |
| |
| private QName _processId; |
| private QName _type; |
| private long _version; |
| final Map<String, CorrelatorDaoImpl> _correlators = new ConcurrentHashMap<String, CorrelatorDaoImpl>(); |
| protected final Map<Long, ProcessInstanceDAO> _instances = new ConcurrentHashMap<Long, ProcessInstanceDAO>(); |
| protected final Map<Long, Long> _instancesAge = new ConcurrentHashMap<Long, Long>(); |
| protected final Map<Integer, PartnerLinkDAO> _plinks = new ConcurrentHashMap<Integer, PartnerLinkDAO>(); |
| private Map<QName, ProcessDaoImpl> _store; |
| private BpelDAOConnectionImpl _conn; |
| private int _executionCount = 0; |
| private Collection<Long> _instancesToRemove = new ConcurrentLinkedQueue<Long>(); |
| private volatile long _lastRemoval = 0; |
| |
| private String _guid; |
| |
| public ProcessDaoImpl(BpelDAOConnectionImpl conn, Map<QName, ProcessDaoImpl> store, |
| QName processId, QName type, String guid, long version) { |
| if (__log.isDebugEnabled()) { |
| __log.debug("Creating ProcessDao object for process \"" + processId + "\"."); |
| } |
| |
| _guid = guid; |
| _conn = conn; |
| _store = store; |
| _processId = processId; |
| _type = type; |
| _version = version; |
| } |
| |
| public Serializable getId() { |
| return _guid; |
| } |
| |
| public QName getProcessId() { |
| return _processId; |
| } |
| |
| public CorrelatorDAO getCorrelator(String cid) { |
| CorrelatorDAO ret = _correlators.get(cid); |
| if (ret == null) { |
| throw new IllegalArgumentException("no such correlator: " + cid); |
| } |
| return ret; |
| } |
| |
| public Collection<CorrelatorDAO> getCorrelators() { |
| // Note: _correlators.values() is a Collection<CorrealatorDaoImpl>. We can't just return this object |
| // since Collection<CorrelatorDAO> is /not/ assignment compatible with Collection<CorrelatorDaoImpl>. |
| // However, a immutable Collection<CorrelationDAO> is assignment compatible with Collection<CorrelatorDaoImpl>, |
| // but.... we need to introduce some ambiguity into the type hierarchy so that Java will infer the correct type. |
| |
| // Make an ambiguous collection. |
| Collection<? extends CorrelatorDAO> foo = _correlators.values(); |
| |
| // In order to get a collection of the super-type from a sub-type we must make the collection read-only. |
| return Collections.unmodifiableCollection(foo); |
| } |
| |
| public void removeRoutes(String routeId, ProcessInstanceDAO target) { |
| for (CorrelatorDAO correlatorDAO : _correlators.values()) { |
| correlatorDAO.removeRoutes(routeId, target); |
| } |
| } |
| |
| public ProcessInstanceDAO createInstance(CorrelatorDAO correlator) { |
| final ProcessInstanceDaoImpl newInstance = new ProcessInstanceDaoImpl(_conn, this, correlator); |
| _conn.defer(new Runnable() { |
| public void run() { |
| _instances.put(newInstance.getInstanceId(), newInstance); |
| _instancesAge.put(newInstance.getInstanceId(), System.currentTimeMillis()); |
| } |
| }); |
| |
| discardOldInstances(); |
| |
| // Removing right away on rollback |
| final Long iid = newInstance.getInstanceId(); |
| _conn.onRollback(new Runnable() { |
| public void run() { |
| _instances.remove(iid); |
| _instancesAge.remove(iid); |
| } |
| }); |
| |
| _executionCount++; |
| return newInstance; |
| } |
| |
| public ProcessInstanceDAO getInstance(Long instanceId) { |
| return _instances.get(instanceId); |
| } |
| |
| public Collection<ProcessInstanceDAO> findInstance(CorrelationKey key) { |
| ArrayList<ProcessInstanceDAO> result = new ArrayList<ProcessInstanceDAO>(); |
| for (ProcessInstanceDAO instance : _instances.values()) { |
| for (CorrelationSetDAO corrSet : instance.getCorrelationSets()) { |
| if (corrSet.getValue().equals(key)) result.add(instance); |
| } |
| } |
| return result; |
| } |
| |
| public void instanceCompleted(ProcessInstanceDAO instance) { |
| // Cleaning up |
| if (__log.isDebugEnabled()) |
| __log.debug("Removing completed process instance " + instance.getInstanceId() + " from in-memory store."); |
| _instancesAge.remove(instance.getInstanceId()); |
| ProcessInstanceDAO removed = _instances.remove(instance.getInstanceId()); |
| if (removed == null) { |
| // Checking for leftover instances that should be removed |
| ArrayList<Long> removals = new ArrayList<Long>(_instancesToRemove); |
| for (Long iid : removals) { |
| _instances.remove(iid); |
| } |
| _instancesToRemove.removeAll(removals); |
| |
| // The instance can't be found probably because the transaction isn't committed yet and |
| // it doesn't exist. Saving its id for later cleanup. |
| _instancesToRemove.add(instance.getInstanceId()); |
| } |
| } |
| |
| public void deleteProcessAndRoutes() { |
| _store.remove(_processId); |
| } |
| |
| public long getVersion() { |
| return _version; |
| } |
| |
| public String getDeployer() { |
| return "nobody"; |
| } |
| |
| public QName getType() { |
| return _type; |
| } |
| |
| public CorrelatorDAO addCorrelator(String correlator) { |
| CorrelatorDaoImpl corr = new CorrelatorDaoImpl(correlator, _conn); |
| _correlators.put(corr.getCorrelatorId(), corr); |
| return corr; |
| } |
| |
| /** |
| * Nothing to do. |
| */ |
| public void update() { |
| //TODO Check requirement for persisting. |
| } |
| |
| public int getNumInstances() { |
| // Instances are removed after execution, using a counter instead |
| return _executionCount; |
| } |
| |
| public ProcessInstanceDAO getInstanceWithLock(Long iid) { |
| return getInstance(iid); |
| } |
| |
| public int getActivityFailureCount() { |
| return 0; |
| } |
| |
| public Date getActivityFailureDateTime() { |
| return null; |
| } |
| |
| public String getGuid() { |
| return _guid; |
| } |
| |
| public void setGuid(String guid) { |
| _guid = guid; |
| } |
| |
| public Collection<ProcessInstanceDAO> getActiveInstances() { |
| ArrayList<ProcessInstanceDAO> pis = new ArrayList<ProcessInstanceDAO>(); |
| for (ProcessInstanceDAO processInstanceDAO : _instances.values()) { |
| if (processInstanceDAO.getState() == ProcessState.STATE_ACTIVE) |
| pis.add(processInstanceDAO); |
| } |
| return pis; |
| } |
| |
| /** |
| * Discard in-memory instances that exceeded their time-to-live to prevent memory leaks |
| */ |
| void discardOldInstances() { |
| long now = System.currentTimeMillis(); |
| if (now > _lastRemoval + (_conn._mexTtl/10)) { |
| _lastRemoval = now; |
| Object[] oldInstances = _instancesAge.keySet().toArray(); |
| for (int i=oldInstances.length-1; i>=0; i--) { |
| Long id = (Long) oldInstances[i]; |
| Long age = _instancesAge.get(id); |
| if (age != null && now-age > _conn._mexTtl) { |
| if (_instances.get(id) != null) { |
| __log.warn("Discarding in-memory instance "+id+" because it exceeded its time-to-live: "+_instances.get(id)); |
| } |
| _instances.remove(id); |
| _instancesAge.remove(id); |
| } |
| } |
| } |
| } |
| } |