blob: 27ecc40dbcb823799c0661d17bc2b3933797f4fb [file] [log] [blame]
package org.apache.helix.controller;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.io.FileFilter;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Generic class that will read the data given the root path.
*/
public class HierarchicalDataHolder<T> {
private static final Logger logger = LoggerFactory.getLogger(HierarchicalDataHolder.class.getName());
AtomicReference<Node<T>> root;
/**
* currentVersion, gets updated when data is read from original source
*/
AtomicLong currentVersion;
private final HelixZkClient _zkClient;
private final String _rootPath;
private final FileFilter _filter;
public HierarchicalDataHolder(HelixZkClient client, String rootPath, FileFilter filter) {
this._zkClient = client;
this._rootPath = rootPath;
this._filter = filter;
// Node<T> initialValue = new Node<T>();
root = new AtomicReference<>();
currentVersion = new AtomicLong(1);
refreshData();
}
public long getVersion() {
return currentVersion.get();
}
public boolean refreshData() {
Node<T> newRoot = new Node<T>();
boolean dataChanged = refreshRecursively(root.get(), newRoot, _rootPath);
if (dataChanged) {
currentVersion.getAndIncrement();
root.set(newRoot);
return true;
} else {
return false;
}
}
// private void refreshRecursively(Node<T> oldRoot, Stat oldStat, Node<T>
// newRoot,Stat newStat, String path)
private boolean refreshRecursively(Node<T> oldRoot, Node<T> newRoot, String path) {
boolean dataChanged = false;
Stat newStat = _zkClient.getStat(path);
Stat oldStat = (oldRoot != null) ? oldRoot.stat : null;
newRoot.name = path;
if (newStat != null) {
if (oldStat == null) {
newRoot.stat = newStat;
newRoot.data = _zkClient.<T> readData(path, true);
dataChanged = true;
} else if (newStat.equals(oldStat)) {
newRoot.stat = oldStat;
newRoot.data = oldRoot.data;
} else {
dataChanged = true;
newRoot.stat = newStat;
newRoot.data = _zkClient.<T> readData(path, true);
}
if (newStat.getNumChildren() > 0) {
List<String> children = _zkClient.getChildren(path);
for (String child : children) {
String newPath = path + "/" + child;
Node<T> oldChild =
(oldRoot != null && oldRoot.children != null) ? oldRoot.children.get(child) : null;
if (newRoot.children == null) {
newRoot.children = new ConcurrentHashMap<>();
}
if (!newRoot.children.contains(child)) {
newRoot.children.put(child, new Node<T>());
}
Node<T> newChild = newRoot.children.get(child);
boolean childChanged = refreshRecursively(oldChild, newChild, newPath);
dataChanged = dataChanged || childChanged;
}
}
} else {
logger.info(path + " does not exist");
}
return dataChanged;
}
static class Node<T> {
String name;
Stat stat;
T data;
ConcurrentHashMap<String, Node<T>> children;
}
public void print() {
logger.info("START " + _rootPath);
LinkedList<Node<T>> stack = new LinkedList<HierarchicalDataHolder.Node<T>>();
stack.push(root.get());
while (!stack.isEmpty()) {
Node<T> pop = stack.pop();
if (pop != null) {
logger.info("name:" + pop.name);
logger.info("\tdata:" + pop.data);
if (pop.children != null) {
for (Node<T> child : pop.children.values()) {
stack.push(child);
}
}
}
}
logger.info("END " + _rootPath);
}
}