| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.zookeeper.server.watch; |
| |
| import java.io.PrintWriter; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.Watcher.Event.EventType; |
| import org.apache.zookeeper.Watcher.Event.KeeperState; |
| import org.apache.zookeeper.server.ServerCnxn; |
| import org.apache.zookeeper.server.ServerMetrics; |
| import org.apache.zookeeper.server.ZooTrace; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * This class manages watches. It allows watches to be associated with a string |
| * and removes watchers and their watches in addition to managing triggers. |
| */ |
| public class WatchManager implements IWatchManager { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class); |
| |
| private final Map<String, Set<Watcher>> watchTable = new HashMap<>(); |
| |
| private final Map<Watcher, Map<String, WatchStats>> watch2Paths = new HashMap<>(); |
| |
| private int recursiveWatchQty = 0; |
| |
| @Override |
| public synchronized int size() { |
| int result = 0; |
| for (Set<Watcher> watches : watchTable.values()) { |
| result += watches.size(); |
| } |
| return result; |
| } |
| |
| private boolean isDeadWatcher(Watcher watcher) { |
| return watcher instanceof ServerCnxn && ((ServerCnxn) watcher).isStale(); |
| } |
| |
| @Override |
| public boolean addWatch(String path, Watcher watcher) { |
| return addWatch(path, watcher, WatcherMode.DEFAULT_WATCHER_MODE); |
| } |
| |
| @Override |
| public synchronized boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) { |
| if (isDeadWatcher(watcher)) { |
| LOG.debug("Ignoring addWatch with closed cnxn"); |
| return false; |
| } |
| |
| Set<Watcher> list = watchTable.get(path); |
| if (list == null) { |
| // don't waste memory if there are few watches on a node |
| // rehash when the 4th entry is added, doubling size thereafter |
| // seems like a good compromise |
| list = new HashSet<>(4); |
| watchTable.put(path, list); |
| } |
| list.add(watcher); |
| |
| Map<String, WatchStats> paths = watch2Paths.get(watcher); |
| if (paths == null) { |
| // cnxns typically have many watches, so use default cap here |
| paths = new HashMap<>(); |
| watch2Paths.put(watcher, paths); |
| } |
| |
| WatchStats stats = paths.getOrDefault(path, WatchStats.NONE); |
| WatchStats newStats = stats.addMode(watcherMode); |
| |
| if (newStats != stats) { |
| paths.put(path, newStats); |
| if (watcherMode.isRecursive()) { |
| ++recursiveWatchQty; |
| } |
| return true; |
| } |
| |
| return false; |
| } |
| |
| @Override |
| public synchronized void removeWatcher(Watcher watcher) { |
| Map<String, WatchStats> paths = watch2Paths.remove(watcher); |
| if (paths == null) { |
| return; |
| } |
| for (String p : paths.keySet()) { |
| Set<Watcher> list = watchTable.get(p); |
| if (list != null) { |
| list.remove(watcher); |
| if (list.isEmpty()) { |
| watchTable.remove(p); |
| } |
| } |
| } |
| for (WatchStats stats : paths.values()) { |
| if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) { |
| --recursiveWatchQty; |
| } |
| } |
| } |
| |
| @Override |
| public WatcherOrBitSet triggerWatch(String path, EventType type) { |
| return triggerWatch(path, type, null); |
| } |
| |
| @Override |
| public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) { |
| WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); |
| Set<Watcher> watchers = new HashSet<>(); |
| synchronized (this) { |
| PathParentIterator pathParentIterator = getPathParentIterator(path); |
| for (String localPath : pathParentIterator.asIterable()) { |
| Set<Watcher> thisWatchers = watchTable.get(localPath); |
| if (thisWatchers == null || thisWatchers.isEmpty()) { |
| continue; |
| } |
| Iterator<Watcher> iterator = thisWatchers.iterator(); |
| while (iterator.hasNext()) { |
| Watcher watcher = iterator.next(); |
| Map<String, WatchStats> paths = watch2Paths.getOrDefault(watcher, Collections.emptyMap()); |
| WatchStats stats = paths.get(localPath); |
| if (stats == null) { |
| LOG.warn("inconsistent watch table for watcher {}, {} not in path list", watcher, localPath); |
| continue; |
| } |
| if (!pathParentIterator.atParentPath()) { |
| watchers.add(watcher); |
| WatchStats newStats = stats.removeMode(WatcherMode.STANDARD); |
| if (newStats == WatchStats.NONE) { |
| iterator.remove(); |
| paths.remove(localPath); |
| } else if (newStats != stats) { |
| paths.put(localPath, newStats); |
| } |
| } else if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) { |
| watchers.add(watcher); |
| } |
| } |
| if (thisWatchers.isEmpty()) { |
| watchTable.remove(localPath); |
| } |
| } |
| } |
| if (watchers.isEmpty()) { |
| if (LOG.isTraceEnabled()) { |
| ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK, "No watchers for " + path); |
| } |
| return null; |
| } |
| |
| for (Watcher w : watchers) { |
| if (supress != null && supress.contains(w)) { |
| continue; |
| } |
| w.process(e); |
| } |
| |
| switch (type) { |
| case NodeCreated: |
| ServerMetrics.getMetrics().NODE_CREATED_WATCHER.add(watchers.size()); |
| break; |
| |
| case NodeDeleted: |
| ServerMetrics.getMetrics().NODE_DELETED_WATCHER.add(watchers.size()); |
| break; |
| |
| case NodeDataChanged: |
| ServerMetrics.getMetrics().NODE_CHANGED_WATCHER.add(watchers.size()); |
| break; |
| |
| case NodeChildrenChanged: |
| ServerMetrics.getMetrics().NODE_CHILDREN_WATCHER.add(watchers.size()); |
| break; |
| default: |
| // Other types not logged. |
| break; |
| } |
| |
| return new WatcherOrBitSet(watchers); |
| } |
| |
| @Override |
| public synchronized String toString() { |
| StringBuilder sb = new StringBuilder(); |
| |
| sb.append(watch2Paths.size()).append(" connections watching ").append(watchTable.size()).append(" paths\n"); |
| |
| int total = 0; |
| for (Map<String, WatchStats> paths : watch2Paths.values()) { |
| total += paths.size(); |
| } |
| sb.append("Total watches:").append(total); |
| |
| return sb.toString(); |
| } |
| |
| @Override |
| public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) { |
| if (byPath) { |
| for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) { |
| pwriter.println(e.getKey()); |
| for (Watcher w : e.getValue()) { |
| pwriter.print("\t0x"); |
| pwriter.print(Long.toHexString(((ServerCnxn) w).getSessionId())); |
| pwriter.print("\n"); |
| } |
| } |
| } else { |
| for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) { |
| pwriter.print("0x"); |
| pwriter.println(Long.toHexString(((ServerCnxn) e.getKey()).getSessionId())); |
| for (String path : e.getValue().keySet()) { |
| pwriter.print("\t"); |
| pwriter.println(path); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public synchronized boolean containsWatcher(String path, Watcher watcher) { |
| Set<Watcher> list = watchTable.get(path); |
| return list != null && list.contains(watcher); |
| } |
| |
| @Override |
| public synchronized boolean removeWatcher(String path, Watcher watcher) { |
| Map<String, WatchStats> paths = watch2Paths.get(watcher); |
| if (paths == null) { |
| return false; |
| } |
| |
| WatchStats stats = paths.remove(path); |
| if (stats == null) { |
| return false; |
| } |
| if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) { |
| --recursiveWatchQty; |
| } |
| |
| Set<Watcher> list = watchTable.get(path); |
| if (list == null || !list.remove(watcher)) { |
| LOG.warn("inconsistent watch table for path {}, {} not in watcher list", path, watcher); |
| return false; |
| } |
| |
| if (list.isEmpty()) { |
| watchTable.remove(path); |
| } |
| |
| return true; |
| } |
| |
| // VisibleForTesting |
| Map<Watcher, Map<String, WatchStats>> getWatch2Paths() { |
| return watch2Paths; |
| } |
| |
| @Override |
| public synchronized WatchesReport getWatches() { |
| Map<Long, Set<String>> id2paths = new HashMap<>(); |
| for (Entry<Watcher, Map<String, WatchStats>> e : watch2Paths.entrySet()) { |
| Long id = ((ServerCnxn) e.getKey()).getSessionId(); |
| Set<String> paths = new HashSet<>(e.getValue().keySet()); |
| id2paths.put(id, paths); |
| } |
| return new WatchesReport(id2paths); |
| } |
| |
| @Override |
| public synchronized WatchesPathReport getWatchesByPath() { |
| Map<String, Set<Long>> path2ids = new HashMap<>(); |
| for (Entry<String, Set<Watcher>> e : watchTable.entrySet()) { |
| Set<Long> ids = new HashSet<>(e.getValue().size()); |
| path2ids.put(e.getKey(), ids); |
| for (Watcher watcher : e.getValue()) { |
| ids.add(((ServerCnxn) watcher).getSessionId()); |
| } |
| } |
| return new WatchesPathReport(path2ids); |
| } |
| |
| @Override |
| public synchronized WatchesSummary getWatchesSummary() { |
| int totalWatches = 0; |
| for (Map<String, WatchStats> paths : watch2Paths.values()) { |
| totalWatches += paths.size(); |
| } |
| return new WatchesSummary(watch2Paths.size(), watchTable.size(), totalWatches); |
| } |
| |
| @Override |
| public void shutdown() { /* do nothing */ } |
| |
| // VisibleForTesting |
| synchronized int getRecursiveWatchQty() { |
| return recursiveWatchQty; |
| } |
| |
| private PathParentIterator getPathParentIterator(String path) { |
| if (getRecursiveWatchQty() == 0) { |
| return PathParentIterator.forPathOnly(path); |
| } |
| return PathParentIterator.forAll(path); |
| } |
| } |