blob: a1475de8a1ea059380fd3e29bce3ef63ac7f7218 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A very simple read only zookeeper implementation without watcher support.
*/
@InterfaceAudience.Private
public final class ReadOnlyZKClient implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class);
public static final String RECOVERY_RETRY = "zookeeper.recovery.retry";
private static final int DEFAULT_RECOVERY_RETRY = 30;
public static final String RECOVERY_RETRY_INTERVAL_MILLIS =
"zookeeper.recovery.retry.intervalmill";
private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000;
public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time";
private static final int DEFAULT_KEEPALIVE_MILLIS = 60000;
private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED);
private final String connectString;
private final int sessionTimeoutMs;
private final int maxRetries;
private final int retryIntervalMs;
private final int keepAliveTimeMs;
private static abstract class Task implements Delayed {
protected long time = System.nanoTime();
public boolean needZk() {
return false;
}
public void exec(ZooKeeper zk) {
}
public void connectFailed(IOException e) {
}
public void closed(IOException e) {
}
@Override
public int compareTo(Delayed o) {
Task that = (Task) o;
int c = Long.compare(time, that.time);
if (c != 0) {
return c;
}
return Integer.compare(System.identityHashCode(this), System.identityHashCode(that));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
}
}
private static final Task CLOSE = new Task() {
};
private final DelayQueue<Task> tasks = new DelayQueue<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
ZooKeeper zookeeper;
private int pendingRequests = 0;
private String getId() {
return String.format("0x%08x", System.identityHashCode(this));
}
public ReadOnlyZKClient(Configuration conf) {
// We might use a different ZK for client access
String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
if (clientZkQuorumServers != null) {
this.connectString = clientZkQuorumServers;
} else {
this.connectString = ZKConfig.getZKQuorumServersString(conf);
}
this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
this.retryIntervalMs =
conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
LOG.debug(
"Connect {} to {} with session timeout={}ms, retries {}, " +
"retry interval {}ms, keepAlive={}ms",
getId(), connectString, sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
Threads.setDaemonThreadRunning(new Thread(this::run),
"ReadOnlyZKClient-" + connectString + "@" + getId());
}
private abstract class ZKTask<T> extends Task {
protected final String path;
private final CompletableFuture<T> future;
private final String operationType;
private int retries;
protected ZKTask(String path, CompletableFuture<T> future, String operationType) {
this.path = path;
this.future = future;
this.operationType = operationType;
}
protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) {
tasks.add(new Task() {
@Override
public void exec(ZooKeeper alwaysNull) {
pendingRequests--;
Code code = Code.get(rc);
if (code == Code.OK) {
future.complete(ret);
} else if (code == Code.NONODE) {
if (errorIfNoNode) {
future.completeExceptionally(KeeperException.create(code, path));
} else {
future.complete(ret);
}
} else if (FAIL_FAST_CODES.contains(code)) {
future.completeExceptionally(KeeperException.create(code, path));
} else {
if (code == Code.SESSIONEXPIRED) {
LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString);
try {
zk.close();
} catch (InterruptedException e) {
}
}
if (ZKTask.this.delay(retryIntervalMs, maxRetries)) {
LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(),
connectString, operationType, path, code, ZKTask.this.retries);
tasks.add(ZKTask.this);
} else {
LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(),
connectString, operationType, path, code, ZKTask.this.retries);
future.completeExceptionally(KeeperException.create(code, path));
}
}
}
@Override
public void closed(IOException e) {
// It may happen that a request is succeeded and the onComplete has been called and pushed
// us into the task queue, but before we get called a close is called and here we will
// fail the request, although it is succeeded actually.
// This is not a perfect solution but anyway, it is better than hang the requests for
// ever, and also acceptable as if you close the zk client before actually getting the
// response then a failure is always possible.
future.completeExceptionally(e);
}
});
}
@Override
public boolean needZk() {
return true;
}
protected abstract void doExec(ZooKeeper zk);
@Override
public final void exec(ZooKeeper zk) {
pendingRequests++;
doExec(zk);
}
public boolean delay(long intervalMs, int maxRetries) {
if (retries >= maxRetries) {
return false;
}
retries++;
time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs);
return true;
}
@Override
public void connectFailed(IOException e) {
if (delay(retryIntervalMs, maxRetries)) {
LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(),
connectString, operationType, path, retries, e);
tasks.add(this);
} else {
LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(),
connectString, operationType, path, retries, e);
future.completeExceptionally(e);
}
}
@Override
public void closed(IOException e) {
future.completeExceptionally(e);
}
}
public CompletableFuture<byte[]> get(String path) {
if (closed.get()) {
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
}
CompletableFuture<byte[]> future = new CompletableFuture<>();
tasks.add(new ZKTask<byte[]>(path, future, "get") {
@Override
protected void doExec(ZooKeeper zk) {
zk.getData(path, false,
(rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null);
}
});
return future;
}
public CompletableFuture<Stat> exists(String path) {
if (closed.get()) {
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
}
CompletableFuture<Stat> future = new CompletableFuture<>();
tasks.add(new ZKTask<Stat>(path, future, "exists") {
@Override
protected void doExec(ZooKeeper zk) {
zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
}
});
return future;
}
public CompletableFuture<List<String>> list(String path) {
if (closed.get()) {
return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
}
CompletableFuture<List<String>> future = new CompletableFuture<>();
tasks.add(new ZKTask<List<String>>(path, future, "list") {
@Override
protected void doExec(ZooKeeper zk) {
zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true),
null);
}
});
return future;
}
private void closeZk() {
if (zookeeper != null) {
try {
zookeeper.close();
} catch (InterruptedException e) {
}
zookeeper = null;
}
}
private ZooKeeper getZk() throws IOException {
// may be closed when session expired
if (zookeeper == null || !zookeeper.getState().isAlive()) {
zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {});
}
return zookeeper;
}
private void run() {
for (;;) {
Task task;
try {
task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
continue;
}
if (task == CLOSE) {
break;
}
if (task == null) {
if (pendingRequests == 0) {
LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)",
getId(), connectString, keepAliveTimeMs);
closeZk();
}
continue;
}
if (!task.needZk()) {
task.exec(null);
} else {
ZooKeeper zk;
try {
zk = getZk();
} catch (IOException e) {
task.connectFailed(e);
continue;
}
task.exec(zk);
}
}
closeZk();
DoNotRetryIOException error = new DoNotRetryIOException("Client already closed");
Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error));
tasks.clear();
}
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
LOG.debug("Close zookeeper connection {} to {}", getId(), connectString);
tasks.add(CLOSE);
}
}
public String getConnectString() {
return connectString;
}
}