blob: 3d97e6f7b896abadbce88c1d79e4cc3289597f7f [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.zookeeper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.ZooWorker.ZooCallable;
import org.apache.zookeeper.AsyncCallback.ACLCallback;
import org.apache.zookeeper.AsyncCallback.Children2Callback;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
import org.apache.zookeeper.AsyncCallback.MultiCallback;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Transaction;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Provide a zookeeper client to handle session expire.
*/
public class ZooKeeperClient extends ZooKeeper implements Watcher, AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ZooKeeperClient.class);
private static final int DEFAULT_RETRY_EXECUTOR_THREAD_COUNT = 1;
// ZooKeeper client connection variables
private final String connectString;
private final int sessionTimeoutMs;
private final boolean allowReadOnlyMode;
// state for the zookeeper client
private final AtomicReference<ZooKeeper> zk = new AtomicReference<ZooKeeper>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final ZooKeeperWatcherBase watcherManager;
private final ScheduledExecutorService retryExecutor;
private final ExecutorService connectExecutor;
// rate limiter
private final RateLimiter rateLimiter;
// retry polices
private final RetryPolicy connectRetryPolicy;
private final RetryPolicy operationRetryPolicy;
// Stats Logger
private final OpStatsLogger createStats;
private final OpStatsLogger getStats;
private final OpStatsLogger setStats;
private final OpStatsLogger deleteStats;
private final OpStatsLogger getChildrenStats;
private final OpStatsLogger existsStats;
private final OpStatsLogger multiStats;
private final OpStatsLogger getACLStats;
private final OpStatsLogger setACLStats;
private final OpStatsLogger syncStats;
private final OpStatsLogger createClientStats;
private final Callable<ZooKeeper> clientCreator = new Callable<ZooKeeper>() {
@Override
public ZooKeeper call() throws Exception {
try {
return ZooWorker.syncCallWithRetries(null, new ZooCallable<ZooKeeper>() {
@Override
public ZooKeeper call() throws KeeperException, InterruptedException {
logger.info("Reconnecting zookeeper {}.", connectString);
// close the previous one
closeZkHandle();
ZooKeeper newZk;
try {
newZk = createZooKeeper();
} catch (IOException ie) {
logger.error("Failed to create zookeeper instance to " + connectString, ie);
throw KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
}
waitForConnection();
zk.set(newZk);
logger.info("ZooKeeper session {} is created to {}.",
Long.toHexString(newZk.getSessionId()), connectString);
return newZk;
}
@Override
public String toString() {
return String.format("ZooKeeper Client Creator (%s)", connectString);
}
}, connectRetryPolicy, rateLimiter, createClientStats);
} catch (Exception e) {
logger.error("Gave up reconnecting to ZooKeeper : ", e);
Runtime.getRuntime().exit(-1);
return null;
}
}
};
@VisibleForTesting
static ZooKeeperClient createConnectedZooKeeperClient(
String connectString, int sessionTimeoutMs, Set<Watcher> childWatchers,
RetryPolicy operationRetryPolicy)
throws KeeperException, InterruptedException, IOException {
return ZooKeeperClient.newBuilder()
.connectString(connectString)
.sessionTimeoutMs(sessionTimeoutMs)
.watchers(childWatchers)
.operationRetryPolicy(operationRetryPolicy)
.build();
}
/**
* A builder to build retryable zookeeper client.
*/
public static class Builder {
String connectString = null;
int sessionTimeoutMs = 10000;
Set<Watcher> watchers = null;
RetryPolicy connectRetryPolicy = null;
RetryPolicy operationRetryPolicy = null;
StatsLogger statsLogger = NullStatsLogger.INSTANCE;
int retryExecThreadCount = DEFAULT_RETRY_EXECUTOR_THREAD_COUNT;
double requestRateLimit = 0;
boolean allowReadOnlyMode = false;
private Builder() {}
public Builder connectString(String connectString) {
this.connectString = connectString;
return this;
}
public Builder sessionTimeoutMs(int sessionTimeoutMs) {
this.sessionTimeoutMs = sessionTimeoutMs;
return this;
}
public Builder watchers(Set<Watcher> watchers) {
this.watchers = watchers;
return this;
}
public Builder connectRetryPolicy(RetryPolicy retryPolicy) {
this.connectRetryPolicy = retryPolicy;
return this;
}
public Builder operationRetryPolicy(RetryPolicy retryPolicy) {
this.operationRetryPolicy = retryPolicy;
return this;
}
public Builder statsLogger(StatsLogger statsLogger) {
this.statsLogger = statsLogger;
return this;
}
public Builder requestRateLimit(double requestRateLimit) {
this.requestRateLimit = requestRateLimit;
return this;
}
public Builder retryThreadCount(int numThreads) {
this.retryExecThreadCount = numThreads;
return this;
}
public Builder allowReadOnlyMode(boolean allowReadOnlyMode) {
this.allowReadOnlyMode = allowReadOnlyMode;
return this;
}
public ZooKeeperClient build() throws IOException, KeeperException, InterruptedException {
checkNotNull(connectString);
checkArgument(sessionTimeoutMs > 0);
checkNotNull(statsLogger);
checkArgument(retryExecThreadCount > 0);
if (null == connectRetryPolicy) {
connectRetryPolicy =
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE);
}
if (null == operationRetryPolicy) {
operationRetryPolicy =
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0);
}
// Create a watcher manager
StatsLogger watcherStatsLogger = statsLogger.scope("watcher");
ZooKeeperWatcherBase watcherManager =
null == watchers ? new ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) :
new ZooKeeperWatcherBase(sessionTimeoutMs, watchers, watcherStatsLogger);
ZooKeeperClient client = new ZooKeeperClient(
connectString,
sessionTimeoutMs,
watcherManager,
connectRetryPolicy,
operationRetryPolicy,
statsLogger,
retryExecThreadCount,
requestRateLimit,
allowReadOnlyMode
);
// Wait for connection to be established.
try {
watcherManager.waitForConnection();
} catch (KeeperException ke) {
client.close();
throw ke;
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
client.close();
throw ie;
}
return client;
}
}
public static Builder newBuilder() {
return new Builder();
}
protected ZooKeeperClient(String connectString,
int sessionTimeoutMs,
ZooKeeperWatcherBase watcherManager,
RetryPolicy connectRetryPolicy,
RetryPolicy operationRetryPolicy,
StatsLogger statsLogger,
int retryExecThreadCount,
double rate,
boolean allowReadOnlyMode) throws IOException {
super(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode);
this.connectString = connectString;
this.sessionTimeoutMs = sessionTimeoutMs;
this.allowReadOnlyMode = allowReadOnlyMode;
this.watcherManager = watcherManager;
this.connectRetryPolicy = connectRetryPolicy;
this.operationRetryPolicy = operationRetryPolicy;
this.rateLimiter = rate > 0 ? RateLimiter.create(rate) : null;
this.retryExecutor =
Executors.newScheduledThreadPool(retryExecThreadCount,
new ThreadFactoryBuilder().setNameFormat("ZKC-retry-executor-%d").build());
this.connectExecutor =
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("ZKC-connect-executor-%d").build());
// added itself to the watcher
watcherManager.addChildWatcher(this);
// Stats
StatsLogger scopedStatsLogger = statsLogger.scope("zk");
createClientStats = scopedStatsLogger.getOpStatsLogger("create_client");
createStats = scopedStatsLogger.getOpStatsLogger("create");
getStats = scopedStatsLogger.getOpStatsLogger("get_data");
setStats = scopedStatsLogger.getOpStatsLogger("set_data");
deleteStats = scopedStatsLogger.getOpStatsLogger("delete");
getChildrenStats = scopedStatsLogger.getOpStatsLogger("get_children");
existsStats = scopedStatsLogger.getOpStatsLogger("exists");
multiStats = scopedStatsLogger.getOpStatsLogger("multi");
getACLStats = scopedStatsLogger.getOpStatsLogger("get_acl");
setACLStats = scopedStatsLogger.getOpStatsLogger("set_acl");
syncStats = scopedStatsLogger.getOpStatsLogger("sync");
}
@Override
public void close() throws InterruptedException {
closed.set(true);
connectExecutor.shutdown();
retryExecutor.shutdown();
closeZkHandle();
}
private void closeZkHandle() throws InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
super.close();
} else {
zkHandle.close();
}
}
public void waitForConnection() throws KeeperException, InterruptedException {
watcherManager.waitForConnection();
}
protected ZooKeeper createZooKeeper() throws IOException {
return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, allowReadOnlyMode);
}
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.None
&& event.getState() == KeeperState.Expired) {
onExpired();
}
}
private void onExpired() {
if (closed.get()) {
// we don't schedule any tries if the client is closed.
return;
}
logger.info("ZooKeeper session {} is expired from {}.",
Long.toHexString(getSessionId()), connectString);
try {
connectExecutor.submit(clientCreator);
} catch (RejectedExecutionException ree) {
if (!closed.get()) {
logger.error("ZooKeeper reconnect task is rejected : ", ree);
}
} catch (Exception t) {
logger.error("Failed to submit zookeeper reconnect task due to runtime exception : ", t);
}
}
/**
* A runnable that retries zookeeper operations.
*/
abstract static class ZkRetryRunnable implements Runnable {
final ZooWorker worker;
final RateLimiter rateLimiter;
final Runnable that;
ZkRetryRunnable(RetryPolicy retryPolicy,
RateLimiter rateLimiter,
OpStatsLogger statsLogger) {
this.worker = new ZooWorker(retryPolicy, statsLogger);
this.rateLimiter = rateLimiter;
that = this;
}
@Override
public void run() {
if (null != rateLimiter) {
rateLimiter.acquire();
}
zkRun();
}
abstract void zkRun();
}
// inherits from ZooKeeper client for all operations
@Override
public long getSessionId() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return super.getSessionId();
}
return zkHandle.getSessionId();
}
@Override
public byte[] getSessionPasswd() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return super.getSessionPasswd();
}
return zkHandle.getSessionPasswd();
}
@Override
public int getSessionTimeout() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return super.getSessionTimeout();
}
return zkHandle.getSessionTimeout();
}
@Override
public void addAuthInfo(String scheme, byte[] auth) {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
super.addAuthInfo(scheme, auth);
return;
}
zkHandle.addAuthInfo(scheme, auth);
}
private void backOffAndRetry(Runnable r, long nextRetryWaitTimeMs) {
try {
retryExecutor.schedule(r, nextRetryWaitTimeMs, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException ree) {
if (!closed.get()) {
logger.error("ZooKeeper Operation {} is rejected : ", r, ree);
}
}
}
private boolean allowRetry(ZooWorker worker, int rc) {
return worker.allowRetry(rc) && !closed.get();
}
@Override
public synchronized void register(Watcher watcher) {
watcherManager.addChildWatcher(watcher);
}
@Override
public List<OpResult> multi(final Iterable<Op> ops) throws InterruptedException, KeeperException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<OpResult>>() {
@Override
public String toString() {
return "multi";
}
@Override
public List<OpResult> call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.multi(ops);
}
return zkHandle.multi(ops);
}
}, operationRetryPolicy, rateLimiter, multiStats);
}
@Override
public void multi(final Iterable<Op> ops,
final MultiCallback cb,
final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) {
final MultiCallback multiCb = new MultiCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<OpResult> results) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, results);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.multi(ops, multiCb, worker);
} else {
zkHandle.multi(ops, multiCb, worker);
}
}
@Override
public String toString() {
return "multi";
}
};
// execute it immediately
proc.run();
}
@Override
@Deprecated
public Transaction transaction() {
// since there is no reference about which client that the transaction could use
// so just use ZooKeeper instance directly.
// you'd better to use {@link #multi}.
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return super.transaction();
}
return zkHandle.transaction();
}
@Override
public List<ACL> getACL(final String path, final Stat stat) throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<ACL>>() {
@Override
public String toString() {
return String.format("getACL (%s, stat = %s)", path, stat);
}
@Override
public List<ACL> call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.getACL(path, stat);
}
return zkHandle.getACL(path, stat);
}
}, operationRetryPolicy, rateLimiter, getACLStats);
}
@Override
public void getACL(final String path, final Stat stat, final ACLCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getACLStats) {
final ACLCallback aclCb = new ACLCallback() {
@Override
public void processResult(int rc, String path, Object ctx, List<ACL> acl, Stat stat) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, acl, stat);
}
}
};
@Override
public String toString() {
return String.format("getACL (%s, stat = %s)", path, stat);
}
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.getACL(path, stat, aclCb, worker);
} else {
zkHandle.getACL(path, stat, aclCb, worker);
}
}
};
// execute it immediately
proc.run();
}
@Override
public Stat setACL(final String path, final List<ACL> acl, final int version)
throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
@Override
public String toString() {
return String.format("setACL (%s, acl = %s, version = %d)", path, acl, version);
}
@Override
public Stat call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.setACL(path, acl, version);
}
return zkHandle.setACL(path, acl, version);
}
}, operationRetryPolicy, rateLimiter, setACLStats);
}
@Override
public void setACL(final String path, final List<ACL> acl, final int version,
final StatCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setACLStats) {
final StatCallback stCb = new StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, stat);
}
}
};
@Override
public String toString() {
return String.format("setACL (%s, acl = %s, version = %d)", path, acl, version);
}
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.setACL(path, acl, version, stCb, worker);
} else {
zkHandle.setACL(path, acl, version, stCb, worker);
}
}
};
// execute it immediately
proc.run();
}
@Override
public void sync(final String path, final VoidCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, syncStats) {
final VoidCallback vCb = new VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context);
}
}
};
@Override
public String toString() {
return String.format("sync (%s)", path);
}
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.sync(path, vCb, worker);
} else {
zkHandle.sync(path, vCb, worker);
}
}
};
// execute it immediately
proc.run();
}
@Override
public States getState() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.getState();
} else {
return zkHandle.getState();
}
}
@Override
public String toString() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.toString();
} else {
return zkHandle.toString();
}
}
@Override
public String create(final String path, final byte[] data,
final List<ACL> acl, final CreateMode createMode)
throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<String>() {
@Override
public String call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.create(path, data, acl, createMode);
}
return zkHandle.create(path, data, acl, createMode);
}
@Override
public String toString() {
return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode);
}
}, operationRetryPolicy, rateLimiter, createStats);
}
@Override
public void create(final String path, final byte[] data, final List<ACL> acl,
final CreateMode createMode, final StringCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, createStats) {
final StringCallback createCb = new StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, name);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.create(path, data, acl, createMode, createCb, worker);
} else {
zkHandle.create(path, data, acl, createMode, createCb, worker);
}
}
@Override
public String toString() {
return String.format("create (%s, acl = %s, mode = %s)", path, acl, createMode);
}
};
// execute it immediately
proc.run();
}
@Override
public void delete(final String path, final int version) throws KeeperException, InterruptedException {
ZooWorker.syncCallWithRetries(this, new ZooCallable<Void>() {
@Override
public Void call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.delete(path, version);
} else {
zkHandle.delete(path, version);
}
return null;
}
@Override
public String toString() {
return String.format("delete (%s, version = %d)", path, version);
}
}, operationRetryPolicy, rateLimiter, deleteStats);
}
@Override
public void delete(final String path, final int version, final VoidCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, deleteStats) {
final VoidCallback deleteCb = new VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.delete(path, version, deleteCb, worker);
} else {
zkHandle.delete(path, version, deleteCb, worker);
}
}
@Override
public String toString() {
return String.format("delete (%s, version = %d)", path, version);
}
};
// execute it immediately
proc.run();
}
@Override
public Stat exists(final String path, final Watcher watcher) throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
@Override
public Stat call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.exists(path, watcher);
}
return zkHandle.exists(path, watcher);
}
@Override
public String toString() {
return String.format("exists (%s, watcher = %s)", path, watcher);
}
}, operationRetryPolicy, rateLimiter, existsStats);
}
@Override
public Stat exists(final String path, final boolean watch) throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
@Override
public Stat call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.exists(path, watch);
}
return zkHandle.exists(path, watch);
}
@Override
public String toString() {
return String.format("exists (%s, watcher = %s)", path, watch);
}
}, operationRetryPolicy, rateLimiter, existsStats);
}
@Override
public void exists(final String path, final Watcher watcher, final StatCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, existsStats) {
final StatCallback stCb = new StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, stat);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.exists(path, watcher, stCb, worker);
} else {
zkHandle.exists(path, watcher, stCb, worker);
}
}
@Override
public String toString() {
return String.format("exists (%s, watcher = %s)", path, watcher);
}
};
// execute it immediately
proc.run();
}
@Override
public void exists(final String path, final boolean watch, final StatCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, existsStats) {
final StatCallback stCb = new StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, stat);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.exists(path, watch, stCb, worker);
} else {
zkHandle.exists(path, watch, stCb, worker);
}
}
@Override
public String toString() {
return String.format("exists (%s, watcher = %s)", path, watch);
}
};
// execute it immediately
proc.run();
}
@Override
public byte[] getData(final String path, final Watcher watcher, final Stat stat)
throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() {
@Override
public byte[] call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.getData(path, watcher, stat);
}
return zkHandle.getData(path, watcher, stat);
}
@Override
public String toString() {
return String.format("getData (%s, watcher = %s)", path, watcher);
}
}, operationRetryPolicy, rateLimiter, getStats);
}
@Override
public byte[] getData(final String path, final boolean watch, final Stat stat)
throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<byte[]>() {
@Override
public byte[] call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.getData(path, watch, stat);
}
return zkHandle.getData(path, watch, stat);
}
@Override
public String toString() {
return String.format("getData (%s, watcher = %s)", path, watch);
}
}, operationRetryPolicy, rateLimiter, getStats);
}
@Override
public void getData(final String path, final Watcher watcher, final DataCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getStats) {
final DataCallback dataCb = new DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, data, stat);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.getData(path, watcher, dataCb, worker);
} else {
zkHandle.getData(path, watcher, dataCb, worker);
}
}
@Override
public String toString() {
return String.format("getData (%s, watcher = %s)", path, watcher);
}
};
// execute it immediately
proc.run();
}
@Override
public void getData(final String path, final boolean watch, final DataCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getStats) {
final DataCallback dataCb = new DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, data, stat);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.getData(path, watch, dataCb, worker);
} else {
zkHandle.getData(path, watch, dataCb, worker);
}
}
@Override
public String toString() {
return String.format("getData (%s, watcher = %s)", path, watch);
}
};
// execute it immediately
proc.run();
}
@Override
public Stat setData(final String path, final byte[] data, final int version)
throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<Stat>() {
@Override
public Stat call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.setData(path, data, version);
}
return zkHandle.setData(path, data, version);
}
@Override
public String toString() {
return String.format("setData (%s, version = %d)", path, version);
}
}, operationRetryPolicy, rateLimiter, setStats);
}
@Override
public void setData(final String path, final byte[] data, final int version,
final StatCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, setStats) {
final StatCallback stCb = new StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, stat);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.setData(path, data, version, stCb, worker);
} else {
zkHandle.setData(path, data, version, stCb, worker);
}
}
@Override
public String toString() {
return String.format("setData (%s, version = %d)", path, version);
}
};
// execute it immediately
proc.run();
}
@Override
public List<String> getChildren(final String path, final Watcher watcher, final Stat stat)
throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
@Override
public List<String> call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.getChildren(path, watcher, stat);
}
return zkHandle.getChildren(path, watcher, stat);
}
@Override
public String toString() {
return String.format("getChildren (%s, watcher = %s)", path, watcher);
}
}, operationRetryPolicy, rateLimiter, getChildrenStats);
}
@Override
public List<String> getChildren(final String path, final boolean watch, final Stat stat)
throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
@Override
public List<String> call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.getChildren(path, watch, stat);
}
return zkHandle.getChildren(path, watch, stat);
}
@Override
public String toString() {
return String.format("getChildren (%s, watcher = %s)", path, watch);
}
}, operationRetryPolicy, rateLimiter, getChildrenStats);
}
@Override
public void getChildren(final String path, final Watcher watcher,
final Children2Callback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
final Children2Callback childCb = new Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx,
List<String> children, Stat stat) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, children, stat);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.getChildren(path, watcher, childCb, worker);
} else {
zkHandle.getChildren(path, watcher, childCb, worker);
}
}
@Override
public String toString() {
return String.format("getChildren (%s, watcher = %s)", path, watcher);
}
};
// execute it immediately
proc.run();
}
@Override
public void getChildren(final String path, final boolean watch, final Children2Callback cb,
final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
final Children2Callback childCb = new Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx,
List<String> children, Stat stat) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, children, stat);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.getChildren(path, watch, childCb, worker);
} else {
zkHandle.getChildren(path, watch, childCb, worker);
}
}
@Override
public String toString() {
return String.format("getChildren (%s, watcher = %s)", path, watch);
}
};
// execute it immediately
proc.run();
}
@Override
public List<String> getChildren(final String path, final Watcher watcher)
throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
@Override
public List<String> call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.getChildren(path, watcher);
}
return zkHandle.getChildren(path, watcher);
}
@Override
public String toString() {
return String.format("getChildren (%s, watcher = %s)", path, watcher);
}
}, operationRetryPolicy, rateLimiter, getChildrenStats);
}
@Override
public List<String> getChildren(final String path, final boolean watch)
throws KeeperException, InterruptedException {
return ZooWorker.syncCallWithRetries(this, new ZooCallable<List<String>>() {
@Override
public List<String> call() throws KeeperException, InterruptedException {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
return ZooKeeperClient.super.getChildren(path, watch);
}
return zkHandle.getChildren(path, watch);
}
@Override
public String toString() {
return String.format("getChildren (%s, watcher = %s)", path, watch);
}
}, operationRetryPolicy, rateLimiter, getChildrenStats);
}
@Override
public void getChildren(final String path, final Watcher watcher,
final ChildrenCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
final ChildrenCallback childCb = new ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx,
List<String> children) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, children);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.getChildren(path, watcher, childCb, worker);
} else {
zkHandle.getChildren(path, watcher, childCb, worker);
}
}
@Override
public String toString() {
return String.format("getChildren (%s, watcher = %s)", path, watcher);
}
};
// execute it immediately
proc.run();
}
@Override
public void getChildren(final String path, final boolean watch,
final ChildrenCallback cb, final Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, rateLimiter, getChildrenStats) {
final ChildrenCallback childCb = new ChildrenCallback() {
@Override
public void processResult(int rc, String path, Object ctx,
List<String> children) {
ZooWorker worker = (ZooWorker) ctx;
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
cb.processResult(rc, path, context, children);
}
}
};
@Override
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
ZooKeeperClient.super.getChildren(path, watch, childCb, worker);
} else {
zkHandle.getChildren(path, watch, childCb, worker);
}
}
@Override
public String toString() {
return String.format("getChildren (%s, watcher = %s)", path, watch);
}
};
// execute it immediately
proc.run();
}
}