| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.twitter.distributedlog.zk; |
| |
| import com.twitter.distributedlog.ZooKeeperClient; |
| import org.apache.bookkeeper.stats.Gauge; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.zookeeper.AsyncCallback; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.HashSet; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| /** |
| * Watcher Manager to manage watchers. |
| * <h3>Metrics</h3> |
| * <ul> |
| * <li> `total_watches`: total number of watches that managed by this watcher manager. |
| * <li> `num_child_watches`: number of paths that are watched for children changes by this watcher manager. |
| * </ul> |
| */ |
| public class ZKWatcherManager implements Watcher { |
| |
| static final Logger logger = LoggerFactory.getLogger(ZKWatcherManager.class); |
| |
| public static Builder newBuilder() { |
| return new Builder(); |
| } |
| |
| public static class Builder { |
| |
| private String _name; |
| private StatsLogger _statsLogger; |
| private ZooKeeperClient _zkc; |
| |
| public Builder name(String name) { |
| this._name = name; |
| return this; |
| } |
| |
| public Builder zkc(ZooKeeperClient zkc) { |
| this._zkc = zkc; |
| return this; |
| } |
| |
| public Builder statsLogger(StatsLogger statsLogger) { |
| this._statsLogger = statsLogger; |
| return this; |
| } |
| |
| public ZKWatcherManager build() { |
| return new ZKWatcherManager(_name, _zkc, _statsLogger); |
| } |
| } |
| |
| private final String name; |
| private final ZooKeeperClient zkc; |
| private final StatsLogger statsLogger; |
| // Gauges and their labels |
| private final Gauge<Number> totalWatchesGauge; |
| private static final String totalWatchesGauageLabel = "total_watches"; |
| private final Gauge<Number> numChildWatchesGauge; |
| private static final String numChildWatchesGauageLabel = "num_child_watches"; |
| |
| protected final ConcurrentMap<String, Set<Watcher>> childWatches; |
| protected final AtomicInteger allWatchesGauge; |
| |
| private ZKWatcherManager(String name, |
| ZooKeeperClient zkc, |
| StatsLogger statsLogger) { |
| this.name = name; |
| this.zkc = zkc; |
| this.statsLogger = statsLogger; |
| |
| // watches |
| this.childWatches = new ConcurrentHashMap<String, Set<Watcher>>(); |
| this.allWatchesGauge = new AtomicInteger(0); |
| |
| // stats |
| totalWatchesGauge = new Gauge<Number>() { |
| @Override |
| public Number getDefaultValue() { |
| return 0; |
| } |
| |
| @Override |
| public Number getSample() { |
| return allWatchesGauge.get(); |
| } |
| }; |
| this.statsLogger.registerGauge(totalWatchesGauageLabel, totalWatchesGauge); |
| |
| numChildWatchesGauge = new Gauge<Number>() { |
| @Override |
| public Number getDefaultValue() { |
| return 0; |
| } |
| |
| @Override |
| public Number getSample() { |
| return childWatches.size(); |
| } |
| }; |
| |
| this.statsLogger.registerGauge(numChildWatchesGauageLabel, numChildWatchesGauge); |
| } |
| |
| public Watcher registerChildWatcher(String path, Watcher watcher) { |
| Set<Watcher> watchers = childWatches.get(path); |
| if (null == watchers) { |
| Set<Watcher> newWatchers = new HashSet<Watcher>(); |
| Set<Watcher> oldWatchers = childWatches.putIfAbsent(path, newWatchers); |
| watchers = (null == oldWatchers) ? newWatchers : oldWatchers; |
| } |
| synchronized (watchers) { |
| if (childWatches.get(path) == watchers) { |
| if (watchers.add(watcher)) { |
| allWatchesGauge.incrementAndGet(); |
| } |
| } else { |
| logger.warn("Watcher set for path {} has been changed while registering child watcher {}.", |
| path, watcher); |
| } |
| } |
| return this; |
| } |
| |
| public void unregisterChildWatcher(String path, Watcher watcher, boolean removeFromServer) { |
| Set<Watcher> watchers = childWatches.get(path); |
| if (null == watchers) { |
| logger.warn("No watchers found on path {} while unregistering child watcher {}.", |
| path, watcher); |
| return; |
| } |
| synchronized (watchers) { |
| if (watchers.remove(watcher)) { |
| allWatchesGauge.decrementAndGet(); |
| } else { |
| logger.warn("Remove a non-registered child watcher {} from path {}", watcher, path); |
| } |
| if (watchers.isEmpty()) { |
| // best-efforts to remove watches |
| try { |
| if (null != zkc && removeFromServer) { |
| zkc.get().removeWatches(path, this, WatcherType.Children, true, new AsyncCallback.VoidCallback() { |
| @Override |
| public void processResult(int rc, String path, Object ctx) { |
| if (KeeperException.Code.OK.intValue() == rc) { |
| logger.debug("Successfully removed children watches from {}", path); |
| } else { |
| logger.debug("Encountered exception on removing children watches from {}", |
| path, KeeperException.create(KeeperException.Code.get(rc))); |
| } |
| } |
| }, null); |
| } |
| } catch (InterruptedException e) { |
| logger.debug("Encountered exception on removing watches from {}", path, e); |
| } catch (ZooKeeperClient.ZooKeeperConnectionException e) { |
| logger.debug("Encountered exception on removing watches from {}", path, e); |
| } |
| childWatches.remove(path, watchers); |
| } |
| } |
| } |
| |
| public void unregisterGauges() { |
| this.statsLogger.unregisterGauge(totalWatchesGauageLabel, totalWatchesGauge); |
| this.statsLogger.unregisterGauge(numChildWatchesGauageLabel, numChildWatchesGauge); |
| } |
| |
| @Override |
| public void process(WatchedEvent event) { |
| switch (event.getType()) { |
| case None: |
| handleKeeperStateEvent(event); |
| break; |
| case NodeChildrenChanged: |
| handleChildWatchEvent(event); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| private void handleKeeperStateEvent(WatchedEvent event) { |
| Set<Watcher> savedAllWatches = new HashSet<Watcher>(allWatchesGauge.get()); |
| for (Set<Watcher> watcherSet : childWatches.values()) { |
| synchronized (watcherSet) { |
| savedAllWatches.addAll(watcherSet); |
| } |
| } |
| for (Watcher watcher : savedAllWatches) { |
| watcher.process(event); |
| } |
| } |
| |
| private void handleChildWatchEvent(WatchedEvent event) { |
| String path = event.getPath(); |
| if (null == path) { |
| logger.warn("Received zookeeper watch event with null path : {}", event); |
| return; |
| } |
| Set<Watcher> watchers = childWatches.get(path); |
| if (null == watchers) { |
| return; |
| } |
| Set<Watcher> watchersToFire; |
| synchronized (watchers) { |
| watchersToFire = new HashSet<Watcher>(watchers.size()); |
| watchersToFire.addAll(watchers); |
| } |
| for (Watcher watcher : watchersToFire) { |
| watcher.process(event); |
| } |
| } |
| } |