blob: b16cc4087bd3ba1c3022f5f2bad7bed6c45e2003 [file] [log] [blame]
/**
* @@@ START COPYRIGHT @@@
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
* @@@ END COPYRIGHT @@@
*/
package org.trafodion.wms.zookeeper;
import java.io.*;
import java.util.concurrent.CountDownLatch;
import java.util.ArrayList;
import java.util.List;
import java.util.LinkedList;
import java.util.Iterator;
import java.nio.charset.Charset;
import org.apache.hadoop.conf.Configuration;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZooKeeper.States;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.trafodion.wms.util.WmsConfiguration;
import org.trafodion.wms.util.RetryCounter;
import org.trafodion.wms.util.RetryCounterFactory;
import org.trafodion.wms.Constants;
import org.trafodion.wms.util.Bytes;
public class ZkClient implements Watcher {
private static final Log LOG = LogFactory.getLog(ZkClient.class.getName());
private String path;
private List<String> children;
private static final Charset CHARSET = Charset.forName("UTF-8");
private Configuration conf;
private CountDownLatch connectedSignal = new CountDownLatch(1);
private String zkServers;
private int port;
private int sessionTimeout = 0;
private ZooKeeper zk = null;
private RetryCounterFactory retryCounterFactory;
private int maxRetries = 0;
private int retryIntervalMillis = 0;
private final String identifier = null;
private final byte[] id = null;
private String parentZnode;
// The metadata attached to each piece of data has the
// format:
// <magic> 1-byte constant
// <id length> 4-byte big-endian integer (length of next field)
// <id> identifier corresponding uniquely to this process
// It is prepended to the data supplied by the user.
// the magic number is to be backward compatible
private static final byte MAGIC =(byte) 0XFF;
private static final int MAGIC_SIZE = Bytes.SIZEOF_BYTE;
private static final int ID_LENGTH_OFFSET = MAGIC_SIZE;
private static final int ID_LENGTH_SIZE = Bytes.SIZEOF_INT;
private void init() {
this.parentZnode = conf.get(Constants.ZOOKEEPER_ZNODE_PARENT,Constants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
this.path = this.parentZnode + Constants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
this.port = conf.getInt(Constants.ZOOKEEPER_CLIENT_PORT, Constants.DEFAULT_ZOOKEEPER_CLIENT_PORT);
String[] servers = conf.getStrings(Constants.ZOOKEEPER_QUORUM,Constants.LOCALHOST);
StringBuffer hosts = new StringBuffer();
for (int i = 0; i < servers.length; ++i) {
if(i != 0 )
hosts.append(",");
hosts.append(servers[i]);
hosts.append(":");
hosts.append(port);
}
this.zkServers = hosts.toString();
this.sessionTimeout = conf.getInt(Constants.ZK_SESSION_TIMEOUT,Constants.DEFAULT_ZK_SESSION_TIMEOUT);
this.maxRetries = conf.getInt(Constants.ZK_RECOVERY_RETRY,Constants.DEFAULT_ZK_RECOVERY_RETRY);
this.retryIntervalMillis = conf.getInt(Constants.ZK_RECOVERY_RETRY_INTERVAL_MILLIS,Constants.DEFAULT_ZK_RECOVERY_RETRY_INTERVAL_MILLIS);
retryCounterFactory = new RetryCounterFactory(maxRetries, retryIntervalMillis);
LOG.debug("ZooKeeper Servers:" + zkServers + ",SessionTimeout:" + this.sessionTimeout + ",MaxRetries:" + maxRetries + ",RetryIntervalMillis:" + retryIntervalMillis);
}
public String getZkQuorum(){
return this.zkServers;
}
public ZkClient() {
this.conf = WmsConfiguration.create();
init();
}
public ZkClient(int sessionTimeout,int maxRetries,int retryIntervalMillis) {
this.conf = WmsConfiguration.create();
this.conf.setInt(Constants.ZK_SESSION_TIMEOUT,sessionTimeout);
this.conf.setInt(Constants.ZK_RECOVERY_RETRY,maxRetries);
this.conf.setInt(Constants.ZK_RECOVERY_RETRY_INTERVAL_MILLIS,retryIntervalMillis);
init();
}
public ZkClient(String zkhost, int zkport) {
this.conf = WmsConfiguration.create();
this.conf.setStrings(Constants.ZOOKEEPER_QUORUM,zkhost);
this.conf.setInt(Constants.ZOOKEEPER_CLIENT_PORT, zkport);
init();
}
public void connect() throws IOException, InterruptedException {
if(zk==null) {
this.zk = new ZooKeeper(zkServers, sessionTimeout, this);
//wait 3 seconds to connect
int retries=0;
while(this.zk.getState() != ZooKeeper.States.CONNECTED) {
LOG.debug("Zookeeper.State=" + this.zk.getState());
try {
Thread.sleep(1000L);//1 second
retries++;
} catch (InterruptedException ie) {}
if(retries > 3)
break;
}
if(this.zk.getState() != ZooKeeper.States.CONNECTED) {
this.zk.close();
this.zk=null;
LOG.error("Zookeeper.State [" + this.zk.getState() + "]");
throw new IOException("Cannot connect to Zookeeper");
}
LOG.debug("Zookeeper.State=" + this.zk.getState());
connectedSignal.await();
}
}
public void resetZk() throws IOException, InterruptedException {
zk = null;
}
public void close() throws InterruptedException {
if(zk != null)
zk.close();
this.sessionTimeout = 0;
zk = null;
}
public ZooKeeper getZk() {
return zk;
}
@Override
public void process(WatchedEvent event) {
if(event.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectedSignal.countDown();
}
}
public void create(String path, String value, boolean ephemeral)
throws KeeperException, InterruptedException {
Stat stat = zk.exists(path,false);
if(stat==null) {
if(ephemeral==true)
zk.create(path,value.getBytes(CHARSET),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL);
else
zk.create(path,value.getBytes(CHARSET),ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
} else {
zk.setData(path,value.getBytes(CHARSET), -1);
}
}
public String get(String path, Watcher watcher)
throws KeeperException, InterruptedException {
byte[] data = zk.getData(path, watcher, null/*stat*/);
return new String(data, CHARSET);
}
/**
* delete is an idempotent operation. Retry before throwing exception.
* This function will not throw NoNodeException if the path does not
* exist.
*/
public void delete(String path, int version)
throws InterruptedException, KeeperException {
RetryCounter retryCounter = retryCounterFactory.create();
boolean isRetry = false; // False for first attempt, true for all retries.
while (true) {
try {
zk.delete(path, version);
return;
} catch (KeeperException e) {
switch (e.code()) {
case NONODE:
if (isRetry) {
LOG.info("Node " + path + " already deleted. Assuming that a " +
"previous attempt succeeded.");
return;
}
LOG.warn("Node " + path + " already deleted, and this is not a " +
"retry");
throw e;
case CONNECTIONLOSS:
case SESSIONEXPIRED:
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "delete");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
retryCounter.useRetry();
isRetry = true;
}
}
/**
* exists is an idempotent operation. Retry before throwing exception
* @return A Stat instance
*/
public Stat exists(String path, Watcher watcher)
throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
return zk.exists(path, watcher);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
case SESSIONEXPIRED:
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "exists");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
retryCounter.useRetry();
}
}
/**
* exists is an idempotent operation. Retry before throwing exception
* @return A Stat instance
*/
public Stat exists(String path, boolean watch)
throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
return zk.exists(path, watch);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
case SESSIONEXPIRED:
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "exists");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
retryCounter.useRetry();
}
}
private void retryOrThrow(RetryCounter retryCounter, KeeperException e,
String opName) throws KeeperException {
LOG.warn("Possibly transient ZooKeeper exception: " + e);
if (!retryCounter.shouldRetry()) {
LOG.error("ZooKeeper " + opName + " failed after "
+ retryCounter.getMaxRetries() + " retries");
throw e;
}
}
/**
* getChildren is an idempotent operation. Retry before throwing exception
* @return List of children znodes
*/
public List<String> getChildren(String path, Watcher watcher)
throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
return zk.getChildren(path, watcher);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
case SESSIONEXPIRED:
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "getChildren");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
retryCounter.useRetry();
}
}
/**
* getData is an idempotent operation. Retry before throwing exception
* @return Data
*/
public byte[] getData(String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
byte[] revData = zk.getData(path, watcher, stat);
return this.removeMetaData(revData);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
case SESSIONEXPIRED:
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "getData");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
retryCounter.useRetry();
}
}
/**
* getData is an idemnpotent operation. Retry before throwing exception
* @return Data
*/
public byte[] getData(String path, boolean watch, Stat stat)
throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
byte[] revData = zk.getData(path, watch, stat);
return this.removeMetaData(revData);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
case SESSIONEXPIRED:
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "getData");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
retryCounter.useRetry();
}
}
/**
* setData is NOT an idempotent operation. Retry may cause BadVersion Exception
* Adding an identifier field into the data to check whether
* badversion is caused by the result of previous correctly setData
* @return Stat instance
*/
public Stat setData(String path, byte[] data, int version)
throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
// byte[] newData = appendMetaData(data);
byte[] newData = data;
while (true) {
try {
return zk.setData(path, newData, version);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
case SESSIONEXPIRED:
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "setData");
break;
case BADVERSION:
// try to verify whether the previous setData success or not
try{
Stat stat = new Stat();
byte[] revData = zk.getData(path, false, stat);
if (Bytes.equals(revData, newData)) {
// the bad version is caused by previous successful setData
return stat;
}
} catch(KeeperException keeperException){
// the ZK is not reliable at this moment. just throwing exception
throw keeperException;
}
break;
// throw other exceptions and verified bad version exceptions
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
retryCounter.useRetry();
}
}
/**
* <p>
* NONSEQUENTIAL create is idempotent operation.
* Retry before throwing exceptions.
* But this function will not throw the NodeExist exception back to the
* application.
* </p>
* <p>
* But SEQUENTIAL is NOT idempotent operation. It is necessary to add
* identifier to the path to verify, whether the previous one is successful
* or not.
* </p>
*
* @return Path
*/
public String create(String path, byte[] data, List<ACL> acl,
CreateMode createMode)
throws KeeperException, InterruptedException {
// byte[] newData = appendMetaData(data);
byte[] newData = data;
switch (createMode) {
case EPHEMERAL:
case PERSISTENT:
return createNonSequential(path, newData, acl, createMode);
case EPHEMERAL_SEQUENTIAL:
case PERSISTENT_SEQUENTIAL:
return createSequential(path, newData, acl, createMode);
default:
throw new IllegalArgumentException("Unrecognized CreateMode: " +
createMode);
}
}
private String createNonSequential(String path, byte[] data, List<ACL> acl,
CreateMode createMode) throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
boolean isRetry = false; // False for first attempt, true for all retries.
while (true) {
try {
return zk.create(path, data, acl, createMode);
} catch (KeeperException e) {
switch (e.code()) {
case NODEEXISTS:
if (isRetry) {
// If the connection was lost, there is still a possibility that
// we have successfully created the node at our previous attempt,
// so we read the node and compare.
byte[] currentData = zk.getData(path, false, null);
if (currentData != null &&
Bytes.compareTo(currentData, data) == 0) {
// We successfully created a non-sequential node
return path;
}
LOG.error("Node " + path + " already exists with " +
Bytes.toStringBinary(currentData) + ", could not write " +
Bytes.toStringBinary(data));
throw e;
}
LOG.info("Node " + path + " already exists and this is not a " +
"retry");
throw e;
case CONNECTIONLOSS:
case SESSIONEXPIRED:
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "create");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
retryCounter.useRetry();
isRetry = true;
}
}
private String createSequential(String path, byte[] data,
List<ACL> acl, CreateMode createMode)
throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
boolean first = true;
//String newPath = path+this.identifier;
String newPath = path;
while (true) {
try {
if (!first) {
// Check if we succeeded on a previous attempt
String previousResult = findPreviousSequentialNode(newPath);
if (previousResult != null) {
return previousResult;
}
}
first = false;
return zk.create(newPath, data, acl, createMode);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
case SESSIONEXPIRED:
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "create");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
retryCounter.useRetry();
}
}
/**
* Convert Iterable of {@link ZKOp} we got into the ZooKeeper.Op
* instances to actually pass to multi (need to do this in order to appendMetaData).
*/
private Iterable<Op> prepareZKMulti(Iterable<Op> ops)
throws UnsupportedOperationException {
if(ops == null) return null;
List<Op> preparedOps = new LinkedList<Op>();
for (Op op : ops) {
if (op.getType() == ZooDefs.OpCode.create) {
CreateRequest create = (CreateRequest)op.toRequestRecord();
preparedOps.add(Op.create(create.getPath(), appendMetaData(create.getData()),
create.getAcl(), create.getFlags()));
} else if (op.getType() == ZooDefs.OpCode.delete) {
// no need to appendMetaData for delete
preparedOps.add(op);
} else if (op.getType() == ZooDefs.OpCode.setData) {
SetDataRequest setData = (SetDataRequest)op.toRequestRecord();
preparedOps.add(Op.setData(setData.getPath(), appendMetaData(setData.getData()),
setData.getVersion()));
} else {
throw new UnsupportedOperationException("Unexpected ZKOp type: " + op.getClass().getName());
}
}
return preparedOps;
}
/**
* Run multiple operations in a transactional manner. Retry before throwing exception
*/
public List<OpResult> multi(Iterable<Op> ops)
throws KeeperException, InterruptedException {
RetryCounter retryCounter = retryCounterFactory.create();
Iterable<Op> multiOps = prepareZKMulti(ops);
while (true) {
try {
return zk.multi(multiOps);
} catch (KeeperException e) {
switch (e.code()) {
case CONNECTIONLOSS:
case SESSIONEXPIRED:
case OPERATIONTIMEOUT:
retryOrThrow(retryCounter, e, "multi");
break;
default:
throw e;
}
}
retryCounter.sleepUntilNextRetry();
retryCounter.useRetry();
}
}
private String findPreviousSequentialNode(String path)
throws KeeperException, InterruptedException {
int lastSlashIdx = path.lastIndexOf('/');
assert(lastSlashIdx != -1);
String parent = path.substring(0, lastSlashIdx);
String nodePrefix = path.substring(lastSlashIdx+1);
List<String> nodes = zk.getChildren(parent, false);
List<String> matching = filterByPrefix(nodes, nodePrefix);
for (String node : matching) {
String nodePath = parent + "/" + node;
Stat stat = zk.exists(nodePath, false);
if (stat != null) {
return nodePath;
}
}
return null;
}
public byte[] removeMetaData(byte[] data) {
if(data == null || data.length == 0) {
return data;
}
// check the magic data; to be backward compatible
byte magic = data[0];
if(magic != MAGIC) {
return data;
}
int idLength = Bytes.toInt(data, ID_LENGTH_OFFSET);
int dataLength = data.length-MAGIC_SIZE-ID_LENGTH_SIZE-idLength;
int dataOffset = MAGIC_SIZE+ID_LENGTH_SIZE+idLength;
byte[] newData = new byte[dataLength];
System.arraycopy(data, dataOffset, newData, 0, dataLength);
return newData;
}
private byte[] appendMetaData(byte[] data) {
if(data == null || data.length == 0){
return data;
}
byte[] newData = new byte[MAGIC_SIZE+ID_LENGTH_SIZE+id.length+data.length];
int pos = 0;
pos = Bytes.putByte(newData, pos, MAGIC);
pos = Bytes.putInt(newData, pos, id.length);
pos = Bytes.putBytes(newData, pos, id, 0, id.length);
pos = Bytes.putBytes(newData, pos, data, 0, data.length);
return newData;
}
public long getSessionId() {
return zk.getSessionId();
}
public States getState() {
return zk.getState();
}
public ZooKeeper getZooKeeper() {
return zk;
}
public byte[] getSessionPasswd() {
return zk.getSessionPasswd();
}
public void sync(String path, AsyncCallback.VoidCallback cb, Object ctx) {
this.zk.sync(path, null, null);
}
/**
* Filters the given node list by the given prefixes.
* This method is all-inclusive--if any element in the node list starts
* with any of the given prefixes, then it is included in the result.
*
* @param nodes the nodes to filter
* @param prefixes the prefixes to include in the result
* @return list of every element that starts with one of the prefixes
*/
private static List<String> filterByPrefix(List<String> nodes,
String... prefixes) {
List<String> lockChildren = new ArrayList<String>();
for (String child : nodes){
for (String prefix : prefixes){
if (child.startsWith(prefix)){
lockChildren.add(child);
break;
}
}
}
return lockChildren;
}
public static void main(String [] args) throws Exception {
ZkClient zkc = new ZkClient();
}
}