blob: a20c49456be41c345e1ad5f62959bee43ba76bf5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.zk;
import java.io.IOException;
import org.apache.hadoop.util.Progressable;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
/**
* ZooKeeper provides only atomic operations. ZooKeeperExt provides additional
* non-atomic operations that are useful. It also provides wrappers to
* deal with ConnectionLossException. All methods of this class
* should be thread-safe.
*/
public class ZooKeeperExt {
/** Length of the ZK sequence number */
public static final int SEQUENCE_NUMBER_LENGTH = 10;
/** Internal logger */
private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
/** Internal ZooKeeper */
private final ZooKeeper zooKeeper;
/** Ensure we have progress */
private final Progressable progressable;
/** Number of max attempts to retry when failing due to connection loss */
private final int maxRetryAttempts;
/** Milliseconds to wait before trying again due to connection loss */
private final int retryWaitMsecs;
/**
* Constructor to connect to ZooKeeper, does not make progress
*
* @param connectString Comma separated host:port pairs, each corresponding
* to a zk server. e.g.
* "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
* chroot suffix is used the example would look
* like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
* where the client would be rooted at "/app/a" and all paths
* would be relative to this root - ie getting/setting/etc...
* "/foo/bar" would result in operations being run on
* "/app/a/foo/bar" (from the server perspective).
* @param sessionTimeout Session timeout in milliseconds
* @param maxRetryAttempts Max retry attempts during connection loss
* @param retryWaitMsecs Msecs to wait when retrying due to connection
* loss
* @param watcher A watcher object which will be notified of state changes,
* may also be notified for node events
* @throws IOException
*/
public ZooKeeperExt(String connectString,
int sessionTimeout,
int maxRetryAttempts,
int retryWaitMsecs,
Watcher watcher) throws IOException {
this(connectString, sessionTimeout, maxRetryAttempts,
retryWaitMsecs, watcher, null);
}
/**
* Constructor to connect to ZooKeeper, make progress
*
* @param connectString Comma separated host:port pairs, each corresponding
* to a zk server. e.g.
* "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
* chroot suffix is used the example would look
* like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
* where the client would be rooted at "/app/a" and all paths
* would be relative to this root - ie getting/setting/etc...
* "/foo/bar" would result in operations being run on
* "/app/a/foo/bar" (from the server perspective).
* @param sessionTimeout Session timeout in milliseconds
* @param maxRetryAttempts Max retry attempts during connection loss
* @param retryWaitMsecs Msecs to wait when retrying due to connection
* loss
* @param watcher A watcher object which will be notified of state changes,
* may also be notified for node events
* @param progressable Makes progress for longer operations
* @throws IOException
*/
public ZooKeeperExt(String connectString,
int sessionTimeout,
int maxRetryAttempts,
int retryWaitMsecs,
Watcher watcher,
Progressable progressable) throws IOException {
this.zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
this.progressable = progressable;
this.maxRetryAttempts = maxRetryAttempts;
this.retryWaitMsecs = retryWaitMsecs;
}
/**
* Provides a possibility of a creating a path consisting of more than one
* znode (not atomic). If recursive is false, operates exactly the
* same as create().
*
* @param path path to create
* @param data data to set on the final znode
* @param acl acls on each znode created
* @param createMode only affects the final znode
* @param recursive if true, creates all ancestors
* @return Actual created path
* @throws KeeperException
* @throws InterruptedException
*/
public String createExt(
final String path,
byte[] data,
List<ACL> acl,
CreateMode createMode,
boolean recursive) throws KeeperException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("createExt: Creating path " + path);
}
int attempt = 0;
while (attempt < maxRetryAttempts) {
try {
if (!recursive) {
return zooKeeper.create(path, data, acl, createMode);
}
try {
return zooKeeper.create(path, data, acl, createMode);
} catch (KeeperException.NoNodeException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("createExt: Cannot directly create node " + path);
}
}
int pos = path.indexOf("/", 1);
for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
try {
if (progressable != null) {
progressable.progress();
}
String filePath = path.substring(0, pos);
if (zooKeeper.exists(filePath, false) == null) {
zooKeeper.create(
filePath, null, acl, CreateMode.PERSISTENT);
}
} catch (KeeperException.NodeExistsException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("createExt: Znode " + path.substring(0, pos) +
" already exists");
}
}
}
return zooKeeper.create(path, data, acl, createMode);
} catch (KeeperException.ConnectionLossException e) {
LOG.warn("createExt: Connection loss on attempt " + attempt + ", " +
"waiting " + retryWaitMsecs + " msecs before retrying.", e);
}
++attempt;
Thread.sleep(retryWaitMsecs);
}
throw new IllegalStateException("createExt: Failed to create " + path +
" after " + attempt + " tries!");
}
/**
* Data structure for handling the output of createOrSet()
*/
public static class PathStat {
/** Path to created znode (if any) */
private String path;
/** Stat from set znode (if any) */
private Stat stat;
/**
* Put in results from createOrSet()
*
* @param path Path to created znode (or null)
* @param stat Stat from set znode (if set)
*/
public PathStat(String path, Stat stat) {
this.path = path;
this.stat = stat;
}
/**
* Get the path of the created znode if it was created.
*
* @return Path of created znode or null if not created
*/
public String getPath() {
return path;
}
/**
* Get the stat of the set znode if set
*
* @return Stat of set znode or null if not set
*/
public Stat getStat() {
return stat;
}
}
/**
* Create a znode. Set the znode if the created znode already exists.
*
* @param path path to create
* @param data data to set on the final znode
* @param acl acls on each znode created
* @param createMode only affects the final znode
* @param recursive if true, creates all ancestors
* @param version Version to set if setting
* @return Path of created znode or Stat of set znode
* @throws InterruptedException
* @throws KeeperException
*/
public PathStat createOrSetExt(final String path,
byte[] data,
List<ACL> acl,
CreateMode createMode,
boolean recursive,
int version) throws KeeperException, InterruptedException {
String createdPath = null;
Stat setStat = null;
try {
createdPath = createExt(path, data, acl, createMode, recursive);
} catch (KeeperException.NodeExistsException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("createOrSet: Node exists on path " + path);
}
setStat = zooKeeper.setData(path, data, version);
}
return new PathStat(createdPath, setStat);
}
/**
* Create a znode if there is no other znode there
*
* @param path path to create
* @param data data to set on the final znode
* @param acl acls on each znode created
* @param createMode only affects the final znode
* @param recursive if true, creates all ancestors
* @return Path of created znode or Stat of set znode
* @throws InterruptedException
* @throws KeeperException
*/
public PathStat createOnceExt(final String path,
byte[] data,
List<ACL> acl,
CreateMode createMode,
boolean recursive) throws KeeperException, InterruptedException {
String createdPath = null;
Stat setStat = null;
try {
createdPath = createExt(path, data, acl, createMode, recursive);
} catch (KeeperException.NodeExistsException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("createOnceExt: Node already exists on path " + path);
}
}
return new PathStat(createdPath, setStat);
}
/**
* Delete a path recursively. When the deletion is recursive, it is a
* non-atomic operation, hence, not part of ZooKeeper.
* @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)
* @param version expected version (-1 for all)
* @param recursive if true, remove all children, otherwise behave like
* remove()
* @throws InterruptedException
* @throws KeeperException
*/
public void deleteExt(final String path, int version, boolean recursive)
throws InterruptedException, KeeperException {
int attempt = 0;
while (attempt < maxRetryAttempts) {
try {
if (!recursive) {
zooKeeper.delete(path, version);
return;
}
try {
zooKeeper.delete(path, version);
return;
} catch (KeeperException.NotEmptyException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("deleteExt: Cannot directly remove node " + path);
}
}
List<String> childList = zooKeeper.getChildren(path, false);
for (String child : childList) {
if (progressable != null) {
progressable.progress();
}
deleteExt(path + "/" + child, -1, true);
}
zooKeeper.delete(path, version);
return;
} catch (KeeperException.ConnectionLossException e) {
LOG.warn("deleteExt: Connection loss on attempt " +
attempt + ", waiting " + retryWaitMsecs +
" msecs before retrying.", e);
}
++attempt;
Thread.sleep(retryWaitMsecs);
}
throw new IllegalStateException("deleteExt: Failed to delete " + path +
" after " + attempt + " tries!");
}
/**
* Return the stat of the node of the given path. Return null if no such a
* node exists.
* <p>
* If the watch is true and the call is successful (no exception is thrown),
* a watch will be left on the node with the given path. The watch will be
* triggered by a successful operation that creates/delete the node or sets
* the data on the node.
*
* @param path
* the node path
* @param watch
* whether need to watch this node
* @return the stat of the node of the given path; return null if no such a
* node exists.
* @throws KeeperException If the server signals an error
* @throws InterruptedException If the server transaction is interrupted.
*/
public Stat exists(String path, boolean watch) throws KeeperException,
InterruptedException {
int attempt = 0;
while (attempt < maxRetryAttempts) {
try {
return zooKeeper.exists(path, watch);
} catch (KeeperException.ConnectionLossException e) {
LOG.warn("exists: Connection loss on attempt " +
attempt + ", waiting " + retryWaitMsecs +
" msecs before retrying.", e);
}
++attempt;
Thread.sleep(retryWaitMsecs);
}
throw new IllegalStateException("exists: Failed to check " + path +
" after " + attempt + " tries!");
}
/**
* Return the stat of the node of the given path. Return null if no such a
* node exists.
* <p>
* If the watch is non-null and the call is successful (no exception is
* thrown), a watch will be left on the node with the given path. The
* watch will be triggered by a successful operation that
* creates/delete the node or sets the data on the node.
*
* @param path the node path
* @param watcher explicit watcher
* @return the stat of the node of the given path; return null if no such a
* node exists.
* @throws KeeperException If the server signals an error
* @throws InterruptedException If the server transaction is interrupted.
* @throws IllegalArgumentException if an invalid path is specified
*/
public Stat exists(final String path, Watcher watcher)
throws KeeperException, InterruptedException {
int attempt = 0;
while (attempt < maxRetryAttempts) {
try {
return zooKeeper.exists(path, watcher);
} catch (KeeperException.ConnectionLossException e) {
LOG.warn("exists: Connection loss on attempt " +
attempt + ", waiting " + retryWaitMsecs +
" msecs before retrying.", e);
}
++attempt;
Thread.sleep(retryWaitMsecs);
}
throw new IllegalStateException("exists: Failed to check " + path +
" after " + attempt + " tries!");
}
/**
* Return the data and the stat of the node of the given path.
* <p>
* If the watch is non-null and the call is successful (no exception is
* thrown), a watch will be left on the node with the given path. The watch
* will be triggered by a successful operation that sets data on the node, or
* deletes the node.
* <p>
* A KeeperException with error code KeeperException.NoNode will be thrown
* if no node with the given path exists.
*
* @param path the given path
* @param watcher explicit watcher
* @param stat the stat of the node
* @return the data of the node
* @throws KeeperException If the server signals an error with a non-zero
* error code
* @throws InterruptedException If the server transaction is interrupted.
* @throws IllegalArgumentException if an invalid path is specified
*/
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException {
int attempt = 0;
while (attempt < maxRetryAttempts) {
try {
return zooKeeper.getData(path, watcher, stat);
} catch (KeeperException.ConnectionLossException e) {
LOG.warn("getData: Connection loss on attempt " +
attempt + ", waiting " + retryWaitMsecs +
" msecs before retrying.", e);
}
++attempt;
Thread.sleep(retryWaitMsecs);
}
throw new IllegalStateException("getData: Failed to get " + path +
" after " + attempt + " tries!");
}
/**
* Return the data and the stat of the node of the given path.
* <p>
* If the watch is true and the call is successful (no exception is
* thrown), a watch will be left on the node with the given path. The watch
* will be triggered by a successful operation that sets data on the node, or
* deletes the node.
* <p>
* A KeeperException with error code KeeperException.NoNode will be thrown
* if no node with the given path exists.
*
* @param path the given path
* @param watch whether need to watch this node
* @param stat the stat of the node
* @return the data of the node
* @throws KeeperException If the server signals an error with a non-zero
* error code
* @throws InterruptedException If the server transaction is interrupted.
*/
public byte[] getData(String path, boolean watch, Stat stat)
throws KeeperException, InterruptedException {
int attempt = 0;
while (attempt < maxRetryAttempts) {
try {
return zooKeeper.getData(path, watch, stat);
} catch (KeeperException.ConnectionLossException e) {
LOG.warn("getData: Connection loss on attempt " +
attempt + ", waiting " + retryWaitMsecs +
" msecs before retrying.", e);
}
++attempt;
Thread.sleep(retryWaitMsecs);
}
throw new IllegalStateException("getData: Failed to get " + path +
" after " + attempt + " tries!");
}
/**
* Get the children of the path with extensions.
* Extension 1: Sort the children based on sequence number
* Extension 2: Get the full path instead of relative path
*
* @param path path to znode
* @param watch set the watch?
* @param sequenceSorted sort by the sequence number
* @param fullPath if true, get the fully znode path back
* @return list of children
* @throws InterruptedException
* @throws KeeperException
*/
public List<String> getChildrenExt(
final String path,
boolean watch,
boolean sequenceSorted,
boolean fullPath) throws KeeperException, InterruptedException {
int attempt = 0;
while (attempt < maxRetryAttempts) {
try {
List<String> childList = zooKeeper.getChildren(path, watch);
/* Sort children according to the sequence number, if desired */
if (sequenceSorted) {
Collections.sort(childList, new Comparator<String>() {
public int compare(String s1, String s2) {
if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
(s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
throw new RuntimeException(
"getChildrenExt: Invalid length for sequence " +
" sorting > " +
SEQUENCE_NUMBER_LENGTH +
" for s1 (" +
s1.length() + ") or s2 (" + s2.length() + ")");
}
int s1sequenceNumber = Integer.parseInt(
s1.substring(s1.length() -
SEQUENCE_NUMBER_LENGTH));
int s2sequenceNumber = Integer.parseInt(
s2.substring(s2.length() -
SEQUENCE_NUMBER_LENGTH));
return s1sequenceNumber - s2sequenceNumber;
}
});
}
if (fullPath) {
List<String> fullChildList = new ArrayList<String>();
for (String child : childList) {
fullChildList.add(path + "/" + child);
}
return fullChildList;
}
return childList;
} catch (KeeperException.ConnectionLossException e) {
LOG.warn("getChildrenExt: Connection loss on attempt " +
attempt + ", waiting " + retryWaitMsecs +
" msecs before retrying.", e);
}
++attempt;
Thread.sleep(retryWaitMsecs);
}
throw new IllegalStateException("createExt: Failed to create " + path +
" after " + attempt + " tries!");
}
/**
* Close this client object. Once the client is closed, its session becomes
* invalid. All the ephemeral nodes in the ZooKeeper server associated with
* the session will be removed. The watches left on those nodes (and on
* their parents) will be triggered.
*
* @throws InterruptedException
*/
public void close() throws InterruptedException {
zooKeeper.close();
}
}