blob: 587108e0eb624060a604f5fe71a80d8cc2a551ac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.giraph.zk;
import java.io.IOException;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
/**
* ZooKeeper provides only atomic operations. ZooKeeperExt provides additional
* non-atomic operations that are useful.
*/
public class ZooKeeperExt extends ZooKeeper {
/** Internal logger */
private static final Logger LOG = Logger.getLogger(ZooKeeperExt.class);
/** Length of the ZK sequence number */
private static final int SEQUENCE_NUMBER_LENGTH = 10;
/**
* Constructor to connect to ZooKeeper
*
* @param connectString Comma separated host:port pairs, each corresponding
* to a zk server. e.g.
* "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If the optional
* chroot suffix is used the example would look
* like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
* where the client would be rooted at "/app/a" and all paths
* would be relative to this root - ie getting/setting/etc...
* "/foo/bar" would result in operations being run on
* "/app/a/foo/bar" (from the server perspective).
* @param sessionTimeout Session timeout in milliseconds
* @param watcher A watcher object which will be notified of state changes,
* may also be notified for node events
* @throws IOException
*/
public ZooKeeperExt(String connectString,
int sessionTimeout,
Watcher watcher) throws IOException {
super(connectString, sessionTimeout, watcher);
}
/**
* Provides a possibility of a creating a path consisting of more than one
* znode (not atomic). If recursive is false, operates exactly the
* same as create().
*
* @param path path to create
* @param data data to set on the final znode
* @param acl acls on each znode created
* @param createMode only affects the final znode
* @param recursive if true, creates all ancestors
* @return Actual created path
* @throws KeeperException
* @throws InterruptedException
*/
public String createExt(
final String path,
byte data[],
List<ACL> acl,
CreateMode createMode,
boolean recursive) throws KeeperException, InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("createExt: Creating path " + path);
}
if (!recursive) {
return create(path, data, acl, createMode);
}
try {
return create(path, data, acl, createMode);
} catch (KeeperException.NoNodeException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("createExt: Cannot directly create node " + path);
}
}
int pos = path.indexOf("/", 1);
for (; pos != -1; pos = path.indexOf("/", pos + 1)) {
try {
create(
path.substring(0, pos), null, acl, CreateMode.PERSISTENT);
} catch (KeeperException.NodeExistsException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("createExt: Znode " + path.substring(0, pos) +
" already exists");
}
}
}
return create(path, data, acl, createMode);
}
/**
* Data structure for handling the output of createOrSet()
*/
public class PathStat {
private String path;
private Stat stat;
/**
* Put in results from createOrSet()
*
* @param path Path to created znode (or null)
* @param stat Stat from set znode (if set)
*/
public PathStat(String path, Stat stat) {
this.path = path;
this.stat = stat;
}
/**
* Get the path of the created znode if it was created.
*
* @return Path of created znode or null if not created
*/
public String getPath() {
return path;
}
/**
* Get the stat of the set znode if set
*
* @return Stat of set znode or null if not set
*/
public Stat getStat() {
return stat;
}
}
/**
* Create a znode. Set the znode if the created znode already exists.
*
* @param path path to create
* @param data data to set on the final znode
* @param acl acls on each znode created
* @param createMode only affects the final znode
* @param recursive if true, creates all ancestors
* @return Path of created znode or Stat of set znode
* @throws InterruptedException
* @throws KeeperException
*/
public PathStat createOrSetExt(final String path,
byte data[],
List<ACL> acl,
CreateMode createMode,
boolean recursive,
int version)
throws KeeperException, InterruptedException {
String createdPath = null;
Stat setStat = null;
try {
createdPath = createExt(path, data, acl, createMode, recursive);
} catch (KeeperException.NodeExistsException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("createOrSet: Node exists on path " + path);
}
setStat = setData(path, data, version);
}
return new PathStat(createdPath, setStat);
}
/**
* Create a znode if there is no other znode there
*
* @param path path to create
* @param data data to set on the final znode
* @param acl acls on each znode created
* @param createMode only affects the final znode
* @param recursive if true, creates all ancestors
* @return Path of created znode or Stat of set znode
* @throws InterruptedException
* @throws KeeperException
*/
public PathStat createOnceExt(final String path,
byte data[],
List<ACL> acl,
CreateMode createMode,
boolean recursive)
throws KeeperException, InterruptedException {
String createdPath = null;
Stat setStat = null;
try {
createdPath = createExt(path, data, acl, createMode, recursive);
} catch (KeeperException.NodeExistsException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("createOnceExt: Node already exists on path " + path);
}
}
return new PathStat(createdPath, setStat);
}
/**
* Delete a path recursively. When the deletion is recursive, it is a
* non-atomic operation, hence, not part of ZooKeeper.
* @param path path to remove (i.e. /tmp will remove /tmp/1 and /tmp/2)
* @param version expected version (-1 for all)
* @param recursive if true, remove all children, otherwise behave like
* remove()
* @throws InterruptedException
* @throws KeeperException
*/
public void deleteExt(final String path, int version, boolean recursive)
throws InterruptedException, KeeperException {
if (!recursive) {
delete(path, version);
return;
}
try {
delete(path, version);
return;
} catch (KeeperException.NotEmptyException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("deleteExt: Cannot directly remove node " + path);
}
}
List<String> childList = getChildren(path, false);
for (String child : childList) {
deleteExt(path + "/" + child, -1, true);
}
delete(path, version);
}
/**
* Get the children of the path with extensions.
* Extension 1: Sort the children based on sequence number
* Extension 2: Get the full path instead of relative path
*
* @param path path to znode
* @param watch set the watch?
* @param sequenceSorted sort by the sequence number
* @param fullPath if true, get the fully znode path back
* @return list of children
* @throws InterruptedException
* @throws KeeperException
*/
public List<String> getChildrenExt(
final String path,
boolean watch,
boolean sequenceSorted,
boolean fullPath)
throws KeeperException, InterruptedException {
List<String> childList = getChildren(path, watch);
/* Sort children according to the sequence number, if desired */
if (sequenceSorted) {
Collections.sort(childList,
new Comparator<String>() {
public int compare(String s1, String s2) {
if ((s1.length() <= SEQUENCE_NUMBER_LENGTH) ||
(s2.length() <= SEQUENCE_NUMBER_LENGTH)) {
throw new RuntimeException(
"getChildrenExt: Invalid length for sequence " +
" sorting > " +
SEQUENCE_NUMBER_LENGTH +
" for s1 (" +
s1.length() + ") or s2 (" + s2.length() + ")");
}
int s1sequenceNumber = Integer.parseInt(
s1.substring(s1.length() -
SEQUENCE_NUMBER_LENGTH));
int s2sequenceNumber = Integer.parseInt(
s2.substring(s2.length() -
SEQUENCE_NUMBER_LENGTH));
return s1sequenceNumber - s2sequenceNumber;
}
}
);
}
if (fullPath) {
List<String> fullChildList = new ArrayList<String>();
for (String child : childList) {
fullChildList.add(path + "/" + child);
}
return fullChildList;
}
return childList;
}
}