blob: e16519958c67c2194f3824e5ad7d3656db624ede [file] [log] [blame]
package org.apache.helix.manager.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixException;
import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
import org.apache.helix.store.HelixPropertyListener;
import org.apache.helix.store.HelixPropertyStore;
import org.apache.helix.store.zk.ZNode;
import org.apache.helix.util.PathUtils;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> {
private static final Logger LOG = LoggerFactory.getLogger(ZkCacheBaseDataAccessor.class);
protected WriteThroughCache<T> _wtCache;
protected ZkCallbackCache<T> _zkCache;
final ZkBaseDataAccessor<T> _baseAccessor;
// TODO: need to make sure no overlap between wtCachePaths and zkCachePaths
// TreeMap key is ordered by key string length, so more general (i.e. short) prefix
// comes first
final Map<String, Cache<T>> _cacheMap = new TreeMap<>((o1, o2) -> {
int len1 = o1.split("/").length;
int len2 = o2.split("/").length;
return len1 - len2;
});
final String _chrootPath;
final List<String> _wtCachePaths;
final List<String> _zkCachePaths;
final HelixGroupCommit<T> _groupCommit = new HelixGroupCommit<T>();
// fire listeners
private final ReentrantLock _eventLock = new ReentrantLock();
private ZkCacheEventThread _eventThread;
private RealmAwareZkClient _zkClient;
@Deprecated
public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor, List<String> wtCachePaths) {
this(baseAccessor, null, wtCachePaths, null);
}
@Deprecated
public ZkCacheBaseDataAccessor(ZkBaseDataAccessor<T> baseAccessor, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths) {
_baseAccessor = baseAccessor;
if (chrootPath == null || chrootPath.equals("/")) {
_chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
_chrootPath = chrootPath;
}
_wtCachePaths = wtCachePaths;
_zkCachePaths = zkCachePaths;
start();
}
@Deprecated
public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths) {
this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, null, null,
ZkBaseDataAccessor.ZkClientType.SHARED);
}
@Deprecated
public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey) {
this(zkAddress, serializer, chrootPath, wtCachePaths, zkCachePaths, monitorType, monitorkey,
ZkBaseDataAccessor.ZkClientType.SHARED);
}
@Deprecated
public ZkCacheBaseDataAccessor(String zkAddress, ZkSerializer serializer, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths, String monitorType, String monitorkey,
ZkBaseDataAccessor.ZkClientType zkClientType) {
_zkClient = ZkBaseDataAccessor.buildRealmAwareZkClientWithDefaultConfigs(
new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(serializer)
.setMonitorType(monitorType).setMonitorKey(monitorkey), zkAddress, zkClientType);
_baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
if (chrootPath == null || chrootPath.equals("/")) {
_chrootPath = null;
} else {
PathUtils.validatePath(chrootPath);
_chrootPath = chrootPath;
}
_wtCachePaths = wtCachePaths;
_zkCachePaths = zkCachePaths;
start();
}
private ZkCacheBaseDataAccessor(RealmAwareZkClient zkClient, String chrootPath,
List<String> wtCachePaths, List<String> zkCachePaths) {
_zkClient = zkClient;
_baseAccessor = new ZkBaseDataAccessor<>(_zkClient);
_chrootPath = chrootPath;
_wtCachePaths = wtCachePaths;
_zkCachePaths = zkCachePaths;
start();
}
private String prependChroot(String clientPath) {
PathUtils.validatePath(clientPath);
if (_chrootPath != null) {
// handle clientPath = "/"
if (clientPath.length() == 1) {
return _chrootPath;
}
return _chrootPath + clientPath;
} else {
return clientPath;
}
}
private List<String> prependChroot(List<String> clientPaths) {
List<String> serverPaths = new ArrayList<String>();
for (String clientPath : clientPaths) {
serverPaths.add(prependChroot(clientPath));
}
return serverPaths;
}
/**
* find the first path in paths that is a descendant
*/
private String firstCachePath(List<String> paths) {
for (String cachePath : _cacheMap.keySet()) {
for (String path : paths) {
if (path.startsWith(cachePath)) {
return path;
}
}
}
return null;
}
private Cache<T> getCache(String path) {
for (String cachePath : _cacheMap.keySet()) {
if (path.startsWith(cachePath)) {
return _cacheMap.get(cachePath);
}
}
return null;
}
private Cache<T> getCache(List<String> paths) {
Cache<T> cache = null;
for (String path : paths) {
for (String cachePath : _cacheMap.keySet()) {
if (cache == null && path.startsWith(cachePath)) {
cache = _cacheMap.get(cachePath);
} else if (cache != null && cache != _cacheMap.get(cachePath)) {
throw new IllegalArgumentException(
"Couldn't do cross-cache async operations. paths: " + paths);
}
}
}
return cache;
}
private void updateCache(Cache<T> cache, List<String> createPaths, boolean success,
String updatePath, T data, Stat stat) {
if (createPaths == null || createPaths.isEmpty()) {
if (success) {
cache.update(updatePath, data, stat);
}
} else {
String firstPath = firstCachePath(createPaths);
if (firstPath != null) {
cache.updateRecursive(firstPath);
}
}
}
@Override
public boolean create(String path, T data, int options) {
String clientPath = path;
String serverPath = prependChroot(clientPath);
Cache<T> cache = getCache(serverPath);
if (cache != null) {
try {
cache.lockWrite();
ZkBaseDataAccessor<T>.AccessResult result =
_baseAccessor.doCreate(serverPath, data, options);
boolean success = (result._retCode == RetCode.OK);
updateCache(cache, result._pathCreated, success, serverPath, data, ZNode.ZERO_STAT);
return success;
} finally {
cache.unlockWrite();
}
}
// no cache
return _baseAccessor.create(serverPath, data, options);
}
@Override
public boolean set(String path, T data, int options) {
return set(path, data, -1, options);
}
@Override
public boolean set(String path, T data, int expectVersion, int options) {
String clientPath = path;
String serverPath = prependChroot(clientPath);
Cache<T> cache = getCache(serverPath);
boolean success = false;
try {
if (cache != null) {
cache.lockWrite();
ZkBaseDataAccessor<T>.AccessResult result =
_baseAccessor.doSet(serverPath, data, expectVersion, options);
success = result._retCode == RetCode.OK;
updateCache(cache, result._pathCreated, success, serverPath, data, result._stat);
} else {
// no cache
success = _baseAccessor.set(serverPath, data, expectVersion, options);
}
} catch (Exception e) {
} finally {
if (cache != null) {
cache.unlockWrite();
}
}
return success;
}
@Override
public boolean update(String path, DataUpdater<T> updater, int options) {
String clientPath = path;
String serverPath = prependChroot(clientPath);
Cache<T> cache = getCache(serverPath);
if (cache != null) {
try {
cache.lockWrite();
ZkBaseDataAccessor<T>.AccessResult result =
_baseAccessor.doUpdate(serverPath, updater, options);
boolean success = (result._retCode == RetCode.OK);
updateCache(cache, result._pathCreated, success, serverPath, result._updatedValue,
result._stat);
return success;
} finally {
cache.unlockWrite();
}
}
// no cache
return _groupCommit.commit(_baseAccessor, options, serverPath, updater);
// return _baseAccessor.update(serverPath, updater, options);
}
@Override
public boolean exists(String path, int options) {
String clientPath = path;
String serverPath = prependChroot(clientPath);
Cache<T> cache = getCache(serverPath);
if (cache != null) {
boolean exists = cache.exists(serverPath);
if (exists) {
return true;
}
}
// if not exists in cache, always fall back to zk
return _baseAccessor.exists(serverPath, options);
}
@Override
public boolean remove(String path, int options) {
String clientPath = path;
String serverPath = prependChroot(clientPath);
Cache<T> cache = getCache(serverPath);
if (cache != null) {
try {
cache.lockWrite();
boolean success = _baseAccessor.remove(serverPath, options);
if (success) {
cache.purgeRecursive(serverPath);
}
return success;
} finally {
cache.unlockWrite();
}
}
// no cache
return _baseAccessor.remove(serverPath, options);
}
@Override
public T get(String path, Stat stat, int options) {
String clientPath = path;
String serverPath = prependChroot(clientPath);
Cache<T> cache = getCache(serverPath);
if (cache != null) {
T record = null;
ZNode znode = cache.get(serverPath);
if (znode != null) {
// TODO: shall return a deep copy instead of reference
record = ((T) znode.getData());
if (stat != null) {
DataTree.copyStat(znode.getStat(), stat);
}
return record;
} else {
// if cache miss, fall back to zk and update cache
try {
cache.lockWrite();
record = _baseAccessor
.get(serverPath, stat, options | AccessOption.THROW_EXCEPTION_IFNOTEXIST);
cache.update(serverPath, record, stat);
} catch (ZkNoNodeException e) {
if (AccessOption.isThrowExceptionIfNotExist(options)) {
throw e;
}
} finally {
cache.unlockWrite();
}
return record;
}
}
// no cache
return _baseAccessor.get(serverPath, stat, options);
}
@Override
public Stat getStat(String path, int options) {
String clientPath = path;
String serverPath = prependChroot(clientPath);
Cache<T> cache = getCache(serverPath);
if (cache != null) {
Stat stat = new Stat();
ZNode znode = cache.get(serverPath);
if (znode != null) {
return znode.getStat();
} else {
// if cache miss, fall back to zk and update cache
try {
cache.lockWrite();
T data = _baseAccessor.get(serverPath, stat, options);
cache.update(serverPath, data, stat);
} catch (ZkNoNodeException e) {
return null;
} finally {
cache.unlockWrite();
}
return stat;
}
}
// no cache
return _baseAccessor.getStat(serverPath, options);
}
@Override
public boolean[] createChildren(List<String> paths, List<T> records, int options) {
final int size = paths.size();
List<String> serverPaths = prependChroot(paths);
Cache<T> cache = getCache(serverPaths);
if (cache != null) {
try {
cache.lockWrite();
boolean[] needCreate = new boolean[size];
Arrays.fill(needCreate, true);
List<List<String>> pathsCreatedList =
new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
ZkAsyncCallbacks.CreateCallbackHandler[] createCbList =
_baseAccessor.create(serverPaths, records, needCreate, pathsCreatedList, options);
boolean[] success = new boolean[size];
for (int i = 0; i < size; i++) {
ZkAsyncCallbacks.CreateCallbackHandler cb = createCbList[i];
success[i] = (Code.get(cb.getRc()) == Code.OK);
updateCache(cache, pathsCreatedList.get(i), success[i], serverPaths.get(i),
records.get(i), ZNode.ZERO_STAT);
}
return success;
} finally {
cache.unlockWrite();
}
}
// no cache
return _baseAccessor.createChildren(serverPaths, records, options);
}
@Override
public boolean[] setChildren(List<String> paths, List<T> records, int options) {
final int size = paths.size();
List<String> serverPaths = prependChroot(paths);
Cache<T> cache = getCache(serverPaths);
if (cache != null) {
try {
cache.lockWrite();
List<Stat> setStats = new ArrayList<Stat>();
List<List<String>> pathsCreatedList =
new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
boolean[] success =
_baseAccessor.set(serverPaths, records, pathsCreatedList, setStats, options);
for (int i = 0; i < size; i++) {
updateCache(cache, pathsCreatedList.get(i), success[i], serverPaths.get(i),
records.get(i), setStats.get(i));
}
return success;
} finally {
cache.unlockWrite();
}
}
return _baseAccessor.setChildren(serverPaths, records, options);
}
@Override
public boolean[] updateChildren(List<String> paths, List<DataUpdater<T>> updaters, int options) {
final int size = paths.size();
List<String> serverPaths = prependChroot(paths);
Cache<T> cache = getCache(serverPaths);
if (cache != null) {
try {
cache.lockWrite();
List<Stat> setStats = new ArrayList<Stat>();
boolean[] success = new boolean[size];
List<List<String>> pathsCreatedList =
new ArrayList<List<String>>(Collections.<List<String>>nCopies(size, null));
List<T> updateData =
_baseAccessor.update(serverPaths, updaters, pathsCreatedList, setStats, options);
// System.out.println("updateChild: ");
// for (T data : updateData)
// {
// System.out.println(data);
// }
for (int i = 0; i < size; i++) {
success[i] = (updateData.get(i) != null);
updateCache(cache, pathsCreatedList.get(i), success[i], serverPaths.get(i),
updateData.get(i), setStats.get(i));
}
return success;
} finally {
cache.unlockWrite();
}
}
// no cache
return _baseAccessor.updateChildren(serverPaths, updaters, options);
}
// TODO: change to use async_exists
@Override
public boolean[] exists(List<String> paths, int options) {
final int size = paths.size();
boolean exists[] = new boolean[size];
for (int i = 0; i < size; i++) {
exists[i] = exists(paths.get(i), options);
}
return exists;
}
@Override
public boolean[] remove(List<String> paths, int options) {
final int size = paths.size();
List<String> serverPaths = prependChroot(paths);
Cache<T> cache = getCache(serverPaths);
if (cache != null) {
try {
cache.lockWrite();
boolean[] success = _baseAccessor.remove(serverPaths, options);
for (int i = 0; i < size; i++) {
if (success[i]) {
cache.purgeRecursive(serverPaths.get(i));
}
}
return success;
} finally {
cache.unlockWrite();
}
}
// no cache
return _baseAccessor.remove(serverPaths, options);
}
@Deprecated
@Override
public List<T> get(List<String> paths, List<Stat> stats, int options) {
return get(paths, stats, options, false);
}
@Override
public List<T> get(List<String> paths, List<Stat> stats, int options, boolean throwException)
throws HelixException {
if (paths == null || paths.isEmpty()) {
return Collections.emptyList();
}
final int size = paths.size();
List<String> serverPaths = prependChroot(paths);
List<T> records = new ArrayList<T>(Collections.<T>nCopies(size, null));
List<Stat> readStats = new ArrayList<Stat>(Collections.<Stat>nCopies(size, null));
boolean needRead = false;
boolean needReads[] = new boolean[size]; // init to false
Cache<T> cache = getCache(serverPaths);
if (cache != null) {
try {
cache.lockRead();
for (int i = 0; i < size; i++) {
ZNode zNode = cache.get(serverPaths.get(i));
if (zNode != null) {
// TODO: shall return a deep copy instead of reference
records.set(i, (T) zNode.getData());
readStats.set(i, zNode.getStat());
} else {
needRead = true;
needReads[i] = true;
}
}
} finally {
cache.unlockRead();
}
// cache miss, fall back to zk and update cache
if (needRead) {
cache.lockWrite();
try {
List<T> readRecords =
_baseAccessor.get(serverPaths, readStats, needReads, throwException);
for (int i = 0; i < size; i++) {
if (needReads[i]) {
records.set(i, readRecords.get(i));
cache.update(serverPaths.get(i), readRecords.get(i), readStats.get(i));
}
}
} finally {
cache.unlockWrite();
}
}
if (stats != null) {
stats.clear();
stats.addAll(readStats);
}
return records;
}
// no cache
return _baseAccessor.get(serverPaths, stats, options, throwException);
}
// TODO: add cache
@Override
public Stat[] getStats(List<String> paths, int options) {
List<String> serverPaths = prependChroot(paths);
return _baseAccessor.getStats(serverPaths, options);
}
@Override
public List<String> getChildNames(String parentPath, int options) {
String serverParentPath = prependChroot(parentPath);
Cache<T> cache = getCache(serverParentPath);
if (cache != null) {
// System.out.println("zk-cache");
ZNode znode = cache.get(serverParentPath);
if (znode != null && znode.getChildSet() != Collections.<String>emptySet()) {
// System.out.println("zk-cache-hit: " + parentPath);
List<String> childNames = new ArrayList<String>(znode.getChildSet());
Collections.sort(childNames);
return childNames;
} else {
// System.out.println("zk-cache-miss");
try {
cache.lockWrite();
List<String> childNames = _baseAccessor.getChildNames(serverParentPath, options);
// System.out.println("\t--" + childNames);
cache.addToParentChildSet(serverParentPath, childNames);
return childNames;
} finally {
cache.unlockWrite();
}
}
}
// no cache
return _baseAccessor.getChildNames(serverParentPath, options);
}
@Deprecated
@Override
public List<T> getChildren(String parentPath, List<Stat> stats, int options) {
return getChildren(parentPath, stats, options, false);
}
@Override
public List<T> getChildren(String parentPath, List<Stat> stats, int options, int retryCount,
int retryInterval) throws HelixException {
// TODO add retry logic according to retryCount and retryInterval input
return getChildren(parentPath, stats, options, true);
}
private List<T> getChildren(String parentPath, List<Stat> stats, int options, boolean throwException) {
List<String> childNames = getChildNames(parentPath, options);
if (childNames == null) {
return null;
}
List<String> paths = new ArrayList<>();
for (String childName : childNames) {
String path = parentPath.equals("/") ? "/" + childName : parentPath + "/" + childName;
paths.add(path);
}
return get(paths, stats, options, throwException);
}
@Override
public void subscribeDataChanges(String path, IZkDataListener listener) {
String serverPath = prependChroot(path);
_baseAccessor.subscribeDataChanges(serverPath, listener);
}
@Override
public void unsubscribeDataChanges(String path, IZkDataListener listener) {
String serverPath = prependChroot(path);
_baseAccessor.unsubscribeDataChanges(serverPath, listener);
}
@Override
public List<String> subscribeChildChanges(String path, IZkChildListener listener) {
String serverPath = prependChroot(path);
return _baseAccessor.subscribeChildChanges(serverPath, listener);
}
@Override
public void unsubscribeChildChanges(String path, IZkChildListener listener) {
String serverPath = prependChroot(path);
_baseAccessor.unsubscribeChildChanges(serverPath, listener);
}
@Override
public void subscribe(String parentPath, HelixPropertyListener listener) {
String serverPath = prependChroot(parentPath);
_zkCache.subscribe(serverPath, listener);
}
@Override
public void unsubscribe(String parentPath, HelixPropertyListener listener) {
String serverPath = prependChroot(parentPath);
_zkCache.unsubscribe(serverPath, listener);
}
@Override
public void start() {
LOG.info("START: Init ZkCacheBaseDataAccessor: " + _chrootPath + ", " + _wtCachePaths + ", "
+ _zkCachePaths);
// start event thread
try {
_eventLock.lockInterruptibly();
if (_eventThread != null) {
LOG.warn(_eventThread + " has already started");
} else {
if (_zkCachePaths == null || _zkCachePaths.isEmpty()) {
LOG.warn("ZkCachePaths is null or empty. Will not start ZkCacheEventThread");
} else {
LOG.debug("Starting ZkCacheEventThread...");
_eventThread = new ZkCacheEventThread("");
_eventThread.start();
}
}
} catch (InterruptedException e) {
// The InterruptedException may come from lockInterruptibly().
// If it fails to get the lock, throw exception so none of the initialization will be done.
throw new HelixException("Current thread is interrupted when acquiring lock. ", e);
} finally {
_eventLock.unlock();
}
LOG.debug("Start ZkCacheEventThread...done");
_wtCache = new WriteThroughCache<T>(_baseAccessor, _wtCachePaths);
_zkCache = new ZkCallbackCache<T>(_baseAccessor, _chrootPath, _zkCachePaths, _eventThread);
if (_wtCachePaths != null && !_wtCachePaths.isEmpty()) {
for (String path : _wtCachePaths) {
_cacheMap.put(path, _wtCache);
}
}
if (_zkCachePaths != null && !_zkCachePaths.isEmpty()) {
for (String path : _zkCachePaths) {
_cacheMap.put(path, _zkCache);
}
}
}
@Override
public void stop() {
try {
_eventLock.lockInterruptibly();
if (_zkClient != null) {
_zkClient.close();
_zkClient = null;
}
if (_eventThread == null) {
LOG.warn(_eventThread + " has already stopped");
return;
}
LOG.debug("Stopping ZkCacheEventThread...");
_eventThread.interrupt();
_eventThread.join(2000);
_eventThread = null;
} catch (InterruptedException e) {
LOG.error("Current thread is interrupted when stopping ZkCacheEventThread.");
} finally {
_eventLock.unlock();
}
LOG.debug("Stop ZkCacheEventThread...done");
}
@Override
public void reset() {
if (_wtCache != null) {
_wtCache.reset();
}
if (_zkCache != null) {
_zkCache.reset();
}
}
@Override
public void close() {
if (_zkClient != null) {
_zkClient.close();
}
}
@Override
public void finalize() {
close();
}
public static class Builder<T> extends GenericBaseDataAccessorBuilder<Builder<T>> {
/** ZkCacheBaseDataAccessor-specific parameters */
private String _chrootPath;
private List<String> _wtCachePaths;
private List<String> _zkCachePaths;
public Builder() {
}
public Builder<T> setChrootPath(String chrootPath) {
_chrootPath = chrootPath;
return this;
}
public Builder<T> setWtCachePaths(List<String> wtCachePaths) {
_wtCachePaths = wtCachePaths;
return this;
}
public Builder<T> setZkCachePaths(List<String> zkCachePaths) {
_zkCachePaths = zkCachePaths;
return this;
}
public ZkCacheBaseDataAccessor<T> build() {
validate();
return new ZkCacheBaseDataAccessor<>(
createZkClient(_realmMode, _realmAwareZkConnectionConfig, _realmAwareZkClientConfig,
_zkAddress), _chrootPath, _wtCachePaths, _zkCachePaths);
}
}
}