blob: 9e6c2eca226a0c06ec24c7de914d261915709dc8 [file] [log] [blame]
package org.apache.helix.zookeeper.impl.client;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.util.HttpRoutingDataReader;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.ZkConnection;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements and supports all ZK operations defined in interface {@link RealmAwareZkClient},
* except for session-aware operations such as creating ephemeral nodes, for which
* an {@link UnsupportedOperationException} will be thrown.
* <p>
* It acts as a single ZK client but will automatically route read/write/change subscription
* requests to the corresponding ZkClient with the help of metadata store directory service.
* It could connect to multiple ZK addresses and maintain a {@link ZkClient} for each ZK address.
* <p>
* Note: each Zk realm has its own event queue to handle listeners. So listeners from different ZK
* realms could be handled concurrently because listeners of a ZK realm are handled in its own
* queue. The concurrency of listeners should be aware of when implementing listeners for different
* ZK realms. The users should use thread-safe data structures if they wish to handle change
* callbacks.
*/
public class FederatedZkClient implements RealmAwareZkClient {
private static final Logger LOG = LoggerFactory.getLogger(FederatedZkClient.class);
private static final String FEDERATED_ZK_CLIENT = FederatedZkClient.class.getSimpleName();
private static final String DEDICATED_ZK_CLIENT_FACTORY =
DedicatedZkClientFactory.class.getSimpleName();
private final MetadataStoreRoutingData _metadataStoreRoutingData;
private final RealmAwareZkClient.RealmAwareZkConnectionConfig _connectionConfig;
private final RealmAwareZkClient.RealmAwareZkClientConfig _clientConfig;
// ZK realm -> ZkClient
private final Map<String, ZkClient> _zkRealmToZkClientMap;
private volatile boolean _isClosed;
private PathBasedZkSerializer _pathBasedZkSerializer;
// TODO: support capacity of ZkClient number in one FederatedZkClient and do garbage collection.
public FederatedZkClient(RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
RealmAwareZkClient.RealmAwareZkClientConfig clientConfig)
throws IOException, InvalidRoutingDataException {
if (connectionConfig == null) {
throw new IllegalArgumentException("RealmAwareZkConnectionConfig cannot be null!");
}
if (clientConfig == null) {
throw new IllegalArgumentException("RealmAwareZkClientConfig cannot be null!");
}
// Attempt to get MetadataStoreRoutingData
String msdsEndpoint = connectionConfig.getMsdsEndpoint();
if (msdsEndpoint == null || msdsEndpoint.isEmpty()) {
_metadataStoreRoutingData = HttpRoutingDataReader.getMetadataStoreRoutingData();
} else {
_metadataStoreRoutingData = HttpRoutingDataReader.getMetadataStoreRoutingData(msdsEndpoint);
}
_isClosed = false;
_connectionConfig = connectionConfig;
_clientConfig = clientConfig;
_pathBasedZkSerializer = clientConfig.getZkSerializer();
_zkRealmToZkClientMap = new ConcurrentHashMap<>();
}
@Override
public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
return getZkClient(path).subscribeChildChanges(path, listener);
}
@Override
public ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener,
boolean skipWatchingNodeNotExist) {
return getZkClient(path).subscribeChildChanges(path, listener, skipWatchingNodeNotExist);
}
@Override
public void unsubscribeChildChanges(String path, IZkChildListener listener) {
getZkClient(path).unsubscribeChildChanges(path, listener);
}
@Override
public void subscribeDataChanges(String path, IZkDataListener listener) {
getZkClient(path).subscribeDataChanges(path, listener);
}
@Override
public boolean subscribeDataChanges(String path, IZkDataListener listener,
boolean skipWatchingNodeNotExist) {
return getZkClient(path).subscribeDataChanges(path, listener, skipWatchingNodeNotExist);
}
@Override
public void unsubscribeDataChanges(String path, IZkDataListener listener) {
getZkClient(path).unsubscribeDataChanges(path, listener);
}
@Override
public void subscribeStateChanges(IZkStateListener listener) {
throwUnsupportedOperationException();
}
@Override
public void unsubscribeStateChanges(IZkStateListener listener) {
throwUnsupportedOperationException();
}
@Override
public void subscribeStateChanges(
org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) {
throwUnsupportedOperationException();
}
@Override
public void unsubscribeStateChanges(
org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) {
throwUnsupportedOperationException();
}
@Override
public void unsubscribeAll() {
_zkRealmToZkClientMap.values().forEach(ZkClient::unsubscribeAll);
}
@Override
public void createPersistent(String path) {
createPersistent(path, false);
}
@Override
public void createPersistent(String path, boolean createParents) {
createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE);
}
@Override
public void createPersistent(String path, boolean createParents, List<ACL> acl) {
getZkClient(path).createPersistent(path, createParents, acl);
}
@Override
public void createPersistent(String path, Object data) {
create(path, data, CreateMode.PERSISTENT);
}
@Override
public void createPersistent(String path, Object data, List<ACL> acl) {
create(path, data, acl, CreateMode.PERSISTENT);
}
@Override
public String createPersistentSequential(String path, Object data) {
return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL);
}
@Override
public String createPersistentSequential(String path, Object data, List<ACL> acl) {
return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);
}
@Override
public void createEphemeral(String path) {
create(path, null, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, String sessionId) {
createEphemeral(path, null, sessionId);
}
@Override
public void createEphemeral(String path, List<ACL> acl) {
create(path, null, acl, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, List<ACL> acl, String sessionId) {
create(path, null, acl, CreateMode.EPHEMERAL, sessionId);
}
@Override
public String create(String path, Object data, CreateMode mode) {
return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);
}
@Override
public String create(String path, Object data, List<ACL> acl, CreateMode mode) {
return create(path, data, acl, mode, null);
}
@Override
public void createEphemeral(String path, Object data) {
create(path, data, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, Object data, String sessionId) {
create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, sessionId);
}
@Override
public void createEphemeral(String path, Object data, List<ACL> acl) {
create(path, data, acl, CreateMode.EPHEMERAL);
}
@Override
public void createEphemeral(String path, Object data, List<ACL> acl, String sessionId) {
create(path, data, acl, CreateMode.EPHEMERAL, sessionId);
}
@Override
public String createEphemeralSequential(String path, Object data) {
return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL);
}
@Override
public String createEphemeralSequential(String path, Object data, List<ACL> acl) {
return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL);
}
@Override
public String createEphemeralSequential(String path, Object data, String sessionId) {
return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
sessionId);
}
@Override
public String createEphemeralSequential(String path, Object data, List<ACL> acl,
String sessionId) {
return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, sessionId);
}
@Override
public List<String> getChildren(String path) {
return getZkClient(path).getChildren(path);
}
@Override
public int countChildren(String path) {
return getZkClient(path).countChildren(path);
}
@Override
public boolean exists(String path) {
return getZkClient(path).exists(path);
}
@Override
public Stat getStat(String path) {
return getZkClient(path).getStat(path);
}
@Override
public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) {
return getZkClient(path).waitUntilExists(path, timeUnit, time);
}
@Override
public void deleteRecursively(String path) {
getZkClient(path).deleteRecursively(path);
}
@Override
public boolean delete(String path) {
return getZkClient(path).delete(path);
}
@Override
@SuppressWarnings("unchecked")
public <T> T readData(String path) {
return (T) readData(path, false);
}
@Override
public <T> T readData(String path, boolean returnNullIfPathNotExists) {
return getZkClient(path).readData(path, returnNullIfPathNotExists);
}
@Override
public <T> T readData(String path, Stat stat) {
return getZkClient(path).readData(path, stat);
}
@Override
public <T> T readData(String path, Stat stat, boolean watch) {
return getZkClient(path).readData(path, stat, watch);
}
@Override
public <T> T readDataAndStat(String path, Stat stat, boolean returnNullIfPathNotExists) {
return getZkClient(path).readData(path, stat, returnNullIfPathNotExists);
}
@Override
public void writeData(String path, Object object) {
writeData(path, object, -1);
}
@Override
public <T> void updateDataSerialized(String path, DataUpdater<T> updater) {
getZkClient(path).updateDataSerialized(path, updater);
}
@Override
public void writeData(String path, Object data, int expectedVersion) {
writeDataReturnStat(path, data, expectedVersion);
}
@Override
public Stat writeDataReturnStat(String path, Object data, int expectedVersion) {
return getZkClient(path).writeDataReturnStat(path, data, expectedVersion);
}
@Override
public Stat writeDataGetStat(String path, Object data, int expectedVersion) {
return writeDataReturnStat(path, data, expectedVersion);
}
@Override
public void asyncCreate(String path, Object data, CreateMode mode,
ZkAsyncCallbacks.CreateCallbackHandler cb) {
getZkClient(path).asyncCreate(path, data, mode, cb);
}
@Override
public void asyncSetData(String path, Object data, int version,
ZkAsyncCallbacks.SetDataCallbackHandler cb) {
getZkClient(path).asyncSetData(path, data, version, cb);
}
@Override
public void asyncGetData(String path, ZkAsyncCallbacks.GetDataCallbackHandler cb) {
getZkClient(path).asyncGetData(path, cb);
}
@Override
public void asyncExists(String path, ZkAsyncCallbacks.ExistsCallbackHandler cb) {
getZkClient(path).asyncExists(path, cb);
}
@Override
public void asyncDelete(String path, ZkAsyncCallbacks.DeleteCallbackHandler cb) {
getZkClient(path).asyncDelete(path, cb);
}
@Override
public void watchForData(String path) {
getZkClient(path).watchForData(path);
}
@Override
public List<String> watchForChilds(String path) {
return getZkClient(path).watchForChilds(path);
}
@Override
public long getCreationTime(String path) {
return getZkClient(path).getCreationTime(path);
}
@Override
public List<OpResult> multi(Iterable<Op> ops) {
throwUnsupportedOperationException();
return null;
}
@Override
public boolean waitUntilConnected(long time, TimeUnit timeUnit) {
throwUnsupportedOperationException();
return false;
}
@Override
public String getServers() {
throwUnsupportedOperationException();
return null;
}
@Override
public long getSessionId() {
// Session-aware is unsupported.
throwUnsupportedOperationException();
return 0L;
}
@Override
public void close() {
if (isClosed()) {
return;
}
_isClosed = true;
synchronized (_zkRealmToZkClientMap) {
Iterator<Map.Entry<String, ZkClient>> iterator = _zkRealmToZkClientMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, ZkClient> entry = iterator.next();
String zkRealm = entry.getKey();
ZkClient zkClient = entry.getValue();
// Catch any exception from ZkClient's close() to avoid that there is leakage of
// remaining unclosed ZkClient.
try {
zkClient.close();
} catch (Exception e) {
LOG.error("Exception thrown when closing ZkClient for ZkRealm: {}!", zkRealm, e);
}
iterator.remove();
}
}
LOG.info("{} is successfully closed.", FEDERATED_ZK_CLIENT);
}
@Override
public boolean isClosed() {
return _isClosed;
}
@Override
public byte[] serialize(Object data, String path) {
return getZkClient(path).serialize(data, path);
}
@Override
public <T> T deserialize(byte[] data, String path) {
return getZkClient(path).deserialize(data, path);
}
@Override
public void setZkSerializer(ZkSerializer zkSerializer) {
_pathBasedZkSerializer = new BasicZkSerializer(zkSerializer);
_zkRealmToZkClientMap.values()
.forEach(zkClient -> zkClient.setZkSerializer(_pathBasedZkSerializer));
}
@Override
public void setZkSerializer(PathBasedZkSerializer zkSerializer) {
_pathBasedZkSerializer = zkSerializer;
_zkRealmToZkClientMap.values().forEach(zkClient -> zkClient.setZkSerializer(zkSerializer));
}
@Override
public PathBasedZkSerializer getZkSerializer() {
return _pathBasedZkSerializer;
}
@Override
public RealmAwareZkConnectionConfig getRealmAwareZkConnectionConfig() {
return _connectionConfig;
}
@Override
public RealmAwareZkClientConfig getRealmAwareZkClientConfig() {
return _clientConfig;
}
private String create(final String path, final Object dataObject, final List<ACL> acl,
final CreateMode mode, final String expectedSessionId) {
if (mode.isEphemeral()) {
throwUnsupportedOperationException();
}
// Create mode is not session-aware, so the node does not have to be created
// by the expectedSessionId.
return getZkClient(path).create(path, dataObject, acl, mode);
}
private ZkClient getZkClient(String path) {
// If FederatedZkClient is closed, should not return ZkClient.
checkClosedState();
String zkRealm = getZkRealm(path);
// Use this zkClient reference to protect the returning zkClient from being null because of
// race condition. Once we get the reference, even _zkRealmToZkClientMap is cleared by closed(),
// this zkClient is not null which guarantees the returned value not null.
ZkClient zkClient = _zkRealmToZkClientMap.get(zkRealm);
if (zkClient == null) {
// 1. Synchronized to avoid creating duplicate ZkClient for the same ZkRealm.
// 2. Synchronized with close() to avoid creating new ZkClient when all ZkClients are
// being closed and _zkRealmToZkClientMap is being cleared.
synchronized (_zkRealmToZkClientMap) {
// Because of potential race condition: thread B to get ZkClient could be blocked by this
// synchronized, while thread A is executing closed() in its synchronized block. So thread B
// could still enter this synchronized block once A completes executing closed() and
// releases the synchronized lock.
// Check closed state again to avoid creating a new ZkClient after FederatedZkClient
// is already closed.
checkClosedState();
if (!_zkRealmToZkClientMap.containsKey(zkRealm)) {
zkClient = createZkClient(zkRealm);
_zkRealmToZkClientMap.put(zkRealm, zkClient);
} else {
zkClient = _zkRealmToZkClientMap.get(zkRealm);
}
}
}
return zkClient;
}
private String getZkRealm(String path) {
String zkRealm;
try {
zkRealm = _metadataStoreRoutingData.getMetadataStoreRealm(path);
} catch (NoSuchElementException ex) {
throw new NoSuchElementException("Cannot find ZK realm for the path: " + path);
}
if (zkRealm == null || zkRealm.isEmpty()) {
throw new NoSuchElementException("Cannot find ZK realm for the path: " + path);
}
return zkRealm;
}
private ZkClient createZkClient(String zkAddress) {
LOG.debug("Creating ZkClient for realm: {}.", zkAddress);
return new ZkClient(new ZkConnection(zkAddress), (int) _clientConfig.getConnectInitTimeout(),
_clientConfig.getOperationRetryTimeout(), _pathBasedZkSerializer,
_clientConfig.getMonitorType(), _clientConfig.getMonitorKey(),
_clientConfig.getMonitorInstanceName(), _clientConfig.isMonitorRootPathOnly());
}
private void checkClosedState() {
if (isClosed()) {
throw new IllegalStateException(FEDERATED_ZK_CLIENT + " is closed!");
}
}
private void throwUnsupportedOperationException() {
throw new UnsupportedOperationException(
"Session-aware operation is not supported by " + FEDERATED_ZK_CLIENT
+ ". Instead, please use " + DEDICATED_ZK_CLIENT_FACTORY
+ " to create a dedicated RealmAwareZkClient for this operation.");
}
}