Merge from fork branch sudharma/ODECluster
diff --git a/Rakefile b/Rakefile
index a8c5f06..ac9b74e 100644
--- a/Rakefile
+++ b/Rakefile
@@ -85,7 +85,7 @@
desc "ODE Axis Integration Layer"
define "axis2" do
compile.with projects("bpel-api", "bpel-connector", "bpel-dao", "bpel-epr", "bpel-runtime",
- "scheduler-simple", "bpel-schemas", "bpel-store", "utils", "agents"),
+ "scheduler-simple", "bpel-schemas", "bpel-store", "utils", "agents", "clustering"),
AXIOM, AXIS2_ALL, COMMONS.lang, COMMONS.collections, COMMONS.httpclient, COMMONS.lang,
DERBY, GERONIMO.kernel, GERONIMO.transaction, JAVAX.activation, JAVAX.servlet, JAVAX.stream,
JAVAX.transaction, JENCKS, WSDL4J, WS_COMMONS, XMLBEANS, AXIS2_MODULES.libs, SLF4J, LOG4J
@@ -101,12 +101,12 @@
libs = projects("axis2", "bpel-api", "bpel-compiler", "bpel-connector", "bpel-dao",
"bpel-epr", "bpel-nobj", "bpel-ql", "bpel-runtime", "scheduler-simple",
"bpel-schemas", "bpel-store", "dao-hibernate", "jca-ra", "jca-server",
- "utils", "dao-jpa", "agents"),
+ "utils", "dao-jpa", "agents", "clustering"),
AXIS2_ALL, ANNONGEN, BACKPORT, COMMONS.codec, COMMONS.collections, COMMONS.fileupload, COMMONS.io, COMMONS.httpclient, COMMONS.beanutils,
COMMONS.lang, COMMONS.pool, DERBY, DERBY_TOOLS, JACOB, JAXEN, JAVAX.activation, JAVAX.ejb, JAVAX.javamail,
JAVAX.connector, JAVAX.jms, JAVAX.persistence, JAVAX.transaction, JAVAX.stream, JIBX,
GERONIMO.connector, GERONIMO.kernel, GERONIMO.transaction, LOG4J, OPENJPA, SAXON, TRANQL,
- WOODSTOX, WSDL4J, WS_COMMONS, XALAN, XERCES, XMLBEANS, SPRING, AXIS2_MODULES.libs, SLF4J, LOG4J
+ WOODSTOX, WSDL4J, WS_COMMONS, XALAN, XERCES, XMLBEANS, SPRING, AXIS2_MODULES.libs, SLF4J, LOG4J, HAZELCAST
package(:war).with(:libs=>libs).path("WEB-INF").tap do |web_inf|
web_inf.merge project("dao-jpa-ojpa-derby").package(:zip)
@@ -208,6 +208,12 @@
package :jar
end
+ desc "ODE Clustering"
+ define "clustering" do
+ compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging
+ package :jar
+ end
+
desc "ODE BPEL Object Model"
define "bpel-obj" do
compile.with project("utils"), SAXON, WSDL4J, COMMONS.collections
@@ -233,12 +239,12 @@
desc "ODE Runtime Engine"
define "bpel-runtime" do
+
compile.with projects("bpel-api", "bpel-compiler", "bpel-dao", "bpel-epr", "bpel-nobj", "bpel-schemas",
- "bpel-store", "utils", "agents"),
+ "bpel-store", "utils", "agents","clustering"),
COMMONS.collections, COMMONS.httpclient, JACOB, JAVAX.persistence, JAVAX.stream, JAXEN, SAXON, WSDL4J, XMLBEANS, SPRING, SLF4J, LOG4J,
JACKSON, JAVAX.connector
-
test.with projects("scheduler-simple", "dao-jpa", "dao-hibernate", "bpel-epr", "bpel-obj"),
# BACKPORT, COMMONS.pool, COMMONS.lang, COMMONS.io, DERBY, JAVAX.connector, JAVAX.transaction,
GERONIMO.transaction, GERONIMO.kernel, GERONIMO.connector, TRANQL, HSQLDB, JAVAX.ejb, JAVAX.transaction,
diff --git a/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml b/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml
new file mode 100644
index 0000000..bf1e99e
--- /dev/null
+++ b/axis2-war/src/main/webapp/WEB-INF/conf/hazelcast.xml
@@ -0,0 +1,63 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.4.xsd"
+ xmlns="http://www.hazelcast.com/schema/config"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+ <network>
+ <port auto-increment="true" port-count="100">5701</port>
+ <outbound-ports>
+ <ports>0</ports>
+ </outbound-ports>
+ <reuse-address>false</reuse-address>
+ <join>
+ <multicast enabled="false">
+ <multicast-group>224.2.2.3</multicast-group>
+ <multicast-port>54327</multicast-port>
+ </multicast>
+ <tcp-ip enabled="true">
+ <member>127.0.0.1:5701</member>
+ <member>127.0.0.1:5702</member>
+ </tcp-ip>
+ <aws enabled="false">
+ <access-key>my-access-key</access-key>
+ <secret-key>my-secret-key</secret-key>
+ <region>us-west-1</region>
+ <host-header>ec2.amazonaws.com</host-header>
+ <security-group-name>hazelcast-sg</security-group-name>
+ <tag-key>type</tag-key>
+ <tag-value>hz-nodes</tag-value>
+ <multicast enabled="false">
+ <multicast-group>224.2.2.3</multicast-group>
+ <multicast-port>54327</multicast-port>
+ </multicast>
+ </aws>
+ </join>
+ <interfaces enabled="false">
+ <interface>10.10.1.*</interface>
+ </interfaces>
+ <ssl enabled="false" />
+ <socket-interceptor enabled="false" />
+ </network>
+ <partition-group enabled="false"/>
+ <map name="ODE_DEPLOYMENT_LOCK"></map>
+ <map name="ODE_PROCESS_INSTANCE_LOCK"></map>
+ <topic name="ODE_DEPLOYMENT_TOPIC"></topic>
+</hazelcast>
+
+
+
diff --git a/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties b/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
index 253037c..03ac79c 100644
--- a/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
+++ b/axis2-war/src/main/webapp/WEB-INF/conf/ode-axis2.properties
@@ -94,4 +94,10 @@
## Event listeners
#ode-axis2.event.listeners=
-#ode-axis2.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener
\ No newline at end of file
+#ode-axis2.event.listeners=org.apache.ode.bpel.common.evt.DebugBpelEventListener
+
+## Enable clustering
+#ode-axis2.clustering.enabled=true
+
+## Clustering Implementation class.
+#ode-axis2.clustering.impl.class = org.apache.ode.clustering.hazelcast.HazelcastClusterImpl
diff --git a/axis2/src/main/java/org/apache/ode/axis2/Messages.java b/axis2/src/main/java/org/apache/ode/axis2/Messages.java
index a95c30d..0581c72 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/Messages.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/Messages.java
@@ -58,6 +58,10 @@
return format("Starting ODE ServiceEngine.");
}
+ public String msgOdeClusteringNotInitialized() {
+ return format("Clustering has not been initialized.");
+ }
+
public String msgOdeStarted() {
return format("ODE Service Engine has been started.");
}
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index 7cbf142..0a13c4a 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -19,57 +19,27 @@
package org.apache.ode.axis2;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.StringTokenizer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
-import javax.servlet.ServletConfig;
-import javax.servlet.ServletException;
-import javax.sql.DataSource;
-import javax.transaction.HeuristicMixedException;
-import javax.transaction.HeuristicRollbackException;
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.NotSupportedException;
-import javax.transaction.RollbackException;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import javax.transaction.xa.XAResource;
-
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-import org.apache.commons.httpclient.util.IdleConnectionTimeoutThread;
import org.apache.commons.httpclient.params.HttpConnectionManagerParams;
+import org.apache.commons.httpclient.util.IdleConnectionTimeoutThread;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.axis2.deploy.DeploymentPoller;
import org.apache.ode.axis2.service.DeploymentWebService;
import org.apache.ode.axis2.service.ManagementService;
import org.apache.ode.axis2.util.ClusterUrlTransformer;
+import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ClusterMemberListener;
+import org.apache.ode.bpel.clapi.ClusterProcessStore;
import org.apache.ode.bpel.connector.BpelServerConnector;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
import org.apache.ode.bpel.engine.CountLRUDehydrationPolicy;
import org.apache.ode.bpel.engine.cron.CronScheduler;
import org.apache.ode.bpel.extvar.jdbc.JdbcExternalVariableModule;
-import org.apache.ode.bpel.iapi.BpelEventListener;
-import org.apache.ode.bpel.iapi.EndpointReferenceContext;
-import org.apache.ode.bpel.iapi.ProcessConf;
-import org.apache.ode.bpel.iapi.ProcessStoreEvent;
-import org.apache.ode.bpel.iapi.ProcessStoreListener;
-import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.*;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
import org.apache.ode.bpel.pmapi.InstanceManagement;
@@ -78,10 +48,25 @@
import org.apache.ode.il.dbutil.Database;
import org.apache.ode.scheduler.simple.JdbcDelegate;
import org.apache.ode.scheduler.simple.SimpleScheduler;
+import org.apache.ode.store.ClusterProcessStoreImpl;
import org.apache.ode.store.ProcessStoreImpl;
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.fs.TempFileManager;
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.sql.DataSource;
+import javax.transaction.*;
+import javax.transaction.xa.XAResource;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
/**
* Server class called by our Axis hooks to handle all ODE lifecycle management.
*
@@ -120,6 +105,8 @@
protected Database _db;
+ protected ClusterManager _clusterManager;
+
private DeploymentPoller _poller;
private BpelServerConnector _connector;
@@ -133,6 +120,8 @@
public Runnable txMgrCreatedCallback;
+ private boolean clusteringEnabled = false;
+
public void init(ServletConfig config, ConfigurationContext configContext) throws ServletException {
init(config.getServletContext().getRealPath("/WEB-INF"), configContext);
}
@@ -184,6 +173,12 @@
if (txMgrCreatedCallback != null) {
txMgrCreatedCallback.run();
}
+
+ clusteringEnabled = _odeConfig.isClusteringEnabled();
+ if (clusteringEnabled) {
+ initClustering();
+ } else __log.info(__msgs.msgOdeClusteringNotInitialized());
+
__log.debug("Creating data source.");
initDataSource();
__log.debug("Starting DAO.");
@@ -202,6 +197,12 @@
registerExternalVariableModules();
_store.loadAll();
+ if (_clusterManager != null) {
+ _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler);
+ _clusterManager.setClusterProcessStore((ClusterProcessStore) _store);
+ _clusterManager.init(_configRoot);
+ ((SimpleScheduler)_scheduler).setNodeId(_clusterManager.getNodeID());
+ }
try {
_bpelServer.start();
@@ -221,7 +222,7 @@
try {
__log.debug("Initializing Deployment Web Service");
- new DeploymentWebService().enableService(_configContext.getAxisConfiguration(), _store, _poller, _appRoot.getAbsolutePath(), _workRoot.getAbsolutePath());
+ new DeploymentWebService().enableService(_configContext.getAxisConfiguration(), _store, _poller, _appRoot.getAbsolutePath(), _workRoot.getAbsolutePath(),this);
} catch (Exception e) {
throw new ServletException(e);
}
@@ -371,6 +372,17 @@
_txMgr = null;
}
+ if (_clusterManager != null) {
+ try {
+ __log.debug("shutting down cluster manager.");
+ _clusterManager.shutdown();
+ _clusterManager = null;
+ } catch (Exception ex) {
+ __log.debug("Cluster manager shutdown failed.", ex);
+
+ }
+ }
+
if (_connector != null) {
try {
__log.debug("shutdown BpelConnector");
@@ -455,6 +467,24 @@
}
}
+ public boolean isClusteringEnabled() {
+ return clusteringEnabled;
+ }
+
+ /**
+ * Initialize the clustering if it is enabled
+ */
+ private void initClustering() {
+ String clusterImplName = _odeConfig.getClusteringImplClass();
+ try {
+ Class<?> clusterImplClass = this.getClass().getClassLoader().loadClass(clusterImplName);
+ _clusterManager = (ClusterManager) clusterImplClass.newInstance();
+ } catch (Exception ex) {
+ __log.error("Error while loading class : " + clusterImplName, ex);
+ }
+ }
+
+
/**
* Initialize the DAO.
*
@@ -477,18 +507,24 @@
_store.registerListener(new ProcessStoreListenerImpl());
_store.setDeployDir(
_odeConfig.getDeployDir() != null ?
- new File(_odeConfig.getDeployDir()) :
- new File(_workRoot, "processes"));
+ new File(_odeConfig.getDeployDir()) :
+ new File(_workRoot, "processes"));
_store.setConfigDir(_configRoot);
}
protected ProcessStoreImpl createProcessStore(EndpointReferenceContext eprContext, DataSource ds) {
- return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false);
+ if (clusteringEnabled)
+ return new ClusterProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false, _clusterManager);
+ else return new ProcessStoreImpl(eprContext, ds, _odeConfig.getDAOConnectionFactory(), _odeConfig, false);
}
protected Scheduler createScheduler() {
- SimpleScheduler scheduler = new SimpleScheduler(new GUID().toString(),
- new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
+ SimpleScheduler scheduler;
+ if (clusteringEnabled) {
+ scheduler = new SimpleScheduler(_clusterManager.getNodeID(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), clusteringEnabled);
+ scheduler.setClusterManager(_clusterManager);
+ } else
+ scheduler = new SimpleScheduler(new GUID().toString(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
scheduler.setExecutorService(_executorService);
scheduler.setTransactionManager(_txMgr);
return scheduler;
@@ -533,6 +569,7 @@
_bpelServer.setCronScheduler(_cronScheduler);
_bpelServer.setDaoConnectionFactory(_daoCF);
+ _bpelServer.setClusterManagerImpl(_clusterManager);
_bpelServer.setInMemDaoConnectionFactory(new BpelDAOConnectionFactoryImpl(_scheduler, _odeConfig.getInMemMexTtl()));
_bpelServer.setEndpointReferenceContext(eprContext);
_bpelServer.setMessageExchangeContext(new MessageExchangeContextImpl(this));
diff --git a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
index 6f82385..169ca4f 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/deploy/DeploymentPoller.java
@@ -41,6 +41,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.axis2.ODEServer;
+import org.apache.ode.bpel.clapi.ClusterLock;
import org.apache.ode.bpel.engine.cron.CronScheduler;
import org.apache.ode.bpel.engine.cron.SystemSchedulesConfig;
import org.apache.ode.utils.WatchDog;
@@ -48,7 +49,6 @@
import javax.xml.namespace.QName;
import java.io.File;
import java.io.FileFilter;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
@@ -74,6 +74,8 @@
private SystemSchedulesConfig _systemSchedulesConf;
+ private boolean clusterEnabled;
+
@SuppressWarnings("unchecked")
private Map<String, WatchDog> dDWatchDogsByPath = new HashMap<String, WatchDog>();
@SuppressWarnings("unchecked")
@@ -96,6 +98,7 @@
public DeploymentPoller(File deployDir, final ODEServer odeServer) {
_odeServer = odeServer;
_deployDir = deployDir;
+ clusterEnabled = _odeServer.isClusteringEnabled();
if (!_deployDir.exists()) {
boolean isDeployDirCreated = _deployDir.mkdir();
if (!isDeployDirCreated) {
@@ -130,50 +133,66 @@
@SuppressWarnings("unchecked")
private void check() {
File[] files = _deployDir.listFiles(_fileFilter);
+ boolean duLocked;
// Checking for new deployment directories
if (isDeploymentFromODEFileSystemAllowed() && files != null) {
for (File file : files) {
- File deployXml = new File(file, "deploy.xml");
- File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
-
- if (!deployXml.exists()) {
- // Skip if deploy.xml is abset
- if (__log.isDebugEnabled()) {
- __log.debug("Not deploying " + file + " (missing deploy.xml)");
- }
+ String duName = file.getName();
+ if (__log.isDebugEnabled()) {
+ __log.debug("Trying to acquire the lock for " + duName);
}
+ duLocked = pollerTryLock(duName);
- WatchDog ddWatchDog = ensureDeployXmlWatchDog(file, deployXml);
+ if (duLocked) {
+ try {
+ File deployXml = new File(file, "deploy.xml");
+ File deployedMarker = new File(_deployDir, file.getName() + ".deployed");
- if (deployedMarker.exists()) {
- checkDeployXmlWatchDog(ddWatchDog);
- continue;
- }
+ if (!deployXml.exists()) {
+ // Skip if deploy.xml is abset
+ if (__log.isDebugEnabled()) {
+ __log.debug("Not deploying " + file + " (missing deploy.xml)");
+ }
+ }
- try {
- boolean isCreated = deployedMarker.createNewFile();
- if (!isCreated) {
- __log.error("Error while creating file "
+ WatchDog ddWatchDog = ensureDeployXmlWatchDog(file, deployXml);
+
+ if (deployedMarker.exists()) {
+ checkDeployXmlWatchDog(ddWatchDog);
+ continue;
+ }
+
+ try {
+ boolean isCreated = deployedMarker.createNewFile();
+ if (!isCreated) {
+ __log.error("Error while creating file "
+ file.getName()
+ ".deployed ,deployment could be inconsistent");
+ }
+ } catch (IOException e1) {
+ __log.error("Error creating deployed marker file, " + file + " will not be deployed");
+ continue;
+ }
+
+ try {
+ _odeServer.getProcessStore().undeploy(file);
+ } catch (Exception ex) {
+ __log.error("Error undeploying " + file.getName());
+ }
+
+ try {
+ Collection<QName> deployed = _odeServer.getProcessStore().deploy(file);
+ __log.info("Deployment of artifact " + file.getName() + " successful: " + deployed);
+ } catch (Exception e) {
+ __log.error("Deployment of " + file.getName() + " failed, aborting for now.", e);
+ }
+ } finally {
+ if (__log.isDebugEnabled()) {
+ __log.debug("Trying to release the lock for " + file.getName());
+ }
+ unlock(file.getName());
}
- } catch (IOException e1) {
- __log.error("Error creating deployed marker file, " + file + " will not be deployed");
- continue;
- }
-
- try {
- _odeServer.getProcessStore().undeploy(file);
- } catch (Exception ex) {
- __log.error("Error undeploying " + file.getName());
- }
-
- try {
- Collection<QName> deployed = _odeServer.getProcessStore().deploy(file);
- __log.info("Deployment of artifact " + file.getName() + " successful: " + deployed );
- } catch (Exception e) {
- __log.error("Deployment of " + file.getName() + " failed, aborting for now.", e);
}
}
}
@@ -184,16 +203,33 @@
String pkg = file.getName().substring(0, file.getName().length() - ".deployed".length());
File deployDir = new File(_deployDir, pkg);
if (!deployDir.exists()) {
- Collection<QName> undeployed = _odeServer.getProcessStore().undeploy(deployDir);
- boolean isDeleted = file.delete();
- if (!isDeleted) {
- __log.error("Error while deleting file "
+ String duName = deployDir.getName();
+
+ if (__log.isDebugEnabled()) {
+ __log.debug("Trying to acquire the lock for " + duName);
+ }
+
+ duLocked = pollerTryLock(duName);
+
+ if (duLocked) {
+ try {
+ Collection<QName> undeployed = _odeServer.getProcessStore().undeploy(deployDir);
+ boolean isDeleted = file.delete();
+ if (!isDeleted) {
+ __log.error("Error while deleting file "
+ file.getName()
+ ".deployed , please check if file is locked or if it really exist");
+ }
+ disposeDeployXmlWatchDog(deployDir);
+ if (undeployed.size() > 0)
+ __log.info("Successfully undeployed " + pkg);
+ } finally {
+ if (__log.isDebugEnabled()) {
+ __log.debug("Trying to release the lock for " + duName);
+ }
+ unlock(duName);
+ }
}
- disposeDeployXmlWatchDog(deployDir);
- if (undeployed.size() > 0)
- __log.info("Successfully undeployed " + pkg);
}
}
@@ -324,4 +360,23 @@
_odeServer.getProcessStore().refreshSchedules(deploymentPakage);
}
}
+
+ /**
+ * Use to acquire the lock by poller
+ */
+ private boolean pollerTryLock(String key) {
+ if(clusterEnabled) {
+ ClusterLock clusterLock = _odeServer.getBpelServer().getContexts().clusterManager.getDeploymentLock();
+ clusterLock.putIfAbsent(key,key);
+ return clusterLock.tryLock(key);
+ }
+ else return true;
+ }
+
+ private boolean unlock(String key) {
+ if (clusterEnabled) {
+ _odeServer.getBpelServer().getContexts().clusterManager.getDeploymentLock().unlock(key);
+ }
+ return true;
+ }
}
diff --git a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
index aa769ed..01e003b 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/service/DeploymentWebService.java
@@ -20,24 +20,6 @@
package org.apache.ode.axis2.service;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Collection;
-import java.util.List;
-import java.util.zip.ZipEntry;
-import java.util.zip.ZipInputStream;
-
-import javax.activation.DataHandler;
-import javax.wsdl.Definition;
-import javax.wsdl.WSDLException;
-import javax.wsdl.factory.WSDLFactory;
-import javax.wsdl.xml.WSDLReader;
-import javax.xml.namespace.QName;
-
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMNamespace;
@@ -47,23 +29,35 @@
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.description.AxisService;
-import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.receivers.AbstractMessageReceiver;
import org.apache.axis2.util.Utils;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.lang.StringUtils;
+import org.apache.ode.axis2.ODEServer;
import org.apache.ode.axis2.OdeFault;
import org.apache.ode.axis2.deploy.DeploymentPoller;
import org.apache.ode.axis2.hooks.ODEAxisService;
-import org.apache.ode.bpel.iapi.BpelServer;
+import org.apache.ode.bpel.clapi.ClusterLock;
import org.apache.ode.bpel.iapi.ProcessConf;
import org.apache.ode.bpel.iapi.ProcessStore;
import org.apache.ode.il.OMUtils;
-import org.apache.ode.utils.fs.FileUtils;
import org.apache.ode.utils.Namespaces;
+import org.apache.ode.utils.fs.FileUtils;
+
+import javax.activation.DataHandler;
+import javax.wsdl.Definition;
+import javax.wsdl.WSDLException;
+import javax.wsdl.factory.WSDLFactory;
+import javax.wsdl.xml.WSDLReader;
+import javax.xml.namespace.QName;
+import java.io.*;
+import java.util.Collection;
+import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
/**
* Axis wrapper for process deployment.
@@ -76,9 +70,11 @@
private final OMNamespace _deployapi;
private File _deployPath;
+ private ODEServer _odeServer;
private DeploymentPoller _poller;
private ProcessStore _store;
+ private boolean clusterEnabled;
public DeploymentWebService() {
_pmapi = OMAbstractFactory.getOMFactory().createOMNamespace("http://www.apache.org/ode/pmapi","pmapi");
@@ -86,10 +82,12 @@
}
public void enableService(AxisConfiguration axisConfig, ProcessStore store,
- DeploymentPoller poller, String rootpath, String workPath) throws AxisFault, WSDLException {
+ DeploymentPoller poller, String rootpath, String workPath, ODEServer odeServer) throws AxisFault, WSDLException {
_deployPath = new File(workPath, "processes");
_store = store;
_poller = poller;
+ _odeServer = odeServer;
+ clusterEnabled = _odeServer.isClusteringEnabled();
Definition def;
WSDLReader wsdlReader = WSDLFactory.newInstance().newWSDLReader();
@@ -109,6 +107,7 @@
String operation = messageContext.getAxisOperation().getName().getLocalPart();
SOAPFactory factory = getSOAPFactory(messageContext);
boolean unknown = false;
+ boolean duLocked;
try {
if (operation.equals("deploy")) {
@@ -166,43 +165,56 @@
_poller.hold();
File dest = new File(_deployPath, bundleName + "-" + _store.getCurrentVersion());
- boolean createDir = dest.mkdir();
- if(!createDir){
- throw new OdeFault("Error while creating file " + dest.getName());
+ __log.info("Trying to acquire the lock for deploying: " + dest.getName());
+
+ //lock on deployment unit directory name
+ duLocked = lock(dest.getName());
+
+ if (duLocked) {
+ boolean createDir = dest.mkdir();
+ if (!createDir) {
+ throw new OdeFault("Error while creating file " + dest.getName());
+ }
+ try {
+ unzip(dest, (DataHandler) binaryNode.getDataHandler());
+
+ // Check that we have a deploy.xml
+ File deployXml = new File(dest, "deploy.xml");
+ if (!deployXml.exists())
+ throw new OdeFault("The deployment doesn't appear to contain a deployment " +
+ "descriptor in its root directory named deploy.xml, aborting.");
+
+ Collection<QName> deployed = _store.deploy(dest);
+
+ File deployedMarker = new File(_deployPath, dest.getName() + ".deployed");
+ if (!deployedMarker.createNewFile()) {
+ throw new OdeFault("Error while creating file " + deployedMarker.getName() + "deployment failed");
+ }
+
+ // Telling the poller what we deployed so that it doesn't try to deploy it again
+ _poller.markAsDeployed(dest);
+ __log.info("Deployment of artifact " + dest.getName() + " successful.");
+
+
+ OMElement response = factory.createOMElement("response", null);
+
+ if (__log.isDebugEnabled()) __log.debug("Deployed package: " + dest.getName());
+ OMElement d = factory.createOMElement("name", _deployapi);
+ d.setText(dest.getName());
+ response.addChild(d);
+
+ for (QName pid : deployed) {
+ if (__log.isDebugEnabled()) __log.debug("Deployed PID: " + pid);
+ d = factory.createOMElement("id", _deployapi);
+ d.setText(pid);
+ response.addChild(d);
+ }
+ sendResponse(factory, messageContext, "deployResponse", response);
+ } finally {
+ __log.info("Trying to release the lock for deploying: " + dest.getName());
+ unlock(dest.getName());
+ }
}
- unzip(dest, (DataHandler) binaryNode.getDataHandler());
-
- // Check that we have a deploy.xml
- File deployXml = new File(dest, "deploy.xml");
- if (!deployXml.exists())
- throw new OdeFault("The deployment doesn't appear to contain a deployment " +
- "descriptor in its root directory named deploy.xml, aborting.");
-
- Collection<QName> deployed = _store.deploy(dest);
-
- File deployedMarker = new File(_deployPath, dest.getName() + ".deployed");
- if(!deployedMarker.createNewFile()) {
- throw new OdeFault("Error while creating file " + deployedMarker.getName() + "deployment failed");
- }
-
- // Telling the poller what we deployed so that it doesn't try to deploy it again
- _poller.markAsDeployed(dest);
- __log.info("Deployment of artifact " + dest.getName() + " successful.");
-
- OMElement response = factory.createOMElement("response", null);
-
- if (__log.isDebugEnabled()) __log.debug("Deployed package: "+dest.getName());
- OMElement d = factory.createOMElement("name", _deployapi);
- d.setText(dest.getName());
- response.addChild(d);
-
- for (QName pid : deployed) {
- if (__log.isDebugEnabled()) __log.debug("Deployed PID: "+pid);
- d = factory.createOMElement("id", _deployapi);
- d.setText(pid);
- response.addChild(d);
- }
- sendResponse(factory, messageContext, "deployResponse", response);
} finally {
_poller.release();
}
@@ -224,20 +236,30 @@
// Put the poller on hold to avoid undesired side effects
_poller.hold();
- Collection<QName> undeployed = _store.undeploy(deploymentDir);
+ __log.info("Trying to acquire the lock for undeploying: " + deploymentDir.getName());
+ duLocked = lock(deploymentDir.getName());
- File deployedMarker = new File(deploymentDir + ".deployed");
- boolean isDeleted = deployedMarker.delete();
+ if (duLocked) {
+ try {
+ Collection<QName> undeployed = _store.undeploy(deploymentDir);
- if (!isDeleted)
- __log.error("Error while deleting file " + deployedMarker.getName());
+ File deployedMarker = new File(deploymentDir + ".deployed");
+ boolean isDeleted = deployedMarker.delete();
- FileUtils.deepDelete(deploymentDir);
+ if (!isDeleted)
+ __log.error("Error while deleting file " + deployedMarker.getName());
- OMElement response = factory.createOMElement("response", null);
- response.setText("" + (undeployed.size() > 0));
- sendResponse(factory, messageContext, "undeployResponse", response);
- _poller.markAsUndeployed(deploymentDir);
+ FileUtils.deepDelete(deploymentDir);
+
+ OMElement response = factory.createOMElement("response", null);
+ response.setText("" + (undeployed.size() > 0));
+ sendResponse(factory, messageContext, "undeployResponse", response);
+ _poller.markAsUndeployed(deploymentDir);
+ } finally {
+ __log.info("Trying to release the lock for undeploying: " + deploymentDir.getName());
+ unlock(deploymentDir.getName());
+ }
+ }
} finally {
_poller.release();
}
@@ -352,6 +374,25 @@
out.close();
}
+ /**
+ * Acquire the lock when deploying using web service
+ */
+ private boolean lock(String key) {
+ if(clusterEnabled) {
+ ClusterLock clusterLock = _odeServer.getBpelServer().getContexts().clusterManager.getDeploymentLock();
+ clusterLock.putIfAbsent(key,key);
+ clusterLock.lock(key);
+ }
+ return true;
+ }
-
+ /**
+ * Release the lock after completing deploy process
+ */
+ private boolean unlock(String key) {
+ if(clusterEnabled) {
+ _odeServer.getBpelServer().getContexts().clusterManager.getDeploymentLock().unlock(key);
+ }
+ return true;
+ }
}
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterLock.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterLock.java
new file mode 100644
index 0000000..9c9683f
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterLock.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.util.concurrent.TimeUnit;
+
+public interface ClusterLock<E> {
+ /**
+ * Acquire the lock for specified key
+ *
+ * @param key
+ * @return
+ */
+ void lock(E key);
+
+ /**
+ * Acquire the lock for specified key and time period
+ *
+ *
+ * @param key
+ * @return
+ */
+ void lock(E key,int time,TimeUnit tu) throws InterruptedException, TimeoutException;
+
+ /**
+ * Release the lock acquired for specified key
+ *
+ * @param key
+ * @return
+ */
+ void unlock(E key);
+
+ /**
+ * Tries to acquire the lock for the specified key
+ * @param key
+ * @return
+ */
+ boolean tryLock(E key);
+
+ /**
+ * Tries to acquire the lock for the specified key and time period.
+ * @param key
+ * @param time
+ * @param tu
+ * @return
+ */
+ boolean tryLock(E key, int time, TimeUnit tu);
+
+ /**
+ * Check whether the map has a value for given key, if absent put the value to map
+ * @param key
+ * @param keyVal
+ */
+ void putIfAbsent(E key, E keyVal);
+
+ /** Exception class indicating a time-out occured while obtaining a lock. */
+ public static final class TimeoutException extends Exception {
+ private static final long serialVersionUID = 7247629086692580285L;
+ }
+}
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
new file mode 100644
index 0000000..5a2e0f9
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.io.File;
+import java.util.List;
+
+public interface ClusterManager {
+
+ /**
+ * Initialization of the cluster
+ * @param file
+ */
+ void init(File file);
+
+ /**
+ * shutdown the cluster instance
+ */
+ void shutdown();
+
+ /**
+ * Return whether the local member is Master or not
+ * @return
+ */
+ boolean isMaster();
+
+ /**
+ * Set the Process Store object which uses for clustering
+ * @param ps
+ */
+ void setClusterProcessStore(ClusterProcessStore ps);
+
+ /**
+ * Publish Deploy event to the cluster by deploy initiator
+ * @param clusterEvent
+ */
+ void publishProcessStoreClusterEvent(ProcessStoreClusterEvent clusterEvent);
+
+ /**
+ * Register the cluster for message listener
+ */
+ void registerClusterProcessStoreMessageListener(ProcessStoreClusterListener listener);
+ /**
+ * Register Scheduler as ClusterMemberListener
+ * @param listener
+ */
+ void registerClusterMemberListener(ClusterMemberListener listener);
+
+ /**
+ * Return deployment lock for cluster
+ */
+ ClusterLock getDeploymentLock();
+
+ /**
+ * Return instance lock for cluster
+ */
+ ClusterLock getInstanceLock();
+
+ /**
+ * Return active node list in the cluster
+ */
+ List<String> getActiveNodes();
+
+ /**
+ * Return local member's uuid in the cluster
+ */
+ String getNodeID();
+}
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
new file mode 100644
index 0000000..541ab9c
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+public interface ClusterMemberListener {
+
+ void memberAdded(String nodeId);
+
+ void memberRemoved(String nodeId);
+
+ void memberElectedAsMaster(String masterId);
+
+}
\ No newline at end of file
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterProcessStore.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterProcessStore.java
new file mode 100644
index 0000000..ca792bc
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterProcessStore.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.util.Collection;
+
+import javax.xml.namespace.QName;
+
+import org.apache.ode.bpel.iapi.ProcessStore;
+
+public interface ClusterProcessStore extends ProcessStore {
+ public void deployProcesses(String duName);
+
+ public Collection<QName> undeployProcesses(String duName);
+}
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
new file mode 100644
index 0000000..79d9a78
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterEvent.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+import java.io.Serializable;
+
+public abstract class ProcessStoreClusterEvent implements Serializable {
+ protected static final long serialVersionUID = 1L;
+
+ private String deploymentUnit;
+
+ protected String info ;
+
+ /** Unique ID of the Node in the Cluster generating the Event */
+ private String eventGeneratingNode;
+
+ public ProcessStoreClusterEvent(String deploymentUnit) {
+ this.deploymentUnit = deploymentUnit;
+ }
+
+ @Override
+ public String toString() {
+ return "{ProcessStoreClusterEvent#" + deploymentUnit +"}";
+ }
+
+ public void setEventGeneratingNode(String uuid) {
+ this.eventGeneratingNode = uuid;
+ }
+
+ public String getEventGeneratingNode() {
+ return eventGeneratingNode;
+ }
+
+ public String getDuName() {
+ return deploymentUnit;
+ }
+
+ public String getInfo() {
+ return info;
+ }
+
+}
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java
new file mode 100644
index 0000000..26f42cf
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreClusterListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ode.bpel.clapi;
+
+public interface ProcessStoreClusterListener {
+ public void onProcessStoreClusterEvent(ProcessStoreClusterEvent message);
+}
\ No newline at end of file
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java
new file mode 100644
index 0000000..c7cbb23
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreDeployedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+public class ProcessStoreDeployedEvent extends ProcessStoreClusterEvent {
+
+ public ProcessStoreDeployedEvent(String deploymentUnit) {
+ super(deploymentUnit);
+ info = "Deploy Event";
+ }
+
+ @Override
+ public String toString() {
+ return "{ProcessStoreDeployedEvent#" + getDuName() +"}";
+ }
+}
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java
new file mode 100644
index 0000000..d4f5a49
--- /dev/null
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ProcessStoreUndeployedEvent.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.bpel.clapi;
+
+public class ProcessStoreUndeployedEvent extends ProcessStoreClusterEvent {
+
+ public ProcessStoreUndeployedEvent(String deploymentUnit) {
+ super(deploymentUnit);
+ info = "Undeploy Event";
+ }
+
+ @Override
+ public String toString() {
+ return "{ProcessStoreUndeployedEvent#" + getDuName() +"}";
+ }
+}
diff --git a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
index 16f1e62..cc519bd 100644
--- a/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
+++ b/bpel-epr/src/main/java/org/apache/ode/il/config/OdeConfigProperties.java
@@ -19,6 +19,10 @@
package org.apache.ode.il.config;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.utils.SystemUtils;
+
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
@@ -26,10 +30,6 @@
import java.util.Map;
import java.util.Properties;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ode.utils.SystemUtils;
-
/**
* Configuration object used for configuring the intergration layer. The propereties are those likely to be common to all layers.
*
@@ -106,6 +106,12 @@
public static final String DEFAULT_TX_FACTORY_CLASS_NAME = "org.apache.ode.il.EmbeddedGeronimoFactory";
+ public static final String PROP_CLUSTERING_ENABLED = "clustering.enabled";
+
+ public static final String PROP_CLUSTERING_IMPL_CLASS = "clustering.impl.class";
+
+ public static final String DEFAULT_CLUSTERING_IMPL_CLASS_NAME = "org.apache.ode.clustering.hazelcast.HazelcastClusterImpl";
+
private File _cfgFile;
private String _prefix;
@@ -289,6 +295,14 @@
return getProperty(OdeConfigProperties.PROP_DEPLOY_DIR);
}
+ public boolean isClusteringEnabled() {
+ return Boolean.valueOf(getProperty(OdeConfigProperties.PROP_CLUSTERING_ENABLED, "false"));
+ }
+
+ public String getClusteringImplClass() {
+ return getProperty(OdeConfigProperties.PROP_CLUSTERING_IMPL_CLASS, DEFAULT_CLUSTERING_IMPL_CLASS_NAME);
+ }
+
public String getTxFactoryClass() {
return getProperty(OdeConfigProperties.PROP_TX_FACTORY_CLASS, DEFAULT_TX_FACTORY_CLASS_NAME);
}
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
index ded83ff..b0473bc 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelEngineImpl.java
@@ -19,21 +19,9 @@
package org.apache.ode.bpel.engine;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-
-import javax.wsdl.Operation;
-import javax.wsdl.PortType;
-import javax.xml.namespace.QName;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterLock;
import org.apache.ode.bpel.dao.MessageExchangeDAO;
import org.apache.ode.bpel.dao.ProcessDAO;
import org.apache.ode.bpel.dao.ProcessInstanceDAO;
@@ -54,7 +42,6 @@
import org.apache.ode.bpel.iapi.MessageExchange.Status;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
import org.apache.ode.bpel.iapi.Scheduler.JobDetails;
-import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobType;
import org.apache.ode.bpel.intercept.InterceptorInvoker;
import org.apache.ode.bpel.intercept.MessageExchangeInterceptor;
@@ -70,6 +57,17 @@
import org.w3c.dom.Document;
import org.w3c.dom.Element;
+import javax.wsdl.Operation;
+import javax.wsdl.PortType;
+import javax.xml.namespace.QName;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
/**
* Implementation of the {@link BpelEngine} interface: provides the server methods that should be invoked in the context of a
* transaction.
@@ -115,7 +113,7 @@
private SharedEndpoints _sharedEps;
/** Manage instance-level locks. */
- private final InstanceLockManager _instanceLockManager = new InstanceLockManager();
+ private final ClusterLock<Long> _instanceLockManager;
final Contexts _contexts;
@@ -124,8 +122,13 @@
public BpelEngineImpl(Contexts contexts) {
_contexts = contexts;
+ if(_contexts.clusterManager != null) {
+ _instanceLockManager = _contexts.clusterManager.getInstanceLock();
+ }
+ else _instanceLockManager = new InstanceLockManager();
_sharedEps = new SharedEndpoints();
_sharedEps.init();
+
}
public SharedEndpoints getSharedEndpoints() {
@@ -429,7 +432,8 @@
// Note that we don't want to wait too long here to get our lock, since we are likely holding
// on to scheduler's locks of various sorts.
try {
- _instanceLockManager.lock(iid, 1, TimeUnit.MICROSECONDS);
+ _instanceLockManager.
+ lock(iid, 1, TimeUnit.MICROSECONDS);
_contexts.scheduler.registerSynchronizer(new Scheduler.Synchronizer() {
public void afterCompletion(boolean success) {
_instanceLockManager.unlock(iid);
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
index bc7fe29..2334262 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/BpelServerImpl.java
@@ -35,6 +35,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterManager;
import org.apache.ode.bpel.dao.BpelDAOConnection;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.dao.DeferredProcessInstanceCleanable;
@@ -534,6 +535,10 @@
_contexts.bindingContext = bc;
}
+ public void setClusterManagerImpl(ClusterManager cm) {
+ _contexts.clusterManager = cm;
+ }
+
public DebuggerContext getDebugger(QName pid) throws BpelEngineException {
return _engine._activeProcesses.get(pid)._debugger;
}
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
index 9fa3258..a965d58 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/Contexts.java
@@ -19,6 +19,7 @@
package org.apache.ode.bpel.engine;
+import org.apache.ode.bpel.clapi.ClusterManager;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.iapi.BindingContext;
import org.apache.ode.bpel.iapi.BpelEventListener;
@@ -46,6 +47,8 @@
public CronScheduler cronScheduler;
+ public ClusterManager clusterManager;
+
EndpointReferenceContext eprContext;
BindingContext bindingContext;
diff --git a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
index e49e8e9..1571eac 100644
--- a/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
+++ b/bpel-runtime/src/main/java/org/apache/ode/bpel/engine/InstanceLockManager.java
@@ -20,6 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterLock;
import java.util.HashMap;
import java.util.Map;
@@ -36,11 +37,15 @@
*
* @author Maciej Szefler - m s z e f l e r @ g m a i l . c o m
*/
-public class InstanceLockManager {
+public class InstanceLockManager implements ClusterLock<Long> {
private static final Log __log = LogFactory.getLog(InstanceLockManager.class);
private final Lock _mutex = new java.util.concurrent.locks.ReentrantLock();
- private final Map<Long, InstanceInfo> _locks = new HashMap<Long,InstanceInfo> ();
+ private final Map<Long, InstanceInfo> _locks = new HashMap<Long,InstanceInfo>();
+
+ public void lock(Long key) {
+ // Noting to do here.
+ }
public void lock(Long iid, int time, TimeUnit tu) throws InterruptedException, TimeoutException {
if (iid == null) return;
@@ -105,6 +110,20 @@
}
+ public boolean tryLock(Long key) {
+ // Noting to do here.
+ return false;
+ }
+
+ public boolean tryLock(Long key, int time, TimeUnit tu) {
+ // Noting to do here.
+ return false;
+ }
+
+ public void putIfAbsent(Long key, Long keyVal) {
+ // Noting to do here.
+ }
+
@Override
public String toString() {
@@ -135,9 +154,6 @@
return "{Lock for Instance #" + iid +", acquired by " + acquierer + "}";
}
}
-
- /** Exception class indicating a time-out occured while obtaining a lock. */
- public static final class TimeoutException extends Exception {
- private static final long serialVersionUID = 7247629086692580285L;
- }
}
+
+
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
new file mode 100644
index 0000000..d701e23
--- /dev/null
+++ b/bpel-store/src/main/java/org/apache/ode/store/ClusterProcessStoreImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.store;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ClusterProcessStore;
+import org.apache.ode.bpel.clapi.ProcessStoreDeployedEvent;
+import org.apache.ode.bpel.clapi.ProcessStoreUndeployedEvent;
+import org.apache.ode.bpel.iapi.ContextException;
+import org.apache.ode.bpel.iapi.ProcessState;
+import org.apache.ode.bpel.iapi.EndpointReferenceContext;
+import org.apache.ode.il.config.OdeConfigProperties;
+
+import javax.sql.DataSource;
+import javax.xml.namespace.QName;
+
+import java.io.File;
+import java.util.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class ClusterProcessStoreImpl extends ProcessStoreImpl implements ClusterProcessStore {
+ private static final Log __log = LogFactory.getLog(ClusterProcessStoreImpl.class);
+
+ private ClusterManager _clusterManager;
+ private ProcessStoreDeployedEvent deployedEvent;
+ private ProcessStoreUndeployedEvent undeployedEvent;
+
+ public ClusterProcessStoreImpl(EndpointReferenceContext eprContext, DataSource ds, String persistenceType, OdeConfigProperties props, boolean createDatamodel, ClusterManager clusterManager) {
+ super(eprContext,ds,persistenceType,props,createDatamodel);
+ _clusterManager = clusterManager;
+ }
+
+ public Collection<QName> deploy(final File deploymentUnitDirectory) {
+ Collection<QName> deployed = super.deploy(deploymentUnitDirectory);
+ publishProcessStoreDeployedEvent(deploymentUnitDirectory.getName());
+ return deployed;
+ }
+
+ private void publishProcessStoreDeployedEvent(String duName){
+ deployedEvent = new ProcessStoreDeployedEvent(duName);
+ _clusterManager.publishProcessStoreClusterEvent(deployedEvent);
+ __log.info("Completed actual deployment for " +duName +" by " +deployedEvent.getEventGeneratingNode());
+ }
+
+ public void deployProcesses(final String duName) {
+ final ArrayList<ProcessConfImpl> confs = new ArrayList<ProcessConfImpl>();
+ ProcessState state = ProcessState.ACTIVE;
+
+ Pattern duNamePattern = getPreviousPackageVersionPattern(duName);
+
+ for (String packageName : _deploymentUnits.keySet()) {
+ Matcher matcher = duNamePattern.matcher(packageName);
+ if (matcher.matches()) {
+ DeploymentUnitDir duDir = _deploymentUnits.get(packageName);
+ if (duDir == null) throw new ContextException("Could not find package " + packageName);
+ for (QName processName : duDir.getProcessNames()) {
+ QName pid = toPid(processName, duDir.getVersion());
+ ProcessConfImpl pconf = _processes.get(pid);
+ if (pconf.getState().equals(state)) {
+ pconf.setState(ProcessState.RETIRED);
+ __log.info("Set state of " + pconf.getProcessId() + "to " + pconf.getState());
+ confs.add(pconf);
+ }
+ }
+ }
+ }
+
+ try {
+ exec(new Callable<Object>() {
+ public Object call(ConfStoreConnection conn) {
+ DeploymentUnitDAO dudao = conn.getDeploymentUnit(duName);
+ if (dudao != null) {
+ List<ProcessConfImpl> load = load(dudao);
+ __log.info("Loading DU from store: " + duName);
+ confs.addAll(load);
+ }
+ return null;
+ }
+ });
+ } catch (Exception ex) {
+ __log.error("Error loading DU from store: " + duName, ex);
+ }
+
+ for (ProcessConfImpl p : confs) {
+ try {
+ __log.info("Fire event of " + p.getProcessId() + " " + p.getState());
+ fireStateChange(p.getProcessId(), p.getState(), p.getDeploymentUnit().getName());
+ } catch (Exception except) {
+ __log.error("Error with process retiring or activating : pid=" + p.getProcessId() + " package=" + p.getDeploymentUnit().getName(), except);
+ }
+ }
+ }
+
+
+ public Collection<QName> undeploy(final File dir) {
+ Collection<QName> undeployed = super.undeploy(dir);
+ publishProcessStoreUndeployedEvent(dir.getName());
+ return undeployed;
+ }
+
+ private void publishProcessStoreUndeployedEvent(String duName){
+ undeployedEvent = new ProcessStoreUndeployedEvent(duName);
+ _clusterManager.publishProcessStoreClusterEvent(undeployedEvent);
+ __log.info("Completed actual undeployment for " +duName +" by " +undeployedEvent.getEventGeneratingNode());
+ }
+
+ /**
+ * Use to unregister processes when deployment unit is undeployed
+ * @param duName
+ * @return
+ */
+ public Collection<QName> undeployProcesses(final String duName) {
+ Collection<QName> undeployed = super.undeployProcesses(duName);
+ return undeployed;
+ }
+}
diff --git a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
index ba911a7..05252a1 100644
--- a/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
+++ b/bpel-store/src/main/java/org/apache/ode/store/ProcessStoreImpl.java
@@ -71,9 +71,9 @@
private final CopyOnWriteArrayList<ProcessStoreListener> _listeners = new CopyOnWriteArrayList<ProcessStoreListener>();
- private Map<QName, ProcessConfImpl> _processes = new HashMap<QName, ProcessConfImpl>();
+ protected Map<QName, ProcessConfImpl> _processes = new HashMap<QName, ProcessConfImpl>();
- private Map<String, DeploymentUnitDir> _deploymentUnits = new HashMap<String, DeploymentUnitDir>();
+ protected Map<String, DeploymentUnitDir> _deploymentUnits = new HashMap<String, DeploymentUnitDir>();
/** Guards access to the _processes and _deploymentUnits */
private final ReadWriteLock _rw = new ReentrantReadWriteLock();
@@ -88,6 +88,8 @@
protected File _configDir;
+
+
/**
* Executor used to process DB transactions. Allows us to isolate the TX context, and to ensure that only one TX gets executed a
* time. We don't really care to parallelize these operations because: i) HSQL does not isolate transactions and we don't want
@@ -345,18 +347,9 @@
* "AbsenceRequest-2/AbsenceRequest.ode" and setRetirePackage() will be called accordingly.
*/
private void retirePreviousPackageVersions(DeploymentUnitDir du) {
- //retire all the other versions of the same DU
- String[] nameParts = du.getName().split("/");
- /* Replace the version number (if any) with regexp to match any version number */
- nameParts[0] = nameParts[0].replaceAll("([-\\Q.\\E](\\d)+)?\\z", "");
- nameParts[0] += "([-\\Q.\\E](\\d)+)?";
- StringBuilder duNameRegExp = new StringBuilder(du.getName().length() * 2);
- for (int i = 0, n = nameParts.length; i < n; i++) {
- if (i > 0) duNameRegExp.append("/");
- duNameRegExp.append(nameParts[i]);
- }
- Pattern duNamePattern = Pattern.compile(duNameRegExp.toString());
+ Pattern duNamePattern = getPreviousPackageVersionPattern(du.getName());
+
for (String deployedDUname : _deploymentUnits.keySet()) {
Matcher matcher = duNamePattern.matcher(deployedDUname);
if (matcher.matches()) {
@@ -383,26 +376,7 @@
__log.error("Error synchronizing with data store; " + duName + " may be reappear after restart!");
}
- Collection<QName> undeployed = Collections.emptyList();
- DeploymentUnitDir du;
- _rw.writeLock().lock();
- try {
- du = _deploymentUnits.remove(duName);
- if (du != null) {
- undeployed = toPids(du.getProcessNames(), du.getVersion());
- }
-
- for (QName pn : undeployed) {
- fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.UNDEPLOYED, pn, du.getName()));
- __log.info(__msgs.msgProcessUndeployed(pn));
- }
-
- _processes.keySet().removeAll(undeployed);
- } finally {
- _rw.writeLock().unlock();
- }
-
- return undeployed;
+ return undeployProcesses(duName);
}
public Collection<String> getPackages() {
@@ -612,7 +586,7 @@
psl.onProcessStoreEvent(pse);
}
- private void fireStateChange(QName processId, ProcessState state, String duname) {
+ protected void fireStateChange(QName processId, ProcessState state, String duname) {
switch (state) {
case ACTIVE:
fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.ACTVIATED, processId, duname));
@@ -894,7 +868,7 @@
return result;
}
- private QName toPid(QName processType, long version) {
+ protected QName toPid(QName processType, long version) {
return new QName(processType.getNamespaceURI(), processType.getLocalPart() + "-" + version);
}
@@ -916,4 +890,41 @@
}
}
}
+
+ protected Pattern getPreviousPackageVersionPattern(String duName) {
+ //retire all the other versions of the same DU
+ String[] nameParts = duName.split("/");
+ /* Replace the version number (if any) with regexp to match any version number */
+ nameParts[0] = nameParts[0].replaceAll("([-\\Q.\\E](\\d)+)?\\z", "");
+ nameParts[0] += "([-\\Q.\\E](\\d)+)?";
+ StringBuilder duNameRegExp = new StringBuilder(duName.length() * 2);
+ for (int i = 0, n = nameParts.length; i < n; i++) {
+ if (i > 0) duNameRegExp.append("/");
+ duNameRegExp.append(nameParts[i]);
+ }
+ Pattern duNamePattern = Pattern.compile(duNameRegExp.toString());
+ return duNamePattern;
+ }
+
+ protected Collection<QName> undeployProcesses(final String duName) {
+ Collection<QName> undeployed = Collections.emptyList();
+ DeploymentUnitDir du;
+ _rw.writeLock().lock();
+ try {
+ du = _deploymentUnits.remove(duName);
+ if (du != null) {
+ undeployed = toPids(du.getProcessNames(), du.getVersion());
+ }
+
+ for (QName pn : undeployed) {
+ fireEvent(new ProcessStoreEvent(ProcessStoreEvent.Type.UNDEPLOYED, pn, du.getName()));
+ __log.info(__msgs.msgProcessUndeployed(pn));
+ }
+
+ _processes.keySet().removeAll(undeployed);
+ } finally {
+ _rw.writeLock().unlock();
+ }
+ return undeployed;
+ }
}
diff --git a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
index d24b59b..00bdf7d 100644
--- a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
+++ b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
@@ -18,44 +18,14 @@
*/
package org.apache.ode.test;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.sql.Connection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.persistence.EntityManager;
-import javax.persistence.EntityManagerFactory;
-import javax.sql.DataSource;
-import javax.transaction.TransactionManager;
-import javax.xml.namespace.QName;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.common.evt.DebugBpelEventListener;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
-import org.apache.ode.bpel.iapi.Message;
-import org.apache.ode.bpel.iapi.MessageExchange;
+import org.apache.ode.bpel.iapi.*;
import org.apache.ode.bpel.iapi.MessageExchange.Status;
-import org.apache.ode.bpel.iapi.MyRoleMessageExchange;
import org.apache.ode.bpel.iapi.MyRoleMessageExchange.CorrelationStatus;
-import org.apache.ode.bpel.iapi.ProcessStore;
-import org.apache.ode.bpel.iapi.ProcessStoreEvent;
-import org.apache.ode.bpel.iapi.ProcessStoreListener;
import org.apache.ode.bpel.memdao.BpelDAOConnectionFactoryImpl;
import org.apache.ode.il.EmbeddedGeronimoFactory;
import org.apache.ode.il.config.OdeConfigProperties;
@@ -71,6 +41,25 @@
import org.junit.Before;
import org.w3c.dom.Element;
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.sql.DataSource;
+import javax.transaction.TransactionManager;
+import javax.xml.namespace.QName;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.*;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
public abstract class BPELTestAbstract {
private static final Log log = LogFactory.getLog(BPELTestAbstract.class);
public static final long WAIT_BEFORE_INVOKE_TIMEOUT = 2000;
diff --git a/clustering/pom.xml b/clustering/pom.xml
new file mode 100644
index 0000000..d0e4b7c
--- /dev/null
+++ b/clustering/pom.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0"?>
+<!--
+~ Licensed to the Apache Software Foundation (ASF) under one
+~ or more contributor license agreements. See the NOTICE file
+~ distributed with this work for additional information
+~ regarding copyright ownership. The ASF licenses this file
+~ to you under the Apache License, Version 2.0 (the
+~ "License"); you may not use this file except in compliance
+~ with the License. You may obtain a copy of the License at
+~
+~ http://www.apache.org/licenses/LICENSE-2.0
+~
+~ Unless required by applicable law or agreed to in writing,
+~ software distributed under the License is distributed on an
+~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+~ KIND, either express or implied. See the License for the
+~ specific language governing permissions and limitations
+~ under the License.
+-->
+<project>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode-clustering</artifactId>
+ <name>ODE :: Clustering</name>
+ <parent>
+ <groupId>org.apache.ode</groupId>
+ <artifactId>ode</artifactId>
+ <version>1.4-SNAPSHOT</version>
+ </parent>
+</project>
\ No newline at end of file
diff --git a/clustering/src/main/java/org/apache/ode/clustering/Test.java b/clustering/src/main/java/org/apache/ode/clustering/Test.java
new file mode 100644
index 0000000..0e25733
--- /dev/null
+++ b/clustering/src/main/java/org/apache/ode/clustering/Test.java
@@ -0,0 +1,4 @@
+package org.apache.ode.clustering;
+
+public class Test {
+}
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
new file mode 100644
index 0000000..4c5cad5
--- /dev/null
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.clustering.hazelcast;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.config.FileSystemXmlConfig;
+import com.hazelcast.config.ListenerConfig;
+import com.hazelcast.config.TopicConfig;
+import com.hazelcast.core.Hazelcast;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.core.IMap;
+import com.hazelcast.core.ITopic;
+import com.hazelcast.core.Member;
+import com.hazelcast.core.MemberAttributeEvent;
+import com.hazelcast.core.MembershipEvent;
+import com.hazelcast.core.MembershipListener;
+import com.hazelcast.core.Message;
+import com.hazelcast.core.MessageListener;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.ode.bpel.clapi.*;
+
+/**
+ * This class implements necessary methods to build the cluster using hazelcast
+ */
+public class HazelcastClusterImpl implements ClusterManager, ProcessStoreClusterListener {
+ private static final Log __log = LogFactory.getLog(HazelcastClusterImpl.class);
+
+ private HazelcastInstance _hazelcastInstance;
+ private boolean isMaster = false;
+ private String nodeHostName;
+ private String nodeID;
+ private IMap<String, String> deployment_lock_map;
+ private IMap<Long, Long> instance_lock_map;
+ private ITopic<ProcessStoreClusterEvent> clusterDeploymentMessageTopic;
+ private ClusterProcessStore _clusterProcessStore;
+ private HazelcastDeploymentLock hazelcastDeploymentLock;
+ private HazelcastInstanceLock hazelcastInstanceLock;
+ private ClusterDeploymentMessageListener clusterDeploymentMessageListener;
+ private ClusterMemberShipListener clusterMemberShipListener;
+ private List<ClusterMemberListener> clusterMemberListenerList = null;
+
+ public HazelcastClusterImpl() {
+ clusterMemberShipListener = new ClusterMemberShipListener();
+ clusterDeploymentMessageListener = new ClusterDeploymentMessageListener();
+ clusterDeploymentMessageListener.registerClusterProcessStoreListener((ProcessStoreClusterListener)this);
+ hazelcastDeploymentLock = new HazelcastDeploymentLock();
+ hazelcastInstanceLock = new HazelcastInstanceLock();
+ }
+
+
+ public void init(File configRoot) {
+
+ /*First,looks for the hazelcast.config system property. If it is set, its value is used as the path.
+ Else it will load the hazelcast.xml file using FileSystemXmlConfig()*/
+
+ String hzConfig = System.getProperty("hazelcast.config");
+ if (hzConfig != null) _hazelcastInstance = Hazelcast.newHazelcastInstance();
+ else {
+ File hzXml = new File(configRoot, "hazelcast.xml");
+ if (!hzXml.isFile())
+ __log.error("hazelcast.xml does not exist or is not a file");
+ else
+ try {
+ Config config = loadConfig(hzXml);
+ _hazelcastInstance = Hazelcast.newHazelcastInstance(config);
+ } catch (FileNotFoundException fnf) {
+ __log.error(fnf);
+ }
+ }
+
+ if (_hazelcastInstance != null) {
+ // Registering this node in the cluster.
+ //_hazelcastInstance.getCluster().addMembershipListener(new ClusterMemberShipListener());
+ Member localMember = _hazelcastInstance.getCluster().getLocalMember();
+ nodeHostName = localMember.getSocketAddress().getHostName() + ":" + localMember.getSocketAddress().getPort();
+ nodeID = localMember.getUuid();
+ __log.info("Registering HZ localMember:" + nodeHostName);
+
+ deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK);
+ instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK);
+ clusterDeploymentMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC);
+
+ hazelcastDeploymentLock.setLockMap(deployment_lock_map);
+ hazelcastInstanceLock.setLockMap(instance_lock_map);
+ markAsMaster();
+ }
+ }
+
+ protected Config loadConfig(File hazelcastConfigFile) throws FileNotFoundException {
+ Config config = new FileSystemXmlConfig(hazelcastConfigFile);
+
+ //add Cluster membership listener
+ ListenerConfig clusterMemberShipListenerConfig = new ListenerConfig();
+ clusterMemberShipListenerConfig.setImplementation(clusterMemberShipListener);
+ config.addListenerConfig(clusterMemberShipListenerConfig);
+
+ //set topic message listener
+ ListenerConfig topicListenerConfig = new ListenerConfig();
+ topicListenerConfig.setImplementation(clusterDeploymentMessageListener);
+ TopicConfig topicConfig = config.getTopicConfig(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_TOPIC);
+ topicConfig.addMessageListenerConfig(topicListenerConfig);
+
+ return config;
+ }
+
+ class ClusterMemberShipListener implements MembershipListener {
+
+ public ClusterMemberShipListener() {
+ clusterMemberListenerList = new ArrayList<ClusterMemberListener>();
+ }
+
+ public void registerClusterMemberListener(ClusterMemberListener listener) {
+ clusterMemberListenerList.add(listener);
+ }
+
+ @Override
+ public void memberAdded(MembershipEvent membershipEvent) {
+ String eventNodeID = membershipEvent.getMember().getUuid();
+ __log.info("Member Added " + eventNodeID);
+ if (isMaster) {
+ for (ClusterMemberListener listener : clusterMemberListenerList) {
+ listener.memberAdded(eventNodeID);
+ }
+ }
+ }
+
+ @Override
+ public void memberRemoved(MembershipEvent membershipEvent) {
+ String eventNodeID = membershipEvent.getMember().getUuid();
+ __log.info("Member Removed " + eventNodeID);
+ markAsMaster();
+ if (isMaster) {
+ for (ClusterMemberListener listener : clusterMemberListenerList) {
+ listener.memberRemoved(eventNodeID);
+ }
+ }
+ }
+
+ @Override
+ public void memberAttributeChanged(MemberAttributeEvent memberAttributeEvent) {
+ // Noting to do here.
+ }
+ }
+
+ public void publishProcessStoreClusterEvent(ProcessStoreClusterEvent clusterEvent) {
+ clusterEvent.setEventGeneratingNode(nodeID);
+ __log.info("Send " + clusterEvent.getInfo() + " Cluster Message " + "for " + clusterEvent.getDuName() + " [" + nodeHostName + "]");
+ clusterDeploymentMessageTopic.publish(clusterEvent);
+ }
+
+
+ class ClusterDeploymentMessageListener implements MessageListener<ProcessStoreClusterEvent> {
+ List<ProcessStoreClusterListener> clusterProcessStoreListenerList = null;
+
+ public ClusterDeploymentMessageListener() {
+ clusterProcessStoreListenerList = new ArrayList<ProcessStoreClusterListener>();
+ }
+
+ public void registerClusterProcessStoreListener(ProcessStoreClusterListener listener) {
+ clusterProcessStoreListenerList.add(listener);
+ }
+
+ @Override
+ public void onMessage(Message<ProcessStoreClusterEvent> msg) {
+ for (ProcessStoreClusterListener listener : clusterProcessStoreListenerList) {
+ listener.onProcessStoreClusterEvent(msg.getMessageObject());
+ }
+ }
+ }
+
+ public void onProcessStoreClusterEvent(ProcessStoreClusterEvent message) {
+ if (message instanceof ProcessStoreDeployedEvent) {
+ ProcessStoreDeployedEvent event = (ProcessStoreDeployedEvent) message;
+ String eventUuid = event.getEventGeneratingNode();
+ if (!nodeID.equals(eventUuid)) {
+ String duName = event.getDuName();
+ __log.info("Receive " + event.getInfo() + " Cluster Message " + "for " + event.getDuName() + " [" + nodeHostName + "]");
+ _clusterProcessStore.deployProcesses(duName);
+ }
+ }
+
+ else if (message instanceof ProcessStoreUndeployedEvent) {
+ ProcessStoreUndeployedEvent event = (ProcessStoreUndeployedEvent) message;
+ String eventUuid = event.getEventGeneratingNode();
+ if (!nodeID.equals(eventUuid)) {
+ String duName = event.getDuName();
+ __log.info("Receive " + event.getInfo() + " Cluster Message " + "for " + event.getDuName() + " [" + nodeHostName + "]");
+ _clusterProcessStore.undeployProcesses(duName);
+ }
+ }
+
+ }
+
+ private void markAsMaster() {
+ Member member = _hazelcastInstance.getCluster().getMembers().iterator().next();
+ if (member.localMember() && isMaster == false) {
+ isMaster = true;
+ for (ClusterMemberListener listener : clusterMemberListenerList) {
+ listener.memberElectedAsMaster(nodeID);
+ }
+ }
+ __log.info("Master node: " +isMaster);
+ }
+
+ public boolean isMaster() {
+ return isMaster;
+ }
+
+ public String getNodeID() {
+ return nodeID;
+ }
+
+ public void setClusterProcessStore(ClusterProcessStore store) {
+ _clusterProcessStore = store;
+ }
+
+ public void registerClusterProcessStoreMessageListener(ProcessStoreClusterListener listener) {
+ clusterDeploymentMessageListener.registerClusterProcessStoreListener(listener);
+ }
+
+ public void registerClusterMemberListener(ClusterMemberListener listener) {
+ clusterMemberShipListener.registerClusterMemberListener(listener);
+ }
+
+ public void shutdown() {
+ if (_hazelcastInstance != null) _hazelcastInstance.shutdown();
+ }
+
+ public ClusterLock<String> getDeploymentLock(){
+ return (ClusterLock)hazelcastDeploymentLock;
+ }
+
+ public ClusterLock<Long> getInstanceLock(){
+ return (ClusterLock)hazelcastInstanceLock;
+ }
+
+ public List<String> getActiveNodes() {
+ List<String> nodeList = new ArrayList<String>();
+ for(Member m : _hazelcastInstance.getCluster().getMembers())
+ nodeList.add(m.getUuid()) ;
+ return nodeList;
+ }
+}
+
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
new file mode 100644
index 0000000..8f204ff
--- /dev/null
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastConstants.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.clustering.hazelcast;
+
+/**
+ * Constants used in Hazelcast based clustering implementation
+ */
+public final class HazelcastConstants {
+ public static final String ODE_CLUSTER_DEPLOYMENT_LOCK = "ODE_DEPLOYMENT_LOCK";
+ public static final String ODE_CLUSTER_PROCESS_INSTANCE_LOCK = "ODE_PROCESS_INSTANCE_LOCK ";
+ public static final String ODE_CLUSTER_DEPLOYMENT_TOPIC = "ODE_DEPLOYMENT_TOPIC";
+
+ private HazelcastConstants() {
+ }
+}
\ No newline at end of file
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java
new file mode 100644
index 0000000..b753305
--- /dev/null
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastDeploymentLock.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.clustering.hazelcast;
+
+import com.hazelcast.core.IMap;
+import org.apache.ode.bpel.clapi.ClusterLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.concurrent.TimeUnit;
+
+public class HazelcastDeploymentLock implements ClusterLock<String>{
+ private static final Log __log = LogFactory.getLog(HazelcastDeploymentLock.class);
+
+ private IMap<String, String> _lock_map;
+
+ public void setLockMap(IMap<String, String> lock_map) {
+ _lock_map = lock_map;
+ }
+
+ public void putIfAbsent(String key, String keyVal) {
+ _lock_map.putIfAbsent(key, keyVal);
+ }
+
+ public void lock(String key) {
+ _lock_map.lock(key);
+ if (__log.isDebugEnabled()) {
+ __log.debug("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + true);
+ }
+ }
+
+ public void unlock(String key) {
+ _lock_map.unlock(key);
+ if (__log.isDebugEnabled()) {
+ __log.debug("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after unlocking: " + false);
+ }
+ }
+
+ public boolean tryLock(String key) {
+ boolean state = _lock_map.tryLock(key);
+ if (__log.isDebugEnabled()) {
+ __log.debug("ThreadID:" + Thread.currentThread().getId() + " duLocked value for " + key + " file" + " after locking: " + state);
+ }
+ return state;
+ }
+
+ public boolean tryLock(String key,int time, TimeUnit tu) {
+ // Noting to do here.
+ return false;
+
+ }
+
+ public void lock(String key,int time, TimeUnit tu) {
+ // Noting to do here.
+ }
+}
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java
new file mode 100644
index 0000000..8ac11f8
--- /dev/null
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastInstanceLock.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.ode.clustering.hazelcast;
+
+import com.hazelcast.core.IMap;
+import org.apache.ode.bpel.clapi.ClusterLock;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class HazelcastInstanceLock implements ClusterLock<Long> {
+ private static final Log __log = LogFactory.getLog(HazelcastInstanceLock.class);
+
+ private IMap<Long, Long> _lock_map;
+
+ public void setLockMap(IMap<Long, Long> lock_map) {
+ _lock_map = lock_map;
+ }
+
+ public void putIfAbsent(Long key, Long keyVal) {
+ _lock_map.putIfAbsent(key, keyVal);
+ }
+
+ public void lock(Long key) {
+ // Noting to do here.
+ }
+
+ public void lock(Long iid, int time, TimeUnit tu) throws InterruptedException,TimeoutException {
+ if (iid == null) {
+ if (__log.isDebugEnabled())
+ __log.debug(" Instance Id null at lock[]");
+ return;
+ }
+
+ String thrd = Thread.currentThread().toString();
+
+ if (__log.isDebugEnabled())
+ __log.debug(thrd + ": lock(iid=" + iid + ", time=" + time + tu + ")");
+
+ putIfAbsent(iid, iid);
+
+ if (!_lock_map.tryLock(iid, time, tu)) {
+
+ if (__log.isDebugEnabled())
+ __log.debug(thrd + ": lock(iid=" + iid + ", " +
+ "time=" + time + tu + ")-->TIMEOUT");
+ throw new TimeoutException();
+ }
+
+ }
+
+ public void unlock(Long iid) {
+ if (iid == null) {
+ if (__log.isDebugEnabled())
+ __log.debug(" unlock, instance id is null");
+ return;
+ }
+
+ String thrd = Thread.currentThread().toString();
+
+ _lock_map.unlock(iid);
+
+ if (__log.isDebugEnabled())
+ __log.debug(thrd + " unlock(iid=" + iid + ")");
+ }
+
+ public boolean tryLock(Long key) {
+ // Noting to do here.
+ return false;
+ }
+
+ public boolean tryLock(Long key, int time, TimeUnit tu) {
+ // Noting to do here.
+ return false;
+ }
+}
diff --git a/dependencies.rb b/dependencies.rb
index 0857cf1..f469c02 100644
--- a/dependencies.rb
+++ b/dependencies.rb
@@ -75,6 +75,7 @@
:transaction =>"org.apache.geronimo.components:geronimo-transaction:jar:2.0.1",
:connector =>"org.apache.geronimo.components:geronimo-connector:jar:2.0.1"
)
+HAZELCAST ="com.hazelcast:hazelcast:jar:3.4.2"
HIBERNATE = [ "org.hibernate:hibernate-core:jar:3.3.2.GA", "javassist:javassist:jar:3.9.0.GA", "antlr:antlr:jar:2.7.6",
"asm:asm:jar:3.3.1", "cglib:cglib:jar:2.2", "net.sf.ehcache:ehcache:jar:1.2.3" ]
HSQLDB = "org.hsqldb:hsqldb:jar:2.3.3"
diff --git a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
index 40fb044..c885d13 100644
--- a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
+++ b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
@@ -19,18 +19,6 @@
package org.apache.ode.jbi;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.util.concurrent.Executors;
-
-import javax.jbi.JBIException;
-import javax.jbi.component.ComponentContext;
-import javax.jbi.component.ComponentLifeCycle;
-import javax.jbi.component.ServiceUnitManager;
-import javax.management.MBeanServer;
-import javax.management.ObjectName;
-import javax.transaction.TransactionManager;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
@@ -50,6 +38,17 @@
import org.apache.ode.utils.GUID;
import org.apache.ode.utils.fs.TempFileManager;
+import javax.jbi.JBIException;
+import javax.jbi.component.ComponentContext;
+import javax.jbi.component.ComponentLifeCycle;
+import javax.jbi.component.ServiceUnitManager;
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.transaction.TransactionManager;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.concurrent.Executors;
+
/**
* This class implements ComponentLifeCycle. The JBI framework will start this engine class automatically when JBI framework starts
* up.
diff --git a/pom.xml b/pom.xml
index c43c2d0..f36373c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,6 +165,7 @@
<module>jbi-karaf-pmapi-httpbinding</module>
<module>axis2-war</module>
<module>bpel-itest</module>
+ <module>clustering</module>
</modules>
<build>
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index a56b86e..517045d 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -19,33 +19,20 @@
package org.apache.ode.scheduler.simple;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ClusterMemberListener;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Scheduler;
+import javax.transaction.*;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* A reliable and relatively simple scheduler that uses a database to persist information about
* scheduled tasks.
@@ -66,7 +53,7 @@
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*
*/
-public class SimpleScheduler implements Scheduler, TaskRunner {
+public class SimpleScheduler implements Scheduler, TaskRunner, ClusterMemberListener {
private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
private static final int DEFAULT_TRANSACTION_TIMEOUT = 60 * 1000;
@@ -114,11 +101,18 @@
private DatabaseDelegate _db;
+ private boolean _isClusterEnabled;
+
+ private ClusterManager _clusterManager;
+
/** All the nodes we know about */
private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
+ /** All the stale nodes */
+ private CopyOnWriteArraySet<String> _staleNodes = new CopyOnWriteArraySet<String>();
+
/** When we last heard from our nodes. */
- private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
+ //private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
/** Set of outstanding jobs, i.e., jobs that have been enqueued but not dequeued or dispatched yet.
Used to avoid cases where a job would be dispatched twice if the server is under high load and
@@ -148,8 +142,13 @@
private DateFormat debugDateFormatter = new SimpleDateFormat("HH:mm:ss,SSS");
public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
+ this(nodeId,del,conf,false);
+ }
+
+ public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf, boolean clusterState) {
_nodeId = nodeId;
_db = del;
+ _isClusterEnabled = clusterState;
_todoLimit = getIntProperty(conf, "ode.scheduler.queueLength", _todoLimit);
_immediateInterval = getLongProperty(conf, "ode.scheduler.immediateInterval", _immediateInterval);
_nearFutureInterval = getLongProperty(conf, "ode.scheduler.nearFutureInterval", _nearFutureInterval);
@@ -183,6 +182,10 @@
_nodeId = nodeId;
}
+ public void setClusterManager(ClusterManager cm) {
+ _clusterManager = cm;
+ }
+
public void setStaleInterval(long staleInterval) {
_staleInterval = staleInterval;
}
@@ -466,34 +469,21 @@
_outstandingJobs.clear();
_knownNodes.clear();
-
- try {
- execTransaction(new Callable<Void>() {
-
- public Void call() throws Exception {
- _knownNodes.addAll(_db.getNodeIds());
- return null;
- }
-
- });
- } catch (Exception ex) {
- __log.error("Error retrieving node list.", ex);
- throw new ContextException("Error retrieving node list.", ex);
- }
+ _staleNodes.clear();
long now = System.currentTimeMillis();
// Pretend we got a heartbeat...
- for (String s : _knownNodes) _lastHeartBeat.put(s, now);
+ //for (String s : _knownNodes) _lastHeartBeat.put(s, now);
// schedule immediate job loading for now!
_todo.enqueue(new LoadImmediateTask(now));
- // schedule check for stale nodes, make it random so that the nodes don't overlap.
- _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+ if(!_isClusterEnabled) enqueueTasksReadnodeIds(now);
- // do the upgrade sometime (random) in the immediate interval.
- _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+ else {
+ if (_clusterManager.isMaster()) enqueueTasksReadnodeIds(now);
+ }
_todo.start();
_running = true;
@@ -521,6 +511,47 @@
_running = false;
}
+ public void memberAdded(String nodeId) {
+ _knownNodes.add(nodeId);
+ }
+
+ public void memberRemoved(String nodeId) {
+ _staleNodes.add(nodeId);
+ }
+
+ // Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified.
+ public void memberElectedAsMaster(String masterId) {
+ long now = System.currentTimeMillis();
+ enqueueTasksReadnodeIds(now);
+ }
+
+ private void enqueueTasksReadnodeIds(long now) {
+ try {
+ execTransaction(new Callable<Void>() {
+
+ public Void call() throws Exception {
+ _knownNodes.addAll(_db.getNodeIds());
+ return null;
+ }
+
+ });
+ } catch (Exception ex) {
+ __log.error("Error retrieving node list.", ex);
+ throw new ContextException("Error retrieving node list.", ex);
+ }
+
+ //make double sure all the active nodes are included into _knownNodes
+ if(_isClusterEnabled) _knownNodes.addAll(_clusterManager.getActiveNodes());
+
+ else _knownNodes.add(_nodeId);
+
+ // schedule check for stale nodes, make it random so that the nodes don't overlap.
+ _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+
+ // do the upgrade sometime (random) in the immediate interval.
+ _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+ }
+
class RunJob implements Callable<Void> {
final Job job;
final JobProcessor processor;
@@ -697,10 +728,10 @@
if (nodeId == null)
return;
- if (_nodeId.equals(nodeId))
- return;
+ /*if (_nodeId.equals(nodeId))
+ return;*/
- _lastHeartBeat.put(nodeId, System.currentTimeMillis());
+ //_lastHeartBeat.put(nodeId, System.currentTimeMillis());
_knownNodes.add(nodeId);
}
@@ -780,10 +811,21 @@
boolean doUpgrade() {
__log.debug("UPGRADE started");
- final ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
- // Don't forget about self.
- knownNodes.add(_nodeId);
- Collections.sort(knownNodes);
+ final ArrayList<String> activeNodes;
+
+ // for cluster mode
+ if (_isClusterEnabled) {
+ if (_clusterManager.isMaster()) {
+ activeNodes = (ArrayList) _clusterManager.getActiveNodes();
+ } else activeNodes = null;
+ }
+ //for standalone ODE deployments
+ else {
+ activeNodes = new ArrayList<String>();
+ activeNodes.add(_nodeId);
+ }
+
+ Collections.sort(activeNodes);
// We're going to try to upgrade near future jobs using the db only.
// We assume that the distribution of the trailing digits in the
@@ -795,9 +837,9 @@
return execTransaction(new Callable<Boolean>() {
public Boolean call() throws Exception {
- int numNodes = knownNodes.size();
+ int numNodes = activeNodes.size();
for (int i = 0; i < numNodes; ++i) {
- String node = knownNodes.get(i);
+ String node = activeNodes.get(i);
_db.updateAssignToNode(node, i, numNodes, maxtime);
}
return true;
@@ -833,10 +875,15 @@
__log.debug("reassigned " + numrows + " jobs to self. ");
}
+ if(_isClusterEnabled) _staleNodes.remove(nodeId);
+
+ // If the stale node id is in _knownNodes, remove it.
+ _knownNodes.remove(nodeId);
+
// We can now forget about this node, if we see it again, it will be
// "new to us"
- _knownNodes.remove(nodeId);
- _lastHeartBeat.remove(nodeId);
+ //_knownNodes.remove(nodeId);
+ //_lastHeartBeat.remove(nodeId);
// Force a load-immediate to catch anything new from the recovered node.
doLoadImmediate();
@@ -911,7 +958,7 @@
boolean success = false;
try {
- success = doUpgrade();
+ success = doUpgrade();
} finally {
long future = System.currentTimeMillis() + (success ? (long) (_nearFutureInterval * .50) : 1000);
_nextUpgrade.set(future);
@@ -934,14 +981,40 @@
public void run() {
_todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
__log.debug("CHECK STALE NODES started");
- for (String nodeId : _knownNodes) {
+
+ ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
+
+ // for cluster mode
+ if (_isClusterEnabled) {
+ if (_clusterManager.isMaster()) {
+ ArrayList<String> memberList = (ArrayList) _clusterManager.getActiveNodes();
+
+ //find stale nodes
+ knownNodes.removeAll(memberList);
+ if (knownNodes.size() != 0) {
+ for (String nodeId : knownNodes) {
+ _staleNodes.add(nodeId);
+ }
+ }
+ for (String nodeId : _staleNodes) {
+ recoverStaleNode(nodeId);
+ }
+ }
+ }
+ // for standalone ode node
+ else {
+ for (String nodeId : knownNodes) {
+ if (!_nodeId.equals(nodeId)) recoverStaleNode(nodeId);
+ }
+ }
+ /*for (String nodeId : _knownNodes) {
Long lastSeen = _lastHeartBeat.get(nodeId);
if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
- && !_nodeId.equals(nodeId))
+ && !_nodeId.equals(nodeId))
{
recoverStaleNode(nodeId);
}
- }
+ }*/
}
}
diff --git a/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java b/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
index 4c89ae9..10e86fc 100644
--- a/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
+++ b/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
@@ -19,7 +19,15 @@
package org.apache.ode.scheduler.simple;
-import java.util.*;
+import junit.framework.Assert;
+import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
+import org.apache.ode.bpel.iapi.Scheduler;
+import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
+import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
+import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
import javax.transaction.RollbackException;
import javax.transaction.Status;
@@ -27,18 +35,9 @@
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
-import junit.framework.Assert;
-import junit.framework.TestCase;
-
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
-import org.apache.ode.bpel.iapi.Scheduler;
-import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
-import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
-import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.Properties;
public class SimpleSchedulerTest extends Assert implements JobProcessor {
@@ -210,10 +209,10 @@
_scheduler.setImmediateInterval(1000);
_scheduler.setStaleInterval(1000);
_scheduler.start();
- for (int i = 0; i < 40; ++i) {
- _scheduler.updateHeartBeat("n1");
+ /*for (int i = 0; i < 40; ++i) {
+ _scheduler.updateHeartBeat("n1");
Thread.sleep(100);
- }
+ }*/
_scheduler.stop();
Thread.sleep(1000);