blob: 4398e8e3ad9c2d9856c04a4ecc0998d1ffbd7100 [file] [log] [blame]
package org.apache.helix.metaclient.impl.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.helix.metaclient.api.AsyncCallback;
import org.apache.helix.metaclient.api.ChildChangeListener;
import org.apache.helix.metaclient.api.ConnectStateChangeListener;
import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.metaclient.api.DataUpdater;
import org.apache.helix.metaclient.api.DirectChildChangeListener;
import org.apache.helix.metaclient.api.DirectChildSubscribeResult;
import org.apache.helix.metaclient.api.MetaClientInterface;
import org.apache.helix.metaclient.api.Op;
import org.apache.helix.metaclient.api.OpResult;
import org.apache.helix.metaclient.exception.MetaClientException;
import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
import org.apache.helix.metaclient.impl.zk.adapter.ChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DataListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.DirectChildListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.StateChangeListenerAdapter;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientCreateCallbackHandler;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientDeleteCallbackHandler;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientExistCallbackHandler;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientGetCallbackHandler;
import org.apache.helix.metaclient.impl.zk.adapter.ZkMetaClientSetCallbackHandler;
import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
import org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil;
import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.convertZkEntryModeToMetaClientEntryMode;
import static org.apache.helix.metaclient.impl.zk.util.ZkMetaClientUtil.translateZkExceptionToMetaclientException;
public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(ZkMetaClient.class);
private final ZkClient _zkClient;
private final long _initConnectionTimeout;
private final long _reconnectTimeout;
// After ZkClient gets disconnected from ZK server, it keeps retrying connection until connection
// is re-established or ZkClient is closed. We need a separate thread to monitor ZkClient
// reconnect and close ZkClient if it not able to reconnect within user specified timeout.
private final ScheduledExecutorService _zkClientReconnectMonitor;
private ScheduledFuture<?> _reconnectMonitorFuture;
private ReconnectStateChangeListener _reconnectStateChangeListener;
// Lock all activities related to ZkClient connection
private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();
public ZkMetaClient(ZkMetaClientConfig config) {
_initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
_reconnectTimeout = config.getMetaClientReconnectPolicy().getAutoReconnectTimeout();
// TODO: Right new ZkClient reconnect using exp backoff with fixed max backoff interval. We should
// Allow user to config reconnect policy
_zkClient = new ZkClient(
new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()),
(int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/,
config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(),
config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false, true);
_zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor();
_reconnectStateChangeListener = new ReconnectStateChangeListener();
}
@Override
public void create(String key, Object data) {
try {
create(key, data, EntryMode.PERSISTENT);
} catch (Exception e) {
throw new MetaClientException(e);
}
}
@Override
public void create(String key, Object data, MetaClientInterface.EntryMode mode) {
try{
_zkClient.create(key, data, ZkMetaClientUtil.convertMetaClientMode(mode));
} catch (ZkException | KeeperException e) {
throw new MetaClientException(e);
}
}
@Override
public void createWithTTL(String key, T data, long ttl) {
try{
_zkClient.createPersistentWithTTL(key, data, ttl);
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
@Override
public void renewTTLNode(String key) {
T oldData = get(key);
if (oldData == null) {
throw new MetaClientNoNodeException("Node at " + key + " does not exist.");
}
set(key, oldData, _zkClient.getStat(key).getVersion());
}
@Override
public void set(String key, T data, int version) {
try {
_zkClient.writeData(key, data, version);
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
@Override
public T update(String key, DataUpdater<T> updater) {
org.apache.zookeeper.data.Stat stat = new org.apache.zookeeper.data.Stat();
// TODO: add retry logic for ZkBadVersionException.
try {
T oldData = _zkClient.readData(key, stat);
T newData = updater.update(oldData);
set(key, newData, stat.getVersion());
return newData;
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
//TODO: Get Expiry Time in Stat
@Override
public Stat exists(String key) {
org.apache.zookeeper.data.Stat zkStats;
try {
zkStats = _zkClient.getStat(key);
if (zkStats == null) {
return null;
}
return new Stat(convertZkEntryModeToMetaClientEntryMode(zkStats.getEphemeralOwner()),
zkStats.getVersion(), zkStats.getCtime(), zkStats.getMtime(), -1);
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
@Override
public T get(String key) {
return _zkClient.readData(key, true);
}
@Override
public List<String> getDirectChildrenKeys(String key) {
try {
return _zkClient.getChildren(key);
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
@Override
public int countDirectChildren(String key) {
return _zkClient.countChildren(key);
}
@Override
public boolean delete(String key) {
try {
return _zkClient.delete(key);
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
}
}
@Override
public boolean recursiveDelete(String key) {
_zkClient.deleteRecursively(key);
return true;
}
// In Current ZkClient, Async CRUD do auto retry when connection lost or session mismatch using
// existing retry handling logic in zkClient. (defined in ZkAsyncCallbacks)
// ZkClient execute async callbacks at zkClient main thead, retry is handles in a separate retry
// thread. In our first version of implementation, we will keep similar behavior and have
// callbacks executed in ZkClient event thread, and reuse zkClient retry logic.
// It is highly recommended *NOT* to perform any blocking operation inside the callbacks.
// If you block the thread the meta client won't process other events.
// corresponding callbacks for each operation are invoked in order.
@Override
public void setAsyncExecPoolSize(int poolSize) {
throw new UnsupportedOperationException(
"All async calls are executed in a single thread to maintain sequence.");
}
@Override
public void asyncCreate(String key, Object data, EntryMode mode, AsyncCallback.VoidCallback cb) {
CreateMode entryMode;
try {
entryMode = ZkMetaClientUtil.convertMetaClientMode(mode);
} catch (ZkException | KeeperException e) {
throw new MetaClientException(e);
}
_zkClient.asyncCreate(key, data, entryMode,
new ZkMetaClientCreateCallbackHandler(cb));
}
@Override
public void asyncUpdate(String key, DataUpdater<T> updater, AsyncCallback.DataCallback cb) {
throw new NotImplementedException("Currently asyncUpdate is not supported in ZkMetaClient.");
/*
* TODO: Only Helix has potential using this API as of now. (ZkBaseDataAccessor.update())
* Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor.
*/
}
@Override
public void asyncGet(String key, AsyncCallback.DataCallback cb) {
_zkClient.asyncGetData(key,
new ZkMetaClientGetCallbackHandler(cb));
}
@Override
public void asyncCountChildren(String key, AsyncCallback.DataCallback cb) {
throw new NotImplementedException(
"Currently asyncCountChildren is not supported in ZkMetaClient.");
/*
* TODO: Only Helix has potential using this API as of now. (ZkBaseDataAccessor.getChildren())
* Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor.
*/
}
@Override
public void asyncExist(String key, AsyncCallback.StatCallback cb) {
_zkClient.asyncExists(key,
new ZkMetaClientExistCallbackHandler(cb));
}
public void asyncDelete(String key, AsyncCallback.VoidCallback cb) {
_zkClient.asyncDelete(key, new ZkMetaClientDeleteCallbackHandler(cb));
}
@Override
public void asyncTransaction(Iterable<Op> ops, AsyncCallback.TransactionCallback cb) {
throw new NotImplementedException(
"Currently asyncTransaction is not supported in ZkMetaClient.");
//TODO: There is no active use case for Async transaction.
}
@Override
public void asyncSet(String key, T data, int version, AsyncCallback.StatCallback cb) {
_zkClient.asyncSetData(key, data, version,
new ZkMetaClientSetCallbackHandler(cb));
}
@Override
public void connect() {
try {
_zkClientConnectionMutex.lock();
_zkClient.connect(_initConnectionTimeout, _zkClient);
// register _reconnectStateChangeListener as state change listener to react to ZkClient connect
// state change event. When ZkClient disconnected from ZK, it still auto reconnect until
// ZkClient is closed or connection re-established.
// We will need to close ZkClient when user set retry connection timeout.
_zkClient.subscribeStateChanges(_reconnectStateChangeListener);
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
} finally {
_zkClientConnectionMutex.unlock();
}
}
@Override
public void disconnect() {
cleanUpAndClose(true, true);
_zkClientReconnectMonitor.shutdownNow();
}
@Override
public ConnectState getClientConnectionState() {
return null;
}
@Override
public boolean subscribeDataChange(String key, DataChangeListener listener, boolean skipWatchingNonExistNode) {
_zkClient.subscribeDataChanges(key, new DataListenerAdapter(listener));
return true;
}
@Override
public DirectChildSubscribeResult subscribeDirectChildChange(String key,
DirectChildChangeListener listener, boolean skipWatchingNonExistNode) {
ChildrenSubscribeResult result =
_zkClient.subscribeChildChanges(key, new DirectChildListenerAdapter(listener), skipWatchingNonExistNode);
return new DirectChildSubscribeResult(result.getChildren(), result.isInstalled());
}
@Override
public boolean subscribeStateChanges(ConnectStateChangeListener listener) {
_zkClient.subscribeStateChanges(new StateChangeListenerAdapter(listener));
return true;
}
@Override
public boolean subscribeChildChanges(String key, ChildChangeListener listener, boolean skipWatchingNonExistNode) {
if (skipWatchingNonExistNode && exists(key) == null) {
return false;
}
_zkClient.subscribePersistRecursiveListener(key, new ChildListenerAdapter(listener));
return true;
}
@Override
public void unsubscribeDataChange(String key, DataChangeListener listener) {
_zkClient.unsubscribeDataChanges(key, new DataListenerAdapter(listener));
}
@Override
public void unsubscribeDirectChildChange(String key, DirectChildChangeListener listener) {
_zkClient.unsubscribeChildChanges(key, new DirectChildListenerAdapter(listener));
}
// TODO: add impl and remove UnimplementedException
@Override
public void unsubscribeChildChanges(String key, ChildChangeListener listener) {
_zkClient.unsubscribePersistRecursiveListener(key, new ChildListenerAdapter(listener));
}
@Override
public void unsubscribeConnectStateChanges(ConnectStateChangeListener listener) {
_zkClient.subscribeStateChanges(new StateChangeListenerAdapter(listener));
}
@Override
public boolean waitUntilExists(String key, TimeUnit timeUnit, long time) {
return false;
}
@Override
public boolean[] create(List<String> key, List<T> data, List<EntryMode> mode) {
return new boolean[0];
}
@Override
public boolean[] create(List<String> key, List<T> data) {
return new boolean[0];
}
@Override
public boolean[] delete(List<String> keys) {
return new boolean[0];
}
@Override
public List<Stat> exists(List<String> keys) {
return null;
}
@Override
public List<T> get(List<String> keys) {
return null;
}
@Override
public List<T> update(List<String> keys, List<DataUpdater<T>> updater) {
return null;
}
@Override
public boolean[] set(List<String> keys, List<T> datas, List<Integer> version) {
return new boolean[0];
}
@Override
public void close() {
disconnect();
}
@Override
public List<OpResult> transactionOP(Iterable<Op> ops) {
// Convert list of MetaClient Ops to Zk Ops
List<org.apache.zookeeper.Op> zkOps = ZkMetaClientUtil.metaClientOpsToZkOps(ops);
// Execute Zk transactional support
List<org.apache.zookeeper.OpResult> zkResult = _zkClient.multi(zkOps);
// Convert list of Zk OpResults to MetaClient OpResults
return ZkMetaClientUtil.zkOpResultToMetaClientOpResults(zkResult);
}
@Override
public byte[] serialize(T data, String path) {
return _zkClient.serialize(data, path);
}
@Override
public T deserialize(byte[] bytes, String path) {
return _zkClient.deserialize(bytes, path);
}
/**
* A clean up method called when connect state change or MetaClient is closing.
* @param cancel If we want to cancel the reconnect monitor thread.
* @param close If we want to close ZkClient.
*/
private void cleanUpAndClose(boolean cancel, boolean close) {
_zkClientConnectionMutex.lock();
try {
if (close && !_zkClient.isClosed()) {
_zkClient.close();
// TODO: need to unsubscribe all persist watcher from ZK
// Add this in ZkClient when persist watcher change is in
// Also need to manually send CLOSED state change to state
// change listener (in change adapter)
LOG.info("ZkClient is closed");
}
if (cancel && _reconnectMonitorFuture != null) {
_reconnectMonitorFuture.cancel(true);
LOG.info("ZkClient reconnect monitor thread is canceled");
}
} finally {
_zkClientConnectionMutex.unlock();
}
}
/**
* MetaClient uses Helix ZkClient (@see org.apache.helix.zookeeper.impl.client.ZkClient) to connect
* to ZK. Current implementation of ZkClient auto-reconnects infinitely. We use monitor thread
* in ZkMetaClient to monitor reconnect status and close ZkClient when the client still is in
* disconnected state when it reach reconnect timeout.
*
*
* case 1: Start the monitor thread when ZkMetaClient gets disconnected even to check connect state
* when timeout reached. If not re-connected when timed out, kill the monitor thread
* and close ZkClient.
* [MetaClient thread] ---------------------------------------------------------------
* ( When disconnected, schedule a event
* to check connect state after timeout)
* [Reconnect monitor thread] --------------------------------------
* ^ | not reconnected when timed out
* / |
* | disconnected event v
* [ZkClient] -------X---------------------------------------X zkClient.close()
* [ZkClient exp back | X X
* -off retry connection] |--------|--------------|--------------
*
*
* case 2: Start the monitor thread when ZkMetaClient gets disconnected even to check connect state
* when timeout reached. If re-connected before timed out, cancel the delayed monitor thread.
*
* [MetaClient thread] ---------------------------------------------------------------
* (cancel scheduled task when reconnected)
* [Reconnect monitor] ---------------------------------X
* ^ ^
* / /
* | disconnected event | reconnected event
* [ZkClient] -----X------------------------------------------------------
* [ZkClient exp back | X Y Reconnected before timed out
* -off retry connection] |--------| ---------------------|
*
*
* case 3: Start the monitor thread when ZkMetaClient gets disconnected even to check connect state
* when timeout reached. If re-connected errored, kill the monitor thread and cancel the
* delayed monitor thread.
* [MetaClient thread] ---------------------------------------------------------------
* (cancel scheduled task and close ZkClient when reconnected error)
* [Reconnect monitor] ----------------------------------X
* ^ ^ |
* / err / |
* | disconnected event | v close ZkClient
* [ZkClient] -----X-------------------------------X ---X
* [ZkClient exp back | X ^ Reconnect error
* -off retry connection] |--------| --------------------|
*
*/
private class ReconnectStateChangeListener implements IZkStateListener {
// Schedule a monitor to track ZkClient auto reconnect when Disconnected
// Cancel the monitor thread when connected.
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
if (state == Watcher.Event.KeeperState.Disconnected) { // ------case 1
// Expired. start a new event monitoring retry
_zkClientConnectionMutex.lockInterruptibly();
try {
if (_reconnectMonitorFuture == null || _reconnectMonitorFuture.isCancelled()
|| _reconnectMonitorFuture.isDone()) {
_reconnectMonitorFuture = _zkClientReconnectMonitor.schedule(() -> {
if (!_zkClient.getConnection().getZookeeperState().isConnected()) {
cleanUpAndClose(false, true);
}
}, _reconnectTimeout, TimeUnit.MILLISECONDS);
LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after {}",
_reconnectTimeout);
}
} finally {
_zkClientConnectionMutex.unlock();
}
} else if (state == Watcher.Event.KeeperState.SyncConnected
|| state == Watcher.Event.KeeperState.ConnectedReadOnly) { // ------ case 2
cleanUpAndClose(true, false);
LOG.info("ZkClient is SyncConnected, reconnect monitor thread is canceled (if any)");
}
}
// Cancel the monitor thread when connected.
@Override
public void handleNewSession(String sessionId) throws Exception { // ------ case 2
cleanUpAndClose(true, false);
LOG.info("New session initiated in ZkClient, reconnect monitor thread is canceled (if any)");
}
// Cancel the monitor thread and close ZkClient when connect error.
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception { // -- case 3
cleanUpAndClose(true, true);
LOG.info("New session initiated in ZkClient, reconnect monitor thread is canceled (if any)");
}
}
@VisibleForTesting
ZkClient getZkClient() {
return _zkClient;
}
}