blob: bc63f4f8a99b9d2b3d7d2e9db80e386cfb630021 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp.sync;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hama.Constants;
import org.apache.hama.bsp.BSPJobID;
import org.apache.hama.bsp.TaskAttemptID;
import org.apache.hama.zookeeper.QuorumPeer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* This client class abstracts the use of our zookeeper sync code.
*
*/
public class ZooKeeperSyncClientImpl extends ZKSyncClient implements
PeerSyncClient {
/*
* TODO maybe extract an abstract class and let the subclasses implement
* enter-/leaveBarrier so we can have multiple implementations, just like
* goldenorb.
*/
public static final Log LOG = LogFactory
.getLog(ZooKeeperSyncClientImpl.class);
private volatile Integer mutex = 0;
private String quorumServers;
private ZooKeeper zk;
private String bspRoot;
private InetSocketAddress peerAddress;
private int numBSPTasks;
// allPeers is lazily initialized
private String[] allPeers;
@Override
public void init(Configuration conf, BSPJobID jobId, TaskAttemptID taskId)
throws Exception {
quorumServers = QuorumPeer.getZKQuorumServersString(conf);
this.zk = new ZooKeeper(quorumServers, conf.getInt(
Constants.ZOOKEEPER_SESSION_TIMEOUT, 1200000), this);
bspRoot = conf.get(Constants.ZOOKEEPER_ROOT,
Constants.DEFAULT_ZOOKEEPER_ROOT);
String bindAddress = conf.get(Constants.PEER_HOST,
Constants.DEFAULT_PEER_HOST);
int bindPort = conf
.getInt(Constants.PEER_PORT, Constants.DEFAULT_PEER_PORT);
initialize(this.zk, bspRoot);
peerAddress = new InetSocketAddress(bindAddress, bindPort);
LOG.info("Start connecting to Zookeeper! At " + peerAddress);
numBSPTasks = conf.getInt("bsp.peers.num", 1);
}
@Override
public void enterBarrier(BSPJobID jobId, TaskAttemptID taskId, long superstep)
throws SyncException {
LOG.debug("[" + getPeerName() + "] enter the enterbarrier: " + superstep);
try {
synchronized (zk) {
final String pathToSuperstepZnode = constructKey(taskId.getJobID(),
"sync", "" + superstep);
writeNode(pathToSuperstepZnode, null, true, null);
BarrierWatcher barrierWatcher = new BarrierWatcher();
// this is really needed to register the barrier watcher, don't remove
// this line!
zk.exists(pathToSuperstepZnode + "/ready", barrierWatcher);
zk.create(getNodeName(taskId, superstep), null, Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL);
List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
int size = znodes.size(); // may contains ready
boolean hasReady = znodes.contains("ready");
if (hasReady) {
size--;
}
LOG.debug("===> at superstep :" + superstep + " current znode size: "
+ znodes.size() + " current znodes:" + znodes);
LOG.debug("enterBarrier() znode size within " + pathToSuperstepZnode
+ " is " + znodes.size() + ". Znodes include " + znodes);
if (size < numBSPTasks) {
while (!barrierWatcher.isComplete()) {
if (!hasReady) {
synchronized (mutex) {
mutex.wait(1000);
}
}
}
LOG.debug("2. at superstep: " + superstep + " after waiting ..."
+ taskId.toString());
} else {
LOG.debug("---> at superstep: " + superstep
+ " task that is creating /ready znode:" + taskId.toString());
writeNode(pathToSuperstepZnode + "/ready", null, false, null);
}
}
} catch (Exception e) {
throw new SyncException(e.toString());
}
}
@Override
public void leaveBarrier(final BSPJobID jobId, final TaskAttemptID taskId,
final long superstep) throws SyncException {
try {
// final String pathToSuperstepZnode = bspRoot + "/"
// + taskId.getJobID().toString() + "/" + superstep;
final String pathToSuperstepZnode = constructKey(taskId.getJobID(),
"sync", "" + superstep);
while (true) {
List<String> znodes = zk.getChildren(pathToSuperstepZnode, false);
LOG.debug("leaveBarrier() !!! checking znodes contnains /ready node or not: at superstep:"
+ superstep + " znode:" + znodes);
if (znodes.contains("ready")) {
znodes.remove("ready");
}
final int size = znodes.size();
LOG.debug("leaveBarrier() at superstep:" + superstep + " znode size: ("
+ size + ") znodes:" + znodes);
if (null == znodes || znodes.isEmpty())
return;
if (1 == size) {
try {
zk.delete(getNodeName(taskId, superstep), 0);
} catch (KeeperException.NoNodeException nne) {
LOG.debug(
"+++ (znode size is 1). Ignore because znode may disconnect.",
nne);
}
return;
}
Collections.sort(znodes);
final String lowest = znodes.get(0);
final String highest = znodes.get(size - 1);
synchronized (mutex) {
if (getNodeName(taskId, superstep).equals(
pathToSuperstepZnode + "/" + lowest)) {
Stat s = zk.exists(pathToSuperstepZnode + "/" + highest,
new Watcher() {
@Override
public void process(WatchedEvent event) {
synchronized (mutex) {
LOG.debug("leaveBarrier() at superstep: " + superstep
+ " taskid:" + taskId.toString()
+ " highest notify lowest.");
mutex.notifyAll();
}
}
});
if (null != s) {
LOG.debug("leaveBarrier(): superstep:" + superstep + " taskid:"
+ taskId.toString() + " wait for higest notify.");
mutex.wait();
}
} else {
Stat s1 = zk.exists(getNodeName(taskId, superstep), false);
if (null != s1) {
try {
zk.delete(getNodeName(taskId, superstep), 0);
} catch (KeeperException.NoNodeException nne) {
LOG.debug("++++ Ignore because node may be dleted.", nne);
}
}
Stat s2 = zk.exists(pathToSuperstepZnode + "/" + lowest,
new Watcher() {
@Override
public void process(WatchedEvent event) {
synchronized (mutex) {
LOG.debug("leaveBarrier() at superstep: " + superstep
+ " taskid:" + taskId.toString()
+ " lowest notify other nodes.");
mutex.notifyAll();
}
}
});
if (null != s2) {
LOG.debug("leaveBarrier(): superstep:" + superstep + " taskid:"
+ taskId.toString() + " wait for lowest notify.");
mutex.wait();
}
}
}
}
} catch (Exception e) {
throw new SyncException(e.getMessage());
}
}
@Override
public void register(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
int count = 0;
String jobRegisterKey = constructKey(jobId, "peers");
Stat stat = null;
while (stat != null) {
try {
stat = zk.exists(jobRegisterKey, false);
zk.create(jobRegisterKey, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
Thread.sleep(1000);
} catch (Exception e) {
LOG.debug(e); // ignore it.
}
count++;
// retry 10 times.
if (count > 9) {
throw new RuntimeException("can't create root node.");
}
}
registerTask(jobId, hostAddress, port, taskId);
}
/**
* Registers the task from outside, most of the time used by the groom which
* uses this at task spawn-time.
*
* @param jobId
* @param hostAddress
* @param port
* @param taskId
*/
public void registerTask(BSPJobID jobId, String hostAddress, long port,
TaskAttemptID taskId) {
// byte[] taskIdBytes = serializeTaskId(taskId);
String taskRegisterKey = constructKey(jobId, "peers", hostAddress + ":"
+ port);
writeNode(taskRegisterKey, taskId, false, null);
}
@Override
public String[] getAllPeerNames(BSPJobID jobID) {
if (allPeers == null) {
TreeMap<Integer, String> sortedMap = new TreeMap<Integer, String>();
try {
List<String> var = zk.getChildren(constructKey(jobID, "peers"), this);
allPeers = var.toArray(new String[var.size()]);
TreeMap<TaskAttemptID, String> taskAttemptSortedMap = new TreeMap<TaskAttemptID, String>();
for (String s : allPeers) {
byte[] data = zk.getData(constructKey(jobID, "peers", s), this, null);
TaskAttemptID thatTask = new TaskAttemptID();
boolean result = getValueFromBytes(data, thatTask);
if (result) {
taskAttemptSortedMap.put(thatTask, s);
}
}
for (Map.Entry<TaskAttemptID, String> entry : taskAttemptSortedMap
.entrySet()) {
TaskAttemptID thatTask = entry.getKey();
String s = entry.getValue();
LOG.debug("TASK mapping from zookeeper: " + thatTask + " ID:"
+ thatTask.getTaskID().getId() + " : " + s);
sortedMap.put(thatTask.getTaskID().getId(), s);
}
} catch (Exception e) {
LOG.error(e);
throw new RuntimeException("All peer names could not be retrieved!");
}
allPeers = new String[sortedMap.size()];
int count = 0;
for (Entry<Integer, String> entry : sortedMap.entrySet()) {
allPeers[count++] = entry.getValue();
LOG.debug("TASK mapping from zookeeper: " + entry.getKey() + " : "
+ entry.getValue() + " at index " + (count - 1));
}
}
return allPeers;
}
@Override
public void close() throws IOException {
try {
zk.close();
} catch (InterruptedException e) {
throw new IOException(e);
}
}
@Override
public void deregisterFromBarrier(BSPJobID jobId, TaskAttemptID taskId,
String hostAddress, long port) {
// TODO
throw new UnsupportedOperationException();
}
@Override
public void stopServer() {
// TODO
throw new UnsupportedOperationException();
}
@Override
public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
}
/*
* UTILITY METHODS
*/
/**
* @return the string as host:port of this Peer
*/
public String getPeerName() {
return peerAddress.getHostName() + ":" + peerAddress.getPort();
}
/*
* INNER CLASSES
*/
private class BarrierWatcher implements Watcher {
private boolean complete = false;
boolean isComplete() {
return this.complete;
}
@Override
public void process(WatchedEvent event) {
this.complete = true;
synchronized (mutex) {
mutex.notifyAll();
}
}
}
}