blob: 912d592186434f970a8707281937421dbc8086db [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;
import com.google.common.base.Stopwatch;
import com.twitter.distributedlog.util.FailpointUtils;
import com.twitter.distributedlog.zk.ZKWatcherManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.RetryPolicy;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static com.google.common.base.Charsets.UTF_8;
/**
* ZooKeeper Client wrapper over {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient}.
* It handles retries on session expires and provides a watcher manager {@link ZKWatcherManager}.
*
* <h3>Metrics</h3>
* <ul>
* <li> zookeeper operation stats are exposed under scope <code>zk</code> by
* {@link org.apache.bookkeeper.zookeeper.ZooKeeperClient}
* <li> stats on zookeeper watched events are exposed under scope <code>watcher</code> by
* {@link org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase}
* <li> stats about {@link ZKWatcherManager} are exposed under scope <code>watcher_manager</code>
* </ul>
*/
public class ZooKeeperClient {
public static interface Credentials {
Credentials NONE = new Credentials() {
@Override
public void authenticate(ZooKeeper zooKeeper) {
// noop
}
};
void authenticate(ZooKeeper zooKeeper);
}
public static class DigestCredentials implements Credentials {
String username;
String password;
public DigestCredentials(String username, String password) {
this.username = username;
this.password = password;
}
@Override
public void authenticate(ZooKeeper zooKeeper) {
zooKeeper.addAuthInfo("digest", String.format("%s:%s", username, password).getBytes(UTF_8));
}
}
public interface ZooKeeperSessionExpireNotifier {
void notifySessionExpired();
}
/**
* Indicates an error connecting to a zookeeper cluster.
*/
public static class ZooKeeperConnectionException extends IOException {
private static final long serialVersionUID = 6682391687004819361L;
public ZooKeeperConnectionException(String message) {
super(message);
}
public ZooKeeperConnectionException(String message, Throwable cause) {
super(message, cause);
}
}
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class.getName());
private final String name;
private final int sessionTimeoutMs;
private final int defaultConnectionTimeoutMs;
private final String zooKeeperServers;
// GuardedBy "this", but still volatile for tests, where we want to be able to see writes
// made from within long synchronized blocks.
private volatile ZooKeeper zooKeeper = null;
private final RetryPolicy retryPolicy;
private final StatsLogger statsLogger;
private final int retryThreadCount;
private final double requestRateLimit;
private final Credentials credentials;
private volatile boolean authenticated = false;
private Stopwatch disconnectedStopwatch = null;
private boolean closed = false;
final Set<Watcher> watchers = new CopyOnWriteArraySet<Watcher>();
// watcher manager to manage watchers
private final ZKWatcherManager watcherManager;
/**
* Creates an unconnected client that will lazily attempt to connect on the first call to
* {@link #get}. All successful connections will be authenticated with the given
* {@code credentials}.
*
* @param sessionTimeoutMs
* ZK session timeout in milliseconds
* @param connectionTimeoutMs
* ZK connection timeout in milliseconds
* @param zooKeeperServers
* the set of servers forming the ZK cluster
*/
ZooKeeperClient(int sessionTimeoutMs, int connectionTimeoutMs, String zooKeeperServers) {
this("default", sessionTimeoutMs, connectionTimeoutMs, zooKeeperServers, null, NullStatsLogger.INSTANCE, 1, 0,
Credentials.NONE);
}
ZooKeeperClient(String name,
int sessionTimeoutMs,
int connectionTimeoutMs,
String zooKeeperServers,
RetryPolicy retryPolicy,
StatsLogger statsLogger,
int retryThreadCount,
double requestRateLimit,
Credentials credentials) {
this.name = name;
this.sessionTimeoutMs = sessionTimeoutMs;
this.zooKeeperServers = zooKeeperServers;
this.defaultConnectionTimeoutMs = connectionTimeoutMs;
this.retryPolicy = retryPolicy;
this.statsLogger = statsLogger;
this.retryThreadCount = retryThreadCount;
this.requestRateLimit = requestRateLimit;
this.credentials = credentials;
this.watcherManager = ZKWatcherManager.newBuilder()
.name(name)
.statsLogger(statsLogger.scope("watcher_manager"))
.build();
}
public List<ACL> getDefaultACL() {
if (Credentials.NONE == credentials) {
return ZooDefs.Ids.OPEN_ACL_UNSAFE;
} else {
return DistributedLogConstants.EVERYONE_READ_CREATOR_ALL;
}
}
public ZKWatcherManager getWatcherManager() {
return watcherManager;
}
/**
* Returns the current active ZK connection or establishes a new one if none has yet been
* established or a previous connection was disconnected or had its session time out.
*
* @return a connected ZooKeeper client
* @throws ZooKeeperConnectionException if there was a problem connecting to the ZK cluster
* @throws InterruptedException if interrupted while waiting for a connection to be established
* @throws TimeoutException if a connection could not be established within the configured
* session timeout
*/
public synchronized ZooKeeper get()
throws ZooKeeperConnectionException, InterruptedException {
try {
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_ZooKeeperConnectionLoss);
} catch (IOException ioe) {
throw new ZooKeeperConnectionException("Client " + name + " failed on establishing zookeeper connection", ioe);
}
// This indicates that the client was explictly closed
if (closed) {
throw new ZooKeeperConnectionException("Client " + name + " has already been closed");
}
// the underneath zookeeper is retryable zookeeper
if (zooKeeper != null && retryPolicy != null) {
if (zooKeeper.getState().equals(ZooKeeper.States.CONNECTED)) {
// the zookeeper client is connected
disconnectedStopwatch = null;
} else {
if (disconnectedStopwatch == null) {
disconnectedStopwatch = Stopwatch.createStarted();
} else {
long disconnectedMs = disconnectedStopwatch.elapsed(TimeUnit.MILLISECONDS);
if (disconnectedMs > defaultConnectionTimeoutMs) {
closeInternal();
authenticated = false;
}
}
}
}
if (zooKeeper == null) {
zooKeeper = buildZooKeeper();
disconnectedStopwatch = null;
}
// In case authenticate throws an exception, the caller can try to recover the client by
// calling get again.
if (!authenticated) {
credentials.authenticate(zooKeeper);
authenticated = true;
}
return zooKeeper;
}
private ZooKeeper buildZooKeeper()
throws ZooKeeperConnectionException, InterruptedException {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
switch (event.getType()) {
case None:
switch (event.getState()) {
case Expired:
if (null == retryPolicy) {
LOG.info("ZooKeeper {}' session expired. Event: {}", name, event);
closeInternal();
}
authenticated = false;
break;
case Disconnected:
if (null == retryPolicy) {
LOG.info("ZooKeeper {} is disconnected from zookeeper now," +
" but it is OK unless we received EXPIRED event.", name);
}
// Mark as not authenticated if expired or disconnected. In both cases
// we lose any attached auth info. Relying on Expired/Disconnected is
// sufficient since all Expired/Disconnected events are processed before
// all SyncConnected events, and the underlying member is not updated until
// SyncConnected is received.
authenticated = false;
break;
default:
break;
}
}
try {
for (Watcher watcher : watchers) {
try {
watcher.process(event);
} catch (Throwable t) {
LOG.warn("Encountered unexpected exception from watcher {} : ", watcher, t);
}
}
} catch (Throwable t) {
LOG.warn("Encountered unexpected exception when firing watched event {} : ", event, t);
}
}
};
Set<Watcher> watchers = new HashSet<Watcher>();
watchers.add(watcher);
ZooKeeper zk;
try {
RetryPolicy opRetryPolicy = null == retryPolicy ?
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) : retryPolicy;
RetryPolicy connectRetryPolicy = null == retryPolicy ?
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, 0) :
new BoundExponentialBackoffRetryPolicy(sessionTimeoutMs, sessionTimeoutMs, Integer.MAX_VALUE);
zk = org.apache.bookkeeper.zookeeper.ZooKeeperClient.newBuilder()
.connectString(zooKeeperServers)
.sessionTimeoutMs(sessionTimeoutMs)
.watchers(watchers)
.operationRetryPolicy(opRetryPolicy)
.connectRetryPolicy(connectRetryPolicy)
.statsLogger(statsLogger)
.retryThreadCount(retryThreadCount)
.requestRateLimit(requestRateLimit)
.build();
} catch (KeeperException e) {
throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e);
} catch (IOException e) {
throw new ZooKeeperConnectionException("Problem connecting to servers: " + zooKeeperServers, e);
}
return zk;
}
/**
* Clients that need to re-establish state after session expiration can register an
* {@code onExpired} command to execute.
*
* @param onExpired the {@code Command} to register
* @return the new {@link Watcher} which can later be passed to {@link #unregister} for
* removal.
*/
public Watcher registerExpirationHandler(final ZooKeeperSessionExpireNotifier onExpired) {
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.None && event.getState() == KeeperState.Expired) {
try {
onExpired.notifySessionExpired();
} catch (Exception exc) {
// do nothing
}
}
}
};
register(watcher);
return watcher;
}
/**
* Clients that need to register a top-level {@code Watcher} should do so using this method. The
* registered {@code watcher} will remain registered across re-connects and session expiration
* events.
*
* @param watcher the {@code Watcher to register}
*/
public void register(Watcher watcher) {
if (null != watcher) {
watchers.add(watcher);
}
}
/**
* Clients can attempt to unregister a top-level {@code Watcher} that has previously been
* registered.
*
* @param watcher the {@code Watcher} to unregister as a top-level, persistent watch
* @return whether the given {@code Watcher} was found and removed from the active set
*/
public boolean unregister(Watcher watcher) {
return null != watcher && watchers.remove(watcher);
}
/**
* Closes the current connection if any expiring the current ZooKeeper session. Any subsequent
* calls to this method will no-op until the next successful {@link #get}.
*/
public synchronized void closeInternal() {
if (zooKeeper != null) {
try {
LOG.info("Closing zookeeper client {}.", name);
zooKeeper.close();
LOG.info("Closed zookeeper client {}.", name);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted trying to close zooKeeper {} : ", name, e);
} finally {
zooKeeper = null;
}
}
}
/**
* Closes the the underlying zookeeper instance.
* Subsequent attempts to {@link #get} will fail
*/
public synchronized void close() {
if (closed) {
return;
}
LOG.info("Close zookeeper client {}.", name);
closeInternal();
closed = true;
}
}