blob: f2345f64552b51d9f9c3253424db6c1f323c1f50 [file] [log] [blame]
package org.apache.helix.zookeeper.api.client;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.SerializableSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
public interface RealmAwareZkClient {
int DEFAULT_OPERATION_TIMEOUT = Integer.MAX_VALUE;
int DEFAULT_CONNECTION_TIMEOUT = 60 * 1000;
int DEFAULT_SESSION_TIMEOUT = 30 * 1000;
// listener subscription
List<String> subscribeChildChanges(String path, IZkChildListener listener);
void unsubscribeChildChanges(String path, IZkChildListener listener);
void subscribeDataChanges(String path, IZkDataListener listener);
void unsubscribeDataChanges(String path, IZkDataListener listener);
/*
* This is for backwards compatibility.
*
* TODO: remove below default implementation when getting rid of I0Itec in the new zk client.
*/
default void subscribeStateChanges(final IZkStateListener listener) {
subscribeStateChanges(new HelixZkClient.I0ItecIZkStateListenerHelixImpl(listener));
}
/*
* This is for backwards compatibility.
*
* TODO: remove below default implementation when getting rid of I0Itec in the new zk client.
*/
default void unsubscribeStateChanges(IZkStateListener listener) {
unsubscribeStateChanges(new HelixZkClient.I0ItecIZkStateListenerHelixImpl(listener));
}
/**
* Subscribes state changes for a
* {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener} listener.
* @deprecated
* This is deprecated. It is kept for backwards compatibility. Please use
* {@link #subscribeStateChanges(IZkStateListener)}.
* @param listener {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener}
* listener
*/
@Deprecated
void subscribeStateChanges(
final org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener);
/**
* Unsubscribes state changes for a
* {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener} listener.
* @deprecated
* This is deprecated. It is kept for backwards compatibility. Please use
* {@link #unsubscribeStateChanges(IZkStateListener)}.
* @param listener {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener}
* listener
*/
@Deprecated
void unsubscribeStateChanges(
org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener);
void unsubscribeAll();
// data access
void createPersistent(String path);
void createPersistent(String path, boolean createParents);
void createPersistent(String path, boolean createParents, List<ACL> acl);
void createPersistent(String path, Object data);
void createPersistent(String path, Object data, List<ACL> acl);
String createPersistentSequential(String path, Object data);
String createPersistentSequential(String path, Object data, List<ACL> acl);
void createEphemeral(final String path);
void createEphemeral(final String path, final String sessionId);
void createEphemeral(final String path, final List<ACL> acl);
void createEphemeral(final String path, final List<ACL> acl, final String sessionId);
String create(final String path, Object data, final CreateMode mode);
String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode);
void createEphemeral(final String path, final Object data);
void createEphemeral(final String path, final Object data, final String sessionId);
void createEphemeral(final String path, final Object data, final List<ACL> acl);
void createEphemeral(final String path, final Object data, final List<ACL> acl,
final String sessionId);
String createEphemeralSequential(final String path, final Object data);
String createEphemeralSequential(final String path, final Object data, final List<ACL> acl);
String createEphemeralSequential(final String path, final Object data, final String sessionId);
String createEphemeralSequential(final String path, final Object data, final List<ACL> acl,
final String sessionId);
List<String> getChildren(String path);
int countChildren(String path);
boolean exists(final String path);
Stat getStat(final String path);
boolean waitUntilExists(String path, TimeUnit timeUnit, long time);
void deleteRecursively(String path);
boolean delete(final String path);
<T extends Object> T readData(String path);
<T extends Object> T readData(String path, boolean returnNullIfPathNotExists);
<T extends Object> T readData(String path, Stat stat);
<T extends Object> T readData(final String path, final Stat stat, final boolean watch);
<T extends Object> T readDataAndStat(String path, Stat stat, boolean returnNullIfPathNotExists);
void writeData(String path, Object object);
<T extends Object> void updateDataSerialized(String path, DataUpdater<T> updater);
void writeData(final String path, Object datat, final int expectedVersion);
Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion);
Stat writeDataGetStat(final String path, Object datat, final int expectedVersion);
void asyncCreate(final String path, Object datat, final CreateMode mode,
final ZkAsyncCallbacks.CreateCallbackHandler cb);
void asyncSetData(final String path, Object datat, final int version,
final ZkAsyncCallbacks.SetDataCallbackHandler cb);
void asyncGetData(final String path, final ZkAsyncCallbacks.GetDataCallbackHandler cb);
void asyncExists(final String path, final ZkAsyncCallbacks.ExistsCallbackHandler cb);
void asyncDelete(final String path, final ZkAsyncCallbacks.DeleteCallbackHandler cb);
void watchForData(final String path);
List<String> watchForChilds(final String path);
long getCreationTime(String path);
List<OpResult> multi(final Iterable<Op> ops);
// ZK state control
boolean waitUntilConnected(long time, TimeUnit timeUnit);
/**
* Waits for SyncConnected state and returns a valid session ID(non-zero). The implementation of
* this method should wait for SyncConnected state and ZK session to be established, and should
* guarantee the established session's ID is returned before keeper state changes.
*
* Please note: this default implementation may have race condition issue and return an unexpected
* session ID that is zero or another new session's ID. The default implementation is for backward
* compatibility purpose.
*
* @param timeout Max waiting time for connecting to ZK server.
* @param timeUnit Time unit for the timeout.
* @return A valid ZK session ID which is non-zero.
*/
default long waitForEstablishedSession(long timeout, TimeUnit timeUnit) {
if (!waitUntilConnected(timeout, timeUnit)) {
throw new ZkTimeoutException(
"Failed to get established session because connecting to ZK server has timed out in "
+ timeout + " " + timeUnit);
}
return getSessionId();
}
String getServers();
long getSessionId();
void close();
boolean isClosed();
// other
byte[] serialize(Object data, String path);
<T extends Object> T deserialize(byte[] data, String path);
void setZkSerializer(ZkSerializer zkSerializer);
void setZkSerializer(PathBasedZkSerializer zkSerializer);
PathBasedZkSerializer getZkSerializer();
/**
* A class that wraps a default implementation of
* {@link IZkStateListener}, which means this listener
* runs the methods of {@link IZkStateListener}.
* This is for backward compatibility and to avoid breaking the original implementation of
* {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener}.
*/
class I0ItecIZkStateListenerHelixImpl
implements org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener {
private IZkStateListener _listener;
I0ItecIZkStateListenerHelixImpl(IZkStateListener listener) {
_listener = listener;
}
@Override
public void handleStateChanged(Watcher.Event.KeeperState keeperState) throws Exception {
_listener.handleStateChanged(keeperState);
}
@Override
public void handleNewSession() throws Exception {
/*
* org.apache.helix.manager.zk.zookeeper.IZkStateListener does not have handleNewSession(),
* so null is passed into handleNewSession(sessionId).
*/
_listener.handleNewSession(null);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
_listener.handleSessionEstablishmentError(error);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof HelixZkClient.I0ItecIZkStateListenerHelixImpl)) {
return false;
}
if (_listener == null) {
return false;
}
HelixZkClient.I0ItecIZkStateListenerHelixImpl
defaultListener = (HelixZkClient.I0ItecIZkStateListenerHelixImpl) obj;
return _listener.equals(defaultListener._listener);
}
@Override
public int hashCode() {
/*
* The original listener's hashcode helps find the wrapped listener with the same original
* listener. This is helpful in unsubscribeStateChanges(listener).
*/
return _listener.hashCode();
}
}
/**
* Configuration for creating a new ZkConnection.
*/
class ZkConnectionConfig {
// Connection configs
private final String _zkServers;
private int _sessionTimeout = HelixZkClient.DEFAULT_SESSION_TIMEOUT;
public ZkConnectionConfig(String zkServers) {
_zkServers = zkServers;
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof HelixZkClient.ZkConnectionConfig)) {
return false;
}
HelixZkClient.ZkConnectionConfig configObj = (HelixZkClient.ZkConnectionConfig) obj;
return (_zkServers == null && configObj._zkServers == null ||
_zkServers != null && _zkServers.equals(configObj._zkServers)) &&
_sessionTimeout == configObj._sessionTimeout;
}
@Override
public int hashCode() {
return _sessionTimeout * 31 + _zkServers.hashCode();
}
@Override
public String toString() {
return (_zkServers + "_" + _sessionTimeout).replaceAll("[\\W]", "_");
}
public HelixZkClient.ZkConnectionConfig setSessionTimeout(Integer sessionTimeout) {
this._sessionTimeout = sessionTimeout;
return this;
}
public String getZkServers() {
return _zkServers;
}
public int getSessionTimeout() {
return _sessionTimeout;
}
}
/**
* Configuration for creating a new RealmAwareZkClient with serializer and monitor.
*/
class ZkClientConfig {
// For client to init the connection
private long _connectInitTimeout = HelixZkClient.DEFAULT_CONNECTION_TIMEOUT;
// Data access configs
private long _operationRetryTimeout = HelixZkClient.DEFAULT_OPERATION_TIMEOUT;
// Others
private PathBasedZkSerializer _zkSerializer;
// Monitoring
private String _monitorType;
private String _monitorKey;
private String _monitorInstanceName = null;
private boolean _monitorRootPathOnly = true;
public HelixZkClient.ZkClientConfig setZkSerializer(PathBasedZkSerializer zkSerializer) {
this._zkSerializer = zkSerializer;
return this;
}
public HelixZkClient.ZkClientConfig setZkSerializer(ZkSerializer zkSerializer) {
this._zkSerializer = new BasicZkSerializer(zkSerializer);
return this;
}
/**
* Used as part of the MBean ObjectName. This item is required for enabling monitoring.
*
* @param monitorType
*/
public HelixZkClient.ZkClientConfig setMonitorType(String monitorType) {
this._monitorType = monitorType;
return this;
}
/**
* Used as part of the MBean ObjectName. This item is required for enabling monitoring.
*
* @param monitorKey
*/
public HelixZkClient.ZkClientConfig setMonitorKey(String monitorKey) {
this._monitorKey = monitorKey;
return this;
}
/**
* Used as part of the MBean ObjectName. This item is optional.
*
* @param instanceName
*/
public HelixZkClient.ZkClientConfig setMonitorInstanceName(String instanceName) {
this._monitorInstanceName = instanceName;
return this;
}
public HelixZkClient.ZkClientConfig setMonitorRootPathOnly(Boolean monitorRootPathOnly) {
this._monitorRootPathOnly = monitorRootPathOnly;
return this;
}
public HelixZkClient.ZkClientConfig setOperationRetryTimeout(Long operationRetryTimeout) {
this._operationRetryTimeout = operationRetryTimeout;
return this;
}
public HelixZkClient.ZkClientConfig setConnectInitTimeout(long _connectInitTimeout) {
this._connectInitTimeout = _connectInitTimeout;
return this;
}
public PathBasedZkSerializer getZkSerializer() {
if (_zkSerializer == null) {
_zkSerializer = new BasicZkSerializer(new SerializableSerializer());
}
return _zkSerializer;
}
public long getOperationRetryTimeout() {
return _operationRetryTimeout;
}
public String getMonitorType() {
return _monitorType;
}
public String getMonitorKey() {
return _monitorKey;
}
public String getMonitorInstanceName() {
return _monitorInstanceName;
}
public boolean isMonitorRootPathOnly() {
return _monitorRootPathOnly;
}
public long getConnectInitTimeout() {
return _connectInitTimeout;
}
}
}