blob: 5b82242c2f7dce2577ec4371ed81a72ef5edb6d0 [file] [log] [blame]
package org.apache.helix.manager.zk;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.manager.zk.ZkCacheEventThread.ZkCacheEvent;
import org.apache.helix.store.HelixPropertyListener;
import org.apache.helix.store.zk.ZNode;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.data.Stat;
public class ZkCallbackCache<T> extends Cache<T> implements IZkChildListener, IZkDataListener,
IZkStateListener {
private static Logger LOG = LoggerFactory.getLogger(ZkCallbackCache.class);
final BaseDataAccessor<T> _accessor;
final String _chrootPath;
private final ZkCacheEventThread _eventThread;
private final Map<String, Set<HelixPropertyListener>> _listener;
public ZkCallbackCache(BaseDataAccessor<T> accessor, String chrootPath, List<String> paths,
ZkCacheEventThread eventThread) {
super();
_accessor = accessor;
_chrootPath = chrootPath;
_listener = new ConcurrentHashMap<>();
_eventThread = eventThread;
// init cache
// System.out.println("init cache: " + paths);
if (paths != null && !paths.isEmpty()) {
for (String path : paths) {
updateRecursive(path);
}
}
}
@Override
public void update(String path, T data, Stat stat) {
String parentPath = HelixUtil.getZkParentPath(path);
String childName = HelixUtil.getZkName(path);
addToParentChildSet(parentPath, childName);
ZNode znode = _cache.get(path);
if (znode == null) {
_cache.put(path, new ZNode(path, data, stat));
fireEvents(path, EventType.NodeCreated);
} else {
Stat oldStat = znode.getStat();
znode.setData(data);
znode.setStat(stat);
// System.out.println("\t\t--setData. path: " + path + ", data: " + data);
if (oldStat.getCzxid() != stat.getCzxid()) {
fireEvents(path, EventType.NodeDeleted);
fireEvents(path, EventType.NodeCreated);
} else if (oldStat.getVersion() != stat.getVersion()) {
// System.out.println("\t--fireNodeChanged: " + path + ", oldVersion: " +
// oldStat.getVersion() + ", newVersion: " + stat.getVersion());
fireEvents(path, EventType.NodeDataChanged);
}
}
}
// TODO: make readData async
@Override
public void updateRecursive(String path) {
if (path == null) {
return;
}
try {
_lock.writeLock().lock();
try {
// subscribe changes before read
_accessor.subscribeDataChanges(path, this);
// update this node
Stat stat = new Stat();
T readData = _accessor.get(path, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
update(path, readData, stat);
} catch (ZkNoNodeException e) {
// OK. znode not exists
// we still need to subscribe child change
}
// recursively update children nodes if not exists
// System.out.println("subcribeChildChange: " + path);
ZNode znode = _cache.get(path);
List<String> childNames = _accessor.subscribeChildChanges(path, this);
if (childNames != null && !childNames.isEmpty()) {
for (String childName : childNames) {
if (!znode.hasChild(childName)) {
String childPath = path + "/" + childName;
znode.addChild(childName);
updateRecursive(childPath);
}
}
}
} finally {
_lock.writeLock().unlock();
}
}
@Override
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
// System.out.println("handleChildChange: " + parentPath + ", " + currentChilds);
// this is invoked if subscribed for childChange and node gets deleted
if (currentChilds == null) {
return;
}
updateRecursive(parentPath);
}
@Override
public void handleDataChange(String dataPath, Object data) throws Exception {
// System.out.println("handleDataChange: " + dataPath);
try {
_lock.writeLock().lock();
// TODO: optimize it by get stat from callback
Stat stat = new Stat();
Object readData = _accessor.get(dataPath, stat, AccessOption.THROW_EXCEPTION_IFNOTEXIST);
ZNode znode = _cache.get(dataPath);
if (znode != null) {
Stat oldStat = znode.getStat();
// System.out.println("handleDataChange: " + dataPath + ", data: " + data);
// System.out.println("handleDataChange: " + dataPath + ", oldCzxid: " +
// oldStat.getCzxid() + ", newCzxid: " + stat.getCzxid()
// + ", oldVersion: " + oldStat.getVersion() + ", newVersion: " +
// stat.getVersion());
znode.setData(readData);
znode.setStat(stat);
// if create right after delete, and zkCallback comes after create
// no DataDelete() will be fired, instead will fire 2 DataChange()
// see ZkClient.fireDataChangedEvents()
if (oldStat.getCzxid() != stat.getCzxid()) {
fireEvents(dataPath, EventType.NodeDeleted);
fireEvents(dataPath, EventType.NodeCreated);
} else if (oldStat.getVersion() != stat.getVersion()) {
// System.out.println("\t--fireNodeChanged: " + dataPath + ", oldVersion: " +
// oldStat.getVersion() + ", newVersion: " + stat.getVersion());
fireEvents(dataPath, EventType.NodeDataChanged);
}
} else {
// we may see dataChange on child before childChange on parent
// in this case, let childChange update cache
}
} finally {
_lock.writeLock().unlock();
}
}
@Override
public void handleDataDeleted(String dataPath) throws Exception {
// System.out.println("handleDataDeleted: " + dataPath);
try {
_lock.writeLock().lock();
_accessor.unsubscribeDataChanges(dataPath, this);
_accessor.unsubscribeChildChanges(dataPath, this);
String parentPath = HelixUtil.getZkParentPath(dataPath);
String name = HelixUtil.getZkName(dataPath);
removeFromParentChildSet(parentPath, name);
_cache.remove(dataPath);
fireEvents(dataPath, EventType.NodeDeleted);
} finally {
_lock.writeLock().unlock();
}
}
@Override
public void handleStateChanged(KeeperState state) throws Exception {
// TODO Auto-generated method stub
}
@Override
public void handleNewSession() throws Exception {
// TODO Auto-generated method stub
}
public void subscribe(String path, HelixPropertyListener listener) {
synchronized (_listener) {
Set<HelixPropertyListener> listeners = _listener.get(path);
if (listeners == null) {
listeners = new CopyOnWriteArraySet<HelixPropertyListener>();
_listener.put(path, listeners);
}
listeners.add(listener);
}
}
public void unsubscribe(String path, HelixPropertyListener childListener) {
synchronized (_listener) {
final Set<HelixPropertyListener> listeners = _listener.get(path);
if (listeners != null) {
listeners.remove(childListener);
}
}
}
private void fireEvents(final String path, EventType type) {
String tmpPath = path;
final String clientPath =
(_chrootPath == null ? path : (_chrootPath.equals(path) ? "/" : path.substring(_chrootPath
.length())));
while (tmpPath != null) {
Set<HelixPropertyListener> listeners = _listener.get(tmpPath);
if (listeners != null && !listeners.isEmpty()) {
for (final HelixPropertyListener listener : listeners) {
try {
switch (type) {
case NodeDataChanged:
// listener.onDataChange(path);
_eventThread.send(new ZkCacheEvent("dataChange on " + path + " send to " + listener) {
@Override
public void run() throws Exception {
listener.onDataChange(clientPath);
}
});
break;
case NodeCreated:
// listener.onDataCreate(path);
_eventThread.send(new ZkCacheEvent("dataCreate on " + path + " send to " + listener) {
@Override
public void run() throws Exception {
listener.onDataCreate(clientPath);
}
});
break;
case NodeDeleted:
// listener.onDataDelete(path);
_eventThread.send(new ZkCacheEvent("dataDelete on " + path + " send to " + listener) {
@Override
public void run() throws Exception {
listener.onDataDelete(clientPath);
}
});
break;
default:
break;
}
} catch (Exception e) {
LOG.error("Exception in handle events.", e);
}
}
}
tmpPath = HelixUtil.getZkParentPath(tmpPath);
}
}
@Override
public void handleSessionEstablishmentError(Throwable var1) throws Exception {
}
}