/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.ode.store;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.compiler.api.CompilationException;
import org.apache.ode.bpel.dd.DeployDocument;
import org.apache.ode.bpel.dd.TDeployment;
import org.apache.ode.bpel.iapi.*;
import org.apache.ode.il.config.OdeConfigProperties;
import org.apache.ode.store.DeploymentUnitDir.CBPInfo;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.msg.MessageBundle;
import org.h2.jdbcx.JdbcDataSource;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;

import javax.sql.DataSource;
import javax.xml.namespace.QName;
import java.io.File;
import java.io.IOException;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * <p>
 * JDBC-based implementation of a process store. Also provides an "in-memory" store by way of H2 database.
 * </p>
 *
 * <p>
 * The philsophy here is to keep things simple. Process store operations are relatively infrequent. Performance of the public
 * methods is not a concern. However, note that the {@link org.apache.ode.bpel.iapi.ProcessConf} objects returned by the class are
 * going to be used from within the engine runtime, and hence their performance needs to be very good. Similarly, these objects
 * should be immutable so as not to confuse the engine.
 *
 * Note the way that the database is used in this class, it is more akin to a recovery log, this is intentional: we want to start
 * up, load stuff from the database and then pretty much forget about it when it comes to reads.
 *
 * @author Maciej Szefler <mszefler at gmail dot com>
 * @author mriou <mriou at apache dot org>
 */
public class ProcessStoreImpl implements ProcessStore {

    private static final Logger __log = LoggerFactory.getLogger(ProcessStoreImpl.class);

    private static final Messages __msgs = MessageBundle.getMessages(Messages.class);

    private final CopyOnWriteArrayList<ProcessStoreListener> _listeners = new CopyOnWriteArrayList<ProcessStoreListener>();

    protected Map<QName, ProcessConfImpl> _processes = new HashMap<QName, ProcessConfImpl>();

    protected Map<String, DeploymentUnitDir> _deploymentUnits = new HashMap<String, DeploymentUnitDir>();

    /** Guards access to the _processes and _deploymentUnits */
    private final ReadWriteLock _rw = new ReentrantReadWriteLock();

    private ConfStoreConnectionFactory _cf;

    private EndpointReferenceContext eprContext;

    private boolean generateProcessEventsAll;

    protected File _deployDir;

    protected File _configDir;



    /**
     * Executor used to process DB transactions. Allows us to isolate the TX context, and to ensure that only one TX gets executed a
     * time. We don't really care to parallelize these operations because: i) HSQL does not isolate transactions and we don't want
     * to get confused ii) we're already serializing all the operations with a read/write lock. iii) we don't care about
     * performance, these are infrequent operations.
     */
    private ExecutorService _executor = Executors.newSingleThreadExecutor(new SimpleThreadFactory());

    /**
     * In-memory DataSource, or <code>null</code> if we are using a real DS. We need this to shutdown the DB.
     */
    private DataSource _inMemDs;

    private static final ThreadLocal<Long> _currentVersion = new ThreadLocal<Long>();

    public ProcessStoreImpl() {
        this(null, null, "", new OdeConfigProperties(new Properties(), ""), true);
    }

    public ProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel) {
        this.eprContext = eprContext;
        this.generateProcessEventsAll = props.getProperty("generateProcessEvents", "all").equals("all");
        if (ds != null) {
            // ugly hack
            if (persistenceType.toLowerCase().indexOf("hib") != -1) {
                _cf = new org.apache.ode.store.hib.DbConfStoreConnectionFactory(ds, props.getProperties(), createDatamodel, props.getTxFactoryClass());
            } else {
                _cf = new org.apache.ode.store.jpa.DbConfStoreConnectionFactory(ds, props.getProperties(), createDatamodel, props.getTxFactoryClass());
            }
         } else {
            // If the datasource is not provided, then we create a H2-based
            // in-memory database. Makes testing a bit simpler.
            DataSource h2 = createInternalDS(new GUID().toString());
            if ("hibernate".equalsIgnoreCase(persistenceType)) {
                _cf = new org.apache.ode.store.hib.DbConfStoreConnectionFactory(h2, props.getProperties(), createDatamodel, props.getTxFactoryClass());
            } else {
                _cf = new org.apache.ode.store.jpa.DbConfStoreConnectionFactory(h2, props.getProperties(), createDatamodel, props.getTxFactoryClass());
            }
            _inMemDs = h2;
        }


    }


    /**
     * Constructor that hardwires OpenJPA on a new in-memory database. Suitable for tests.
     */
    public ProcessStoreImpl(EndpointReferenceContext eprContext, DataSource inMemDs) {
        this.eprContext = eprContext;
        DataSource h2 = createInternalDS(new GUID().toString());
        //when in memory we always create the model as we are starting from scratch
        _cf = new org.apache.ode.store.jpa.DbConfStoreConnectionFactory(h2, true, OdeConfigProperties.DEFAULT_TX_FACTORY_CLASS_NAME);
        _inMemDs = h2;
    }

    public void shutdown() {
        if (_inMemDs != null) {
            shutdownInternalDB(_inMemDs);
            _inMemDs = null;
        }
        if (_executor != null) {
            _executor.shutdownNow();
            _executor = null;
        }
    }

    @Override
    protected void finalize() throws Throwable {
        // force a shutdown so that HSQL cleans up its mess.
        try {
            shutdown();
        } catch (Throwable t) {
            ; // we tried, no worries.
        }
        super.finalize();
    }

    /**
     * Deploys a process.
     */
    public Collection<QName> deploy(final File deploymentUnitDirectory, boolean autoincrementVersion) {
        return deploy(deploymentUnitDirectory, true, null, autoincrementVersion);
    }

    public Collection<QName> deploy(final File deploymentUnitDirectory) {
        return deploy(deploymentUnitDirectory, true, null, OdeGlobalConfig.autoincrementVersion());
    }

    /**
     * Deploys a process.
     */
    public Collection<QName> deploy(final File deploymentUnitDirectory, boolean activate, String duName, boolean autoincrementVersion) {
        __log.info(__msgs.msgDeployStarting(deploymentUnitDirectory));

        final Date deployDate = new Date();

        // Create the DU and compile/scan it before acquiring lock.
        final DeploymentUnitDir du = new DeploymentUnitDir(deploymentUnitDirectory);
        if( duName != null ) {
            // Override the package name if given from the parameter
            du.setName(duName);
        }

        long version;
        if (autoincrementVersion || du.getStaticVersion() == -1) {
            // Process and DU use a monotonically increased single version number by default.
            try {
                version = getCurrentVersion();
            } finally {
                //we need to reset the current version thread local value.
                _currentVersion.set(null);
            }
        } else {
            version = du.getStaticVersion();
        }
        du.setVersion(version);

        try {
            du.compile();
        } catch (CompilationException ce) {
            String errmsg = __msgs.msgDeployFailCompileErrors(ce);
            __log.error(errmsg, ce);
            throw new ContextException(errmsg, ce);
        }

        du.scan();
        final DeployDocument dd = du.getDeploymentDescriptor();
        final ArrayList<ProcessConfImpl> processes = new ArrayList<ProcessConfImpl>();
        Collection<QName> deployed;

        _rw.writeLock().lock();

        try {
            if (_deploymentUnits.containsKey(du.getName())) {
                String errmsg = __msgs.msgDeployFailDuplicateDU(du.getName());
                __log.error(errmsg);
                throw new ContextException(errmsg);
            }

            retirePreviousPackageVersions(du);

            for (TDeployment.Process processDD : dd.getDeploy().getProcessArray()) {
                QName pid = toPid(processDD.getName(), version);

                if (_processes.containsKey(pid)) {
                    String errmsg = __msgs.msgDeployFailDuplicatePID(processDD.getName(), du.getName());
                    __log.error(errmsg);
                    throw new ContextException(errmsg);
                }

                QName type = processDD.getType() != null ? processDD.getType() : processDD.getName();

                CBPInfo cbpInfo = du.getCBPInfo(type);
                if (cbpInfo == null) {
                    String errmsg = __msgs.msgDeployFailedProcessNotFound(processDD.getName(), du.getName());
                    __log.error(errmsg);
                    throw new ContextException(errmsg);
                }

                ProcessConfImpl pconf = new ProcessConfImpl(pid, processDD.getName(), version, du, processDD, deployDate,
                        calcInitialProperties(du.getProperties(), processDD), calcInitialState(processDD), eprContext, _configDir, generateProcessEventsAll);
                processes.add(pconf);
            }

            _deploymentUnits.put(du.getName(), du);

            for (ProcessConfImpl process : processes) {
                __log.info(__msgs.msgProcessDeployed(du.getDeployDir(), process.getProcessId()));
                _processes.put(process.getProcessId(), process);

            }

        } finally {
            _rw.writeLock().unlock();
        }

        // Do the deployment in the DB. We need this so that we remember deployments across system shutdowns.
        // We don't fail if there is a DB error, simply print some errors.
        deployed = exec(new Callable<Collection<QName>>() {
            public Collection<QName> call(ConfStoreConnection conn) {
                // Check that this deployment unit is not deployed.
                DeploymentUnitDAO dudao = conn.getDeploymentUnit(du.getName());
                if (dudao != null) {
                    String errmsg = "Database out of synch for DU " + du.getName();
                    __log.warn(errmsg);
                    dudao.delete();
                }

                dudao = conn.createDeploymentUnit(du.getName());
                try {
                    dudao.setDeploymentUnitDir(deploymentUnitDirectory.getCanonicalPath());
                } catch (IOException e1) {
                    String errmsg = "Error getting canonical path for " + du.getName()
                            + "; deployment unit will not be available after restart!";
                    __log.error(errmsg);

                }

                ArrayList<QName> deployed = new ArrayList<QName>();
                // Going trough each process declared in the dd
                for (ProcessConfImpl pc : processes) {
                    try {
                        ProcessConfDAO newDao = dudao.createProcess(pc.getProcessId(), pc.getType(), pc.getVersion());
                        newDao.setState(pc.getState());
                        for (Map.Entry<QName, Node> prop : pc.getProcessProperties().entrySet()) {
                            newDao.setProperty(prop.getKey(), DOMUtils.domToString(prop.getValue()));
                        }
                        deployed.add(pc.getProcessId());
                    } catch (Throwable e) {
                        String errmsg = "Error persisting deployment record for " + pc.getProcessId()
                                + "; process will not be available after restart!";
                        __log.error(errmsg, e);
                    }
                }
                return deployed;
            }

        });


        _rw.readLock().lock();
        boolean readLockHeld = true;
        try {
            for (ProcessConfImpl process : processes) {
                fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.DEPLOYED, process.getProcessId(), process.getDeploymentUnit()
                        .getName()));
                fireStateChange(process.getProcessId(), process.getState(), process.getDeploymentUnit().getName());
            }
        } catch (Exception e) {
            //need to unlock as undeploy operation will need a writeLock
            _rw.readLock().unlock();
            readLockHeld = false;

            // A problem at that point means that engine deployment failed, we don't want the store to keep the du
            __log.warn("Deployment failed within the engine, store undeploying process.", e);
            undeploy(deploymentUnitDirectory);
            if (e instanceof ContextException) throw (ContextException) e;
            else throw new ContextException("Deployment failed within the engine. " + e.getMessage(), e);
        } finally {
            if(readLockHeld)
                _rw.readLock().unlock();
        }

        return deployed;
    }

    /**
     * Retire all the other versions of the same DU:
     * first take the DU name and insert version regexp,
     * than try to match the this string against names of already deployed DUs.
     * For instance if we are deploying DU "AbsenceRequest-2/AbsenceRequest.ode" and
     * there's already version 2 than regexp
     * "AbsenceRequest([-\\.](\d)+)?/AbsenceRequest.ode" will be matched against
     * "AbsenceRequest-2/AbsenceRequest.ode" and setRetirePackage() will be called accordingly.
     */
    private void retirePreviousPackageVersions(DeploymentUnitDir du) {

        Pattern duNamePattern = getPreviousPackageVersionPattern(du.getName());

        for (String deployedDUname : _deploymentUnits.keySet()) {
            Matcher matcher = duNamePattern.matcher(deployedDUname);
            if (matcher.matches()) {
                setRetiredPackage(deployedDUname, true);
            }
        }
    }

    public Collection<QName> undeploy(final File dir) {
        return undeploy(dir.getName());
    }

    public Collection<QName> undeploy(final String duName) {
        try {
            exec(new Callable<Collection<QName>>() {
                public Collection<QName> call(ConfStoreConnection conn) {
                    DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName);
                    if (dudao != null)
                        dudao.delete();
                    return null;
                }
            });
        } catch (Exception ex) {
            __log.error("Error synchronizing with data store; " + duName + " may be reappear after restart!");
        }

        return undeployProcesses(duName);
    }

    public Collection<String> getPackages() {
        _rw.readLock().lock();
        try {
            return new ArrayList<String>(_deploymentUnits.keySet());
        } finally {
            _rw.readLock().unlock();
        }
    }

    public List<QName> listProcesses(String packageName) {
        _rw.readLock().lock();
        try {
            DeploymentUnitDir du = _deploymentUnits.get(packageName);
            if (du == null)
                return null;
            return toPids(du.getProcessNames(), du.getVersion());
        } finally {
            _rw.readLock().unlock();
        }
    }

    public void setState(final QName pid, final ProcessState state) {
        __log.debug("Changing process state for " + pid + " to " + state);

        final ProcessConfImpl pconf;

        _rw.readLock().lock();
        try {
            pconf = _processes.get(pid);
            if (pconf == null) {
                String msg = __msgs.msgProcessNotFound(pid);
                __log.info(msg);
                throw new ContextException(msg);
            }
        } finally {
            _rw.readLock().unlock();
        }

        final DeploymentUnitDir dudir = pconf.getDeploymentUnit();

        // Update in the database.
        ProcessState old = exec(new Callable<ProcessState>() {
            public ProcessState call(ConfStoreConnection conn) {
                DeploymentUnitDAO dudao = conn.getDeploymentUnit(dudir.getName());
                if (dudao == null) {
                    String errmsg = __msgs.msgProcessNotFound(pid);
                    __log.error(errmsg);
                    throw new ContextException(errmsg);
                }

                ProcessConfDAO dao = dudao.getProcess(pid);
                if (dao == null) {
                    String errmsg = __msgs.msgProcessNotFound(pid);
                    __log.error(errmsg);
                    throw new ContextException(errmsg);
                }

                Set processKeys = _processes.keySet();
                Iterator processConfQNameItr = processKeys.iterator();

                while(processConfQNameItr.hasNext()){
                    ProcessConf cachedProcessConf = _processes.get(processConfQNameItr.next());
                    if(dao.getType().equals(cachedProcessConf.getType())){
                        if (ProcessState.ACTIVE == cachedProcessConf.getState()
                                && ProcessState.RETIRED == dao.getState()
                                && ProcessState.ACTIVE == state) {
                            String errorMsg = "Can't activate the process with PID: " + dao.getPID() + " with version " + dao.getVersion() +
                                    ", as another version of the process with PID : " + cachedProcessConf.getProcessId() + " with version " +
                                    cachedProcessConf.getVersion() + " is already active.";
                            __log.error(errorMsg);
                            throw new ContextException(errorMsg);
                        }
                    }
                }

                ProcessState old = dao.getState();
                dao.setState(state);
                pconf.setState(state);
                return old;
            }
        });

        pconf.setState(state);
        if (old != null && old != state)
            fireStateChange(pid, state, pconf.getDeploymentUnit().getName());
    }

    public void setRetiredPackage(String packageName, boolean retired) {
        DeploymentUnitDir duDir = _deploymentUnits.get(packageName);
        if (duDir == null) throw new ContextException("Could not find package " + packageName);
        for (QName processName : duDir.getProcessNames()) {
            setState(toPid(processName, duDir.getVersion()), retired ? ProcessState.RETIRED : ProcessState.ACTIVE);
        }
    }

    public ProcessConf getProcessConfiguration(final QName processId) {
        _rw.readLock().lock();
        try {
            return _processes.get(processId);
        } finally {
            _rw.readLock().unlock();
        }
    }

    public void setProperty(final QName pid, final QName propName, final Node value) {
        setProperty(pid, propName, DOMUtils.domToStringLevel2(value));
    }

    public void setProperty(final QName pid, final QName propName, final String value) {
        if (__log.isDebugEnabled())
            __log.debug("Setting property " + propName + " on process " + pid);

        ProcessConfImpl pconf = _processes.get(pid);
        if (pconf == null) {
            String msg = __msgs.msgProcessNotFound(pid);
            __log.info(msg);
            throw new ContextException(msg);
        }

        final DeploymentUnitDir dudir = pconf.getDeploymentUnit();
        exec(new ProcessStoreImpl.Callable<Object>() {
            public Object call(ConfStoreConnection conn) {
                DeploymentUnitDAO dudao = conn.getDeploymentUnit(dudir.getName());
                if (dudao == null)
                    return null;
                ProcessConfDAO proc = dudao.getProcess(pid);
                if (proc == null)
                    return null;
                proc.setProperty(propName, value);
                return null;
            }
        });

        fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.PROPERTY_CHANGED, pid, dudir.getName()));
    }

    /**
     * Load all the deployment units out of the store. Called on start-up.
     *
     */
    public void loadAll() {
        final ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>();
        exec(new Callable<Object>() {
            public Object call(ConfStoreConnection conn) {
                Collection<DeploymentUnitDAO> dus = conn.getDeploymentUnits();
                for (DeploymentUnitDAO du : dus)
                    try {
                        loaded.addAll(load(du));
                    } catch (Exception ex) {
                        __log.error("Error loading DU from store: " + du.getName(), ex);
                    }
                return null;
            }
        });

        // Dispatch DISABLED, RETIRED and ACTIVE events in that order
        Collections.sort(loaded, new Comparator<ProcessConf>() {
            public int compare(ProcessConf o1, ProcessConf o2) {
                return stateValue(o1.getState()) - stateValue(o2.getState());
            }
            int stateValue(ProcessState state) {
                if (ProcessState.DISABLED.equals(state)) return 0;
                if (ProcessState.RETIRED.equals(state)) return 1;
                if (ProcessState.ACTIVE.equals(state)) return 2;
                throw new IllegalStateException("Unexpected process state: "+state);
            }
        });
        for (ProcessConfImpl p : loaded) {
            try {
                fireStateChange(p.getProcessId(), p.getState(), p.getDeploymentUnit().getName());
            } catch (Exception except) {
                __log.error("Error while activating process: pid=" + p.getProcessId() + " package="+p.getDeploymentUnit().getName(), except);
            }
        }

    }

    public List<QName> getProcesses() {
        _rw.readLock().lock();
        try {
            return new ArrayList<QName>(_processes.keySet());
        } finally {
            _rw.readLock().unlock();
        }
    }

    public long getCurrentVersion() {
        if (_currentVersion.get() != null){
            return _currentVersion.get();
        }

        long version = exec(new Callable<Long>() {
            public Long call(ConfStoreConnection conn) {
                return conn.getNextVersion();
            }
        });

        _currentVersion.set(version);
        return _currentVersion.get();
    }

    protected void fireEvent(ProcessStoreEvent pse) {
        __log.debug("firing event: " + pse);
        for (ProcessStoreListener psl : _listeners)
            psl.onProcessStoreEvent(pse);
    }

    protected void fireStateChange(QName processId, ProcessState state, String duname) {
        switch (state) {
            case ACTIVE:
                fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.ACTVIATED, processId, duname));
                break;
            case DISABLED:
                fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.DISABLED, processId, duname));
                break;
            case RETIRED:
                fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.RETIRED, processId, duname));
                break;
        }

    }

    public void registerListener(ProcessStoreListener psl) {
        __log.debug("Registering listener " + psl);
        _listeners.add(psl);
    }

    public void unregisterListener(ProcessStoreListener psl) {
        __log.debug("Unregistering listener " + psl);
        _listeners.remove(psl);
    }

    /**
     * Execute database transactions in an isolated context.
     *
     * @param <T>
     *            return type
     * @param callable
     *            transaction
     * @return
     */
    synchronized <T> T exec(Callable<T> callable) {
        // We want to submit db jobs to an executor to isolate
        // them from the current thread,
        Future<T> future = _executor.submit(callable);
        try {
            return future.get();
        } catch (Exception e) {
            throw new ContextException("DbError", e);
        }
    }

    private ConfStoreConnection getConnection() {
        return _cf.getConnection();
    }

    /**
     * Create a property mapping based on the initial values in the deployment descriptor.
     *
     * @param dd
     * @return
     */
    public static Map<QName, Node> calcInitialProperties(Properties properties, TDeployment.Process dd) {
        HashMap<QName, Node> ret = new HashMap<QName, Node>();

        for (Object key1 : properties.keySet()) {
            String key = (String) key1;
            Document doc = DOMUtils.newDocument();
            doc.appendChild(doc.createElementNS(null, "temporary-simple-type-wrapper"));
            doc.getDocumentElement().appendChild(doc.createTextNode(properties.getProperty(key)));

            ret.put(new QName(key), doc.getDocumentElement());
        }

        for (TDeployment.Process.Property property : dd.getPropertyArray()) {
            Element elmtContent = DOMUtils.getElementContent(property.getDomNode());
            if (elmtContent != null) {
                // We'll need DOM Level 3
                Document doc = DOMUtils.newDocument();
                doc.appendChild(doc.importNode(elmtContent, true));
                ret.put(property.getName(), doc.getDocumentElement());
            } else
                ret.put(property.getName(), property.getDomNode().getFirstChild());

        }
        return ret;
    }

    /**
     * Figure out the initial process state from the state in the deployment descriptor.
     *
     * @param dd
     *            deployment descriptor
     * @return
     */
    private static ProcessState calcInitialState(TDeployment.Process dd) {
        ProcessState state = ProcessState.ACTIVE;

        if (dd.isSetActive() && dd.getActive() == false)
            state = ProcessState.DISABLED;
        if (dd.isSetRetired() && dd.getRetired() == true)
            state = ProcessState.RETIRED;

        return state;
    }

    /**
     * Load a deployment unit record stored in the db into memory.
     *
     * @param dudao
     */
    protected List<ProcessConfImpl> load(DeploymentUnitDAO dudao) {
        __log.debug("Loading deployment unit record from db: " + dudao.getName());

        File dudir = findDeployDir(dudao);

        if (dudir == null || !dudir.exists())
            throw new ContextException("Deployed directory " + (dudir == null ? "(unknown)" : dudir) + " no longer there!");
        DeploymentUnitDir dud = new DeploymentUnitDir(dudir);
        // set the name with the one from database
        dud.setName(dudao.getName());
        dud.scan();

        ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>();

        _rw.writeLock().lock();
        try {
            _deploymentUnits.put(dud.getName(), dud);

            long version = 0;
            for (ProcessConfDAO p : dudao.getProcesses()) {
                TDeployment.Process pinfo = dud.getProcessDeployInfo(p.getType());
                if (pinfo == null) {
                    __log.warn("Cannot load " + p.getPID() + "; cannot find descriptor.");
                    continue;
                }

                Map<QName, Node> props = calcInitialProperties(dud.getProperties(), pinfo);
                // TODO: update the props based on the values in the DB.

                ProcessConfImpl pconf = new ProcessConfImpl(p.getPID(), p.getType(), p.getVersion(), dud, pinfo, dudao
                        .getDeployDate(), props, p.getState(), eprContext, _configDir, generateProcessEventsAll);
                version = p.getVersion();

                _processes.put(pconf.getProcessId(), pconf);
                loaded.add(pconf);
            }

            // All processes and the DU have the same version
            dud.setVersion(version);
        } finally {
            _rw.writeLock().unlock();
        }

        return loaded;
    }

    protected File findDeployDir(DeploymentUnitDAO dudao) {
        File f = new File(dudao.getDeploymentUnitDir());
        if (f.exists())
            return f;
        f = new File(_deployDir, dudao.getName());
        if (f.exists()) {
            try {
                dudao.setDeploymentUnitDir(f.getCanonicalPath());
            } catch (IOException e) {
                __log.warn("Could not update deployment unit directory for " + dudao.getName(), e);
            }
            return f;
        }

        return null;
    }

    /**
     * Make sure that the deployment unit is loaded.
     *
     * @param duName
     *            deployment unit name
     */
    protected boolean load(final String duName) {
        _rw.writeLock().lock();
        try {
            if (_deploymentUnits.containsKey(duName))
                return true;
        } finally {
            _rw.writeLock().unlock();
        }

        try {
            return exec(new Callable<Boolean>() {
                public Boolean call(ConfStoreConnection conn) {
                    DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName);
                    if (dudao == null)
                        return false;
                    load(dudao);
                    return true;
                }
            });
        } catch (Exception ex) {
            __log.error("Error loading deployment unit: " + duName);
            return false;
        }

    }

    /**
     * Wrapper for database transactions.
     *
     * @author Maciej Szefler
     *
     * @param <V>
     *            return type
     */
    abstract class Callable<V> implements java.util.concurrent.Callable<V> {
        public V call() {
            boolean success = false;
            // in JTA, transaction is bigger than the session
            _cf.beginTransaction();
            ConfStoreConnection conn = getConnection();
            try {
                V r = call(conn);
                _cf.commitTransaction();
                success = true;
                return r;
            } finally {
                if (!success)
                    try {
                        _cf.rollbackTransaction();
                    } catch (Exception ex) {
                        __log.error("DbError", ex);
                    }
            }
         // session is closed automatically when committed or rolled back under JTA
        }

        abstract V call(ConfStoreConnection conn);
    }

    public void setDeployDir(File depDir) {
        if (depDir != null) {
            if( !depDir.exists() ) {
                depDir.mkdirs();
                __log.warn("Deploy directory: " + depDir.getAbsolutePath() + " does not exist; created it.");
            } else if(!depDir.isDirectory()) {
                throw new IllegalArgumentException("Deploy directory is not a directory:  " + depDir);
            }
        }

        _deployDir = depDir;
    }

    public File getDeployDir() {
        return _deployDir;
    }

    public File getConfigDir() {
        return _configDir;
    }

    public void setConfigDir(File configDir) {
        if (configDir != null && !configDir.isDirectory())
            throw new IllegalArgumentException("Config directory is not a directory or does not exist: " + configDir);
        this._configDir = configDir;
    }

    public static DataSource createInternalDS(String guid) {
        JdbcDataSource h2 = new JdbcDataSource();
        h2.setURL("jdbc:h2:mem:" + new GUID().toString()+";DB_CLOSE_DELAY=-1");
        h2.setUser("sa");
        return h2;
    }

    public static void shutdownInternalDB(DataSource ds) {
        try {
            ds.getConnection().createStatement().execute("SHUTDOWN;");
        } catch (SQLException e) {
            __log.error("Error shutting down.", e);
        }
    }

    private List<QName> toPids(Collection<QName> processTypes, long version) {
        ArrayList<QName> result = new ArrayList<QName>();
        for (QName pqName : processTypes) {
            result.add(toPid(pqName, version));
        }
        return result;
    }

    protected QName toPid(QName processType, long version) {
        return new QName(processType.getNamespaceURI(), processType.getLocalPart() + "-" + version);
    }

    private class SimpleThreadFactory implements ThreadFactory {
        int threadNumber = 0;
        public Thread newThread(Runnable r) {
            threadNumber += 1;
            Thread t = new Thread(r, "ProcessStoreImpl-"+threadNumber);
            t.setDaemon(true);
            return t;
        }
    }

    public void refreshSchedules(String packageName) {
        List<QName> pids = listProcesses(packageName);
        if (pids != null) {
            for( QName pid : pids ) {
                fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.SCHEDULE_SETTINGS_CHANGED, pid, packageName));
            }
        }
    }

    protected Pattern getPreviousPackageVersionPattern(String duName) {
        //retire all the other versions of the same DU
        String[] nameParts = duName.split("/");
        /* Replace the version number (if any) with regexp to match any version number */
        nameParts[0] = nameParts[0].replaceAll("([-\\Q.\\E](\\d)+)?\\z", "");
        nameParts[0] += "([-\\Q.\\E](\\d)+)?";
        StringBuilder duNameRegExp = new StringBuilder(duName.length() * 2);
        for (int i = 0, n = nameParts.length; i < n; i++) {
            if (i > 0) duNameRegExp.append("/");
            duNameRegExp.append(nameParts[i]);
        }
        Pattern duNamePattern = Pattern.compile(duNameRegExp.toString());
        return duNamePattern;
    }

    protected  Collection<QName> undeployProcesses(final String duName) {
        Collection<QName> undeployed = Collections.emptyList();
        DeploymentUnitDir du;
        _rw.writeLock().lock();
        try {
            du = _deploymentUnits.remove(duName);
            if (du != null) {
                undeployed = toPids(du.getProcessNames(), du.getVersion());
            }

            for (QName pn : undeployed) {
                fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.UNDEPLOYED, pn, du.getName()));
                __log.info(__msgs.msgProcessUndeployed(pn));
            }

            _processes.keySet().removeAll(undeployed);
        } finally {
            _rw.writeLock().unlock();
        }
        return undeployed;
    }
}
