| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.zookeeper.server; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicLong; |
| import org.apache.jute.InputArchive; |
| import org.apache.jute.OutputArchive; |
| import org.apache.jute.Record; |
| import org.apache.zookeeper.DigestWatcher; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.Code; |
| import org.apache.zookeeper.KeeperException.NoNodeException; |
| import org.apache.zookeeper.KeeperException.NodeExistsException; |
| import org.apache.zookeeper.Quotas; |
| import org.apache.zookeeper.StatsTrack; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.Watcher.Event; |
| import org.apache.zookeeper.Watcher.Event.EventType; |
| import org.apache.zookeeper.Watcher.Event.KeeperState; |
| import org.apache.zookeeper.Watcher.WatcherType; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.ZooDefs.OpCode; |
| import org.apache.zookeeper.audit.AuditConstants; |
| import org.apache.zookeeper.audit.AuditEvent.Result; |
| import org.apache.zookeeper.audit.ZKAuditProvider; |
| import org.apache.zookeeper.common.PathTrie; |
| import org.apache.zookeeper.common.PathUtils; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Stat; |
| import org.apache.zookeeper.data.StatPersisted; |
| import org.apache.zookeeper.server.watch.IWatchManager; |
| import org.apache.zookeeper.server.watch.WatchManagerFactory; |
| import org.apache.zookeeper.server.watch.WatcherMode; |
| import org.apache.zookeeper.server.watch.WatcherOrBitSet; |
| import org.apache.zookeeper.server.watch.WatchesPathReport; |
| import org.apache.zookeeper.server.watch.WatchesReport; |
| import org.apache.zookeeper.server.watch.WatchesSummary; |
| import org.apache.zookeeper.txn.CheckVersionTxn; |
| import org.apache.zookeeper.txn.CloseSessionTxn; |
| import org.apache.zookeeper.txn.CreateContainerTxn; |
| import org.apache.zookeeper.txn.CreateTTLTxn; |
| import org.apache.zookeeper.txn.CreateTxn; |
| import org.apache.zookeeper.txn.DeleteTxn; |
| import org.apache.zookeeper.txn.ErrorTxn; |
| import org.apache.zookeeper.txn.MultiTxn; |
| import org.apache.zookeeper.txn.SetACLTxn; |
| import org.apache.zookeeper.txn.SetDataTxn; |
| import org.apache.zookeeper.txn.Txn; |
| import org.apache.zookeeper.txn.TxnDigest; |
| import org.apache.zookeeper.txn.TxnHeader; |
| import org.apache.zookeeper.util.ServiceUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class maintains the tree data structure. It doesn't have any networking |
| * or client connection code in it so that it can be tested in a standalone way. |
| * |
| * <p>The tree maintains two parallel data structures: a hashtable that maps from |
| * full paths to DataNodes and a tree of DataNodes. All accesses to a path is |
| * through the hashtable. The tree is traversed only when serializing to disk. |
| */ |
| public class DataTree { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(DataTree.class); |
| |
| private final RateLogger RATE_LOGGER = new RateLogger(LOG, 15 * 60 * 1000); |
| |
| /** |
| * This map provides a fast lookup to the data nodes. The tree is the |
| * source of truth and is where all the locking occurs |
| */ |
| private final NodeHashMap nodes; |
| |
| private IWatchManager dataWatches; |
| |
| private IWatchManager childWatches; |
| |
| /** cached total size of paths and data for all DataNodes */ |
| private final AtomicLong nodeDataSize = new AtomicLong(0); |
| |
| /** the root of zookeeper tree */ |
| private static final String rootZookeeper = "/"; |
| |
| /** the zookeeper nodes that acts as the management and status node **/ |
| private static final String procZookeeper = Quotas.procZookeeper; |
| |
| /** this will be the string that's stored as a child of root */ |
| private static final String procChildZookeeper = procZookeeper.substring(1); |
| |
| /** |
| * the zookeeper quota node that acts as the quota management node for |
| * zookeeper |
| */ |
| private static final String quotaZookeeper = Quotas.quotaZookeeper; |
| |
| /** this will be the string that's stored as a child of /zookeeper */ |
| private static final String quotaChildZookeeper = quotaZookeeper.substring(procZookeeper.length() + 1); |
| |
| /** |
| * the zookeeper config node that acts as the config management node for |
| * zookeeper |
| */ |
| private static final String configZookeeper = ZooDefs.CONFIG_NODE; |
| |
| /** this will be the string that's stored as a child of /zookeeper */ |
| private static final String configChildZookeeper = configZookeeper.substring(procZookeeper.length() + 1); |
| |
| /** |
| * the path trie that keeps track of the quota nodes in this datatree |
| */ |
| private final PathTrie pTrie = new PathTrie(); |
| |
| /** |
| * over-the-wire size of znode stat. Counting the fields of Stat class |
| */ |
| public static final int STAT_OVERHEAD_BYTES = (6 * 8) + (5 * 4); |
| |
| /** |
| * This hashtable lists the paths of the ephemeral nodes of a session. |
| */ |
| private final Map<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<>(); |
| |
| /** |
| * This set contains the paths of all container nodes |
| */ |
| private final Set<String> containers = Collections.newSetFromMap(new ConcurrentHashMap<>()); |
| |
| /** |
| * This set contains the paths of all ttl nodes |
| */ |
| private final Set<String> ttls = Collections.newSetFromMap(new ConcurrentHashMap<>()); |
| |
| private final ReferenceCountedACLCache aclCache = new ReferenceCountedACLCache(); |
| |
| // The maximum number of tree digests that we will keep in our history |
| public static final int DIGEST_LOG_LIMIT = 1024; |
| |
| // Dump digest every 128 txns, in hex it's 80, which will make it easier |
| // to align and compare between servers. |
| public static final int DIGEST_LOG_INTERVAL = 128; |
| |
| // If this is not null, we are actively looking for a target zxid that we |
| // want to validate the digest for |
| private ZxidDigest digestFromLoadedSnapshot; |
| |
| // The digest associated with the highest zxid in the data tree. |
| private volatile ZxidDigest lastProcessedZxidDigest; |
| |
| private boolean firstMismatchTxn = true; |
| |
| // Will be notified when digest mismatch event triggered. |
| private final List<DigestWatcher> digestWatchers = new ArrayList<>(); |
| |
| // The historical digests list. |
| private final LinkedList<ZxidDigest> digestLog = new LinkedList<>(); |
| |
| private final DigestCalculator digestCalculator; |
| |
| @SuppressWarnings("unchecked") |
| public Set<String> getEphemerals(long sessionId) { |
| HashSet<String> ret = ephemerals.get(sessionId); |
| if (ret == null) { |
| return new HashSet<>(); |
| } |
| synchronized (ret) { |
| return (HashSet<String>) ret.clone(); |
| } |
| } |
| |
| public Set<String> getContainers() { |
| return new HashSet<>(containers); |
| } |
| |
| public Set<String> getTtls() { |
| return new HashSet<>(ttls); |
| } |
| |
| public Collection<Long> getSessions() { |
| return ephemerals.keySet(); |
| } |
| |
| public DataNode getNode(String path) { |
| return nodes.get(path); |
| } |
| |
| public int getNodeCount() { |
| return nodes.size(); |
| } |
| |
| public int getWatchCount() { |
| return dataWatches.size() + childWatches.size(); |
| } |
| |
| public int getEphemeralsCount() { |
| int result = 0; |
| for (HashSet<String> set : ephemerals.values()) { |
| result += set.size(); |
| } |
| return result; |
| } |
| |
| /** |
| * Get the size of the nodes based on path and data length. |
| * |
| * @return size of the data |
| */ |
| public long approximateDataSize() { |
| long result = 0; |
| for (Map.Entry<String, DataNode> entry : nodes.entrySet()) { |
| DataNode value = entry.getValue(); |
| synchronized (value) { |
| result += getNodeSize(entry.getKey(), value.data); |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Get the size of the node based on path and data length. |
| */ |
| private static long getNodeSize(String path, byte[] data) { |
| return (path == null ? 0 : path.length()) + (data == null ? 0 : data.length); |
| } |
| |
| public long cachedApproximateDataSize() { |
| return nodeDataSize.get(); |
| } |
| |
| /** |
| * This is a pointer to the root of the DataTree. It is the source of truth, |
| * but we usually use the nodes hashmap to find nodes in the tree. |
| */ |
| private DataNode root = new DataNode(new byte[0], -1L, new StatPersisted()); |
| |
| /** |
| * create a /zookeeper filesystem that is the proc filesystem of zookeeper |
| */ |
| private final DataNode procDataNode = new DataNode(new byte[0], -1L, new StatPersisted()); |
| |
| /** |
| * create a /zookeeper/quota node for maintaining quota properties for |
| * zookeeper |
| */ |
| private final DataNode quotaDataNode = new DataNode(new byte[0], -1L, new StatPersisted()); |
| |
| public DataTree() { |
| this(new DigestCalculator()); |
| } |
| |
| DataTree(DigestCalculator digestCalculator) { |
| this.digestCalculator = digestCalculator; |
| nodes = new NodeHashMapImpl(digestCalculator); |
| |
| // rather than fight it, let root have an alias |
| nodes.put("", root); |
| nodes.putWithoutDigest(rootZookeeper, root); |
| |
| // add the proc node and quota node |
| root.addChild(procChildZookeeper); |
| nodes.put(procZookeeper, procDataNode); |
| |
| procDataNode.addChild(quotaChildZookeeper); |
| nodes.put(quotaZookeeper, quotaDataNode); |
| |
| addConfigNode(); |
| |
| nodeDataSize.set(approximateDataSize()); |
| try { |
| dataWatches = WatchManagerFactory.createWatchManager(); |
| childWatches = WatchManagerFactory.createWatchManager(); |
| } catch (Exception e) { |
| LOG.error("Unexpected exception when creating WatchManager, exiting abnormally", e); |
| ServiceUtils.requestSystemExit(ExitCode.UNEXPECTED_ERROR.getValue()); |
| } |
| } |
| |
| /** |
| * create a /zookeeper/config node for maintaining the configuration (membership and quorum system) info for |
| * zookeeper |
| */ |
| public void addConfigNode() { |
| DataNode zookeeperZnode = nodes.get(procZookeeper); |
| if (zookeeperZnode != null) { // should always be the case |
| zookeeperZnode.addChild(configChildZookeeper); |
| } else { |
| assert false : "There's no /zookeeper znode - this should never happen."; |
| } |
| |
| nodes.put(configZookeeper, new DataNode(new byte[0], -1L, new StatPersisted())); |
| try { |
| // Reconfig node is access controlled by default (ZOOKEEPER-2014). |
| setACL(configZookeeper, ZooDefs.Ids.READ_ACL_UNSAFE, -1); |
| } catch (NoNodeException e) { |
| assert false : "There's no " + configZookeeper + " znode - this should never happen."; |
| } |
| } |
| |
| /** |
| * is the path one of the special paths owned by zookeeper. |
| * |
| * @param path |
| * the path to be checked |
| * @return true if a special path. false if not. |
| */ |
| boolean isSpecialPath(String path) { |
| return rootZookeeper.equals(path) |
| || procZookeeper.equals(path) |
| || quotaZookeeper.equals(path) |
| || configZookeeper.equals(path); |
| } |
| |
| public static void copyStatPersisted(StatPersisted from, StatPersisted to) { |
| to.setAversion(from.getAversion()); |
| to.setCtime(from.getCtime()); |
| to.setCversion(from.getCversion()); |
| to.setCzxid(from.getCzxid()); |
| to.setMtime(from.getMtime()); |
| to.setMzxid(from.getMzxid()); |
| to.setPzxid(from.getPzxid()); |
| to.setVersion(from.getVersion()); |
| to.setEphemeralOwner(from.getEphemeralOwner()); |
| } |
| |
| public static void copyStat(Stat from, Stat to) { |
| to.setAversion(from.getAversion()); |
| to.setCtime(from.getCtime()); |
| to.setCversion(from.getCversion()); |
| to.setCzxid(from.getCzxid()); |
| to.setMtime(from.getMtime()); |
| to.setMzxid(from.getMzxid()); |
| to.setPzxid(from.getPzxid()); |
| to.setVersion(from.getVersion()); |
| to.setEphemeralOwner(from.getEphemeralOwner()); |
| to.setDataLength(from.getDataLength()); |
| to.setNumChildren(from.getNumChildren()); |
| } |
| |
| /** |
| * update the count/bytes of this stat data node |
| * |
| * @param lastPrefix |
| * the path of the node that has a quota. |
| * @param bytesDiff |
| * the diff to be added to number of bytes |
| * @param countDiff |
| * the diff to be added to the count |
| */ |
| public void updateQuotaStat(String lastPrefix, long bytesDiff, int countDiff) { |
| String statNodePath = Quotas.statPath(lastPrefix); |
| DataNode statNode = nodes.get(statNodePath); |
| |
| if (statNode == null) { |
| // should not happen |
| LOG.error("Missing node for stat {}", statNodePath); |
| return; |
| } |
| |
| synchronized (statNode) { |
| StatsTrack updatedStat = new StatsTrack(statNode.data); |
| updatedStat.setCount(updatedStat.getCount() + countDiff); |
| updatedStat.setBytes(updatedStat.getBytes() + bytesDiff); |
| statNode.data = updatedStat.getStatsBytes(); |
| } |
| } |
| |
| /** |
| * Add a new node to the DataTree. |
| * @param path |
| * Path for the new node. |
| * @param data |
| * Data to store in the node. |
| * @param acl |
| * Node acls |
| * @param ephemeralOwner |
| * the session id that owns this node. -1 indicates this is not |
| * an ephemeral node. |
| * @param zxid |
| * Transaction ID |
| * @param time |
| * @throws NodeExistsException |
| * @throws NoNodeException |
| */ |
| public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time) throws NoNodeException, NodeExistsException { |
| createNode(path, data, acl, ephemeralOwner, parentCVersion, zxid, time, null); |
| } |
| |
| /** |
| * Add a new node to the DataTree. |
| * @param path |
| * Path for the new node. |
| * @param data |
| * Data to store in the node. |
| * @param acl |
| * Node acls |
| * @param ephemeralOwner |
| * the session id that owns this node. -1 indicates this is not |
| * an ephemeral node. |
| * @param zxid |
| * Transaction ID |
| * @param time |
| * @param outputStat |
| * A Stat object to store Stat output results into. |
| * @throws NodeExistsException |
| * @throws NoNodeException |
| */ |
| public void createNode(final String path, byte[] data, List<ACL> acl, long ephemeralOwner, int parentCVersion, long zxid, long time, Stat outputStat) throws NoNodeException, NodeExistsException { |
| int lastSlash = path.lastIndexOf('/'); |
| String parentName = path.substring(0, lastSlash); |
| String childName = path.substring(lastSlash + 1); |
| StatPersisted stat = createStat(zxid, time, ephemeralOwner); |
| DataNode parent = nodes.get(parentName); |
| if (parent == null) { |
| throw new NoNodeException(); |
| } |
| synchronized (parent) { |
| // Add the ACL to ACL cache first, to avoid the ACL not being |
| // created race condition during fuzzy snapshot sync. |
| // |
| // This is the simplest fix, which may add ACL reference count |
| // again if it's already counted in the ACL map of fuzzy |
| // snapshot, which might also happen for deleteNode txn, but |
| // at least it won't cause the ACL not exist issue. |
| // |
| // Later we can audit and delete all non-referenced ACLs from |
| // ACL map when loading the snapshot/txns from disk, like what |
| // we did for the global sessions. |
| Long acls = aclCache.convertAcls(acl); |
| |
| Set<String> children = parent.getChildren(); |
| if (children.contains(childName)) { |
| throw new NodeExistsException(); |
| } |
| |
| nodes.preChange(parentName, parent); |
| if (parentCVersion == -1) { |
| parentCVersion = parent.stat.getCversion(); |
| parentCVersion++; |
| } |
| // There is possibility that we'll replay txns for a node which |
| // was created and then deleted in the fuzzy range, and it's not |
| // exist in the snapshot, so replay the creation might revert the |
| // cversion and pzxid, need to check and only update when it's |
| // larger. |
| if (parentCVersion > parent.stat.getCversion()) { |
| parent.stat.setCversion(parentCVersion); |
| parent.stat.setPzxid(zxid); |
| } |
| DataNode child = new DataNode(data, acls, stat); |
| parent.addChild(childName); |
| nodes.postChange(parentName, parent); |
| nodeDataSize.addAndGet(getNodeSize(path, child.data)); |
| nodes.put(path, child); |
| EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner); |
| if (ephemeralType == EphemeralType.CONTAINER) { |
| containers.add(path); |
| } else if (ephemeralType == EphemeralType.TTL) { |
| ttls.add(path); |
| } else if (ephemeralOwner != 0) { |
| HashSet<String> list = ephemerals.computeIfAbsent(ephemeralOwner, k -> new HashSet<>()); |
| synchronized (list) { |
| list.add(path); |
| } |
| } |
| if (outputStat != null) { |
| child.copyStat(outputStat); |
| } |
| } |
| // now check if its one of the zookeeper node child |
| if (parentName.startsWith(quotaZookeeper)) { |
| // now check if it's the limit node |
| if (Quotas.limitNode.equals(childName)) { |
| // this is the limit node |
| // get the parent and add it to the trie |
| pTrie.addPath(Quotas.trimQuotaPath(parentName)); |
| } |
| if (Quotas.statNode.equals(childName)) { |
| updateQuotaForPath(Quotas.trimQuotaPath(parentName)); |
| } |
| } |
| |
| String lastPrefix = getMaxPrefixWithQuota(path); |
| long bytes = data == null ? 0 : data.length; |
| // also check to update the quotas for this node |
| if (lastPrefix != null) { // ok we have some match and need to update |
| updateQuotaStat(lastPrefix, bytes, 1); |
| } |
| updateWriteStat(path, bytes); |
| dataWatches.triggerWatch(path, Event.EventType.NodeCreated); |
| childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); |
| } |
| |
| /** |
| * remove the path from the datatree |
| * |
| * @param path |
| * the path to of the node to be deleted |
| * @param zxid |
| * the current zxid |
| * @throws NoNodeException |
| */ |
| public void deleteNode(String path, long zxid) throws NoNodeException { |
| int lastSlash = path.lastIndexOf('/'); |
| String parentName = path.substring(0, lastSlash); |
| String childName = path.substring(lastSlash + 1); |
| |
| // The child might already be deleted during taking fuzzy snapshot, |
| // but we still need to update the pzxid here before throw exception |
| // for no such child |
| DataNode parent = nodes.get(parentName); |
| if (parent == null) { |
| throw new NoNodeException(); |
| } |
| synchronized (parent) { |
| nodes.preChange(parentName, parent); |
| parent.removeChild(childName); |
| // Only update pzxid when the zxid is larger than the current pzxid, |
| // otherwise we might override some higher pzxid set by a CreateTxn, |
| // which could cause the cversion and pzxid inconsistent |
| if (zxid > parent.stat.getPzxid()) { |
| parent.stat.setPzxid(zxid); |
| } |
| nodes.postChange(parentName, parent); |
| } |
| |
| DataNode node = nodes.get(path); |
| if (node == null) { |
| throw new NoNodeException(); |
| } |
| nodes.remove(path); |
| synchronized (node) { |
| aclCache.removeUsage(node.acl); |
| nodeDataSize.addAndGet(-getNodeSize(path, node.data)); |
| } |
| |
| // Synchronized to sync the containers and ttls change, probably |
| // only need to sync on containers and ttls, will update it in a |
| // separate patch. |
| synchronized (parent) { |
| long owner = node.stat.getEphemeralOwner(); |
| EphemeralType ephemeralType = EphemeralType.get(owner); |
| if (ephemeralType == EphemeralType.CONTAINER) { |
| containers.remove(path); |
| } else if (ephemeralType == EphemeralType.TTL) { |
| ttls.remove(path); |
| } else if (owner != 0) { |
| Set<String> nodes = ephemerals.get(owner); |
| if (nodes != null) { |
| synchronized (nodes) { |
| nodes.remove(path); |
| } |
| } |
| } |
| } |
| |
| if (parentName.startsWith(procZookeeper) && Quotas.limitNode.equals(childName)) { |
| // delete the node in the trie. |
| // we need to update the trie as well |
| pTrie.deletePath(Quotas.trimQuotaPath(parentName)); |
| } |
| |
| // also check to update the quotas for this node |
| String lastPrefix = getMaxPrefixWithQuota(path); |
| if (lastPrefix != null) { |
| // ok we have some match and need to update |
| long bytes; |
| synchronized (node) { |
| bytes = (node.data == null ? 0 : -(node.data.length)); |
| } |
| updateQuotaStat(lastPrefix, bytes, -1); |
| } |
| |
| updateWriteStat(path, 0L); |
| |
| if (LOG.isTraceEnabled()) { |
| ZooTrace.logTraceMessage( |
| LOG, |
| ZooTrace.EVENT_DELIVERY_TRACE_MASK, |
| "dataWatches.triggerWatch " + path); |
| ZooTrace.logTraceMessage( |
| LOG, |
| ZooTrace.EVENT_DELIVERY_TRACE_MASK, |
| "childWatches.triggerWatch " + parentName); |
| } |
| |
| WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted); |
| childWatches.triggerWatch(path, EventType.NodeDeleted, processed); |
| childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged); |
| } |
| |
| public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException { |
| Stat s = new Stat(); |
| DataNode n = nodes.get(path); |
| if (n == null) { |
| throw new NoNodeException(); |
| } |
| byte[] lastData; |
| synchronized (n) { |
| lastData = n.data; |
| nodes.preChange(path, n); |
| n.data = data; |
| n.stat.setMtime(time); |
| n.stat.setMzxid(zxid); |
| n.stat.setVersion(version); |
| n.copyStat(s); |
| nodes.postChange(path, n); |
| } |
| |
| // first do a quota check if the path is in a quota subtree. |
| String lastPrefix = getMaxPrefixWithQuota(path); |
| long bytesDiff = (data == null ? 0 : data.length) - (lastData == null ? 0 : lastData.length); |
| // now update if the path is in a quota subtree. |
| long dataBytes = data == null ? 0 : data.length; |
| if (lastPrefix != null) { |
| updateQuotaStat(lastPrefix, bytesDiff, 0); |
| } |
| nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastData)); |
| |
| updateWriteStat(path, dataBytes); |
| dataWatches.triggerWatch(path, EventType.NodeDataChanged); |
| return s; |
| } |
| |
| /** |
| * If there is a quota set, return the appropriate prefix for that quota |
| * Else return null |
| * @param path The ZK path to check for quota |
| * @return Max quota prefix, or null if none |
| */ |
| public String getMaxPrefixWithQuota(String path) { |
| // do nothing for the root. |
| // we are not keeping a quota on the zookeeper |
| // root node for now. |
| String lastPrefix = pTrie.findMaxPrefix(path); |
| |
| if (rootZookeeper.equals(lastPrefix) || lastPrefix.isEmpty()) { |
| return null; |
| } else { |
| return lastPrefix; |
| } |
| } |
| |
| public void addWatch(String basePath, Watcher watcher, int mode) { |
| WatcherMode watcherMode = WatcherMode.fromZooDef(mode); |
| dataWatches.addWatch(basePath, watcher, watcherMode); |
| if (watcherMode != WatcherMode.PERSISTENT_RECURSIVE) { |
| childWatches.addWatch(basePath, watcher, watcherMode); |
| } |
| } |
| |
| public byte[] getData(String path, Stat stat, Watcher watcher) throws NoNodeException { |
| DataNode n = nodes.get(path); |
| if (n == null) { |
| throw new NoNodeException(); |
| } |
| byte[] data; |
| synchronized (n) { |
| n.copyStat(stat); |
| if (watcher != null) { |
| dataWatches.addWatch(path, watcher); |
| } |
| data = n.data; |
| } |
| updateReadStat(path, data == null ? 0 : data.length); |
| return data; |
| } |
| |
| public Stat statNode(String path, Watcher watcher) throws NoNodeException { |
| if (watcher != null) { |
| dataWatches.addWatch(path, watcher); |
| } |
| DataNode n = nodes.get(path); |
| if (n == null) { |
| throw new NoNodeException(); |
| } |
| Stat stat = new Stat(); |
| synchronized (n) { |
| n.copyStat(stat); |
| } |
| updateReadStat(path, 0L); |
| return stat; |
| } |
| |
| public List<String> getChildren(String path, Stat stat, Watcher watcher) throws NoNodeException { |
| DataNode n = nodes.get(path); |
| if (n == null) { |
| throw new NoNodeException(); |
| } |
| List<String> children; |
| synchronized (n) { |
| if (stat != null) { |
| n.copyStat(stat); |
| } |
| children = new ArrayList<>(n.getChildren()); |
| |
| if (watcher != null) { |
| childWatches.addWatch(path, watcher); |
| } |
| } |
| |
| int bytes = 0; |
| for (String child : children) { |
| bytes += child.length(); |
| } |
| updateReadStat(path, bytes); |
| |
| return children; |
| } |
| |
| public int getAllChildrenNumber(String path) { |
| // cull out these two keys:"", "/" |
| if ("/".equals(path)) { |
| return nodes.size() - 2; |
| } |
| |
| return (int) nodes.entrySet().parallelStream().filter(entry -> entry.getKey().startsWith(path + "/")).count(); |
| } |
| |
| public Stat setACL(String path, List<ACL> acl, int version) throws NoNodeException { |
| DataNode n = nodes.get(path); |
| if (n == null) { |
| throw new NoNodeException(); |
| } |
| synchronized (n) { |
| Stat stat = new Stat(); |
| aclCache.removeUsage(n.acl); |
| nodes.preChange(path, n); |
| n.stat.setAversion(version); |
| n.acl = aclCache.convertAcls(acl); |
| n.copyStat(stat); |
| nodes.postChange(path, n); |
| return stat; |
| } |
| } |
| |
| public List<ACL> getACL(String path, Stat stat) throws NoNodeException { |
| DataNode n = nodes.get(path); |
| if (n == null) { |
| throw new NoNodeException(); |
| } |
| synchronized (n) { |
| if (stat != null) { |
| n.copyStat(stat); |
| } |
| return new ArrayList<>(aclCache.convertLong(n.acl)); |
| } |
| } |
| |
| public List<ACL> getACL(DataNode node) { |
| synchronized (node) { |
| return aclCache.convertLong(node.acl); |
| } |
| } |
| |
| public int aclCacheSize() { |
| return aclCache.size(); |
| } |
| |
| public static class ProcessTxnResult { |
| |
| public long clientId; |
| |
| public int cxid; |
| |
| public long zxid; |
| |
| public int err; |
| |
| public int type; |
| |
| public String path; |
| |
| public Stat stat; |
| |
| public List<ProcessTxnResult> multiResult; |
| |
| /** |
| * Equality is defined as the clientId and the cxid being the same. This |
| * allows us to use hash tables to track completion of transactions. |
| * |
| * @see java.lang.Object#equals(java.lang.Object) |
| */ |
| @Override |
| public boolean equals(Object o) { |
| if (o instanceof ProcessTxnResult) { |
| ProcessTxnResult other = (ProcessTxnResult) o; |
| return other.clientId == clientId && other.cxid == cxid; |
| } |
| return false; |
| } |
| |
| /** |
| * See equals() to find the rationale for how this hashcode is generated. |
| * |
| * @see ProcessTxnResult#equals(Object) |
| * @see java.lang.Object#hashCode() |
| */ |
| @Override |
| public int hashCode() { |
| return (int) ((clientId ^ cxid) % Integer.MAX_VALUE); |
| } |
| |
| } |
| |
| public volatile long lastProcessedZxid = 0; |
| |
| public ProcessTxnResult processTxn(TxnHeader header, Record txn, TxnDigest digest) { |
| ProcessTxnResult result = processTxn(header, txn); |
| compareDigest(header, txn, digest); |
| return result; |
| } |
| |
| public ProcessTxnResult processTxn(TxnHeader header, Record txn) { |
| return this.processTxn(header, txn, false); |
| } |
| |
| public ProcessTxnResult processTxn(TxnHeader header, Record txn, boolean isSubTxn) { |
| ProcessTxnResult rc = new ProcessTxnResult(); |
| |
| try { |
| rc.clientId = header.getClientId(); |
| rc.cxid = header.getCxid(); |
| rc.zxid = header.getZxid(); |
| rc.type = header.getType(); |
| rc.err = 0; |
| rc.multiResult = null; |
| switch (header.getType()) { |
| case OpCode.create: |
| CreateTxn createTxn = (CreateTxn) txn; |
| rc.path = createTxn.getPath(); |
| createNode( |
| createTxn.getPath(), |
| createTxn.getData(), |
| createTxn.getAcl(), |
| createTxn.getEphemeral() ? header.getClientId() : 0, |
| createTxn.getParentCVersion(), |
| header.getZxid(), |
| header.getTime(), |
| null); |
| break; |
| case OpCode.create2: |
| CreateTxn create2Txn = (CreateTxn) txn; |
| rc.path = create2Txn.getPath(); |
| Stat stat = new Stat(); |
| createNode( |
| create2Txn.getPath(), |
| create2Txn.getData(), |
| create2Txn.getAcl(), |
| create2Txn.getEphemeral() ? header.getClientId() : 0, |
| create2Txn.getParentCVersion(), |
| header.getZxid(), |
| header.getTime(), |
| stat); |
| rc.stat = stat; |
| break; |
| case OpCode.createTTL: |
| CreateTTLTxn createTtlTxn = (CreateTTLTxn) txn; |
| rc.path = createTtlTxn.getPath(); |
| stat = new Stat(); |
| createNode( |
| createTtlTxn.getPath(), |
| createTtlTxn.getData(), |
| createTtlTxn.getAcl(), |
| EphemeralType.TTL.toEphemeralOwner(createTtlTxn.getTtl()), |
| createTtlTxn.getParentCVersion(), |
| header.getZxid(), |
| header.getTime(), |
| stat); |
| rc.stat = stat; |
| break; |
| case OpCode.createContainer: |
| CreateContainerTxn createContainerTxn = (CreateContainerTxn) txn; |
| rc.path = createContainerTxn.getPath(); |
| stat = new Stat(); |
| createNode( |
| createContainerTxn.getPath(), |
| createContainerTxn.getData(), |
| createContainerTxn.getAcl(), |
| EphemeralType.CONTAINER_EPHEMERAL_OWNER, |
| createContainerTxn.getParentCVersion(), |
| header.getZxid(), |
| header.getTime(), |
| stat); |
| rc.stat = stat; |
| break; |
| case OpCode.delete: |
| case OpCode.deleteContainer: |
| DeleteTxn deleteTxn = (DeleteTxn) txn; |
| rc.path = deleteTxn.getPath(); |
| deleteNode(deleteTxn.getPath(), header.getZxid()); |
| break; |
| case OpCode.reconfig: |
| case OpCode.setData: |
| SetDataTxn setDataTxn = (SetDataTxn) txn; |
| rc.path = setDataTxn.getPath(); |
| rc.stat = setData( |
| setDataTxn.getPath(), |
| setDataTxn.getData(), |
| setDataTxn.getVersion(), |
| header.getZxid(), |
| header.getTime()); |
| break; |
| case OpCode.setACL: |
| SetACLTxn setACLTxn = (SetACLTxn) txn; |
| rc.path = setACLTxn.getPath(); |
| rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(), setACLTxn.getVersion()); |
| break; |
| case OpCode.closeSession: |
| long sessionId = header.getClientId(); |
| if (txn != null) { |
| killSession(sessionId, header.getZxid(), |
| ephemerals.remove(sessionId), |
| ((CloseSessionTxn) txn).getPaths2Delete()); |
| } else { |
| killSession(sessionId, header.getZxid()); |
| } |
| break; |
| case OpCode.error: |
| ErrorTxn errTxn = (ErrorTxn) txn; |
| rc.err = errTxn.getErr(); |
| break; |
| case OpCode.check: |
| CheckVersionTxn checkTxn = (CheckVersionTxn) txn; |
| rc.path = checkTxn.getPath(); |
| break; |
| case OpCode.multi: |
| MultiTxn multiTxn = (MultiTxn) txn; |
| List<Txn> txns = multiTxn.getTxns(); |
| rc.multiResult = new ArrayList<>(); |
| boolean failed = false; |
| for (Txn subtxn : txns) { |
| if (subtxn.getType() == OpCode.error) { |
| failed = true; |
| break; |
| } |
| } |
| |
| boolean post_failed = false; |
| for (Txn subtxn : txns) { |
| ByteBuffer bb = ByteBuffer.wrap(subtxn.getData()); |
| Record record; |
| switch (subtxn.getType()) { |
| case OpCode.create: |
| record = new CreateTxn(); |
| break; |
| case OpCode.createTTL: |
| record = new CreateTTLTxn(); |
| break; |
| case OpCode.createContainer: |
| record = new CreateContainerTxn(); |
| break; |
| case OpCode.delete: |
| case OpCode.deleteContainer: |
| record = new DeleteTxn(); |
| break; |
| case OpCode.setData: |
| record = new SetDataTxn(); |
| break; |
| case OpCode.error: |
| record = new ErrorTxn(); |
| post_failed = true; |
| break; |
| case OpCode.check: |
| record = new CheckVersionTxn(); |
| break; |
| default: |
| throw new IOException("Invalid type of op: " + subtxn.getType()); |
| } |
| |
| assert record != null; |
| |
| ByteBufferInputStream.byteBuffer2Record(bb, record); |
| |
| if (failed && subtxn.getType() != OpCode.error) { |
| int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() : Code.OK.intValue(); |
| |
| subtxn.setType(OpCode.error); |
| record = new ErrorTxn(ec); |
| } |
| |
| assert !failed || (subtxn.getType() == OpCode.error); |
| |
| TxnHeader subHdr = new TxnHeader( |
| header.getClientId(), |
| header.getCxid(), |
| header.getZxid(), |
| header.getTime(), |
| subtxn.getType()); |
| ProcessTxnResult subRc = processTxn(subHdr, record, true); |
| rc.multiResult.add(subRc); |
| if (subRc.err != 0 && rc.err == 0) { |
| rc.err = subRc.err; |
| } |
| } |
| break; |
| } |
| } catch (KeeperException e) { |
| LOG.debug("Failed: {}:{}", header, txn, e); |
| rc.err = e.code().intValue(); |
| } catch (IOException e) { |
| LOG.debug("Failed: {}:{}", header, txn, e); |
| } |
| |
| /* |
| * Snapshots are taken lazily. When serializing a node, it's data |
| * and children copied in a synchronization block on that node, |
| * which means newly created node won't be in the snapshot, so |
| * we won't have mismatched cversion and pzxid when replaying the |
| * createNode txn. |
| * |
| * But there is a tricky scenario that if the child is deleted due |
| * to session close and re-created in a different global session |
| * after that the parent is serialized, then when replay the txn |
| * because the node belongs to a different session, replay the |
| * closeSession txn won't delete it anymore, and we'll get NODEEXISTS |
| * error when replay the createNode txn. In this case, we need to |
| * update the cversion and pzxid to the new value. |
| * |
| * Note, such failures on DT should be seen only during |
| * restore. |
| */ |
| if (header.getType() == OpCode.create && rc.err == Code.NODEEXISTS.intValue()) { |
| LOG.debug("Adjusting parent cversion for Txn: {} path: {} err: {}", header.getType(), rc.path, rc.err); |
| int lastSlash = rc.path.lastIndexOf('/'); |
| String parentName = rc.path.substring(0, lastSlash); |
| CreateTxn cTxn = (CreateTxn) txn; |
| try { |
| setCversionPzxid(parentName, cTxn.getParentCVersion(), header.getZxid()); |
| } catch (NoNodeException e) { |
| LOG.error("Failed to set parent cversion for: {}", parentName, e); |
| rc.err = e.code().intValue(); |
| } |
| } else if (rc.err != Code.OK.intValue()) { |
| LOG.debug("Ignoring processTxn failure hdr: {} : error: {}", header.getType(), rc.err); |
| } |
| |
| /* |
| * Things we can only update after the whole txn is applied to data |
| * tree. |
| * |
| * If we update the lastProcessedZxid with the first sub txn in multi |
| * and there is a snapshot in progress, it's possible that the zxid |
| * associated with the snapshot only include partial of the multi op. |
| * |
| * When loading snapshot, it will only load the txns after the zxid |
| * associated with snapshot file, which could cause data inconsistency |
| * due to missing sub txns. |
| * |
| * To avoid this, we only update the lastProcessedZxid when the whole |
| * multi-op txn is applied to DataTree. |
| */ |
| if (!isSubTxn) { |
| /* |
| * A snapshot might be in progress while we are modifying the data |
| * tree. If we set lastProcessedZxid prior to making corresponding |
| * change to the tree, then the zxid associated with the snapshot |
| * file will be ahead of its contents. Thus, while restoring from |
| * the snapshot, the restore method will not apply the transaction |
| * for zxid associated with the snapshot file, since the restore |
| * method assumes that transaction to be present in the snapshot. |
| * |
| * To avoid this, we first apply the transaction and then modify |
| * lastProcessedZxid. During restore, we correctly handle the |
| * case where the snapshot contains data ahead of the zxid associated |
| * with the file. |
| */ |
| if (rc.zxid > lastProcessedZxid) { |
| lastProcessedZxid = rc.zxid; |
| } |
| |
| if (digestFromLoadedSnapshot != null) { |
| compareSnapshotDigests(rc.zxid); |
| } else { |
| // only start recording digest when we're not in fuzzy state |
| logZxidDigest(rc.zxid, getTreeDigest()); |
| } |
| } |
| |
| return rc; |
| } |
| |
| void killSession(long session, long zxid) { |
| // The list is already removed from the ephemerals, |
| // so we do not have to worry about synchronizing on |
| // the list. This is only called from FinalRequestProcessor |
| // so there is no need for synchronization. The list is not |
| // changed here. Only create and delete change the list which |
| // are again called from FinalRequestProcessor in sequence. |
| killSession(session, zxid, ephemerals.remove(session), null); |
| } |
| |
| void killSession(long session, long zxid, Set<String> paths2DeleteLocal, |
| List<String> paths2DeleteInTxn) { |
| if (paths2DeleteInTxn != null) { |
| deleteNodes(session, zxid, paths2DeleteInTxn); |
| } |
| |
| if (paths2DeleteLocal == null) { |
| return; |
| } |
| |
| if (paths2DeleteInTxn != null) { |
| // explicitly check and remove to avoid potential performance |
| // issue when using removeAll |
| for (String path: paths2DeleteInTxn) { |
| paths2DeleteLocal.remove(path); |
| } |
| if (!paths2DeleteLocal.isEmpty()) { |
| LOG.warn( |
| "Unexpected extra paths under session {} which are not in txn 0x{}", |
| paths2DeleteLocal, |
| Long.toHexString(zxid)); |
| } |
| } |
| |
| deleteNodes(session, zxid, paths2DeleteLocal); |
| } |
| |
| void deleteNodes(long session, long zxid, Iterable<String> paths2Delete) { |
| for (String path : paths2Delete) { |
| boolean deleted = false; |
| String sessionHex = "0x" + Long.toHexString(session); |
| try { |
| deleteNode(path, zxid); |
| deleted = true; |
| LOG.debug("Deleting ephemeral node {} for session {}", path, sessionHex); |
| } catch (NoNodeException e) { |
| LOG.warn( |
| "Ignoring NoNodeException for path {} while removing ephemeral for dead session {}", |
| path, sessionHex); |
| } |
| if (ZKAuditProvider.isAuditEnabled()) { |
| if (deleted) { |
| ZKAuditProvider.log(ZKAuditProvider.getZKUser(), |
| AuditConstants.OP_DEL_EZNODE_EXP, path, null, null, |
| sessionHex, null, Result.SUCCESS); |
| } else { |
| ZKAuditProvider.log(ZKAuditProvider.getZKUser(), |
| AuditConstants.OP_DEL_EZNODE_EXP, path, null, null, |
| sessionHex, null, Result.FAILURE); |
| } |
| } |
| } |
| } |
| |
| /** |
| * An encapsulation class for return value |
| */ |
| private static class Counts { |
| long bytes; |
| int count; |
| } |
| |
| /** |
| * this method gets the count of nodes and the bytes under a subtree |
| * |
| * @param path the path to be used |
| * @param counts the int count |
| */ |
| private void getCounts(String path, Counts counts) { |
| DataNode node = getNode(path); |
| if (node == null) { |
| return; |
| } |
| String[] children; |
| int len; |
| synchronized (node) { |
| children = node.getChildren().toArray(new String[0]); |
| len = (node.data == null ? 0 : node.data.length); |
| } |
| // add itself |
| counts.count += 1; |
| counts.bytes += len; |
| for (String child : children) { |
| getCounts(path + "/" + child, counts); |
| } |
| } |
| |
| /** |
| * update the quota for the given path |
| * |
| * @param path the path to be used |
| */ |
| private void updateQuotaForPath(String path) { |
| Counts c = new Counts(); |
| getCounts(path, c); |
| StatsTrack statsTrack = new StatsTrack(); |
| statsTrack.setBytes(c.bytes); |
| statsTrack.setCount(c.count); |
| String statPath = Quotas.statPath(path); |
| DataNode node = getNode(statPath); |
| // it should exist |
| if (node == null) { |
| LOG.warn("Missing quota stat node {}", statPath); |
| return; |
| } |
| synchronized (node) { |
| nodes.preChange(statPath, node); |
| node.data = statsTrack.getStatsBytes(); |
| nodes.postChange(statPath, node); |
| } |
| } |
| |
| /** |
| * this method traverses the quota path and update the path trie and sets |
| * |
| * @param path the path to be used |
| */ |
| private void traverseNode(String path) { |
| DataNode node = getNode(path); |
| String[] children; |
| synchronized (node) { |
| children = node.getChildren().toArray(new String[0]); |
| } |
| if (children.length == 0) { |
| // this node does not have a child |
| // is the leaf node |
| // check if it's the leaf node |
| String endString = "/" + Quotas.limitNode; |
| if (path.endsWith(endString)) { |
| // ok this is the limit node |
| // get the real node and update |
| // the count and the bytes |
| String realPath = path.substring(Quotas.quotaZookeeper.length(), path.indexOf(endString)); |
| updateQuotaForPath(realPath); |
| this.pTrie.addPath(realPath); |
| } |
| return; |
| } |
| for (String child : children) { |
| traverseNode(path + "/" + child); |
| } |
| } |
| |
| /** |
| * this method sets up the path trie and sets up stats for quota nodes |
| */ |
| private void setupQuota() { |
| String quotaPath = Quotas.quotaZookeeper; |
| DataNode node = getNode(quotaPath); |
| if (node == null) { |
| return; |
| } |
| traverseNode(quotaPath); |
| } |
| |
| /** |
| * this method uses a stringbuilder to create a new path for children. This |
| * is faster than string appends ( str1 + str2). |
| * |
| * @param oa OutputArchive to write to. |
| * @param path a string builder. |
| * @throws IOException |
| */ |
| void serializeNode(OutputArchive oa, StringBuilder path) throws IOException { |
| String pathString = path.toString(); |
| DataNode node = getNode(pathString); |
| if (node == null) { |
| return; |
| } |
| String[] children; |
| DataNode nodeCopy; |
| synchronized (node) { |
| StatPersisted statCopy = new StatPersisted(); |
| copyStatPersisted(node.stat, statCopy); |
| //we do not need to make a copy of node.data because the contents |
| //are never changed |
| nodeCopy = new DataNode(node.data, node.acl, statCopy); |
| children = node.getChildren().toArray(new String[0]); |
| } |
| serializeNodeData(oa, pathString, nodeCopy); |
| path.append('/'); |
| int off = path.length(); |
| for (String child : children) { |
| // Since this is single buffer being reused, we need to truncate the previous bytes of string. |
| path.delete(off, Integer.MAX_VALUE); |
| path.append(child); |
| serializeNode(oa, path); |
| } |
| } |
| |
| // visible for test |
| public void serializeNodeData(OutputArchive oa, String path, DataNode node) throws IOException { |
| oa.writeString(path, "path"); |
| oa.writeRecord(node, "node"); |
| } |
| |
| public void serializeAcls(OutputArchive oa) throws IOException { |
| aclCache.serialize(oa); |
| } |
| |
| public void serializeNodes(OutputArchive oa) throws IOException { |
| serializeNode(oa, new StringBuilder()); |
| // / marks end of stream |
| // we need to check if clear had been called in between the snapshot. |
| if (root != null) { |
| oa.writeString("/", "path"); |
| } |
| } |
| |
| public void serialize(OutputArchive oa, String tag) throws IOException { |
| serializeAcls(oa); |
| serializeNodes(oa); |
| } |
| |
| public void deserialize(InputArchive ia, String tag) throws IOException { |
| aclCache.deserialize(ia); |
| nodes.clear(); |
| pTrie.clear(); |
| nodeDataSize.set(0); |
| String path = ia.readString("path"); |
| while (!"/".equals(path)) { |
| DataNode node = new DataNode(); |
| ia.readRecord(node, "node"); |
| nodes.put(path, node); |
| synchronized (node) { |
| aclCache.addUsage(node.acl); |
| } |
| int lastSlash = path.lastIndexOf('/'); |
| if (lastSlash == -1) { |
| root = node; |
| } else { |
| String parentPath = path.substring(0, lastSlash); |
| DataNode parent = nodes.get(parentPath); |
| if (parent == null) { |
| throw new IOException( |
| "Invalid Datatree, unable to find parent " + parentPath + " of path " + path); |
| } |
| parent.addChild(path.substring(lastSlash + 1)); |
| long owner = node.stat.getEphemeralOwner(); |
| EphemeralType ephemeralType = EphemeralType.get(owner); |
| if (ephemeralType == EphemeralType.CONTAINER) { |
| containers.add(path); |
| } else if (ephemeralType == EphemeralType.TTL) { |
| ttls.add(path); |
| } else if (owner != 0) { |
| HashSet<String> list = ephemerals.computeIfAbsent(owner, k -> new HashSet<>()); |
| list.add(path); |
| } |
| } |
| path = ia.readString("path"); |
| } |
| // have counted digest for root node with "", ignore here to avoid |
| // counting twice for root node |
| nodes.putWithoutDigest("/", root); |
| |
| nodeDataSize.set(approximateDataSize()); |
| |
| // we are done with deserializing the datatree |
| // update the quotas - create path trie |
| // and also update the stat nodes |
| setupQuota(); |
| |
| aclCache.purgeUnused(); |
| } |
| |
| /** |
| * Summary of the watches on the datatree. |
| * @param writer the output to write to |
| */ |
| public synchronized void dumpWatchesSummary(PrintWriter writer) { |
| writer.print(dataWatches.toString()); |
| } |
| |
| /** |
| * Write a text dump of all the watches on the datatree. |
| * Warning, this is expensive, use sparingly! |
| * @param writer the output to write to |
| */ |
| public synchronized void dumpWatches(PrintWriter writer, boolean byPath) { |
| dataWatches.dumpWatches(writer, byPath); |
| } |
| |
| /** |
| * Returns a watch report. |
| * |
| * @return watch report |
| * @see WatchesReport |
| */ |
| public synchronized WatchesReport getWatches() { |
| return dataWatches.getWatches(); |
| } |
| |
| /** |
| * Returns a watch report by path. |
| * |
| * @return watch report |
| * @see WatchesPathReport |
| */ |
| public synchronized WatchesPathReport getWatchesByPath() { |
| return dataWatches.getWatchesByPath(); |
| } |
| |
| /** |
| * Returns a watch summary. |
| * |
| * @return watch summary |
| * @see WatchesSummary |
| */ |
| public synchronized WatchesSummary getWatchesSummary() { |
| return dataWatches.getWatchesSummary(); |
| } |
| |
| /** |
| * Write a text dump of all the ephemerals in the datatree. |
| * @param writer the output to write to |
| */ |
| public void dumpEphemerals(PrintWriter writer) { |
| writer.println("Sessions with Ephemerals (" + ephemerals.keySet().size() + "):"); |
| for (Entry<Long, HashSet<String>> entry : ephemerals.entrySet()) { |
| writer.print("0x" + Long.toHexString(entry.getKey())); |
| writer.println(":"); |
| Set<String> tmp = entry.getValue(); |
| if (tmp != null) { |
| synchronized (tmp) { |
| for (String path : tmp) { |
| writer.println("\t" + path); |
| } |
| } |
| } |
| } |
| } |
| |
| public void shutdownWatcher() { |
| dataWatches.shutdown(); |
| childWatches.shutdown(); |
| } |
| |
| /** |
| * Returns a mapping of session ID to ephemeral znodes. |
| * |
| * @return map of session ID to sets of ephemeral znodes |
| */ |
| public Map<Long, Set<String>> getEphemerals() { |
| Map<Long, Set<String>> ephemeralsCopy = new HashMap<>(); |
| for (Entry<Long, HashSet<String>> e : ephemerals.entrySet()) { |
| synchronized (e.getValue()) { |
| ephemeralsCopy.put(e.getKey(), new HashSet<>(e.getValue())); |
| } |
| } |
| return ephemeralsCopy; |
| } |
| |
| public void removeCnxn(Watcher watcher) { |
| dataWatches.removeWatcher(watcher); |
| childWatches.removeWatcher(watcher); |
| } |
| |
| public void setWatches(long relativeZxid, List<String> dataWatches, List<String> existWatches, List<String> childWatches, |
| List<String> persistentWatches, List<String> persistentRecursiveWatches, Watcher watcher) { |
| for (String path : dataWatches) { |
| DataNode node = getNode(path); |
| if (node == null) { |
| watcher.process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path)); |
| } else if (node.stat.getMzxid() > relativeZxid) { |
| watcher.process(new WatchedEvent(EventType.NodeDataChanged, KeeperState.SyncConnected, path)); |
| } else { |
| this.dataWatches.addWatch(path, watcher); |
| } |
| } |
| for (String path : existWatches) { |
| DataNode node = getNode(path); |
| if (node != null) { |
| watcher.process(new WatchedEvent(EventType.NodeCreated, KeeperState.SyncConnected, path)); |
| } else { |
| this.dataWatches.addWatch(path, watcher); |
| } |
| } |
| for (String path : childWatches) { |
| DataNode node = getNode(path); |
| if (node == null) { |
| watcher.process(new WatchedEvent(EventType.NodeDeleted, KeeperState.SyncConnected, path)); |
| } else if (node.stat.getPzxid() > relativeZxid) { |
| watcher.process(new WatchedEvent(EventType.NodeChildrenChanged, KeeperState.SyncConnected, path)); |
| } else { |
| this.childWatches.addWatch(path, watcher); |
| } |
| } |
| for (String path : persistentWatches) { |
| this.childWatches.addWatch(path, watcher, WatcherMode.PERSISTENT); |
| this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT); |
| } |
| for (String path : persistentRecursiveWatches) { |
| this.dataWatches.addWatch(path, watcher, WatcherMode.PERSISTENT_RECURSIVE); |
| } |
| } |
| |
| /** |
| * This method sets the Cversion and Pzxid for the specified node to the |
| * values passed as arguments. The values are modified only if newCversion |
| * is greater than the current Cversion. A NoNodeException is thrown if |
| * a znode for the specified path is not found. |
| * |
| * @param path |
| * Full path to the znode whose Cversion needs to be modified. |
| * A "/" at the end of the path is ignored. |
| * @param newCversion |
| * Value to be assigned to Cversion |
| * @param zxid |
| * Value to be assigned to Pzxid |
| * @throws NoNodeException |
| * If znode not found. |
| **/ |
| public void setCversionPzxid(String path, int newCversion, long zxid) throws NoNodeException { |
| if (path.endsWith("/")) { |
| path = path.substring(0, path.length() - 1); |
| } |
| DataNode node = nodes.get(path); |
| if (node == null) { |
| throw new NoNodeException(path); |
| } |
| synchronized (node) { |
| if (newCversion == -1) { |
| newCversion = node.stat.getCversion() + 1; |
| } |
| if (newCversion > node.stat.getCversion()) { |
| nodes.preChange(path, node); |
| node.stat.setCversion(newCversion); |
| node.stat.setPzxid(zxid); |
| nodes.postChange(path, node); |
| } |
| } |
| } |
| |
| public boolean containsWatcher(String path, WatcherType type, Watcher watcher) { |
| boolean containsWatcher = false; |
| switch (type) { |
| case Children: |
| containsWatcher = this.childWatches.containsWatcher(path, watcher); |
| break; |
| case Data: |
| containsWatcher = this.dataWatches.containsWatcher(path, watcher); |
| break; |
| case Any: |
| if (this.childWatches.containsWatcher(path, watcher)) { |
| containsWatcher = true; |
| } |
| if (this.dataWatches.containsWatcher(path, watcher)) { |
| containsWatcher = true; |
| } |
| break; |
| } |
| return containsWatcher; |
| } |
| |
| public boolean removeWatch(String path, WatcherType type, Watcher watcher) { |
| boolean removed = false; |
| switch (type) { |
| case Children: |
| removed = this.childWatches.removeWatcher(path, watcher); |
| break; |
| case Data: |
| removed = this.dataWatches.removeWatcher(path, watcher); |
| break; |
| case Any: |
| if (this.childWatches.removeWatcher(path, watcher)) { |
| removed = true; |
| } |
| if (this.dataWatches.removeWatcher(path, watcher)) { |
| removed = true; |
| } |
| break; |
| } |
| return removed; |
| } |
| |
| // visible for testing |
| public ReferenceCountedACLCache getReferenceCountedAclCache() { |
| return aclCache; |
| } |
| |
| private void updateReadStat(String path, long bytes) { |
| final String namespace = PathUtils.getTopNamespace(path); |
| if (namespace == null) { |
| return; |
| } |
| long totalBytes = path.length() + bytes + STAT_OVERHEAD_BYTES; |
| ServerMetrics.getMetrics().READ_PER_NAMESPACE.add(namespace, totalBytes); |
| } |
| |
| private void updateWriteStat(String path, long bytes) { |
| final String namespace = PathUtils.getTopNamespace(path); |
| if (namespace == null) { |
| return; |
| } |
| ServerMetrics.getMetrics().WRITE_PER_NAMESPACE.add(namespace, path.length() + bytes); |
| } |
| |
| /** |
| * Add the digest to the historical list, and update the latest zxid digest. |
| */ |
| private void logZxidDigest(long zxid, long digest) { |
| ZxidDigest zxidDigest = new ZxidDigest(zxid, digestCalculator.getDigestVersion(), digest); |
| lastProcessedZxidDigest = zxidDigest; |
| if (zxidDigest.zxid % DIGEST_LOG_INTERVAL == 0) { |
| synchronized (digestLog) { |
| digestLog.add(zxidDigest); |
| if (digestLog.size() > DIGEST_LOG_LIMIT) { |
| digestLog.poll(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Serializing the digest to snapshot, this is done after the data tree |
| * is being serialized, so when we replay the txns, and it hits this zxid |
| * we know we should be in a non-fuzzy state, and have the same digest. |
| * |
| * @param oa the output stream to write to |
| * @return true if the digest is serialized successfully |
| */ |
| public boolean serializeZxidDigest(OutputArchive oa) throws IOException { |
| if (!ZooKeeperServer.isDigestEnabled()) { |
| return false; |
| } |
| |
| ZxidDigest zxidDigest = lastProcessedZxidDigest; |
| if (zxidDigest == null) { |
| // write an empty digest |
| zxidDigest = new ZxidDigest(); |
| } |
| zxidDigest.serialize(oa); |
| return true; |
| } |
| |
| /** |
| * Deserializing the zxid digest from the input stream and update the |
| * digestFromLoadedSnapshot. |
| * |
| * @param ia the input stream to read from |
| * @param startZxidOfSnapshot the zxid of snapshot file |
| * @return the true if it deserialized successfully |
| */ |
| public boolean deserializeZxidDigest(InputArchive ia, long startZxidOfSnapshot) throws IOException { |
| if (!ZooKeeperServer.isDigestEnabled()) { |
| return false; |
| } |
| |
| try { |
| ZxidDigest zxidDigest = new ZxidDigest(); |
| zxidDigest.deserialize(ia); |
| if (zxidDigest.zxid > 0) { |
| digestFromLoadedSnapshot = zxidDigest; |
| LOG.info("The digest in the snapshot has digest version of {}, " |
| + "with zxid as 0x{}, and digest value as {}", |
| digestFromLoadedSnapshot.digestVersion, |
| Long.toHexString(digestFromLoadedSnapshot.zxid), |
| digestFromLoadedSnapshot.digest); |
| } else { |
| digestFromLoadedSnapshot = null; |
| LOG.info("The digest value is empty in snapshot"); |
| } |
| |
| // There is possibility that the start zxid of a snapshot might |
| // be larger than the digest zxid in snapshot. |
| // |
| // Known cases: |
| // |
| // The new leader set the last processed zxid to be the new |
| // epoch + 0, which is not mapping to any txn, and it uses |
| // this to take snapshot, which is possible if we don't |
| // clean database before switching to LOOKING. In this case |
| // the currentZxidDigest will be the zxid of last epoch, and |
| // it's smaller than the zxid of the snapshot file. |
| // |
| // It's safe to reset the targetZxidDigest to null and start |
| // to compare digest when replaying the first txn, since it's |
| // a non-fuzzy snapshot. |
| if (digestFromLoadedSnapshot != null && digestFromLoadedSnapshot.zxid < startZxidOfSnapshot) { |
| LOG.info("The zxid of snapshot digest 0x{} is smaller " |
| + "than the known snapshot highest zxid, the snapshot " |
| + "started with zxid 0x{}. It will be invalid to use " |
| + "this snapshot digest associated with this zxid, will " |
| + "ignore comparing it.", Long.toHexString(digestFromLoadedSnapshot.zxid), |
| Long.toHexString(startZxidOfSnapshot)); |
| digestFromLoadedSnapshot = null; |
| } |
| |
| return true; |
| } catch (EOFException e) { |
| LOG.warn("Got EOF exception while reading the digest, likely due to the reading an older snapshot."); |
| return false; |
| } |
| } |
| |
| /** |
| * Serializes the lastProcessedZxid so we can get it from snapshot instead the snapshot file name. |
| * This is needed for performing snapshot and restore via admin server commands. |
| * |
| * @param oa the output stream to write to |
| * @return true if the lastProcessedZxid is serialized successfully, otherwise false |
| * @throws IOException if there is an I/O error |
| */ |
| public boolean serializeLastProcessedZxid(final OutputArchive oa) throws IOException { |
| if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) { |
| return false; |
| } |
| oa.writeLong(lastProcessedZxid, "lastZxid"); |
| return true; |
| } |
| |
| /** |
| * Deserializes the lastProcessedZxid from the input stream and updates the lastProcessedZxid field. |
| * |
| * @param ia the input stream to read from |
| * @return true if lastProcessedZxid is deserialized successfully, otherwise false |
| * @throws IOException if there is an I/O error |
| */ |
| public boolean deserializeLastProcessedZxid(final InputArchive ia) throws IOException { |
| if (!ZooKeeperServer.isSerializeLastProcessedZxidEnabled()) { |
| return false; |
| } |
| try { |
| lastProcessedZxid = ia.readLong("lastZxid"); |
| } catch (final EOFException e) { |
| LOG.warn("Got EOFException while reading the last processed zxid, likely due to reading an older snapshot."); |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Compares the actual tree's digest with that in the snapshot. |
| * Resets digestFromLoadedSnapshot after comparison. |
| * |
| * @param zxid zxid |
| */ |
| public void compareSnapshotDigests(long zxid) { |
| if (zxid == digestFromLoadedSnapshot.zxid) { |
| if (digestCalculator.getDigestVersion() != digestFromLoadedSnapshot.digestVersion) { |
| LOG.info( |
| "Digest version changed, local: {}, new: {}, skip comparing digest now.", |
| digestFromLoadedSnapshot.digestVersion, |
| digestCalculator.getDigestVersion()); |
| digestFromLoadedSnapshot = null; |
| return; |
| } |
| if (getTreeDigest() != digestFromLoadedSnapshot.getDigest()) { |
| reportDigestMismatch(zxid); |
| } |
| digestFromLoadedSnapshot = null; |
| } else if (digestFromLoadedSnapshot.zxid != 0 && zxid > digestFromLoadedSnapshot.zxid) { |
| RATE_LOGGER.rateLimitLog( |
| "The txn 0x{} of snapshot digest does not exist.", |
| Long.toHexString(digestFromLoadedSnapshot.zxid)); |
| } |
| } |
| |
| /** |
| * Compares the digest of the tree with the digest present in transaction digest. |
| * If there is any error, logs and alerts the watchers. |
| * |
| * @param header transaction header being applied |
| * @param txn transaction |
| * @param digest transaction digest |
| * |
| * @return false if digest in the txn doesn't match what we have now in the data tree |
| */ |
| public boolean compareDigest(TxnHeader header, Record txn, TxnDigest digest) { |
| long zxid = header.getZxid(); |
| |
| if (!ZooKeeperServer.isDigestEnabled() || digest == null) { |
| return true; |
| } |
| // do not compare digest if we're still in fuzzy state |
| if (digestFromLoadedSnapshot != null) { |
| return true; |
| } |
| // do not compare digest if there is digest version change |
| if (digestCalculator.getDigestVersion() != digest.getVersion()) { |
| RATE_LOGGER.rateLimitLog("Digest version not the same on zxid.", String.valueOf(zxid)); |
| return true; |
| } |
| |
| long logDigest = digest.getTreeDigest(); |
| long actualDigest = getTreeDigest(); |
| if (logDigest != actualDigest) { |
| reportDigestMismatch(zxid); |
| LOG.debug("Digest in log: {}, actual tree: {}", logDigest, actualDigest); |
| if (firstMismatchTxn) { |
| LOG.error( |
| "First digest mismatch on txn: {}, {}, expected digest is {}, actual digest is {}, ", |
| header, txn, digest, actualDigest); |
| firstMismatchTxn = false; |
| } |
| return false; |
| } else { |
| RATE_LOGGER.flush(); |
| LOG.debug( |
| "Digests are matching for Zxid: {}, Digest in log and actual tree: {}", |
| Long.toHexString(zxid), logDigest); |
| return true; |
| } |
| } |
| |
| /** |
| * Reports any mismatch in the transaction digest. |
| * @param zxid zxid for which the error is being reported. |
| */ |
| public void reportDigestMismatch(long zxid) { |
| ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT.add(1); |
| RATE_LOGGER.rateLimitLog("Digests are not matching. Value is Zxid.", String.valueOf(zxid)); |
| |
| for (DigestWatcher watcher : digestWatchers) { |
| watcher.process(zxid); |
| } |
| } |
| |
| public long getTreeDigest() { |
| return nodes.getDigest(); |
| } |
| |
| public ZxidDigest getLastProcessedZxidDigest() { |
| return lastProcessedZxidDigest; |
| } |
| |
| public ZxidDigest getDigestFromLoadedSnapshot() { |
| return digestFromLoadedSnapshot; |
| } |
| |
| /** |
| * Add digest mismatch event handler. |
| * |
| * @param digestWatcher the handler to add |
| */ |
| public void addDigestWatcher(DigestWatcher digestWatcher) { |
| digestWatchers.add(digestWatcher); |
| } |
| |
| /** |
| * Return all the digests in the historical digest list. |
| */ |
| public List<ZxidDigest> getDigestLog() { |
| synchronized (digestLog) { |
| // Return a copy of current digest log |
| return new LinkedList<>(digestLog); |
| } |
| } |
| |
| /** |
| * A helper class to maintain the digest meta associated with specific zxid. |
| */ |
| public class ZxidDigest { |
| |
| long zxid; |
| // the digest value associated with this zxid |
| long digest; |
| // the version when the digest was calculated |
| int digestVersion; |
| |
| ZxidDigest() { |
| this(0, digestCalculator.getDigestVersion(), 0); |
| } |
| |
| ZxidDigest(long zxid, int digestVersion, long digest) { |
| this.zxid = zxid; |
| this.digestVersion = digestVersion; |
| this.digest = digest; |
| } |
| |
| public void serialize(OutputArchive oa) throws IOException { |
| oa.writeLong(zxid, "zxid"); |
| oa.writeInt(digestVersion, "digestVersion"); |
| oa.writeLong(digest, "digest"); |
| } |
| |
| public void deserialize(InputArchive ia) throws IOException { |
| zxid = ia.readLong("zxid"); |
| digestVersion = ia.readInt("digestVersion"); |
| // the old version is using hex string as the digest |
| if (digestVersion < 2) { |
| String d = ia.readString("digest"); |
| if (d != null) { |
| digest = Long.parseLong(d, 16); |
| } |
| } else { |
| digest = ia.readLong("digest"); |
| } |
| } |
| |
| public long getZxid() { |
| return zxid; |
| } |
| |
| public int getDigestVersion() { |
| return digestVersion; |
| } |
| |
| public long getDigest() { |
| return digest; |
| } |
| |
| } |
| |
| /** |
| * Create a node stat from the given params. |
| * |
| * @param zxid the zxid associated with the txn |
| * @param time the time when the txn is created |
| * @param ephemeralOwner the owner if the node is an ephemeral |
| * @return the stat |
| */ |
| public static StatPersisted createStat(long zxid, long time, long ephemeralOwner) { |
| StatPersisted stat = new StatPersisted(); |
| stat.setCtime(time); |
| stat.setMtime(time); |
| stat.setCzxid(zxid); |
| stat.setMzxid(zxid); |
| stat.setPzxid(zxid); |
| stat.setVersion(0); |
| stat.setAversion(0); |
| stat.setEphemeralOwner(ephemeralOwner); |
| return stat; |
| } |
| } |