blob: c99fd311843c1ac8807bc59e61da1a5915162d89 [file] [log] [blame]
/**
* @@@ START COPYRIGHT @@@
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @@@ END COPYRIGHT @@@
**/
package org.trafodion.wms.server;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutionException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.trafodion.wms.Constants;
import org.trafodion.wms.server.ServerManager;
public class ServerLeaderElection {
private static final Log LOG = LogFactory.getLog(ServerLeaderElection.class);
private ServerManager sm = null;
private String nodePath;
private String parentZnode;
private boolean isLeader = false;
private ExecutorService es = null;
private Future future = null;
public ServerLeaderElection(ServerManager sm) throws IOException, InterruptedException, KeeperException {
this.sm = sm;
this.parentZnode = sm.getZKParentZnode();
setNodePath(sm.getZkClient().create(parentZnode + Constants.DEFAULT_ZOOKEEPER_ZNODE_SERVERS_LEADER + "/" + ":" + sm.getHostName() + ":" + sm.getInstance() + ":", new byte[0]/*no data yet*/,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL));
elect();
}
public void setNodePath(String nodePath) {
this.nodePath = nodePath;
}
public boolean isLeader() {
return isLeader;
}
// If curent leader is deleted, the immediate follower becomes leader.
// If in between follower is deleted, link is broken, call this method again to re-establish that link.
private void elect() throws IOException, InterruptedException, KeeperException {
List<String> nodes = sm.getZkClient().getChildren(parentZnode + Constants.DEFAULT_ZOOKEEPER_ZNODE_SERVERS_LEADER, new ElectionNodeWatcher());
Collections.sort(nodes);
for(int i=0; i < nodes.size(); i++) {
String nodePath = parentZnode + Constants.DEFAULT_ZOOKEEPER_ZNODE_SERVERS_LEADER + "/" + nodes.get(i);
if (nodePath.equals(this.nodePath)) {
if (i == 0) {
//I'm the first node and therefore the leader.
LOG.info("I'm the Leader [" + nodePath + "]");
if(nodes.size() > 1){
String previousNodePath = parentZnode + Constants.DEFAULT_ZOOKEEPER_ZNODE_SERVERS_LEADER + "/" + nodes.get(nodes.size() - 1);
LOG.info("Watching [" + previousNodePath + "]");
sm.getZkClient().exists(previousNodePath, new IndividualNodeWatcher());
}
isLeader=true;
} else {
//I'm a follower so avoid herd effect by setting watch on previous node.
LOG.info("I'm a follower [" + nodePath + "]");
String previousNodePath = parentZnode + Constants.DEFAULT_ZOOKEEPER_ZNODE_SERVERS_LEADER + "/" + nodes.get(i - 1);
LOG.info("Watching [" + previousNodePath + "]");
sm.getZkClient().exists(previousNodePath, new IndividualNodeWatcher());
isLeader=false;
}
break;
}
}
}
private class ElectionNodeWatcher implements Watcher {
//watches /LEADER node's children changes.
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeChildrenChanged) {
LOG.info("Node changed [" + event.getPath() + "], re-electing new leader.");
try {
elect();
} catch (IOException e) {
LOG.error(e);
} catch (InterruptedException e) {
LOG.error(e);
} catch (KeeperException e) {
LOG.error(e);
}
}
}
}
private class IndividualNodeWatcher implements Watcher {
//watches /LEADER/ node's deleted changes.
public void process(WatchedEvent event) {
if(event.getType() == Event.EventType.NodeDeleted) {
LOG.info("Node deleted [" + event.getPath() + "], re-electing new leader.");
try {
/*
if(isLeader){
if(es == null)
es = Executors.newSingleThreadExecutor();
if(future == null){
//first cycle of restart service
LOG.info("Starting first server restart service cycle");
future = es.submit(new ServerRestart(sm.getZkClient()));
} else {
//check to see if last cycle finished ok
//If so, start a new one
if(future.get() == null){
LOG.info("Starting new server restart service cycle");
future = es.submit(new ServerRestart(sm.getZkClient()));
} else {
LOG.info("Previous restart service cycle is running");
}
}
}
*/
elect();
/*
Scanner s = new Scanner(event.getPath());
s.useDelimiter(":");
s.next();//skip path
String hostName=s.next();
s.close();
Stat stat = zkc.exists(parentZnode + Constants.DEFAULT_ZOOKEEPER_ZNODE_SERVERS_RESTART + "/" + ":" + hostName + ":",false);
if(stat == null) {
sm.getZkClient().create(Constants.DEFAULT_ZOOKEEPER_ZNODE_SERVERS_RESTART + "/" + ":" + hostName + ":", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
*/
} catch (Exception e) {
LOG.error(e);
}
}
}
}
}