| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.ode.store; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.ode.bpel.compiler.api.CompilationException; |
| import org.apache.ode.bpel.dd.DeployDocument; |
| import org.apache.ode.bpel.dd.TDeployment; |
| import org.apache.ode.bpel.iapi.*; |
| import org.apache.ode.il.config.OdeConfigProperties; |
| import org.apache.ode.store.DeploymentUnitDir.CBPInfo; |
| import org.apache.ode.utils.DOMUtils; |
| import org.apache.ode.utils.GUID; |
| import org.apache.ode.utils.msg.MessageBundle; |
| import org.h2.jdbcx.JdbcDataSource; |
| import org.w3c.dom.Document; |
| import org.w3c.dom.Element; |
| import org.w3c.dom.Node; |
| |
| import javax.sql.DataSource; |
| import javax.xml.namespace.QName; |
| import java.io.File; |
| import java.io.IOException; |
| import java.sql.SQLException; |
| import java.util.*; |
| import java.util.concurrent.*; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| |
| /** |
| * <p> |
| * JDBC-based implementation of a process store. Also provides an "in-memory" store by way of H2 database. |
| * </p> |
| * |
| * <p> |
| * The philsophy here is to keep things simple. Process store operations are relatively infrequent. Performance of the public |
| * methods is not a concern. However, note that the {@link org.apache.ode.bpel.iapi.ProcessConf} objects returned by the class are |
| * going to be used from within the engine runtime, and hence their performance needs to be very good. Similarly, these objects |
| * should be immutable so as not to confuse the engine. |
| * |
| * Note the way that the database is used in this class, it is more akin to a recovery log, this is intentional: we want to start |
| * up, load stuff from the database and then pretty much forget about it when it comes to reads. |
| * |
| * @author Maciej Szefler <mszefler at gmail dot com> |
| * @author mriou <mriou at apache dot org> |
| */ |
| public class ProcessStoreImpl implements ProcessStore { |
| |
| private static final Logger __log = LoggerFactory.getLogger(ProcessStoreImpl.class); |
| |
| private static final Messages __msgs = MessageBundle.getMessages(Messages.class); |
| |
| private final CopyOnWriteArrayList<ProcessStoreListener> _listeners = new CopyOnWriteArrayList<ProcessStoreListener>(); |
| |
| protected Map<QName, ProcessConfImpl> _processes = new HashMap<QName, ProcessConfImpl>(); |
| |
| protected Map<String, DeploymentUnitDir> _deploymentUnits = new HashMap<String, DeploymentUnitDir>(); |
| |
| /** Guards access to the _processes and _deploymentUnits */ |
| private final ReadWriteLock _rw = new ReentrantReadWriteLock(); |
| |
| private ConfStoreConnectionFactory _cf; |
| |
| private EndpointReferenceContext eprContext; |
| |
| private boolean generateProcessEventsAll; |
| |
| protected File _deployDir; |
| |
| protected File _configDir; |
| |
| /** |
| * Executor used to process DB transactions. Allows us to isolate the TX context, and to ensure that only one TX gets executed a |
| * time. We don't really care to parallelize these operations because: i) HSQL does not isolate transactions and we don't want |
| * to get confused ii) we're already serializing all the operations with a read/write lock. iii) we don't care about |
| * performance, these are infrequent operations. |
| */ |
| private ExecutorService _executor = Executors.newSingleThreadExecutor(new SimpleThreadFactory()); |
| |
| /** |
| * In-memory DataSource, or <code>null</code> if we are using a real DS. We need this to shutdown the DB. |
| */ |
| private DataSource _inMemDs; |
| |
| private static final ThreadLocal<Long> _currentVersion = new ThreadLocal<Long>(); |
| |
| public ProcessStoreImpl() { |
| this(null, null, "", new OdeConfigProperties(new Properties(), ""), true); |
| } |
| |
| public ProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel) { |
| this.eprContext = eprContext; |
| this.generateProcessEventsAll = props.getProperty("generateProcessEvents", "all").equals("all"); |
| if (ds != null) { |
| // ugly hack |
| if (persistenceType.toLowerCase().indexOf("hib") != -1) { |
| _cf = new org.apache.ode.store.hib.DbConfStoreConnectionFactory(ds, props.getProperties(), createDatamodel, props.getTxFactoryClass()); |
| } else { |
| _cf = new org.apache.ode.store.jpa.DbConfStoreConnectionFactory(ds, props.getProperties(), createDatamodel, props.getTxFactoryClass()); |
| } |
| } else { |
| // If the datasource is not provided, then we create a H2-based |
| // in-memory database. Makes testing a bit simpler. |
| DataSource h2 = createInternalDS(new GUID().toString()); |
| if ("hibernate".equalsIgnoreCase(persistenceType)) { |
| _cf = new org.apache.ode.store.hib.DbConfStoreConnectionFactory(h2, props.getProperties(), createDatamodel, props.getTxFactoryClass()); |
| } else { |
| _cf = new org.apache.ode.store.jpa.DbConfStoreConnectionFactory(h2, props.getProperties(), createDatamodel, props.getTxFactoryClass()); |
| } |
| _inMemDs = h2; |
| } |
| |
| |
| } |
| |
| |
| /** |
| * Constructor that hardwires OpenJPA on a new in-memory database. Suitable for tests. |
| */ |
| public ProcessStoreImpl(EndpointReferenceContext eprContext, DataSource inMemDs) { |
| this.eprContext = eprContext; |
| DataSource h2 = createInternalDS(new GUID().toString()); |
| //when in memory we always create the model as we are starting from scratch |
| _cf = new org.apache.ode.store.jpa.DbConfStoreConnectionFactory(h2, true, OdeConfigProperties.DEFAULT_TX_FACTORY_CLASS_NAME); |
| _inMemDs = h2; |
| } |
| |
| public void shutdown() { |
| if (_inMemDs != null) { |
| shutdownInternalDB(_inMemDs); |
| _inMemDs = null; |
| } |
| if (_executor != null) { |
| _executor.shutdownNow(); |
| _executor = null; |
| } |
| } |
| |
| @Override |
| protected void finalize() throws Throwable { |
| // force a shutdown so that HSQL cleans up its mess. |
| try { |
| shutdown(); |
| } catch (Throwable t) { |
| ; // we tried, no worries. |
| } |
| super.finalize(); |
| } |
| |
| /** |
| * Deploys a process. |
| */ |
| public Collection<QName> deploy(final File deploymentUnitDirectory, boolean autoincrementVersion) { |
| return deploy(deploymentUnitDirectory, true, null, autoincrementVersion); |
| } |
| |
| public Collection<QName> deploy(final File deploymentUnitDirectory) { |
| return deploy(deploymentUnitDirectory, true, null, OdeGlobalConfig.autoincrementVersion()); |
| } |
| |
| /** |
| * Deploys a process. |
| */ |
| public Collection<QName> deploy(final File deploymentUnitDirectory, boolean activate, String duName, boolean autoincrementVersion) { |
| __log.info(__msgs.msgDeployStarting(deploymentUnitDirectory)); |
| |
| final Date deployDate = new Date(); |
| |
| // Create the DU and compile/scan it before acquiring lock. |
| final DeploymentUnitDir du = new DeploymentUnitDir(deploymentUnitDirectory); |
| if( duName != null ) { |
| // Override the package name if given from the parameter |
| du.setName(duName); |
| } |
| |
| long version; |
| if (autoincrementVersion || du.getStaticVersion() == -1) { |
| // Process and DU use a monotonically increased single version number by default. |
| try { |
| version = getCurrentVersion(); |
| } finally { |
| //we need to reset the current version thread local value. |
| _currentVersion.set(null); |
| } |
| } else { |
| version = du.getStaticVersion(); |
| } |
| du.setVersion(version); |
| |
| try { |
| du.compile(); |
| } catch (CompilationException ce) { |
| String errmsg = __msgs.msgDeployFailCompileErrors(ce); |
| __log.error(errmsg, ce); |
| throw new ContextException(errmsg, ce); |
| } |
| |
| du.scan(); |
| final DeployDocument dd = du.getDeploymentDescriptor(); |
| final ArrayList<ProcessConfImpl> processes = new ArrayList<ProcessConfImpl>(); |
| Collection<QName> deployed; |
| |
| _rw.writeLock().lock(); |
| |
| try { |
| if (_deploymentUnits.containsKey(du.getName())) { |
| String errmsg = __msgs.msgDeployFailDuplicateDU(du.getName()); |
| __log.error(errmsg); |
| throw new ContextException(errmsg); |
| } |
| |
| retirePreviousPackageVersions(du); |
| |
| for (TDeployment.Process processDD : dd.getDeploy().getProcessArray()) { |
| QName pid = toPid(processDD.getName(), version); |
| |
| if (_processes.containsKey(pid)) { |
| String errmsg = __msgs.msgDeployFailDuplicatePID(processDD.getName(), du.getName()); |
| __log.error(errmsg); |
| throw new ContextException(errmsg); |
| } |
| |
| QName type = processDD.getType() != null ? processDD.getType() : processDD.getName(); |
| |
| CBPInfo cbpInfo = du.getCBPInfo(type); |
| if (cbpInfo == null) { |
| String errmsg = __msgs.msgDeployFailedProcessNotFound(processDD.getName(), du.getName()); |
| __log.error(errmsg); |
| throw new ContextException(errmsg); |
| } |
| |
| ProcessConfImpl pconf = new ProcessConfImpl(pid, processDD.getName(), version, du, processDD, deployDate, |
| calcInitialProperties(du.getProperties(), processDD), calcInitialState(processDD), eprContext, _configDir, generateProcessEventsAll); |
| processes.add(pconf); |
| } |
| |
| _deploymentUnits.put(du.getName(), du); |
| |
| for (ProcessConfImpl process : processes) { |
| __log.info(__msgs.msgProcessDeployed(du.getDeployDir(), process.getProcessId())); |
| _processes.put(process.getProcessId(), process); |
| |
| } |
| |
| } finally { |
| _rw.writeLock().unlock(); |
| } |
| |
| // Do the deployment in the DB. We need this so that we remember deployments across system shutdowns. |
| // We don't fail if there is a DB error, simply print some errors. |
| deployed = exec(new Callable<Collection<QName>>() { |
| public Collection<QName> call(ConfStoreConnection conn) { |
| // Check that this deployment unit is not deployed. |
| DeploymentUnitDAO dudao = conn.getDeploymentUnit(du.getName()); |
| if (dudao != null) { |
| String errmsg = "Database out of synch for DU " + du.getName(); |
| __log.warn(errmsg); |
| dudao.delete(); |
| } |
| |
| dudao = conn.createDeploymentUnit(du.getName()); |
| try { |
| dudao.setDeploymentUnitDir(deploymentUnitDirectory.getCanonicalPath()); |
| } catch (IOException e1) { |
| String errmsg = "Error getting canonical path for " + du.getName() |
| + "; deployment unit will not be available after restart!"; |
| __log.error(errmsg); |
| |
| } |
| |
| ArrayList<QName> deployed = new ArrayList<QName>(); |
| // Going trough each process declared in the dd |
| for (ProcessConfImpl pc : processes) { |
| try { |
| ProcessConfDAO newDao = dudao.createProcess(pc.getProcessId(), pc.getType(), pc.getVersion()); |
| newDao.setState(pc.getState()); |
| for (Map.Entry<QName, Node> prop : pc.getProcessProperties().entrySet()) { |
| newDao.setProperty(prop.getKey(), DOMUtils.domToString(prop.getValue())); |
| } |
| deployed.add(pc.getProcessId()); |
| } catch (Throwable e) { |
| String errmsg = "Error persisting deployment record for " + pc.getProcessId() |
| + "; process will not be available after restart!"; |
| __log.error(errmsg, e); |
| } |
| } |
| return deployed; |
| } |
| |
| }); |
| |
| |
| _rw.readLock().lock(); |
| boolean readLockHeld = true; |
| try { |
| for (ProcessConfImpl process : processes) { |
| fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.DEPLOYED, process.getProcessId(), process.getDeploymentUnit() |
| .getName())); |
| fireStateChange(process.getProcessId(), process.getState(), process.getDeploymentUnit().getName()); |
| } |
| } catch (Exception e) { |
| //need to unlock as undeploy operation will need a writeLock |
| _rw.readLock().unlock(); |
| readLockHeld = false; |
| |
| // A problem at that point means that engine deployment failed, we don't want the store to keep the du |
| __log.warn("Deployment failed within the engine, store undeploying process.", e); |
| undeploy(deploymentUnitDirectory); |
| if (e instanceof ContextException) throw (ContextException) e; |
| else throw new ContextException("Deployment failed within the engine. " + e.getMessage(), e); |
| } finally { |
| if(readLockHeld) |
| _rw.readLock().unlock(); |
| } |
| |
| return deployed; |
| } |
| |
| /** |
| * Retire all the other versions of the same DU: |
| * first take the DU name and insert version regexp, |
| * than try to match the this string against names of already deployed DUs. |
| * For instance if we are deploying DU "AbsenceRequest-2/AbsenceRequest.ode" and |
| * there's already version 2 than regexp |
| * "AbsenceRequest([-\\.](\d)+)?/AbsenceRequest.ode" will be matched against |
| * "AbsenceRequest-2/AbsenceRequest.ode" and setRetirePackage() will be called accordingly. |
| */ |
| private void retirePreviousPackageVersions(DeploymentUnitDir du) { |
| |
| Pattern duNamePattern = getPreviousPackageVersionPattern(du.getName()); |
| |
| for (String deployedDUname : _deploymentUnits.keySet()) { |
| Matcher matcher = duNamePattern.matcher(deployedDUname); |
| if (matcher.matches()) { |
| setRetiredPackage(deployedDUname, true); |
| } |
| } |
| } |
| |
| public Collection<QName> undeploy(final File dir) { |
| return undeploy(dir.getName()); |
| } |
| |
| public Collection<QName> undeploy(final String duName) { |
| try { |
| exec(new Callable<Collection<QName>>() { |
| public Collection<QName> call(ConfStoreConnection conn) { |
| DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName); |
| if (dudao != null) |
| dudao.delete(); |
| return null; |
| } |
| }); |
| } catch (Exception ex) { |
| __log.error("Error synchronizing with data store; " + duName + " may be reappear after restart!"); |
| } |
| |
| return undeployProcesses(duName); |
| } |
| |
| public Collection<String> getPackages() { |
| _rw.readLock().lock(); |
| try { |
| return new ArrayList<String>(_deploymentUnits.keySet()); |
| } finally { |
| _rw.readLock().unlock(); |
| } |
| } |
| |
| public List<QName> listProcesses(String packageName) { |
| _rw.readLock().lock(); |
| try { |
| DeploymentUnitDir du = _deploymentUnits.get(packageName); |
| if (du == null) |
| return null; |
| return toPids(du.getProcessNames(), du.getVersion()); |
| } finally { |
| _rw.readLock().unlock(); |
| } |
| } |
| |
| public void setState(final QName pid, final ProcessState state) { |
| __log.debug("Changing process state for " + pid + " to " + state); |
| |
| final ProcessConfImpl pconf; |
| |
| _rw.readLock().lock(); |
| try { |
| pconf = _processes.get(pid); |
| if (pconf == null) { |
| String msg = __msgs.msgProcessNotFound(pid); |
| __log.info(msg); |
| throw new ContextException(msg); |
| } |
| } finally { |
| _rw.readLock().unlock(); |
| } |
| |
| final DeploymentUnitDir dudir = pconf.getDeploymentUnit(); |
| |
| // Update in the database. |
| ProcessState old = exec(new Callable<ProcessState>() { |
| public ProcessState call(ConfStoreConnection conn) { |
| DeploymentUnitDAO dudao = conn.getDeploymentUnit(dudir.getName()); |
| if (dudao == null) { |
| String errmsg = __msgs.msgProcessNotFound(pid); |
| __log.error(errmsg); |
| throw new ContextException(errmsg); |
| } |
| |
| ProcessConfDAO dao = dudao.getProcess(pid); |
| if (dao == null) { |
| String errmsg = __msgs.msgProcessNotFound(pid); |
| __log.error(errmsg); |
| throw new ContextException(errmsg); |
| } |
| |
| Set processKeys = _processes.keySet(); |
| Iterator processConfQNameItr = processKeys.iterator(); |
| |
| while(processConfQNameItr.hasNext()){ |
| ProcessConf cachedProcessConf = _processes.get(processConfQNameItr.next()); |
| if(dao.getType().equals(cachedProcessConf.getType())){ |
| if (ProcessState.ACTIVE == cachedProcessConf.getState() |
| && ProcessState.RETIRED == dao.getState() |
| && ProcessState.ACTIVE == state) { |
| String errorMsg = "Can't activate the process with PID: " + dao.getPID() + " with version " + dao.getVersion() + |
| ", as another version of the process with PID : " + cachedProcessConf.getProcessId() + " with version " + |
| cachedProcessConf.getVersion() + " is already active."; |
| __log.error(errorMsg); |
| throw new ContextException(errorMsg); |
| } |
| } |
| } |
| |
| ProcessState old = dao.getState(); |
| dao.setState(state); |
| pconf.setState(state); |
| return old; |
| } |
| }); |
| |
| pconf.setState(state); |
| if (old != null && old != state) |
| fireStateChange(pid, state, pconf.getDeploymentUnit().getName()); |
| } |
| |
| public void setRetiredPackage(String packageName, boolean retired) { |
| DeploymentUnitDir duDir = _deploymentUnits.get(packageName); |
| if (duDir == null) throw new ContextException("Could not find package " + packageName); |
| for (QName processName : duDir.getProcessNames()) { |
| setState(toPid(processName, duDir.getVersion()), retired ? ProcessState.RETIRED : ProcessState.ACTIVE); |
| } |
| } |
| |
| public ProcessConf getProcessConfiguration(final QName processId) { |
| _rw.readLock().lock(); |
| try { |
| return _processes.get(processId); |
| } finally { |
| _rw.readLock().unlock(); |
| } |
| } |
| |
| public void setProperty(final QName pid, final QName propName, final Node value) { |
| setProperty(pid, propName, DOMUtils.domToStringLevel2(value)); |
| } |
| |
| public void setProperty(final QName pid, final QName propName, final String value) { |
| if (__log.isDebugEnabled()) |
| __log.debug("Setting property " + propName + " on process " + pid); |
| |
| ProcessConfImpl pconf = _processes.get(pid); |
| if (pconf == null) { |
| String msg = __msgs.msgProcessNotFound(pid); |
| __log.info(msg); |
| throw new ContextException(msg); |
| } |
| |
| final DeploymentUnitDir dudir = pconf.getDeploymentUnit(); |
| exec(new ProcessStoreImpl.Callable<Object>() { |
| public Object call(ConfStoreConnection conn) { |
| DeploymentUnitDAO dudao = conn.getDeploymentUnit(dudir.getName()); |
| if (dudao == null) |
| return null; |
| ProcessConfDAO proc = dudao.getProcess(pid); |
| if (proc == null) |
| return null; |
| proc.setProperty(propName, value); |
| return null; |
| } |
| }); |
| |
| fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.PROPERTY_CHANGED, pid, dudir.getName())); |
| } |
| |
| /** |
| * Load all the deployment units out of the store. Called on start-up. |
| * |
| */ |
| public void loadAll() { |
| final ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>(); |
| exec(new Callable<Object>() { |
| public Object call(ConfStoreConnection conn) { |
| Collection<DeploymentUnitDAO> dus = conn.getDeploymentUnits(); |
| for (DeploymentUnitDAO du : dus) |
| try { |
| loaded.addAll(load(du)); |
| } catch (Exception ex) { |
| __log.error("Error loading DU from store: " + du.getName(), ex); |
| } |
| return null; |
| } |
| }); |
| |
| // Dispatch DISABLED, RETIRED and ACTIVE events in that order |
| Collections.sort(loaded, new Comparator<ProcessConf>() { |
| public int compare(ProcessConf o1, ProcessConf o2) { |
| return stateValue(o1.getState()) - stateValue(o2.getState()); |
| } |
| int stateValue(ProcessState state) { |
| if (ProcessState.DISABLED.equals(state)) return 0; |
| if (ProcessState.RETIRED.equals(state)) return 1; |
| if (ProcessState.ACTIVE.equals(state)) return 2; |
| throw new IllegalStateException("Unexpected process state: "+state); |
| } |
| }); |
| for (ProcessConfImpl p : loaded) { |
| try { |
| fireStateChange(p.getProcessId(), p.getState(), p.getDeploymentUnit().getName()); |
| } catch (Exception except) { |
| __log.error("Error while activating process: pid=" + p.getProcessId() + " package="+p.getDeploymentUnit().getName(), except); |
| } |
| } |
| |
| } |
| |
| public List<QName> getProcesses() { |
| _rw.readLock().lock(); |
| try { |
| return new ArrayList<QName>(_processes.keySet()); |
| } finally { |
| _rw.readLock().unlock(); |
| } |
| } |
| |
| public long getCurrentVersion() { |
| if (_currentVersion.get() != null){ |
| return _currentVersion.get(); |
| } |
| |
| long version = exec(new Callable<Long>() { |
| public Long call(ConfStoreConnection conn) { |
| return conn.getNextVersion(); |
| } |
| }); |
| |
| _currentVersion.set(version); |
| return _currentVersion.get(); |
| } |
| |
| protected void fireEvent(ProcessStoreEvent pse) { |
| __log.debug("firing event: " + pse); |
| for (ProcessStoreListener psl : _listeners) |
| psl.onProcessStoreEvent(pse); |
| } |
| |
| protected void fireStateChange(QName processId, ProcessState state, String duname) { |
| switch (state) { |
| case ACTIVE: |
| fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.ACTVIATED, processId, duname)); |
| break; |
| case DISABLED: |
| fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.DISABLED, processId, duname)); |
| break; |
| case RETIRED: |
| fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.RETIRED, processId, duname)); |
| break; |
| } |
| |
| } |
| |
| public void registerListener(ProcessStoreListener psl) { |
| __log.debug("Registering listener " + psl); |
| _listeners.add(psl); |
| } |
| |
| public void unregisterListener(ProcessStoreListener psl) { |
| __log.debug("Unregistering listener " + psl); |
| _listeners.remove(psl); |
| } |
| |
| /** |
| * Execute database transactions in an isolated context. |
| * |
| * @param <T> |
| * return type |
| * @param callable |
| * transaction |
| * @return |
| */ |
| synchronized <T> T exec(Callable<T> callable) { |
| // We want to submit db jobs to an executor to isolate |
| // them from the current thread, |
| Future<T> future = _executor.submit(callable); |
| try { |
| return future.get(); |
| } catch (Exception e) { |
| throw new ContextException("DbError", e); |
| } |
| } |
| |
| private ConfStoreConnection getConnection() { |
| return _cf.getConnection(); |
| } |
| |
| /** |
| * Create a property mapping based on the initial values in the deployment descriptor. |
| * |
| * @param dd |
| * @return |
| */ |
| public static Map<QName, Node> calcInitialProperties(Properties properties, TDeployment.Process dd) { |
| HashMap<QName, Node> ret = new HashMap<QName, Node>(); |
| |
| for (Object key1 : properties.keySet()) { |
| String key = (String) key1; |
| Document doc = DOMUtils.newDocument(); |
| doc.appendChild(doc.createElementNS(null, "temporary-simple-type-wrapper")); |
| doc.getDocumentElement().appendChild(doc.createTextNode(properties.getProperty(key))); |
| |
| ret.put(new QName(key), doc.getDocumentElement()); |
| } |
| |
| for (TDeployment.Process.Property property : dd.getPropertyArray()) { |
| Element elmtContent = DOMUtils.getElementContent(property.getDomNode()); |
| if (elmtContent != null) { |
| // We'll need DOM Level 3 |
| Document doc = DOMUtils.newDocument(); |
| doc.appendChild(doc.importNode(elmtContent, true)); |
| ret.put(property.getName(), doc.getDocumentElement()); |
| } else |
| ret.put(property.getName(), property.getDomNode().getFirstChild()); |
| |
| } |
| return ret; |
| } |
| |
| /** |
| * Figure out the initial process state from the state in the deployment descriptor. |
| * |
| * @param dd |
| * deployment descriptor |
| * @return |
| */ |
| private static ProcessState calcInitialState(TDeployment.Process dd) { |
| ProcessState state = ProcessState.ACTIVE; |
| |
| if (dd.isSetActive() && dd.getActive() == false) |
| state = ProcessState.DISABLED; |
| if (dd.isSetRetired() && dd.getRetired() == true) |
| state = ProcessState.RETIRED; |
| |
| return state; |
| } |
| |
| /** |
| * Load a deployment unit record stored in the db into memory. |
| * |
| * @param dudao |
| */ |
| protected List<ProcessConfImpl> load(DeploymentUnitDAO dudao) { |
| __log.debug("Loading deployment unit record from db: " + dudao.getName()); |
| |
| File dudir = findDeployDir(dudao); |
| |
| if (dudir == null || !dudir.exists()) |
| throw new ContextException("Deployed directory " + (dudir == null ? "(unknown)" : dudir) + " no longer there!"); |
| DeploymentUnitDir dud = new DeploymentUnitDir(dudir); |
| // set the name with the one from database |
| dud.setName(dudao.getName()); |
| dud.scan(); |
| |
| ArrayList<ProcessConfImpl> loaded = new ArrayList<ProcessConfImpl>(); |
| |
| _rw.writeLock().lock(); |
| try { |
| _deploymentUnits.put(dud.getName(), dud); |
| |
| long version = 0; |
| for (ProcessConfDAO p : dudao.getProcesses()) { |
| TDeployment.Process pinfo = dud.getProcessDeployInfo(p.getType()); |
| if (pinfo == null) { |
| __log.warn("Cannot load " + p.getPID() + "; cannot find descriptor."); |
| continue; |
| } |
| |
| Map<QName, Node> props = calcInitialProperties(dud.getProperties(), pinfo); |
| // TODO: update the props based on the values in the DB. |
| |
| ProcessConfImpl pconf = new ProcessConfImpl(p.getPID(), p.getType(), p.getVersion(), dud, pinfo, dudao |
| .getDeployDate(), props, p.getState(), eprContext, _configDir, generateProcessEventsAll); |
| version = p.getVersion(); |
| |
| _processes.put(pconf.getProcessId(), pconf); |
| loaded.add(pconf); |
| } |
| |
| // All processes and the DU have the same version |
| dud.setVersion(version); |
| } finally { |
| _rw.writeLock().unlock(); |
| } |
| |
| return loaded; |
| } |
| |
| protected File findDeployDir(DeploymentUnitDAO dudao) { |
| File f = new File(dudao.getDeploymentUnitDir()); |
| if (f.exists()) |
| return f; |
| f = new File(_deployDir, dudao.getName()); |
| if (f.exists()) { |
| try { |
| dudao.setDeploymentUnitDir(f.getCanonicalPath()); |
| } catch (IOException e) { |
| __log.warn("Could not update deployment unit directory for " + dudao.getName(), e); |
| } |
| return f; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Make sure that the deployment unit is loaded. |
| * |
| * @param duName |
| * deployment unit name |
| */ |
| protected boolean load(final String duName) { |
| _rw.writeLock().lock(); |
| try { |
| if (_deploymentUnits.containsKey(duName)) |
| return true; |
| } finally { |
| _rw.writeLock().unlock(); |
| } |
| |
| try { |
| return exec(new Callable<Boolean>() { |
| public Boolean call(ConfStoreConnection conn) { |
| DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName); |
| if (dudao == null) |
| return false; |
| load(dudao); |
| return true; |
| } |
| }); |
| } catch (Exception ex) { |
| __log.error("Error loading deployment unit: " + duName); |
| return false; |
| } |
| |
| } |
| |
| /** |
| * Wrapper for database transactions. |
| * |
| * @author Maciej Szefler |
| * |
| * @param <V> |
| * return type |
| */ |
| abstract class Callable<V> implements java.util.concurrent.Callable<V> { |
| public V call() { |
| boolean success = false; |
| // in JTA, transaction is bigger than the session |
| _cf.beginTransaction(); |
| ConfStoreConnection conn = getConnection(); |
| try { |
| V r = call(conn); |
| _cf.commitTransaction(); |
| success = true; |
| return r; |
| } finally { |
| if (!success) |
| try { |
| _cf.rollbackTransaction(); |
| } catch (Exception ex) { |
| __log.error("DbError", ex); |
| } |
| } |
| // session is closed automatically when committed or rolled back under JTA |
| } |
| |
| abstract V call(ConfStoreConnection conn); |
| } |
| |
| public void setDeployDir(File depDir) { |
| if (depDir != null) { |
| if( !depDir.exists() ) { |
| depDir.mkdirs(); |
| __log.warn("Deploy directory: " + depDir.getAbsolutePath() + " does not exist; created it."); |
| } else if(!depDir.isDirectory()) { |
| throw new IllegalArgumentException("Deploy directory is not a directory: " + depDir); |
| } |
| } |
| |
| _deployDir = depDir; |
| } |
| |
| public File getDeployDir() { |
| return _deployDir; |
| } |
| |
| public File getConfigDir() { |
| return _configDir; |
| } |
| |
| public void setConfigDir(File configDir) { |
| if (configDir != null && !configDir.isDirectory()) |
| throw new IllegalArgumentException("Config directory is not a directory or does not exist: " + configDir); |
| this._configDir = configDir; |
| } |
| |
| public static DataSource createInternalDS(String guid) { |
| JdbcDataSource h2 = new JdbcDataSource(); |
| h2.setURL("jdbc:h2:mem:" + new GUID().toString()+";DB_CLOSE_DELAY=-1"); |
| h2.setUser("sa"); |
| return h2; |
| } |
| |
| public static void shutdownInternalDB(DataSource ds) { |
| try { |
| ds.getConnection().createStatement().execute("SHUTDOWN;"); |
| } catch (SQLException e) { |
| __log.error("Error shutting down.", e); |
| } |
| } |
| |
| private List<QName> toPids(Collection<QName> processTypes, long version) { |
| ArrayList<QName> result = new ArrayList<QName>(); |
| for (QName pqName : processTypes) { |
| result.add(toPid(pqName, version)); |
| } |
| return result; |
| } |
| |
| protected QName toPid(QName processType, long version) { |
| return new QName(processType.getNamespaceURI(), processType.getLocalPart() + "-" + version); |
| } |
| |
| private class SimpleThreadFactory implements ThreadFactory { |
| int threadNumber = 0; |
| public Thread newThread(Runnable r) { |
| threadNumber += 1; |
| Thread t = new Thread(r, "ProcessStoreImpl-"+threadNumber); |
| t.setDaemon(true); |
| return t; |
| } |
| } |
| |
| public void refreshSchedules(String packageName) { |
| List<QName> pids = listProcesses(packageName); |
| if (pids != null) { |
| for( QName pid : pids ) { |
| fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.SCHEDULE_SETTINGS_CHANGED, pid, packageName)); |
| } |
| } |
| } |
| |
| protected Pattern getPreviousPackageVersionPattern(String duName) { |
| //retire all the other versions of the same DU |
| String[] nameParts = duName.split("/"); |
| /* Replace the version number (if any) with regexp to match any version number */ |
| nameParts[0] = nameParts[0].replaceAll("([-\\Q.\\E](\\d)+)?\\z", ""); |
| nameParts[0] += "([-\\Q.\\E](\\d)+)?"; |
| StringBuilder duNameRegExp = new StringBuilder(duName.length() * 2); |
| for (int i = 0, n = nameParts.length; i < n; i++) { |
| if (i > 0) duNameRegExp.append("/"); |
| duNameRegExp.append(nameParts[i]); |
| } |
| Pattern duNamePattern = Pattern.compile(duNameRegExp.toString()); |
| return duNamePattern; |
| } |
| |
| protected Collection<QName> undeployProcesses(final String duName) { |
| Collection<QName> undeployed = Collections.emptyList(); |
| DeploymentUnitDir du; |
| _rw.writeLock().lock(); |
| try { |
| du = _deploymentUnits.remove(duName); |
| if (du != null) { |
| undeployed = toPids(du.getProcessNames(), du.getVersion()); |
| } |
| |
| for (QName pn : undeployed) { |
| fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.UNDEPLOYED, pn, du.getName())); |
| __log.info(__msgs.msgProcessUndeployed(pn)); |
| } |
| |
| _processes.keySet().removeAll(undeployed); |
| } finally { |
| _rw.writeLock().unlock(); |
| } |
| return undeployed; |
| } |
| } |