Merge from fork branch sudharma/ODECluster
diff --git a/Rakefile b/Rakefile
index a8c5f06..ac9b74e 100644
--- a/Rakefile
+++ b/Rakefile
@@ -85,7 +85,7 @@
   desc "ODE Axis Integration Layer"
   define "axis2" do
     compile.with projects("bpel-api", "bpel-connector", "bpel-dao", "bpel-epr", "bpel-runtime",
-      "scheduler-simple", "bpel-schemas", "bpel-store", "utils", "agents"),
+      "scheduler-simple", "bpel-schemas", "bpel-store", "utils", "agents", "clustering"),
       AXIOM, AXIS2_ALL, COMMONS.lang, COMMONS.collections, COMMONS.httpclient, COMMONS.lang,
       DERBY, GERONIMO.kernel, GERONIMO.transaction, JAVAX.activation, JAVAX.servlet, JAVAX.stream,
       JAVAX.transaction, JENCKS, WSDL4J, WS_COMMONS, XMLBEANS, AXIS2_MODULES.libs, SLF4J, LOG4J
@@ -101,12 +101,12 @@
     libs = projects("axis2", "bpel-api", "bpel-compiler", "bpel-connector", "bpel-dao",
       "bpel-epr", "bpel-nobj", "bpel-ql", "bpel-runtime", "scheduler-simple",
       "bpel-schemas", "bpel-store", "dao-hibernate", "jca-ra", "jca-server",
-      "utils", "dao-jpa", "agents"),
+      "utils", "dao-jpa", "agents", "clustering"),
       AXIS2_ALL, ANNONGEN, BACKPORT, COMMONS.codec, COMMONS.collections, COMMONS.fileupload, COMMONS.io, COMMONS.httpclient, COMMONS.beanutils,
       COMMONS.lang, COMMONS.pool, DERBY, DERBY_TOOLS, JACOB, JAXEN, JAVAX.activation, JAVAX.ejb, JAVAX.javamail,
       JAVAX.connector, JAVAX.jms, JAVAX.persistence, JAVAX.transaction, JAVAX.stream,  JIBX,
       GERONIMO.connector, GERONIMO.kernel, GERONIMO.transaction, LOG4J, OPENJPA, SAXON, TRANQL,
-      WOODSTOX, WSDL4J, WS_COMMONS, XALAN, XERCES, XMLBEANS, SPRING, AXIS2_MODULES.libs, SLF4J, LOG4J
+      WOODSTOX, WSDL4J, WS_COMMONS, XALAN, XERCES, XMLBEANS, SPRING, AXIS2_MODULES.libs, SLF4J, LOG4J, HAZELCAST
 
     package(:war).with(:libs=>libs).path("WEB-INF").tap do |web_inf|
       web_inf.merge project("dao-jpa-ojpa-derby").package(:zip)
@@ -208,6 +208,12 @@
     package :jar
   end
 
+  desc "ODE Clustering"
+   define "clustering" do
+     compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging
+     package :jar
+   end
+
   desc "ODE BPEL Object Model"
   define "bpel-obj" do
     compile.with project("utils"), SAXON, WSDL4J, COMMONS.collections
@@ -233,12 +239,12 @@
 
   desc "ODE Runtime Engine"
   define "bpel-runtime" do
+
     compile.with projects("bpel-api", "bpel-compiler", "bpel-dao", "bpel-epr", "bpel-nobj", "bpel-schemas",
-      "bpel-store", "utils", "agents"),
+      "bpel-store", "utils", "agents","clustering"),
       COMMONS.collections, COMMONS.httpclient, JACOB, JAVAX.persistence, JAVAX.stream, JAXEN, SAXON, WSDL4J, XMLBEANS, SPRING, SLF4J, LOG4J,
 	  JACKSON, JAVAX.connector
 
-
     test.with projects("scheduler-simple", "dao-jpa", "dao-hibernate", "bpel-epr", "bpel-obj"),
 #         BACKPORT, COMMONS.pool, COMMONS.lang, COMMONS.io, DERBY, JAVAX.connector, JAVAX.transaction,
         GERONIMO.transaction, GERONIMO.kernel, GERONIMO.connector, TRANQL, HSQLDB, JAVAX.ejb, JAVAX.transaction,
diff --git a/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml b/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml
new file mode 100644
index 0000000..bf1e99e
--- /dev/null
+++ b/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
+  ~
+  ~ Licensed under the Apache License, Version 2.0 (the "License");
+  ~ you may not use this file except in compliance with the License.
+  ~ You may obtain a copy of the License at
+  ~
+  ~ http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.4.xsd"
+           xmlns="http://www.hazelcast.com/schema/config"
+           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+   <network>
+        <port auto-increment="true" port-count="100">5701</port>
+        <outbound-ports>
+            <ports>0</ports>
+        </outbound-ports>
+        <reuse-address>false</reuse-address>
+        <join>
+            <multicast enabled="false">
+                <multicast-group>224.2.2.3</multicast-group>
+                <multicast-port>54327</multicast-port>
+            </multicast>
+            <tcp-ip enabled="true">
+                <member>127.0.0.1:5701</member>
+                <member>127.0.0.1:5702</member>
+            </tcp-ip>
+            <aws enabled="false">
+                <access-key>my-access-key</access-key>
+                <secret-key>my-secret-key</secret-key>
+                <region>us-west-1</region>
+                <host-header>ec2.amazonaws.com</host-header>
+                <security-group-name>hazelcast-sg</security-group-name>
+                <tag-key>type</tag-key>
+                <tag-value>hz-nodes</tag-value>
+            <multicast enabled="false">
+                <multicast-group>224.2.2.3</multicast-group>
+                <multicast-port>54327</multicast-port>
+            </multicast>
+            </aws>
+        </join>
+        <interfaces enabled="false">
+            <interface>10.10.1.*</interface>
+        </interfaces>
+        <ssl enabled="false" />
+        <socket-interceptor enabled="false" />
+    </network>
+    <partition-group enabled="false"/>
+    <map name="ODE_DEPLOYMENT_LOCK"></map>
+    <map name="ODE_PROCESS_INSTANCE_LOCK"></map>
+    <topic name="ODE_DEPLOYMENT_TOPIC"></topic>
+</hazelcast>
+
+
+
diff --git a/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties b/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
index 253037c..03ac79c 100644
--- a/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
+++ b/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
@@ -94,4 +94,10 @@
 
 ## Event listeners
 #ode-axis2.event.listeners=
-#ode-axis2.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener
\ No newline at end of file
+#ode-axis2.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener
+
+## Enable clustering
+#ode-axis2.clustering.enabled=true
+
+## Clustering Implementation class.
+#ode-axis2.clustering.impl.class = org.apache.ode.clustering.hazelcast.HazelcastClusterImpl
diff --git a/axis2/src/main/java/org/apache/ode/axis2/Messages.java b/axis2/src/main/java/org/apache/ode/axis2/Messages.java
index a95c30d..0581c72 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/Messages.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/Messages.java
@@ -58,6 +58,10 @@
         return format("Starting ODE ServiceEngine.");
     }
 
+    public String msgOdeClusteringNotInitialized() {
+        return format("Clustering has not been initialized.");
+    }
+
     public String msgOdeStarted() {
         return format("ODE Service Engine has been started.");
     }
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index 7cbf142..0a13c4a 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -19,57 +19,27 @@
 
 package org.apache.ode.axis2;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.StringTokenizer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.sql.DataSource;
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAResource;
-
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.util.IdleConnectionTimeoutThread;
 import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
+import org.apache.commons.httpclient.util.IdleConnectionTimeoutThread;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.axis2.deploy.DeploymentPoller;
 import org.apache.ode.axis2.service.DeploymentWebService;
 import org.apache.ode.axis2.service.ManagementService;
 import org.apache.ode.axis2.util.ClusterUrlTransformer;
+import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ClusterMemberListener;
+import org.apache.ode.bpel.clapi.ClusterProcessStore;
 import org.apache.ode.bpel.connector.BpelServerConnector;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
 import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy;
 import org.apache.ode.bpel.engine.cron.CronScheduler;
 import org.apache.ode.bpel.extvar.jdbc.JdbcExternalVariableModule;
-import org.apache.ode.bpel.iapi.BpelEventListener;
-import org.apache.ode.bpel.iapi.EndpointReferenceContext;
-import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.ProcessStoreEvent;
-import org.apache.ode.bpel.iapi.ProcessStoreListener;
-import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.*;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
 import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
 import org.apache.ode.bpel.pmapi.InstanceManagement;
@@ -78,10 +48,25 @@
 import org.apache.ode.il.dbutil.Database;
 import org.apache.ode.scheduler.simple.JdbcDelegate;
 import org.apache.ode.scheduler.simple.SimpleScheduler;
+import org.apache.ode.store.ClusterProcessStoreImpl;
 import org.apache.ode.store.ProcessStoreImpl;
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.fs.TempFileManager;
 
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.sql.DataSource;
+import javax.transaction.*;
+import javax.transaction.xa.XAResource;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
 /**
  * Server class called by our Axis hooks to handle all ODE lifecycle management.
  *
@@ -120,6 +105,8 @@
 
     protected Database _db;
 
+    protected ClusterManager _clusterManager;
+
     private DeploymentPoller _poller;
 
     private BpelServerConnector _connector;
@@ -133,6 +120,8 @@
     
     public Runnable txMgrCreatedCallback;
 
+    private boolean clusteringEnabled = false;
+
     public void init(ServletConfig config, ConfigurationContext configContext) throws ServletException {
         init(config.getServletContext().getRealPath("/WEB-INF"), configContext);
     }
@@ -184,6 +173,12 @@
         if (txMgrCreatedCallback != null) {
             txMgrCreatedCallback.run();
         }
+
+        clusteringEnabled = _odeConfig.isClusteringEnabled();
+        if (clusteringEnabled) {
+            initClustering();
+        } else __log.info(__msgs.msgOdeClusteringNotInitialized());
+
         __log.debug("Creating data source.");
         initDataSource();
         __log.debug("Starting DAO.");
@@ -202,6 +197,12 @@
         registerExternalVariableModules();
 
         _store.loadAll();
+        if (_clusterManager != null) {
+            _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler);
+            _clusterManager.setClusterProcessStore((ClusterProcessStore) _store);
+            _clusterManager.init(_configRoot);
+           ((SimpleScheduler)_scheduler).setNodeId(_clusterManager.getNodeID());
+        }
 
         try {
             _bpelServer.start();
@@ -221,7 +222,7 @@
 
         try {
             __log.debug("Initializing Deployment Web Service");
-            new DeploymentWebService().enableService(_configContext.getAxisConfiguration(), _store, _poller, _appRoot.getAbsolutePath(), _workRoot.getAbsolutePath());
+            new DeploymentWebService().enableService(_configContext.getAxisConfiguration(), _store, _poller, _appRoot.getAbsolutePath(), _workRoot.getAbsolutePath(),this);
         } catch (Exception e) {
             throw new ServletException(e);
         }
@@ -371,6 +372,17 @@
                 _txMgr = null;
             }
 
+            if (_clusterManager != null) {
+                try {
+                    __log.debug("shutting down cluster manager.");
+                    _clusterManager.shutdown();
+                    _clusterManager = null;
+                } catch (Exception ex) {
+                    __log.debug("Cluster manager shutdown failed.", ex);
+
+                }
+            }
+
             if (_connector != null) {
                 try {
                     __log.debug("shutdown BpelConnector");
@@ -455,6 +467,24 @@
         }
     }
 
+    public boolean isClusteringEnabled() {
+        return clusteringEnabled;
+    }
+
+    /**
+     * Initialize the clustering if it is enabled
+     */
+    private void initClustering() {
+        String clusterImplName = _odeConfig.getClusteringImplClass();
+        try {
+            Class<?> clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName);
+            _clusterManager = (ClusterManager) clusterImplClass.newInstance();
+        } catch (Exception ex) {
+            __log.error("Error while loading class : " + clusterImplName, ex);
+        }
+    }
+
+
     /**
      * Initialize the DAO.
      *
@@ -477,18 +507,24 @@
         _store.registerListener(new ProcessStoreListenerImpl());
         _store.setDeployDir(
                 _odeConfig.getDeployDir() != null ?
-                    new File(_odeConfig.getDeployDir()) :
-                    new File(_workRoot, "processes"));
+                        new File(_odeConfig.getDeployDir()) :
+                        new File(_workRoot, "processes"));
         _store.setConfigDir(_configRoot);
     }
 
     protected ProcessStoreImpl createProcessStore(EndpointReferenceContext eprContext, DataSource ds) {
-        return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false);
+        if (clusteringEnabled)
+            return new ClusterProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false, _clusterManager);
+        else return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false);
     }
 
     protected Scheduler createScheduler() {
-        SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(),
-                new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
+        SimpleScheduler scheduler;
+        if (clusteringEnabled) {
+            scheduler = new SimpleScheduler(_clusterManager.getNodeID(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), clusteringEnabled);
+            scheduler.setClusterManager(_clusterManager);
+        } else
+            scheduler = new SimpleScheduler(new GUID().toString(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
         scheduler.setExecutorService(_executorService);
         scheduler.setTransactionManager(_txMgr);
         return scheduler;
@@ -533,6 +569,7 @@
         _bpelServer.setCronScheduler(_cronScheduler);
 
         _bpelServer.setDaoConnectionFactory(_daoCF);
+        _bpelServer.setClusterManagerImpl(_clusterManager);
         _bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler, _odeConfig.getInMemMexTtl()));
         _bpelServer.setEndpointReferenceContext(eprContext);
         _bpelServer.setMessageExchangeContext(new MessageExchangeContextImpl(this));
diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
index 6f82385..169ca4f 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
@@ -41,6 +41,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.axis2.ODEServer;
+import org.apache.ode.bpel.clapi.ClusterLock;
 import org.apache.ode.bpel.engine.cron.CronScheduler;
 import org.apache.ode.bpel.engine.cron.SystemSchedulesConfig;
 import org.apache.ode.utils.WatchDog;
@@ -48,7 +49,6 @@
 import javax.xml.namespace.QName;
 import java.io.File;
 import java.io.FileFilter;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
@@ -74,6 +74,8 @@
 
     private SystemSchedulesConfig _systemSchedulesConf;
 
+    private boolean clusterEnabled;
+
     @SuppressWarnings("unchecked")
     private Map<String, WatchDog> dDWatchDogsByPath = new HashMap<String, WatchDog>();
     @SuppressWarnings("unchecked")
@@ -96,6 +98,7 @@
     public DeploymentPoller(File deployDir, final ODEServer odeServer) {
         _odeServer = odeServer;
         _deployDir = deployDir;
+        clusterEnabled = _odeServer.isClusteringEnabled();
         if (!_deployDir.exists()) {
             boolean isDeployDirCreated = _deployDir.mkdir();
             if (!isDeployDirCreated) {
@@ -130,50 +133,66 @@
     @SuppressWarnings("unchecked")
     private void check() {
         File[] files = _deployDir.listFiles(_fileFilter);
+        boolean duLocked;
 
         // Checking for new deployment directories
         if (isDeploymentFromODEFileSystemAllowed() && files != null) {
             for (File file : files) {
-                File deployXml = new File(file, "deploy.xml");
-                File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
-
-                if (!deployXml.exists()) {
-                    // Skip if deploy.xml is abset
-                    if (__log.isDebugEnabled()) {
-                        __log.debug("Not deploying " + file + " (missing deploy.xml)");
-                    }
+                String duName = file.getName();
+                if (__log.isDebugEnabled()) {
+                __log.debug("Trying to acquire the lock for " + duName);
                 }
+                duLocked = pollerTryLock(duName);
 
-                WatchDog ddWatchDog = ensureDeployXmlWatchDog(file, deployXml);
+                if (duLocked) {
+                    try {
+                        File deployXml = new File(file, "deploy.xml");
+                        File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
 
-                if (deployedMarker.exists()) {
-                    checkDeployXmlWatchDog(ddWatchDog);
-                    continue;
-                }
+                        if (!deployXml.exists()) {
+                            // Skip if deploy.xml is abset
+                            if (__log.isDebugEnabled()) {
+                                __log.debug("Not deploying " + file + " (missing deploy.xml)");
+                            }
+                        }
 
-                try {
-                    boolean isCreated = deployedMarker.createNewFile();
-                    if (!isCreated) {
-                        __log.error("Error while creating  file "
+                        WatchDog ddWatchDog = ensureDeployXmlWatchDog(file, deployXml);
+
+                        if (deployedMarker.exists()) {
+                            checkDeployXmlWatchDog(ddWatchDog);
+                            continue;
+                        }
+
+                        try {
+                            boolean isCreated = deployedMarker.createNewFile();
+                            if (!isCreated) {
+                                __log.error("Error while creating  file "
                                         + file.getName()
                                         + ".deployed ,deployment could be inconsistent");
+                            }
+                        } catch (IOException e1) {
+                            __log.error("Error creating deployed marker file, " + file + " will not be deployed");
+                            continue;
+                        }
+
+                        try {
+                            _odeServer.getProcessStore().undeploy(file);
+                        } catch (Exception ex) {
+                            __log.error("Error undeploying " + file.getName());
+                        }
+
+                        try {
+                            Collection<QName> deployed = _odeServer.getProcessStore().deploy(file);
+                            __log.info("Deployment of artifact " + file.getName() + " successful: " + deployed);
+                        } catch (Exception e) {
+                            __log.error("Deployment of " + file.getName() + " failed, aborting for now.", e);
+                        }
+                    } finally {
+                        if (__log.isDebugEnabled()) {
+                        __log.debug("Trying to release the lock for " + file.getName());
+                        }
+                        unlock(file.getName());
                     }
-                } catch (IOException e1) {
-                    __log.error("Error creating deployed marker file, " + file + " will not be deployed");
-                    continue;
-                }
-
-                try {
-                    _odeServer.getProcessStore().undeploy(file);
-                } catch (Exception ex) {
-                    __log.error("Error undeploying " + file.getName());
-                }
-
-                try {
-                    Collection<QName> deployed = _odeServer.getProcessStore().deploy(file);
-                    __log.info("Deployment of artifact " + file.getName() + " successful: " + deployed );
-                } catch (Exception e) {
-                    __log.error("Deployment of " + file.getName() + " failed, aborting for now.", e);
                 }
             }
         }
@@ -184,16 +203,33 @@
             String pkg = file.getName().substring(0, file.getName().length() - ".deployed".length());
             File deployDir = new File(_deployDir, pkg);
             if (!deployDir.exists()) {
-                Collection<QName> undeployed = _odeServer.getProcessStore().undeploy(deployDir);
-                boolean isDeleted = file.delete();
-                if (!isDeleted) {
-                    __log.error("Error while deleting file "
+                String duName = deployDir.getName();
+
+                if (__log.isDebugEnabled()) {
+                    __log.debug("Trying to acquire the lock for " + duName);
+                }
+
+                duLocked = pollerTryLock(duName);
+
+                if (duLocked) {
+                    try {
+                        Collection<QName> undeployed = _odeServer.getProcessStore().undeploy(deployDir);
+                        boolean isDeleted = file.delete();
+                        if (!isDeleted) {
+                            __log.error("Error while deleting file "
                                     + file.getName()
                                     + ".deployed , please check if file is locked or if it really exist");
+                        }
+                        disposeDeployXmlWatchDog(deployDir);
+                        if (undeployed.size() > 0)
+                            __log.info("Successfully undeployed " + pkg);
+                    } finally {
+                        if (__log.isDebugEnabled()) {
+                            __log.debug("Trying to release the lock for " + duName);
+                        }
+                        unlock(duName);
+                    }
                 }
-                disposeDeployXmlWatchDog(deployDir);
-                if (undeployed.size() > 0)
-                    __log.info("Successfully undeployed " + pkg);
             }
         }
 
@@ -324,4 +360,23 @@
             _odeServer.getProcessStore().refreshSchedules(deploymentPakage);
         }
     }
+
+    /**
+     * Use to acquire the lock by poller
+     */
+    private boolean pollerTryLock(String key) {
+        if(clusterEnabled) {
+            ClusterLock clusterLock = _odeServer.getBpelServer().getContexts().clusterManager.getDeploymentLock();
+            clusterLock.putIfAbsent(key,key);
+            return clusterLock.tryLock(key);
+        }
+        else return true;
+    }
+
+    private boolean unlock(String key) {
+        if (clusterEnabled) {
+            _odeServer.getBpelServer().getContexts().clusterManager.getDeploymentLock().unlock(key);
+        }
+        return true;
+    }
 }
diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
index aa769ed..01e003b 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
@@ -20,24 +20,6 @@
 package org.apache.ode.axis2.service;
 
 
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.List;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-
-import javax.activation.DataHandler;
-import javax.wsdl.Definition;
-import javax.wsdl.WSDLException;
-import javax.wsdl.factory.WSDLFactory;
-import javax.wsdl.xml.WSDLReader;
-import javax.xml.namespace.QName;
-
 import org.apache.axiom.om.OMAbstractFactory;
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMNamespace;
@@ -47,23 +29,35 @@
 import org.apache.axis2.AxisFault;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.AxisOperation;
 import org.apache.axis2.engine.AxisConfiguration;
 import org.apache.axis2.engine.AxisEngine;
 import org.apache.axis2.receivers.AbstractMessageReceiver;
 import org.apache.axis2.util.Utils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.lang.StringUtils;
+import org.apache.ode.axis2.ODEServer;
 import org.apache.ode.axis2.OdeFault;
 import org.apache.ode.axis2.deploy.DeploymentPoller;
 import org.apache.ode.axis2.hooks.ODEAxisService;
-import org.apache.ode.bpel.iapi.BpelServer;
+import org.apache.ode.bpel.clapi.ClusterLock;
 import org.apache.ode.bpel.iapi.ProcessConf;
 import org.apache.ode.bpel.iapi.ProcessStore;
 import org.apache.ode.il.OMUtils;
-import org.apache.ode.utils.fs.FileUtils;
 import org.apache.ode.utils.Namespaces;
+import org.apache.ode.utils.fs.FileUtils;
+
+import javax.activation.DataHandler;
+import javax.wsdl.Definition;
+import javax.wsdl.WSDLException;
+import javax.wsdl.factory.WSDLFactory;
+import javax.wsdl.xml.WSDLReader;
+import javax.xml.namespace.QName;
+import java.io.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
 
 /**
  * Axis wrapper for process deployment.
@@ -76,9 +70,11 @@
     private final OMNamespace _deployapi;
 
     private File _deployPath;
+    private  ODEServer _odeServer;
     private DeploymentPoller _poller;
     private ProcessStore _store;
 
+    private boolean clusterEnabled;
 
     public DeploymentWebService() {
         _pmapi = OMAbstractFactory.getOMFactory().createOMNamespace("http://www.apache.org/ode/pmapi","pmapi");
@@ -86,10 +82,12 @@
     }
 
     public void enableService(AxisConfiguration axisConfig, ProcessStore store,
-                              DeploymentPoller poller, String rootpath, String workPath) throws AxisFault, WSDLException {
+                              DeploymentPoller poller, String rootpath, String workPath, ODEServer odeServer) throws AxisFault, WSDLException {
         _deployPath = new File(workPath, "processes");
         _store = store;
         _poller = poller;
+        _odeServer = odeServer;
+        clusterEnabled = _odeServer.isClusteringEnabled();
 
         Definition def;
         WSDLReader wsdlReader = WSDLFactory.newInstance().newWSDLReader();
@@ -109,6 +107,7 @@
             String operation = messageContext.getAxisOperation().getName().getLocalPart();
             SOAPFactory factory = getSOAPFactory(messageContext);
             boolean unknown = false;
+            boolean duLocked;
 
             try {
                 if (operation.equals("deploy")) {
@@ -166,43 +165,56 @@
                         _poller.hold();
 
                         File dest = new File(_deployPath, bundleName + "-" + _store.getCurrentVersion());
-                        boolean createDir = dest.mkdir();
-                        if(!createDir){
-                        	throw new OdeFault("Error while creating file " + dest.getName());
+                        __log.info("Trying to acquire the lock for deploying: " + dest.getName());
+
+                        //lock on deployment unit directory name
+                        duLocked = lock(dest.getName());
+
+                        if (duLocked) {
+                            boolean createDir = dest.mkdir();
+                            if (!createDir) {
+                                throw new OdeFault("Error while creating file " + dest.getName());
+                            }
+                            try {
+                                unzip(dest, (DataHandler) binaryNode.getDataHandler());
+
+                                // Check that we have a deploy.xml
+                                File deployXml = new File(dest, "deploy.xml");
+                                if (!deployXml.exists())
+                                    throw new OdeFault("The deployment doesn't appear to contain a deployment " +
+                                            "descriptor in its root directory named deploy.xml, aborting.");
+
+                                Collection<QName> deployed = _store.deploy(dest);
+
+                                File deployedMarker = new File(_deployPath, dest.getName() + ".deployed");
+                                if (!deployedMarker.createNewFile()) {
+                                    throw new OdeFault("Error while creating file " + deployedMarker.getName() + "deployment failed");
+                                }
+
+                                // Telling the poller what we deployed so that it doesn't try to deploy it again
+                                _poller.markAsDeployed(dest);
+                                __log.info("Deployment of artifact " + dest.getName() + " successful.");
+
+
+                                OMElement response = factory.createOMElement("response", null);
+
+                                if (__log.isDebugEnabled()) __log.debug("Deployed package: " + dest.getName());
+                                OMElement d = factory.createOMElement("name", _deployapi);
+                                d.setText(dest.getName());
+                                response.addChild(d);
+
+                                for (QName pid : deployed) {
+                                    if (__log.isDebugEnabled()) __log.debug("Deployed PID: " + pid);
+                                    d = factory.createOMElement("id", _deployapi);
+                                    d.setText(pid);
+                                    response.addChild(d);
+                                }
+                                sendResponse(factory, messageContext, "deployResponse", response);
+                            } finally {
+                                __log.info("Trying to release the lock for deploying: " + dest.getName());
+                                unlock(dest.getName());
+                            }
                         }
-                        unzip(dest, (DataHandler) binaryNode.getDataHandler());
-
-                        // Check that we have a deploy.xml
-                        File deployXml = new File(dest, "deploy.xml");
-                        if (!deployXml.exists())
-                            throw new OdeFault("The deployment doesn't appear to contain a deployment " +
-                                    "descriptor in its root directory named deploy.xml, aborting.");
-
-                        Collection<QName> deployed = _store.deploy(dest);
-
-                        File deployedMarker = new File(_deployPath, dest.getName() + ".deployed");
-                        if(!deployedMarker.createNewFile()) {
-                        	throw new OdeFault("Error while creating file " + deployedMarker.getName() + "deployment failed");
-                        }
-
-                        // Telling the poller what we deployed so that it doesn't try to deploy it again
-                        _poller.markAsDeployed(dest);
-                        __log.info("Deployment of artifact " + dest.getName() + " successful.");
-
-                        OMElement response = factory.createOMElement("response", null);
-
-                        if (__log.isDebugEnabled()) __log.debug("Deployed package: "+dest.getName());
-                        OMElement d = factory.createOMElement("name", _deployapi);
-                        d.setText(dest.getName());
-                        response.addChild(d);
-
-                        for (QName pid : deployed) {
-                            if (__log.isDebugEnabled()) __log.debug("Deployed PID: "+pid);
-                            d = factory.createOMElement("id", _deployapi);
-                            d.setText(pid);
-                            response.addChild(d);
-                        }
-                        sendResponse(factory, messageContext, "deployResponse", response);
                     } finally {
                         _poller.release();
                     }
@@ -224,20 +236,30 @@
                         // Put the poller on hold to avoid undesired side effects
                         _poller.hold();
 
-                        Collection<QName> undeployed = _store.undeploy(deploymentDir);
+                        __log.info("Trying to acquire the lock for undeploying: " + deploymentDir.getName());
+                        duLocked = lock(deploymentDir.getName());
 
-                        File deployedMarker = new File(deploymentDir + ".deployed");
-                        boolean isDeleted = deployedMarker.delete();
+                        if (duLocked) {
+                            try {
+                                Collection<QName> undeployed = _store.undeploy(deploymentDir);
 
-                        if (!isDeleted)
-                            __log.error("Error while deleting file " + deployedMarker.getName());
+                                File deployedMarker = new File(deploymentDir + ".deployed");
+                                boolean isDeleted = deployedMarker.delete();
 
-                        FileUtils.deepDelete(deploymentDir);
+                                if (!isDeleted)
+                                    __log.error("Error while deleting file " + deployedMarker.getName());
 
-                        OMElement response = factory.createOMElement("response", null);
-                        response.setText("" + (undeployed.size() > 0));
-                        sendResponse(factory, messageContext, "undeployResponse", response);
-                        _poller.markAsUndeployed(deploymentDir);
+                                FileUtils.deepDelete(deploymentDir);
+
+                                OMElement response = factory.createOMElement("response", null);
+                                response.setText("" + (undeployed.size() > 0));
+                                sendResponse(factory, messageContext, "undeployResponse", response);
+                                _poller.markAsUndeployed(deploymentDir);
+                            } finally {
+                                __log.info("Trying to release the lock for undeploying: " + deploymentDir.getName());
+                                unlock(deploymentDir.getName());
+                            }
+                        }
                     } finally {
                         _poller.release();
                     }
@@ -352,6 +374,25 @@
         out.close();
     }
 
+    /**
+     * Acquire the lock when deploying using web service
+     */
+    private boolean lock(String key) {
+        if(clusterEnabled) {
+            ClusterLock clusterLock = _odeServer.getBpelServer().getContexts().clusterManager.getDeploymentLock();
+            clusterLock.putIfAbsent(key,key);
+            clusterLock.lock(key);
+        }
+        return true;
+    }
 
-
+    /**
+     * Release the lock after completing deploy process
+     */
+    private boolean unlock(String key) {
+        if(clusterEnabled) {
+            _odeServer.getBpelServer().getContexts().clusterManager.getDeploymentLock().unlock(key);
+        }
+        return true;
+    }
 }
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterLock.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterLock.java
new file mode 100644
index 0000000..9c9683f
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterLock.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.util.concurrent.TimeUnit;
+
+public interface ClusterLock<E> {
+    /**
+     * Acquire the lock for specified key
+     *
+     * @param key
+     * @return
+     */
+    void lock(E key);
+
+    /**
+     * Acquire the lock for specified key and time period
+     *
+     *
+     * @param key
+     * @return
+     */
+    void lock(E key,int time,TimeUnit tu) throws InterruptedException, TimeoutException;
+
+    /**
+     * Release the lock acquired for specified key
+     *
+     * @param key
+     * @return
+     */
+    void unlock(E key);
+
+    /**
+     * Tries to acquire the lock for the specified key
+     * @param key
+     * @return
+     */
+    boolean tryLock(E key);
+
+    /**
+     * Tries to acquire the lock for the specified key and time period.
+     * @param key
+     * @param time
+     * @param tu
+     * @return
+     */
+    boolean tryLock(E key, int time, TimeUnit tu);
+
+    /**
+     * Check whether the map has a value for given key, if absent put the value to map
+     * @param key
+     * @param keyVal
+     */
+    void putIfAbsent(E key, E keyVal);
+
+    /** Exception class indicating a time-out occured  while obtaining a lock. */
+    public static final class  TimeoutException extends Exception {
+        private static final long serialVersionUID = 7247629086692580285L;
+    }
+}
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
new file mode 100644
index 0000000..5a2e0f9
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.io.File;
+import java.util.List;
+
+public interface ClusterManager {
+
+    /**
+     * Initialization of the cluster
+     * @param file
+     */
+    void init(File file);
+
+    /**
+     * shutdown the cluster instance
+     */
+    void shutdown();
+
+    /**
+     * Return whether the local member is Master or not
+     * @return
+     */
+    boolean isMaster();
+
+    /**
+     * Set the Process Store object which uses for clustering
+     * @param ps
+     */
+    void setClusterProcessStore(ClusterProcessStore ps);
+
+    /**
+     * Publish Deploy event to the cluster by deploy initiator
+     * @param clusterEvent
+     */
+    void publishProcessStoreClusterEvent(ProcessStoreClusterEvent clusterEvent);
+
+    /**
+     * Register the cluster for message listener
+     */
+    void registerClusterProcessStoreMessageListener(ProcessStoreClusterListener listener);
+    /**
+     * Register Scheduler as ClusterMemberListener
+     * @param listener
+     */
+    void registerClusterMemberListener(ClusterMemberListener listener);
+
+    /**
+     * Return deployment lock for cluster
+     */
+    ClusterLock getDeploymentLock();
+
+    /**
+     * Return instance lock for cluster
+     */
+    ClusterLock getInstanceLock();
+
+    /**
+     * Return active node list in the cluster
+     */
+    List<String> getActiveNodes();
+
+    /**
+     * Return local member's uuid in the cluster
+     */
+    String getNodeID();
+}
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
new file mode 100644
index 0000000..541ab9c
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+public interface ClusterMemberListener {
+
+    void memberAdded(String nodeId);
+
+    void memberRemoved(String nodeId);
+
+    void memberElectedAsMaster(String masterId);
+
+}
\ No newline at end of file
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterProcessStore.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterProcessStore.java
new file mode 100644
index 0000000..ca792bc
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterProcessStore.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.util.Collection;
+
+import javax.xml.namespace.QName;
+
+import org.apache.ode.bpel.iapi.ProcessStore;
+
+public interface ClusterProcessStore extends ProcessStore {
+    public void deployProcesses(String duName);
+
+    public Collection<QName> undeployProcesses(String duName);
+}
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
new file mode 100644
index 0000000..79d9a78
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.io.Serializable;
+
+public abstract class ProcessStoreClusterEvent implements Serializable {
+    protected static final long serialVersionUID = 1L;
+
+    private String deploymentUnit;
+
+    protected String info ;
+
+    /** Unique ID of the Node in the Cluster generating the Event */
+    private String eventGeneratingNode;
+
+    public ProcessStoreClusterEvent(String deploymentUnit) {
+        this.deploymentUnit = deploymentUnit;
+    }
+
+    @Override
+    public String toString() {
+        return "{ProcessStoreClusterEvent#" + deploymentUnit +"}";
+    }
+
+    public void setEventGeneratingNode(String uuid) {
+        this.eventGeneratingNode = uuid;
+    }
+
+    public String getEventGeneratingNode() {
+        return eventGeneratingNode;
+    }
+
+    public String getDuName() {
+        return deploymentUnit;
+    }
+
+    public String getInfo() {
+        return info;
+    }
+
+}
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java
new file mode 100644
index 0000000..26f42cf
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.bpel.clapi;
+
+public interface ProcessStoreClusterListener {
+    public void onProcessStoreClusterEvent(ProcessStoreClusterEvent message);
+}
\ No newline at end of file
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java
new file mode 100644
index 0000000..c7cbb23
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+public class ProcessStoreDeployedEvent extends ProcessStoreClusterEvent {
+
+     public ProcessStoreDeployedEvent(String deploymentUnit) {
+        super(deploymentUnit);
+        info = "Deploy Event";
+    }
+
+    @Override
+    public String toString() {
+        return "{ProcessStoreDeployedEvent#" + getDuName() +"}";
+    }
+}
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java
new file mode 100644
index 0000000..d4f5a49
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+public class ProcessStoreUndeployedEvent extends ProcessStoreClusterEvent {
+
+    public ProcessStoreUndeployedEvent(String deploymentUnit) {
+        super(deploymentUnit);
+        info = "Undeploy Event";
+    }
+
+    @Override
+    public String toString() {
+        return "{ProcessStoreUndeployedEvent#" + getDuName() +"}";
+    }
+}
diff --git a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
index 16f1e62..cc519bd 100644
--- a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
+++ b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
@@ -19,6 +19,10 @@
 
 package org.apache.ode.il.config;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.utils.SystemUtils;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileNotFoundException;
@@ -26,10 +30,6 @@
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ode.utils.SystemUtils;
-
 /**
  * Configuration object used for configuring the intergration layer. The propereties are those likely to be common to all layers.
  *
@@ -106,6 +106,12 @@
 
     public static final String DEFAULT_TX_FACTORY_CLASS_NAME = "org.apache.ode.il.EmbeddedGeronimoFactory";
 
+    public static final String PROP_CLUSTERING_ENABLED = "clustering.enabled";
+
+    public static final String PROP_CLUSTERING_IMPL_CLASS = "clustering.impl.class";
+
+    public static final String DEFAULT_CLUSTERING_IMPL_CLASS_NAME = "org.apache.ode.clustering.hazelcast.HazelcastClusterImpl";
+
     private File _cfgFile;
 
     private String _prefix;
@@ -289,6 +295,14 @@
         return getProperty(OdeConfigProperties.PROP_DEPLOY_DIR);
     }
 
+    public boolean isClusteringEnabled() {
+        return Boolean.valueOf(getProperty(OdeConfigProperties.PROP_CLUSTERING_ENABLED, "false"));
+    }
+
+    public String getClusteringImplClass() {
+        return getProperty(OdeConfigProperties.PROP_CLUSTERING_IMPL_CLASS, DEFAULT_CLUSTERING_IMPL_CLASS_NAME);
+    }
+
     public String getTxFactoryClass() {
         return getProperty(OdeConfigProperties.PROP_TX_FACTORY_CLASS, DEFAULT_TX_FACTORY_CLASS_NAME);
     }
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
index ded83ff..b0473bc 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
@@ -19,21 +19,9 @@
 
 package org.apache.ode.bpel.engine;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.wsdl.Operation;
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterLock;
 import org.apache.ode.bpel.dao.MessageExchangeDAO;
 import org.apache.ode.bpel.dao.ProcessDAO;
 import org.apache.ode.bpel.dao.ProcessInstanceDAO;
@@ -54,7 +42,6 @@
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
 import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
-import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobType;
 import org.apache.ode.bpel.intercept.InterceptorInvoker;
 import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
@@ -70,6 +57,17 @@
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 
+import javax.wsdl.Operation;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Implementation of the {@link BpelEngine} interface: provides the server methods that should be invoked in the context of a
  * transaction.
@@ -115,7 +113,7 @@
     private SharedEndpoints _sharedEps;
 
     /** Manage instance-level locks. */
-    private final InstanceLockManager _instanceLockManager = new InstanceLockManager();
+    private final ClusterLock<Long> _instanceLockManager;
 
     final Contexts _contexts;
 
@@ -124,8 +122,13 @@
 
     public BpelEngineImpl(Contexts contexts) {
         _contexts = contexts;
+        if(_contexts.clusterManager != null) {
+            _instanceLockManager = _contexts.clusterManager.getInstanceLock();
+        }
+        else _instanceLockManager = new InstanceLockManager();
         _sharedEps = new SharedEndpoints();
         _sharedEps.init();
+
     }
 
     public SharedEndpoints getSharedEndpoints() {
@@ -429,7 +432,8 @@
         // Note that we don't want to wait too long here to get our lock, since we are likely holding
         // on to scheduler's locks of various sorts.
         try {
-            _instanceLockManager.lock(iid, 1, TimeUnit.MICROSECONDS);
+            _instanceLockManager.
+                    lock(iid, 1, TimeUnit.MICROSECONDS);
             _contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
                 public void afterCompletion(boolean success) {
                     _instanceLockManager.unlock(iid);
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
index bc7fe29..2334262 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
@@ -35,6 +35,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterManager;
 import org.apache.ode.bpel.dao.BpelDAOConnection;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
@@ -534,6 +535,10 @@
         _contexts.bindingContext = bc;
     }
 
+    public void setClusterManagerImpl(ClusterManager cm) {
+        _contexts.clusterManager = cm;
+    }
+
     public DebuggerContext getDebugger(QName pid) throws BpelEngineException {
         return _engine._activeProcesses.get(pid)._debugger;
     }
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
index 9fa3258..a965d58 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
@@ -19,6 +19,7 @@
 
 package org.apache.ode.bpel.engine;
 
+import org.apache.ode.bpel.clapi.ClusterManager;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.iapi.BindingContext;
 import org.apache.ode.bpel.iapi.BpelEventListener;
@@ -46,6 +47,8 @@
 
     public CronScheduler cronScheduler;
 
+    public ClusterManager clusterManager;
+
     EndpointReferenceContext eprContext;
 
     BindingContext bindingContext;
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
index e49e8e9..1571eac 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
@@ -20,6 +20,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterLock;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -36,11 +37,15 @@
  *
  * @author Maciej Szefler - m s z e f l e r @ g m a i l . c o m
  */
-public class InstanceLockManager {
+public class InstanceLockManager implements ClusterLock<Long> {
     private static final Log __log = LogFactory.getLog(InstanceLockManager.class);
 
     private final Lock _mutex = new java.util.concurrent.locks.ReentrantLock();
-    private final Map<Long, InstanceInfo> _locks = new HashMap<Long,InstanceInfo> ();
+    private final Map<Long, InstanceInfo> _locks = new HashMap<Long,InstanceInfo>();
+
+    public void lock(Long key) {
+        // Noting to do here.
+    }
 
     public void lock(Long iid, int time, TimeUnit tu) throws InterruptedException, TimeoutException {
         if (iid == null) return;
@@ -105,6 +110,20 @@
 
     }
 
+    public boolean tryLock(Long key) {
+        // Noting to do here.
+        return false;
+    }
+
+    public boolean tryLock(Long key, int time, TimeUnit tu) {
+        // Noting to do here.
+        return false;
+    }
+
+    public void putIfAbsent(Long key, Long keyVal) {
+        // Noting to do here.
+    }
+
 
     @Override
     public String toString() {
@@ -135,9 +154,6 @@
             return "{Lock for Instance #" + iid +", acquired by " +  acquierer + "}";
         }
     }
-
-    /** Exception class indicating a time-out occured while obtaining a lock. */
-    public static final class TimeoutException extends Exception {
-        private static final long serialVersionUID = 7247629086692580285L;
-    }
 }
+
+
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
new file mode 100644
index 0000000..d701e23
--- /dev/null
+++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.store;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ClusterProcessStore;
+import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent;
+import org.apache.ode.bpel.clapi.ProcessStoreUndeployedEvent;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.ProcessState;
+import org.apache.ode.bpel.iapi.EndpointReferenceContext;
+import org.apache.ode.il.config.OdeConfigProperties;
+
+import javax.sql.DataSource;
+import javax.xml.namespace.QName;
+
+import java.io.File;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ClusterProcessStoreImpl extends ProcessStoreImpl implements ClusterProcessStore {
+    private static final Log __log = LogFactory.getLog(ClusterProcessStoreImpl.class);
+
+    private ClusterManager _clusterManager;
+    private  ProcessStoreDeployedEvent deployedEvent;
+    private  ProcessStoreUndeployedEvent undeployedEvent;
+
+    public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, ClusterManager clusterManager) {
+        super(eprContext,ds,persistenceType,props,createDatamodel);
+        _clusterManager = clusterManager;
+    }
+
+    public Collection<QName> deploy(final File deploymentUnitDirectory) {
+        Collection<QName> deployed = super.deploy(deploymentUnitDirectory);
+        publishProcessStoreDeployedEvent(deploymentUnitDirectory.getName());
+        return deployed;
+    }
+
+    private void publishProcessStoreDeployedEvent(String duName){
+        deployedEvent = new ProcessStoreDeployedEvent(duName);
+        _clusterManager.publishProcessStoreClusterEvent(deployedEvent);
+        __log.info("Completed actual deployment for " +duName +" by " +deployedEvent.getEventGeneratingNode());
+    }
+
+    public void deployProcesses(final String duName) {
+        final ArrayList<ProcessConfImpl> confs = new ArrayList<ProcessConfImpl>();
+        ProcessState state = ProcessState.ACTIVE;
+
+        Pattern duNamePattern = getPreviousPackageVersionPattern(duName);
+
+        for (String packageName : _deploymentUnits.keySet()) {
+            Matcher matcher = duNamePattern.matcher(packageName);
+            if (matcher.matches()) {
+                DeploymentUnitDir duDir = _deploymentUnits.get(packageName);
+                if (duDir == null) throw new ContextException("Could not find package " + packageName);
+                for (QName processName : duDir.getProcessNames()) {
+                    QName pid = toPid(processName, duDir.getVersion());
+                    ProcessConfImpl pconf = _processes.get(pid);
+                    if (pconf.getState().equals(state)) {
+                        pconf.setState(ProcessState.RETIRED);
+                        __log.info("Set state of " + pconf.getProcessId() + "to " + pconf.getState());
+                        confs.add(pconf);
+                    }
+                }
+            }
+        }
+
+        try {
+            exec(new Callable<Object>() {
+                public Object call(ConfStoreConnection conn) {
+                    DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName);
+                    if (dudao != null) {
+                        List<ProcessConfImpl> load = load(dudao);
+                        __log.info("Loading DU from store: " + duName);
+                        confs.addAll(load);
+                    }
+                    return null;
+                }
+            });
+        } catch (Exception ex) {
+            __log.error("Error loading DU from store: " + duName, ex);
+        }
+
+        for (ProcessConfImpl p : confs) {
+            try {
+                __log.info("Fire event of " + p.getProcessId() + " " + p.getState());
+                fireStateChange(p.getProcessId(), p.getState(), p.getDeploymentUnit().getName());
+            } catch (Exception except) {
+                __log.error("Error with process retiring or activating : pid=" + p.getProcessId() + " package=" + p.getDeploymentUnit().getName(), except);
+            }
+        }
+    }
+
+
+    public Collection<QName> undeploy(final File dir) {
+        Collection<QName> undeployed = super.undeploy(dir);
+        publishProcessStoreUndeployedEvent(dir.getName());
+        return undeployed;
+    }
+
+    private void publishProcessStoreUndeployedEvent(String duName){
+        undeployedEvent = new ProcessStoreUndeployedEvent(duName);
+        _clusterManager.publishProcessStoreClusterEvent(undeployedEvent);
+        __log.info("Completed actual undeployment for " +duName +" by " +undeployedEvent.getEventGeneratingNode());
+    }
+
+    /**
+     * Use to unregister processes when deployment unit is undeployed
+     * @param duName
+     * @return
+     */
+    public Collection<QName> undeployProcesses(final String duName) {
+        Collection<QName> undeployed = super.undeployProcesses(duName);
+        return undeployed;
+    }
+}
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
index ba911a7..05252a1 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
@@ -71,9 +71,9 @@
 
     private final CopyOnWriteArrayList<ProcessStoreListener> _listeners = new CopyOnWriteArrayList<ProcessStoreListener>();
 
-    private Map<QName, ProcessConfImpl> _processes = new HashMap<QName, ProcessConfImpl>();
+    protected Map<QName, ProcessConfImpl> _processes = new HashMap<QName, ProcessConfImpl>();
 
-    private Map<String, DeploymentUnitDir> _deploymentUnits = new HashMap<String, DeploymentUnitDir>();
+    protected Map<String, DeploymentUnitDir> _deploymentUnits = new HashMap<String, DeploymentUnitDir>();
 
     /** Guards access to the _processes and _deploymentUnits */
     private final ReadWriteLock _rw = new ReentrantReadWriteLock();
@@ -88,6 +88,8 @@
 
     protected File _configDir;
 
+
+
     /**
      * Executor used to process DB transactions. Allows us to isolate the TX context, and to ensure that only one TX gets executed a
      * time. We don't really care to parallelize these operations because: i) HSQL does not isolate transactions and we don't want
@@ -345,18 +347,9 @@
      * "AbsenceRequest-2/AbsenceRequest.ode" and setRetirePackage() will be called accordingly.
      */
     private void retirePreviousPackageVersions(DeploymentUnitDir du) {
-        //retire all the other versions of the same DU
-        String[] nameParts = du.getName().split("/");
-           /* Replace the version number (if any) with regexp to match any version number */
-            nameParts[0] = nameParts[0].replaceAll("([-\\Q.\\E](\\d)+)?\\z", "");
-            nameParts[0] += "([-\\Q.\\E](\\d)+)?";
-        StringBuilder duNameRegExp = new StringBuilder(du.getName().length() * 2);
-        for (int i = 0, n = nameParts.length; i < n; i++) {
-            if (i > 0) duNameRegExp.append("/");
-            duNameRegExp.append(nameParts[i]);
-        }
 
-        Pattern duNamePattern = Pattern.compile(duNameRegExp.toString());
+        Pattern duNamePattern = getPreviousPackageVersionPattern(du.getName());
+
         for (String deployedDUname : _deploymentUnits.keySet()) {
             Matcher matcher = duNamePattern.matcher(deployedDUname);
             if (matcher.matches()) {
@@ -383,26 +376,7 @@
             __log.error("Error synchronizing with data store; " + duName + " may be reappear after restart!");
         }
 
-        Collection<QName> undeployed = Collections.emptyList();
-        DeploymentUnitDir du;
-        _rw.writeLock().lock();
-        try {
-            du = _deploymentUnits.remove(duName);
-            if (du != null) {
-                undeployed = toPids(du.getProcessNames(), du.getVersion());
-            }
-
-            for (QName pn : undeployed) {
-                fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.UNDEPLOYED, pn, du.getName()));
-                __log.info(__msgs.msgProcessUndeployed(pn));
-            }
-
-            _processes.keySet().removeAll(undeployed);
-        } finally {
-            _rw.writeLock().unlock();
-        }
-
-        return undeployed;
+        return undeployProcesses(duName);
     }
 
     public Collection<String> getPackages() {
@@ -612,7 +586,7 @@
             psl.onProcessStoreEvent(pse);
     }
 
-    private void fireStateChange(QName processId, ProcessState state, String duname) {
+    protected void fireStateChange(QName processId, ProcessState state, String duname) {
         switch (state) {
             case ACTIVE:
                 fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.ACTVIATED, processId, duname));
@@ -894,7 +868,7 @@
         return result;
     }
 
-    private QName toPid(QName processType, long version) {
+    protected QName toPid(QName processType, long version) {
         return new QName(processType.getNamespaceURI(), processType.getLocalPart() + "-" + version);
     }
 
@@ -916,4 +890,41 @@
             }
         }
     }
+
+    protected Pattern getPreviousPackageVersionPattern(String duName) {
+        //retire all the other versions of the same DU
+        String[] nameParts = duName.split("/");
+        /* Replace the version number (if any) with regexp to match any version number */
+        nameParts[0] = nameParts[0].replaceAll("([-\\Q.\\E](\\d)+)?\\z", "");
+        nameParts[0] += "([-\\Q.\\E](\\d)+)?";
+        StringBuilder duNameRegExp = new StringBuilder(duName.length() * 2);
+        for (int i = 0, n = nameParts.length; i < n; i++) {
+            if (i > 0) duNameRegExp.append("/");
+            duNameRegExp.append(nameParts[i]);
+        }
+        Pattern duNamePattern = Pattern.compile(duNameRegExp.toString());
+        return duNamePattern;
+    }
+
+    protected  Collection<QName> undeployProcesses(final String duName) {
+        Collection<QName> undeployed = Collections.emptyList();
+        DeploymentUnitDir du;
+        _rw.writeLock().lock();
+        try {
+            du = _deploymentUnits.remove(duName);
+            if (du != null) {
+                undeployed = toPids(du.getProcessNames(), du.getVersion());
+            }
+
+            for (QName pn : undeployed) {
+                fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.UNDEPLOYED, pn, du.getName()));
+                __log.info(__msgs.msgProcessUndeployed(pn));
+            }
+
+            _processes.keySet().removeAll(undeployed);
+        } finally {
+            _rw.writeLock().unlock();
+        }
+        return undeployed;
+    }
 }
diff --git a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
index d24b59b..00bdf7d 100644
--- a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
+++ b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
@@ -18,44 +18,14 @@
  */
 package org.apache.ode.test;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.xml.namespace.QName;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ode.bpel.common.evt.DebugBpelEventListener;
 import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
 import org.apache.ode.bpel.engine.BpelServerImpl;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.*;
 import org.apache.ode.bpel.iapi.MessageExchange.Status;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
 import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
-import org.apache.ode.bpel.iapi.ProcessStore;
-import org.apache.ode.bpel.iapi.ProcessStoreEvent;
-import org.apache.ode.bpel.iapi.ProcessStoreListener;
 import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
 import org.apache.ode.il.EmbeddedGeronimoFactory;
 import org.apache.ode.il.config.OdeConfigProperties;
@@ -71,6 +41,25 @@
 import org.junit.Before;
 import org.w3c.dom.Element;
 
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 public abstract class BPELTestAbstract {
     private static final Log log = LogFactory.getLog(BPELTestAbstract.class);
     public static final long WAIT_BEFORE_INVOKE_TIMEOUT = 2000;
diff --git a/clustering/pom.xml b/clustering/pom.xml
new file mode 100644
index 0000000..d0e4b7c
--- /dev/null
+++ b/clustering/pom.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one
+~ or more contributor license agreements. See the NOTICE file
+~ distributed with this work for additional information
+~ regarding copyright ownership. The ASF licenses this file
+~ to you under the Apache License, Version 2.0 (the
+~ "License"); you may not use this file except in compliance
+~ with the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing,
+~ software distributed under the License is distributed on an
+~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+~ KIND, either express or implied. See the License for the
+~ specific language governing permissions and limitations
+~ under the License.
+-->
+<project>
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.apache.ode</groupId>
+    <artifactId>ode-clustering</artifactId>
+    <name>ODE :: Clustering</name>
+    <parent>
+        <groupId>org.apache.ode</groupId>
+        <artifactId>ode</artifactId>
+        <version>1.4-SNAPSHOT</version>
+    </parent>
+</project>
\ No newline at end of file
diff --git a/clustering/src/main/java/org/apache/ode/clustering/Test.java b/clustering/src/main/java/org/apache/ode/clustering/Test.java
new file mode 100644
index 0000000..0e25733
--- /dev/null
+++ b/clustering/src/main/java/org/apache/ode/clustering/Test.java
@@ -0,0 +1,4 @@
+package org.apache.ode.clustering;
+
+public class Test {
+}
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
new file mode 100644
index 0000000..4c5cad5
--- /dev/null
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.clustering.hazelcast;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.config.ListenerConfig;
+import com.hazelcast.config.TopicConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberAttributeEvent;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.ode.bpel.clapi.*;
+
+/**
+ * This class implements necessary methods to build the cluster using hazelcast
+ */
+public class HazelcastClusterImpl implements ClusterManager, ProcessStoreClusterListener {
+    private static final Log __log = LogFactory.getLog(HazelcastClusterImpl.class);
+
+    private HazelcastInstance _hazelcastInstance;
+    private boolean isMaster = false;
+    private String nodeHostName;
+    private String nodeID;
+    private IMap<String, String> deployment_lock_map;
+    private IMap<Long, Long> instance_lock_map;
+    private ITopic<ProcessStoreClusterEvent> clusterDeploymentMessageTopic;
+    private ClusterProcessStore _clusterProcessStore;
+    private HazelcastDeploymentLock hazelcastDeploymentLock;
+    private HazelcastInstanceLock hazelcastInstanceLock;
+    private ClusterDeploymentMessageListener clusterDeploymentMessageListener;
+    private ClusterMemberShipListener clusterMemberShipListener;
+    private List<ClusterMemberListener> clusterMemberListenerList = null;
+
+    public HazelcastClusterImpl() {
+        clusterMemberShipListener = new ClusterMemberShipListener();
+        clusterDeploymentMessageListener = new ClusterDeploymentMessageListener();
+        clusterDeploymentMessageListener.registerClusterProcessStoreListener((ProcessStoreClusterListener)this);
+        hazelcastDeploymentLock = new HazelcastDeploymentLock();
+        hazelcastInstanceLock = new HazelcastInstanceLock();
+    }
+
+
+    public void init(File configRoot) {
+
+        /*First,looks for the hazelcast.config system property. If it is set, its value is used as the path.
+        Else it will load the hazelcast.xml file using FileSystemXmlConfig()*/
+
+        String hzConfig = System.getProperty("hazelcast.config");
+        if (hzConfig != null) _hazelcastInstance = Hazelcast.newHazelcastInstance();
+        else {
+            File hzXml = new File(configRoot, "hazelcast.xml");
+            if (!hzXml.isFile())
+                __log.error("hazelcast.xml does not exist or is not a file");
+            else
+                try {
+                    Config config = loadConfig(hzXml);
+                    _hazelcastInstance = Hazelcast.newHazelcastInstance(config);
+                } catch (FileNotFoundException fnf) {
+                    __log.error(fnf);
+                }
+        }
+
+        if (_hazelcastInstance != null) {
+            // Registering this node in the cluster.
+            //_hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener());
+            Member localMember = _hazelcastInstance.getCluster().getLocalMember();
+            nodeHostName = localMember.getSocketAddress().getHostName() + ":" + localMember.getSocketAddress().getPort();
+            nodeID = localMember.getUuid();
+            __log.info("Registering HZ localMember:" + nodeHostName);
+
+            deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK);
+            instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK);
+            clusterDeploymentMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC);
+
+            hazelcastDeploymentLock.setLockMap(deployment_lock_map);
+            hazelcastInstanceLock.setLockMap(instance_lock_map);
+            markAsMaster();
+        }
+    }
+
+    protected Config loadConfig(File hazelcastConfigFile) throws FileNotFoundException {
+        Config config = new FileSystemXmlConfig(hazelcastConfigFile);
+
+        //add Cluster membership listener
+        ListenerConfig clusterMemberShipListenerConfig = new ListenerConfig();
+        clusterMemberShipListenerConfig.setImplementation(clusterMemberShipListener);
+        config.addListenerConfig(clusterMemberShipListenerConfig);
+
+        //set topic message listener
+        ListenerConfig topicListenerConfig = new ListenerConfig();
+        topicListenerConfig.setImplementation(clusterDeploymentMessageListener);
+        TopicConfig topicConfig = config.getTopicConfig(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC);
+        topicConfig.addMessageListenerConfig(topicListenerConfig);
+
+        return config;
+    }
+
+    class ClusterMemberShipListener implements MembershipListener {
+
+        public ClusterMemberShipListener() {
+            clusterMemberListenerList = new ArrayList<ClusterMemberListener>();
+        }
+
+        public void registerClusterMemberListener(ClusterMemberListener listener) {
+            clusterMemberListenerList.add(listener);
+        }
+
+        @Override
+        public void memberAdded(MembershipEvent membershipEvent) {
+            String eventNodeID = membershipEvent.getMember().getUuid();
+            __log.info("Member Added " + eventNodeID);
+            if (isMaster) {
+                for (ClusterMemberListener listener : clusterMemberListenerList) {
+                    listener.memberAdded(eventNodeID);
+                }
+            }
+        }
+
+        @Override
+        public void memberRemoved(MembershipEvent membershipEvent) {
+            String eventNodeID = membershipEvent.getMember().getUuid();
+            __log.info("Member Removed " + eventNodeID);
+            markAsMaster();
+            if (isMaster) {
+                for (ClusterMemberListener listener : clusterMemberListenerList) {
+                    listener.memberRemoved(eventNodeID);
+                }
+            }
+        }
+
+        @Override
+        public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
+            // Noting to do here.
+        }
+    }
+
+    public void publishProcessStoreClusterEvent(ProcessStoreClusterEvent clusterEvent) {
+        clusterEvent.setEventGeneratingNode(nodeID);
+        __log.info("Send " + clusterEvent.getInfo() + " Cluster Message " + "for " + clusterEvent.getDuName() + " [" + nodeHostName + "]");
+        clusterDeploymentMessageTopic.publish(clusterEvent);
+    }
+
+
+    class ClusterDeploymentMessageListener implements MessageListener<ProcessStoreClusterEvent> {
+        List<ProcessStoreClusterListener> clusterProcessStoreListenerList = null;
+
+        public ClusterDeploymentMessageListener() {
+            clusterProcessStoreListenerList = new ArrayList<ProcessStoreClusterListener>();
+        }
+
+        public void registerClusterProcessStoreListener(ProcessStoreClusterListener listener) {
+            clusterProcessStoreListenerList.add(listener);
+        }
+
+        @Override
+        public void onMessage(Message<ProcessStoreClusterEvent> msg) {
+            for (ProcessStoreClusterListener listener : clusterProcessStoreListenerList) {
+                listener.onProcessStoreClusterEvent(msg.getMessageObject());
+            }
+        }
+    }
+
+    public void onProcessStoreClusterEvent(ProcessStoreClusterEvent message) {
+        if (message instanceof ProcessStoreDeployedEvent) {
+            ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message;
+            String eventUuid = event.getEventGeneratingNode();
+            if (!nodeID.equals(eventUuid)) {
+                String duName = event.getDuName();
+                __log.info("Receive " + event.getInfo() + " Cluster Message " + "for " + event.getDuName() + " [" + nodeHostName + "]");
+                _clusterProcessStore.deployProcesses(duName);
+            }
+        }
+
+        else if (message instanceof ProcessStoreUndeployedEvent) {
+            ProcessStoreUndeployedEvent event = (ProcessStoreUndeployedEvent) message;
+            String eventUuid = event.getEventGeneratingNode();
+            if (!nodeID.equals(eventUuid)) {
+                String duName = event.getDuName();
+                __log.info("Receive " + event.getInfo() + "  Cluster Message " + "for " + event.getDuName() + " [" + nodeHostName + "]");
+                _clusterProcessStore.undeployProcesses(duName);
+            }
+        }
+
+    }
+
+    private void markAsMaster() {
+        Member member = _hazelcastInstance.getCluster().getMembers().iterator().next();
+        if (member.localMember() && isMaster == false) {
+            isMaster = true;
+            for (ClusterMemberListener listener : clusterMemberListenerList) {
+                listener.memberElectedAsMaster(nodeID);
+            }
+        }
+        __log.info("Master node: " +isMaster);
+    }
+
+    public boolean isMaster() {
+        return isMaster;
+    }
+
+    public String getNodeID() {
+        return nodeID;
+    }
+
+    public void setClusterProcessStore(ClusterProcessStore store) {
+        _clusterProcessStore = store;
+    }
+
+    public void registerClusterProcessStoreMessageListener(ProcessStoreClusterListener listener) {
+        clusterDeploymentMessageListener.registerClusterProcessStoreListener(listener);
+    }
+
+    public void registerClusterMemberListener(ClusterMemberListener listener) {
+        clusterMemberShipListener.registerClusterMemberListener(listener);
+    }
+
+    public void shutdown() {
+        if (_hazelcastInstance != null) _hazelcastInstance.shutdown();
+    }
+
+    public ClusterLock<String> getDeploymentLock(){
+        return (ClusterLock)hazelcastDeploymentLock;
+    }
+
+    public ClusterLock<Long> getInstanceLock(){
+        return (ClusterLock)hazelcastInstanceLock;
+    }
+
+    public List<String> getActiveNodes() {
+        List<String> nodeList = new ArrayList<String>();
+        for(Member m : _hazelcastInstance.getCluster().getMembers())
+          nodeList.add(m.getUuid()) ;
+        return nodeList;
+    }
+}
+
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
new file mode 100644
index 0000000..8f204ff
--- /dev/null
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.clustering.hazelcast;
+
+/**
+ * Constants used in Hazelcast based clustering implementation
+ */
+public final class HazelcastConstants {
+    public static final String ODE_CLUSTER_DEPLOYMENT_LOCK = "ODE_DEPLOYMENT_LOCK";
+    public static final String ODE_CLUSTER_PROCESS_INSTANCE_LOCK  = "ODE_PROCESS_INSTANCE_LOCK ";
+    public static final String ODE_CLUSTER_DEPLOYMENT_TOPIC = "ODE_DEPLOYMENT_TOPIC";
+
+    private HazelcastConstants() {
+    }
+}
\ No newline at end of file
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java
new file mode 100644
index 0000000..b753305
--- /dev/null
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.clustering.hazelcast;
+
+import com.hazelcast.core.IMap;
+import org.apache.ode.bpel.clapi.ClusterLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class HazelcastDeploymentLock implements ClusterLock<String>{
+    private static final Log __log = LogFactory.getLog(HazelcastDeploymentLock.class);
+
+    private IMap<String, String> _lock_map;
+
+    public void setLockMap(IMap<String, String> lock_map) {
+        _lock_map = lock_map;
+    }
+
+    public void putIfAbsent(String key, String keyVal) {
+        _lock_map.putIfAbsent(key, keyVal);
+    }
+
+    public void lock(String key) {
+        _lock_map.lock(key);
+        if (__log.isDebugEnabled()) {
+            __log.debug("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + true);
+        }
+    }
+
+    public void unlock(String key) {
+        _lock_map.unlock(key);
+        if (__log.isDebugEnabled()) {
+            __log.debug("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after unlocking: " + false);
+        }
+    }
+
+    public boolean tryLock(String key) {
+        boolean state = _lock_map.tryLock(key);
+        if (__log.isDebugEnabled()) {
+            __log.debug("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state);
+        }
+        return state;
+    }
+
+    public boolean tryLock(String key,int time, TimeUnit tu) {
+        // Noting to do here.
+        return false;
+
+    }
+
+    public void lock(String key,int time, TimeUnit tu) {
+        // Noting to do here.
+    }
+}
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java
new file mode 100644
index 0000000..8ac11f8
--- /dev/null
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.clustering.hazelcast;
+
+import com.hazelcast.core.IMap;
+import org.apache.ode.bpel.clapi.ClusterLock;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class HazelcastInstanceLock implements ClusterLock<Long> {
+    private static final Log __log = LogFactory.getLog(HazelcastInstanceLock.class);
+
+    private IMap<Long, Long> _lock_map;
+
+    public void setLockMap(IMap<Long, Long> lock_map) {
+        _lock_map = lock_map;
+    }
+
+    public void putIfAbsent(Long key, Long keyVal) {
+        _lock_map.putIfAbsent(key, keyVal);
+    }
+
+    public void lock(Long key) {
+        // Noting to do here.
+    }
+
+    public void lock(Long iid, int time, TimeUnit tu) throws InterruptedException,TimeoutException {
+        if (iid == null) {
+            if (__log.isDebugEnabled())
+                __log.debug(" Instance Id null at lock[]");
+            return;
+        }
+
+        String thrd = Thread.currentThread().toString();
+
+        if (__log.isDebugEnabled())
+            __log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu + ")");
+
+        putIfAbsent(iid, iid);
+
+        if (!_lock_map.tryLock(iid, time, tu)) {
+
+            if (__log.isDebugEnabled())
+                __log.debug(thrd + ": lock(iid=" + iid + ", " +
+                        "time=" + time + tu + ")-->TIMEOUT");
+            throw new TimeoutException();
+        }
+
+    }
+
+    public void unlock(Long iid) {
+        if (iid == null) {
+            if (__log.isDebugEnabled())
+                __log.debug(" unlock, instance id is null");
+            return;
+        }
+
+        String thrd = Thread.currentThread().toString();
+
+        _lock_map.unlock(iid);
+
+        if (__log.isDebugEnabled())
+            __log.debug(thrd + " unlock(iid=" + iid + ")");
+    }
+
+    public boolean tryLock(Long key) {
+        // Noting to do here.
+        return false;
+    }
+
+    public boolean tryLock(Long key, int time, TimeUnit tu) {
+        // Noting to do here.
+        return false;
+    }
+}
diff --git a/dependencies.rb b/dependencies.rb
index 0857cf1..f469c02 100644
--- a/dependencies.rb
+++ b/dependencies.rb
@@ -75,6 +75,7 @@
   :transaction      =>"org.apache.geronimo.components:geronimo-transaction:jar:2.0.1",
   :connector        =>"org.apache.geronimo.components:geronimo-connector:jar:2.0.1"
 )
+HAZELCAST           ="com.hazelcast:hazelcast:jar:3.4.2"
 HIBERNATE           = [ "org.hibernate:hibernate-core:jar:3.3.2.GA", "javassist:javassist:jar:3.9.0.GA", "antlr:antlr:jar:2.7.6",
                         "asm:asm:jar:3.3.1", "cglib:cglib:jar:2.2", "net.sf.ehcache:ehcache:jar:1.2.3" ]
 HSQLDB              = "org.hsqldb:hsqldb:jar:2.3.3"
diff --git a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
index 40fb044..c885d13 100644
--- a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
+++ b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
@@ -19,18 +19,6 @@
 
 package org.apache.ode.jbi;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.util.concurrent.Executors;
-
-import javax.jbi.JBIException;
-import javax.jbi.component.ComponentContext;
-import javax.jbi.component.ComponentLifeCycle;
-import javax.jbi.component.ServiceUnitManager;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.transaction.TransactionManager;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
@@ -50,6 +38,17 @@
 import org.apache.ode.utils.GUID;
 import org.apache.ode.utils.fs.TempFileManager;
 
+import javax.jbi.JBIException;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.component.ComponentLifeCycle;
+import javax.jbi.component.ServiceUnitManager;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.transaction.TransactionManager;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.concurrent.Executors;
+
 /**
  * This class implements ComponentLifeCycle. The JBI framework will start this engine class automatically when JBI framework starts
  * up.
diff --git a/pom.xml b/pom.xml
index c43c2d0..f36373c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,6 +165,7 @@
         <module>jbi-karaf-pmapi-httpbinding</module>
         <module>axis2-war</module>
         <module>bpel-itest</module>
+        <module>clustering</module>
     </modules>
 
     <build>
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index a56b86e..517045d 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -19,33 +19,20 @@
 
 package org.apache.ode.scheduler.simple;
 
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ClusterMemberListener;
 import org.apache.ode.bpel.iapi.ContextException;
 import org.apache.ode.bpel.iapi.Scheduler;
 
+import javax.transaction.*;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * A reliable and relatively simple scheduler that uses a database to persist information about
  * scheduled tasks.
@@ -66,7 +53,7 @@
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
  *
  */
-public class SimpleScheduler implements Scheduler, TaskRunner {
+public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberListener {
     private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
 
     private static final int DEFAULT_TRANSACTION_TIMEOUT = 60 * 1000;
@@ -114,11 +101,18 @@
 
     private DatabaseDelegate _db;
 
+    private boolean _isClusterEnabled;
+
+    private ClusterManager _clusterManager;
+
     /** All the nodes we know about */
     private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
 
+    /** All the stale nodes */
+    private CopyOnWriteArraySet<String> _staleNodes = new CopyOnWriteArraySet<String>();
+
     /** When we last heard from our nodes. */
-    private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
+    //private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
 
     /** Set of outstanding jobs, i.e., jobs that have been enqueued but not dequeued or dispatched yet.
         Used to avoid cases where a job would be dispatched twice if the server is under high load and
@@ -148,8 +142,13 @@
     private DateFormat debugDateFormatter = new SimpleDateFormat("HH:mm:ss,SSS");
 
     public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
+       this(nodeId,del,conf,false);
+    }
+
+    public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf, boolean clusterState) {
         _nodeId = nodeId;
         _db = del;
+        _isClusterEnabled = clusterState;
         _todoLimit = getIntProperty(conf, "ode.scheduler.queueLength", _todoLimit);
         _immediateInterval = getLongProperty(conf, "ode.scheduler.immediateInterval", _immediateInterval);
         _nearFutureInterval = getLongProperty(conf, "ode.scheduler.nearFutureInterval", _nearFutureInterval);
@@ -183,6 +182,10 @@
         _nodeId = nodeId;
     }
 
+    public void setClusterManager(ClusterManager cm) {
+        _clusterManager = cm;
+    }
+
     public void setStaleInterval(long staleInterval) {
         _staleInterval = staleInterval;
     }
@@ -466,34 +469,21 @@
         _outstandingJobs.clear();
 
         _knownNodes.clear();
-
-        try {
-            execTransaction(new Callable<Void>() {
-
-                public Void call() throws Exception {
-                    _knownNodes.addAll(_db.getNodeIds());
-                    return null;
-                }
-
-            });
-        } catch (Exception ex) {
-            __log.error("Error retrieving node list.", ex);
-            throw new ContextException("Error retrieving node list.", ex);
-        }
+        _staleNodes.clear();
 
         long now = System.currentTimeMillis();
 
         // Pretend we got a heartbeat...
-        for (String s : _knownNodes) _lastHeartBeat.put(s, now);
+        //for (String s : _knownNodes) _lastHeartBeat.put(s, now);
 
         // schedule immediate job loading for now!
         _todo.enqueue(new LoadImmediateTask(now));
 
-        // schedule check for stale nodes, make it random so that the nodes don't overlap.
-        _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+        if(!_isClusterEnabled) enqueueTasksReadnodeIds(now);
 
-        // do the upgrade sometime (random) in the immediate interval.
-        _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+        else {
+            if (_clusterManager.isMaster()) enqueueTasksReadnodeIds(now);
+        }
 
         _todo.start();
         _running = true;
@@ -521,6 +511,47 @@
         _running = false;
     }
 
+    public void memberAdded(String nodeId) {
+        _knownNodes.add(nodeId);
+    }
+
+    public void memberRemoved(String nodeId) {
+        _staleNodes.add(nodeId);
+    }
+
+    // Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified.
+    public void memberElectedAsMaster(String masterId) {
+        long now = System.currentTimeMillis();
+        enqueueTasksReadnodeIds(now);
+    }
+
+    private void enqueueTasksReadnodeIds(long now) {
+        try {
+            execTransaction(new Callable<Void>() {
+
+                public Void call() throws Exception {
+                    _knownNodes.addAll(_db.getNodeIds());
+                    return null;
+                }
+
+            });
+        } catch (Exception ex) {
+            __log.error("Error retrieving node list.", ex);
+            throw new ContextException("Error retrieving node list.", ex);
+        }
+
+        //make double sure all the active nodes are included into _knownNodes
+        if(_isClusterEnabled) _knownNodes.addAll(_clusterManager.getActiveNodes());
+
+        else _knownNodes.add(_nodeId);
+
+        // schedule check for stale nodes, make it random so that the nodes don't overlap.
+        _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+
+        // do the upgrade sometime (random) in the immediate interval.
+        _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+    }
+
     class RunJob implements Callable<Void> {
         final Job job;
         final JobProcessor processor;
@@ -697,10 +728,10 @@
         if (nodeId == null)
             return;
 
-        if (_nodeId.equals(nodeId))
-            return;
+        /*if (_nodeId.equals(nodeId))
+            return;*/
 
-        _lastHeartBeat.put(nodeId, System.currentTimeMillis());
+        //_lastHeartBeat.put(nodeId, System.currentTimeMillis());
         _knownNodes.add(nodeId);
     }
 
@@ -780,10 +811,21 @@
 
     boolean doUpgrade() {
         __log.debug("UPGRADE started");
-        final ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
-        // Don't forget about self.
-        knownNodes.add(_nodeId);
-        Collections.sort(knownNodes);
+        final ArrayList<String> activeNodes;
+
+        // for cluster mode
+        if (_isClusterEnabled) {
+            if (_clusterManager.isMaster()) {
+                activeNodes = (ArrayList) _clusterManager.getActiveNodes();
+            } else activeNodes = null;
+        }
+        //for standalone ODE deployments
+        else {
+            activeNodes = new ArrayList<String>();
+            activeNodes.add(_nodeId);
+        }
+
+        Collections.sort(activeNodes);
 
         // We're going to try to upgrade near future jobs using the db only.
         // We assume that the distribution of the trailing digits in the
@@ -795,9 +837,9 @@
             return execTransaction(new Callable<Boolean>() {
 
                 public Boolean call() throws Exception {
-                    int numNodes = knownNodes.size();
+                    int numNodes = activeNodes.size();
                     for (int i = 0; i < numNodes; ++i) {
-                        String node = knownNodes.get(i);
+                        String node = activeNodes.get(i);
                         _db.updateAssignToNode(node, i, numNodes, maxtime);
                     }
                     return true;
@@ -833,10 +875,15 @@
                 __log.debug("reassigned " + numrows + " jobs to self. ");
             }
 
+            if(_isClusterEnabled) _staleNodes.remove(nodeId);
+
+            // If the stale node id is in _knownNodes, remove it.
+            _knownNodes.remove(nodeId);
+
             // We can now forget about this node, if we see it again, it will be
             // "new to us"
-            _knownNodes.remove(nodeId);
-            _lastHeartBeat.remove(nodeId);
+            //_knownNodes.remove(nodeId);
+            //_lastHeartBeat.remove(nodeId);
 
             // Force a load-immediate to catch anything new from the recovered node.
             doLoadImmediate();
@@ -911,7 +958,7 @@
 
             boolean success = false;
             try {
-                success = doUpgrade();
+              success = doUpgrade();
             } finally {
                 long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000);
                 _nextUpgrade.set(future);
@@ -934,14 +981,40 @@
         public void run() {
             _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
             __log.debug("CHECK STALE NODES started");
-            for (String nodeId : _knownNodes) {
+
+            ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
+
+            // for cluster mode
+            if (_isClusterEnabled) {
+                if (_clusterManager.isMaster()) {
+                    ArrayList<String> memberList = (ArrayList) _clusterManager.getActiveNodes();
+
+                    //find stale nodes
+                    knownNodes.removeAll(memberList);
+                    if (knownNodes.size() != 0) {
+                        for (String nodeId : knownNodes) {
+                            _staleNodes.add(nodeId);
+                        }
+                    }
+                    for (String nodeId : _staleNodes) {
+                        recoverStaleNode(nodeId);
+                    }
+                }
+            }
+            // for standalone ode node
+            else {
+                for (String nodeId : knownNodes) {
+                    if (!_nodeId.equals(nodeId)) recoverStaleNode(nodeId);
+                }
+            }
+            /*for (String nodeId : _knownNodes) {
                 Long lastSeen = _lastHeartBeat.get(nodeId);
                 if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
-                    && !_nodeId.equals(nodeId))
+                        && !_nodeId.equals(nodeId))
                 {
                     recoverStaleNode(nodeId);
                 }
-            }
+            }*/
         }
     }
 
diff --git a/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java b/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
index 4c89ae9..10e86fc 100644
--- a/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
+++ b/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
@@ -19,7 +19,15 @@
 
 package org.apache.ode.scheduler.simple;
 
-import java.util.*;
+import junit.framework.Assert;
+import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
+import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
+import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 import javax.transaction.RollbackException;
 import javax.transaction.Status;
@@ -27,18 +35,9 @@
 import javax.transaction.SystemException;
 import javax.transaction.TransactionManager;
 
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
-import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
-import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
-import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Properties;
 
 public class SimpleSchedulerTest extends Assert implements JobProcessor {
 
@@ -210,10 +209,10 @@
         _scheduler.setImmediateInterval(1000);
         _scheduler.setStaleInterval(1000);
         _scheduler.start();
-        for (int i = 0; i < 40; ++i) {
-            _scheduler.updateHeartBeat("n1");
+        /*for (int i = 0; i < 40; ++i) {
+           _scheduler.updateHeartBeat("n1");
             Thread.sleep(100);
-        }
+        }*/
 
         _scheduler.stop();
         Thread.sleep(1000);