Cluster Enabled Simple Scheduler-3
diff --git a/Rakefile b/Rakefile
index 5475227..7c0fa67 100644
--- a/Rakefile
+++ b/Rakefile
@@ -208,7 +208,7 @@
desc "ODE Clustering"
define "clustering" do
- compile.with projects("bpel-api","bpel-store","scheduler-simple"),HAZELCAST, COMMONS.logging
+ compile.with projects("bpel-api","bpel-store"),HAZELCAST, COMMONS.logging
package :jar
end
diff --git a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
index 222fedd..b3f5d2f 100644
--- a/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
+++ b/axis2/src/main/java/org/apache/ode/axis2/ODEServer.java
@@ -31,6 +31,7 @@
import org.apache.ode.axis2.service.ManagementService;
import org.apache.ode.axis2.util.ClusterUrlTransformer;
import org.apache.ode.bpel.clapi.ClusterManager;
+import org.apache.ode.bpel.clapi.ClusterMemberListener;
import org.apache.ode.bpel.connector.BpelServerConnector;
import org.apache.ode.bpel.dao.BpelDAOConnectionFactory;
import org.apache.ode.bpel.engine.BpelServerImpl;
@@ -197,7 +198,9 @@
_store.loadAll();
if (_clusterManager != null) {
_clusterManager.registerClusterProcessStoreMessageListener();
- _clusterManager.registerClusterMemberListener(_scheduler);
+ if (_scheduler instanceof SimpleScheduler) {
+ _clusterManager.registerClusterMemberListener((ClusterMemberListener) _scheduler);
+ }
}
try {
@@ -527,10 +530,12 @@
}
protected Scheduler createScheduler() {
- String nodeId;
- if (isClusteringEnabled) nodeId = _clusterManager.getUuid();
- else nodeId = new GUID().toString();
- SimpleScheduler scheduler = new SimpleScheduler(nodeId, new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled);
+ SimpleScheduler scheduler;
+ if (isClusteringEnabled) {
+ scheduler = new SimpleScheduler(_clusterManager.getUuid(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties(), isClusteringEnabled);
+ scheduler.setClusterManager(_clusterManager);
+ } else
+ scheduler = new SimpleScheduler(new GUID().toString(), new JdbcDelegate(_db.getDataSource()), _odeConfig.getProperties());
scheduler.setExecutorService(_executorService);
scheduler.setTransactionManager(_txMgr);
return scheduler;
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
index a00959a..07d3d8d 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterManager.java
@@ -59,9 +59,9 @@
/**
* Register Scheduler as ClusterMemberListener
- * @param scheduler
+ * @param listener
*/
- void registerClusterMemberListener(Object scheduler);
+ void registerClusterMemberListener(ClusterMemberListener listener);
/**
* Return deployment lock for cluster
diff --git a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
index 4225f7d..541ab9c 100644
--- a/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
+++ b/bpel-api/src/main/java/org/apache/ode/bpel/clapi/ClusterMemberListener.java
@@ -24,6 +24,6 @@
void memberRemoved(String nodeId);
- void memberElectedAsMaster();
+ void memberElectedAsMaster(String masterId);
}
\ No newline at end of file
diff --git a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
index cdda50e..00bdf7d 100644
--- a/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
+++ b/bpel-test/src/main/java/org/apache/ode/test/BPELTestAbstract.java
@@ -128,7 +128,7 @@
{
JdbcDelegate del = new JdbcDelegate(_dataSource);
- scheduler = new SimpleScheduler("node", del, props,false);
+ scheduler = new SimpleScheduler("node", del, props);
scheduler.setTransactionManager(_txManager);
_cf = new BpelDAOConnectionFactoryImpl(scheduler);
_server.setDaoConnectionFactory(_cf);
diff --git a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
index 971df3e..f68068a 100644
--- a/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
+++ b/clustering/src/main/java/org/apache/ode/clustering/hazelcast/HazelcastClusterImpl.java
@@ -30,7 +30,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.clapi.*;
-import org.apache.ode.scheduler.simple.SimpleScheduler;
/**
* This class implements necessary methods to build the cluster using hazelcast
@@ -47,7 +46,7 @@
private IMap<Long, Long> instance_lock_map;
private ITopic<ProcessStoreClusterEvent> clusterMessageTopic;
private ClusterProcessStore _clusterProcessStore;
- private SimpleScheduler _scheduler;
+ private ClusterMemberListener _listener;
private ClusterLock<String> _hazelcastDeploymentLock;
private ClusterLock<Long> _hazelcastInstanceLock;
@@ -78,6 +77,8 @@
uuid = localMember.getUuid();
__log.info("Registering HZ localMember ID " + nodeID);
+ markAsMaster();
+
deployment_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_DEPLOYMENT_LOCK);
instance_lock_map = _hazelcastInstance.getMap(HazelcastConstants.ODE_CLUSTER_PROCESS_INSTANCE_LOCK);
clusterMessageTopic = _hazelcastInstance.getTopic(HazelcastConstants.ODE_CLUSTER_MSG);
@@ -92,7 +93,7 @@
public void memberAdded(MembershipEvent membershipEvent) {
String nodeId = membershipEvent.getMember().getUuid();
__log.info("Member Added " +nodeId);
- _scheduler.memberAdded(nodeId);
+ if(isMaster && _listener != null) _listener.memberAdded(nodeId);
}
@Override
@@ -100,7 +101,7 @@
String nodeId = membershipEvent.getMember().getUuid();
__log.info("Member Removed " + nodeId);
markAsMaster();
- _scheduler.memberRemoved(nodeId);
+ if(isMaster && _listener != null) _listener.memberRemoved(nodeId);
}
@Override
@@ -150,7 +151,7 @@
leader = _hazelcastInstance.getCluster().getMembers().iterator().next();
if (leader.localMember() && isMaster == false) {
isMaster = true;
- _scheduler.memberElectedAsMaster();
+ if(_listener != null) _listener.memberElectedAsMaster(uuid);
}
__log.info(isMaster);
}
@@ -171,10 +172,8 @@
clusterMessageTopic.addMessageListener(new ClusterMessageListener());
}
- public void registerClusterMemberListener(Object scheduler) {
- _scheduler = (SimpleScheduler) scheduler;
- markAsMaster();
- _scheduler.setClusterManager(this);
+ public void registerClusterMemberListener(ClusterMemberListener listener) {
+ _listener = listener;
}
public void shutdown() {
diff --git a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
index 0c1b296..c885d13 100644
--- a/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
+++ b/jbi/src/main/java/org/apache/ode/jbi/OdeLifeCycle.java
@@ -242,7 +242,7 @@
_ode._executorService = Executors.newCachedThreadPool();
else
_ode._executorService = Executors.newFixedThreadPool(_ode._config.getThreadPoolMaxSize());
- _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties(),false);
+ _ode._scheduler = new SimpleScheduler(new GUID().toString(),new JdbcDelegate(_ode._dataSource), _ode._config.getProperties());
_ode._scheduler.setJobProcessor(_ode._server);
_ode._scheduler.setExecutorService(_ode._executorService);
_ode._scheduler.setTransactionManager((TransactionManager) _ode.getContext().getTransactionManager());
diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
index a0dbf5a..df33ae0 100644
--- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
+++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
@@ -103,19 +103,14 @@
private boolean _isClusterEnabled;
- private String _masterId;
-
private ClusterManager _clusterManager;
- /** All the nodes which are taken from the database*/
- private CopyOnWriteArraySet<String> _dbNodes = new CopyOnWriteArraySet<String>();
+ /** All the nodes we know about */
+ private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet<String>();
/** All the stale nodes */
private CopyOnWriteArraySet<String> _staleNodes = new CopyOnWriteArraySet<String>();
- /** All the nodes when members are added to the cluster*/
- private CopyOnWriteArraySet<String> _clusterNodes = new CopyOnWriteArraySet<String>();
-
/** When we last heard from our nodes. */
//private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap<String, Long>();
@@ -146,6 +141,10 @@
private DateFormat debugDateFormatter = new SimpleDateFormat("HH:mm:ss,SSS");
+ public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
+ this(nodeId,del,conf,false);
+ }
+
public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf, boolean clusterState) {
_nodeId = nodeId;
_db = del;
@@ -469,25 +468,9 @@
_processedSinceLastLoadTask.clear();
_outstandingJobs.clear();
- _dbNodes.clear();
- _clusterNodes.clear();
+ _knownNodes.clear();
_staleNodes.clear();
- try {
- execTransaction(new Callable<Void>() {
-
- public Void call() throws Exception {
- _dbNodes.addAll(_db.getNodeIds());
- return null;
- }
-
- });
- } catch (Exception ex) {
- __log.error("Error retrieving node list.", ex);
- throw new ContextException("Error retrieving node list.", ex);
- }
- _clusterNodes.add(_nodeId);
-
long now = System.currentTimeMillis();
// Pretend we got a heartbeat...
@@ -496,11 +479,11 @@
// schedule immediate job loading for now!
_todo.enqueue(new LoadImmediateTask(now));
- // schedule check for stale nodes, make it random so that the nodes don't overlap.
- _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+ if(!_isClusterEnabled) enqueueTasksReadnodeIds();
- // do the upgrade sometime (random) in the immediate interval.
- _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
+ else {
+ if (_clusterManager.getIsMaster()) enqueueTasksReadnodeIds();
+ }
_todo.start();
_running = true;
@@ -529,7 +512,7 @@
}
public void memberAdded(String nodeId) {
- _clusterNodes.add(nodeId);
+ _knownNodes.add(nodeId);
}
public void memberRemoved(String nodeId) {
@@ -537,16 +520,16 @@
}
// Do enqueue CheckStaleNodes and UpgradeJobsTask after a new master is identified.
- public void memberElectedAsMaster() {
- _masterId = _nodeId;
- _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + randomMean(_staleInterval)));
- _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + randomMean(_immediateInterval)));
- _dbNodes.clear();
+ public void memberElectedAsMaster(String masterId) {
+ enqueueTasksReadnodeIds();
+ }
+
+ private void enqueueTasksReadnodeIds() {
try {
execTransaction(new Callable<Void>() {
public Void call() throws Exception {
- _dbNodes.addAll(_db.getNodeIds());
+ _knownNodes.addAll(_db.getNodeIds());
return null;
}
@@ -555,6 +538,19 @@
__log.error("Error retrieving node list.", ex);
throw new ContextException("Error retrieving node list.", ex);
}
+
+ //make double sure all the active nodes are included into _knownNodes
+ if(_isClusterEnabled) _knownNodes.addAll(_clusterManager.getActiveNodes());
+
+ else _knownNodes.add(_nodeId);
+
+ long now = System.currentTimeMillis();
+
+ // schedule check for stale nodes, make it random so that the nodes don't overlap.
+ _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+
+ // do the upgrade sometime (random) in the immediate interval.
+ _todo.enqueue(new UpgradeJobsTask(now + randomMean(_immediateInterval)));
}
class RunJob implements Callable<Void> {
@@ -880,9 +876,13 @@
if(_isClusterEnabled) _staleNodes.remove(nodeId);
- // If the stale node id is in _clusterNodes or _dbNodes, remove it.
- _clusterNodes.remove(nodeId);
- _dbNodes.remove(nodeId);
+ // If the stale node id is in _knownNodes, remove it.
+ _knownNodes.remove(nodeId);
+
+ // We can now forget about this node, if we see it again, it will be
+ // "new to us"
+ //_knownNodes.remove(nodeId);
+ //_lastHeartBeat.remove(nodeId);
// Force a load-immediate to catch anything new from the recovered node.
doLoadImmediate();
@@ -981,9 +981,7 @@
_todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + _staleInterval));
__log.debug("CHECK STALE NODES started");
- ArrayList<String> knownNodes = new ArrayList<String>();
- knownNodes.addAll(_dbNodes);
- knownNodes.addAll(_clusterNodes);
+ ArrayList<String> knownNodes = new ArrayList<String>(_knownNodes);
// for cluster mode
if (_isClusterEnabled && _clusterManager.getIsMaster()) {
@@ -1006,6 +1004,14 @@
if (!nodeId.equals(_nodeId)) recoverStaleNode(nodeId);
}
}
+ /*for (String nodeId : _knownNodes) {
+ Long lastSeen = _lastHeartBeat.get(nodeId);
+ if ((lastSeen == null || (System.currentTimeMillis() - lastSeen) > _staleInterval)
+ && !_nodeId.equals(nodeId))
+ {
+ recoverStaleNode(nodeId);
+ }
+ }*/
}
}