| /** | |
| * Licensed to the Apache Software Foundation (ASF) under one or more | |
| * contributor license agreements. See the NOTICE file distributed with | |
| * this work for additional information regarding copyright ownership. | |
| * The ASF licenses this file to You under the Apache License, Version 2.0 | |
| * (the "License"); you may not use this file except in compliance with | |
| * the License. You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| namespace Kafka.Client.ZooKeeperIntegration | |
| { | |
| using System; | |
| using System.Collections.Concurrent; | |
| using System.Collections.Generic; | |
| using System.Threading; | |
| using Kafka.Client.Utils; | |
| using Kafka.Client.ZooKeeperIntegration.Events; | |
| using Kafka.Client.ZooKeeperIntegration.Listeners; | |
| using ZooKeeperNet; | |
| internal partial class ZooKeeperClient | |
| { | |
| /// <summary> | |
| /// Represents the method that will handle a ZooKeeper event | |
| /// </summary> | |
| /// <param name="args"> | |
| /// The args. | |
| /// </param> | |
| /// <typeparam name="T"> | |
| /// Type of event data | |
| /// </typeparam> | |
| public delegate void ZooKeeperEventHandler<T>(T args) | |
| where T : ZooKeeperEventArgs; | |
| /// <summary> | |
| /// Occurs when ZooKeeper connection state changes | |
| /// </summary> | |
| public event ZooKeeperEventHandler<ZooKeeperStateChangedEventArgs> StateChanged | |
| { | |
| add | |
| { | |
| this.EnsuresNotDisposed(); | |
| lock (this.eventLock) | |
| { | |
| this.stateChangedHandlers -= value; | |
| this.stateChangedHandlers += value; | |
| } | |
| } | |
| remove | |
| { | |
| this.EnsuresNotDisposed(); | |
| lock (this.eventLock) | |
| { | |
| this.stateChangedHandlers -= value; | |
| } | |
| } | |
| } | |
| /// <summary> | |
| /// Occurs when ZooKeeper session re-creates | |
| /// </summary> | |
| public event ZooKeeperEventHandler<ZooKeeperSessionCreatedEventArgs> SessionCreated | |
| { | |
| add | |
| { | |
| this.EnsuresNotDisposed(); | |
| lock (this.eventLock) | |
| { | |
| this.sessionCreatedHandlers -= value; | |
| this.sessionCreatedHandlers += value; | |
| } | |
| } | |
| remove | |
| { | |
| this.EnsuresNotDisposed(); | |
| lock (this.eventLock) | |
| { | |
| this.sessionCreatedHandlers -= value; | |
| } | |
| } | |
| } | |
| private readonly ConcurrentQueue<ZooKeeperEventArgs> eventsQueue = new ConcurrentQueue<ZooKeeperEventArgs>(); | |
| private readonly object eventLock = new object(); | |
| private ZooKeeperEventHandler<ZooKeeperStateChangedEventArgs> stateChangedHandlers; | |
| private ZooKeeperEventHandler<ZooKeeperSessionCreatedEventArgs> sessionCreatedHandlers; | |
| private Thread eventWorker; | |
| private Thread zooKeeperEventWorker; | |
| private readonly ConcurrentDictionary<string, ChildChangedEventItem> childChangedHandlers = new ConcurrentDictionary<string, ChildChangedEventItem>(); | |
| private readonly ConcurrentDictionary<string, DataChangedEventItem> dataChangedHandlers = new ConcurrentDictionary<string, DataChangedEventItem>(); | |
| private DateTime? idleTime; | |
| /// <summary> | |
| /// Gets time (in miliseconds) of event thread iddleness | |
| /// </summary> | |
| /// <remarks> | |
| /// Used for testing purpose | |
| /// </remarks> | |
| public int IdleTime | |
| { | |
| get | |
| { | |
| return this.idleTime.HasValue ? Convert.ToInt32((DateTime.Now - this.idleTime.Value).TotalMilliseconds) : 0; | |
| } | |
| } | |
| /// <summary> | |
| /// Processes ZooKeeper event | |
| /// </summary> | |
| /// <param name="e"> | |
| /// The event data. | |
| /// </param> | |
| /// <remarks> | |
| /// Requires installed watcher | |
| /// </remarks> | |
| public void Process(WatchedEvent e) | |
| { | |
| this.EnsuresNotDisposed(); | |
| Logger.Debug("Received event: " + e); | |
| this.zooKeeperEventWorker = Thread.CurrentThread; | |
| if (this.shutdownTriggered) | |
| { | |
| Logger.Debug("ignoring event '{" + e.Type + " | " + e.Path + "}' since shutdown triggered"); | |
| return; | |
| } | |
| bool stateChanged = e.Path == null; | |
| bool znodeChanged = e.Path != null; | |
| bool dataChanged = | |
| e.Type == EventType.NodeDataChanged | |
| || e.Type == EventType.NodeDeleted | |
| || e.Type == EventType.NodeCreated | |
| || e.Type == EventType.NodeChildrenChanged; | |
| lock (this.somethingChanged) | |
| { | |
| try | |
| { | |
| if (stateChanged) | |
| { | |
| this.ProcessStateChange(e); | |
| } | |
| if (dataChanged) | |
| { | |
| this.ProcessDataOrChildChange(e); | |
| } | |
| } | |
| finally | |
| { | |
| if (stateChanged) | |
| { | |
| lock (this.stateChangedLock) | |
| { | |
| Monitor.PulseAll(this.stateChangedLock); | |
| } | |
| if (e.State == KeeperState.Expired) | |
| { | |
| lock (this.znodeChangedLock) | |
| { | |
| Monitor.PulseAll(this.znodeChangedLock); | |
| } | |
| foreach (string path in this.childChangedHandlers.Keys) | |
| { | |
| this.Enqueue(new ZooKeeperChildChangedEventArgs(path)); | |
| } | |
| foreach (string path in this.dataChangedHandlers.Keys) | |
| { | |
| this.Enqueue(new ZooKeeperDataChangedEventArgs(path)); | |
| } | |
| } | |
| } | |
| if (znodeChanged) | |
| { | |
| lock (this.znodeChangedLock) | |
| { | |
| Monitor.PulseAll(this.znodeChangedLock); | |
| } | |
| } | |
| } | |
| Monitor.PulseAll(this.somethingChanged); | |
| } | |
| } | |
| /// <summary> | |
| /// Subscribes listeners on ZooKeeper state changes events | |
| /// </summary> | |
| /// <param name="listener"> | |
| /// The listener. | |
| /// </param> | |
| public void Subscribe(IZooKeeperStateListener listener) | |
| { | |
| Guard.Assert<ArgumentNullException>(() => listener != null); | |
| this.EnsuresNotDisposed(); | |
| this.StateChanged += listener.HandleStateChanged; | |
| this.SessionCreated += listener.HandleSessionCreated; | |
| Logger.Debug("Subscribed state changes handler " + listener.GetType().Name); | |
| } | |
| /// <summary> | |
| /// Un-subscribes listeners on ZooKeeper state changes events | |
| /// </summary> | |
| /// <param name="listener"> | |
| /// The listener. | |
| /// </param> | |
| public void Unsubscribe(IZooKeeperStateListener listener) | |
| { | |
| Guard.Assert<ArgumentNullException>(() => listener != null); | |
| this.EnsuresNotDisposed(); | |
| this.StateChanged -= listener.HandleStateChanged; | |
| this.SessionCreated -= listener.HandleSessionCreated; | |
| Logger.Debug("Unsubscribed state changes handler " + listener.GetType().Name); | |
| } | |
| /// <summary> | |
| /// Subscribes listeners on ZooKeeper child changes under given path | |
| /// </summary> | |
| /// <param name="path"> | |
| /// The parent path. | |
| /// </param> | |
| /// <param name="listener"> | |
| /// The listener. | |
| /// </param> | |
| public void Subscribe(string path, IZooKeeperChildListener listener) | |
| { | |
| Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path)); | |
| Guard.Assert<ArgumentNullException>(() => listener != null); | |
| this.EnsuresNotDisposed(); | |
| this.childChangedHandlers.AddOrUpdate( | |
| path, | |
| new ChildChangedEventItem(Logger, listener.HandleChildChange), | |
| (key, oldValue) => { oldValue.ChildChanged += listener.HandleChildChange; return oldValue; }); | |
| this.WatchForChilds(path); | |
| Logger.Debug("Subscribed child changes handler " + listener.GetType().Name + " for path: " + path); | |
| } | |
| /// <summary> | |
| /// Un-subscribes listeners on ZooKeeper child changes under given path | |
| /// </summary> | |
| /// <param name="path"> | |
| /// The parent path. | |
| /// </param> | |
| /// <param name="listener"> | |
| /// The listener. | |
| /// </param> | |
| public void Unsubscribe(string path, IZooKeeperChildListener listener) | |
| { | |
| Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path)); | |
| Guard.Assert<ArgumentNullException>(() => listener != null); | |
| this.EnsuresNotDisposed(); | |
| this.childChangedHandlers.AddOrUpdate( | |
| path, | |
| new ChildChangedEventItem(Logger), | |
| (key, oldValue) => { oldValue.ChildChanged -= listener.HandleChildChange; return oldValue; }); | |
| Logger.Debug("Unsubscribed child changes handler " + listener.GetType().Name + " for path: " + path); | |
| } | |
| /// <summary> | |
| /// Subscribes listeners on ZooKeeper data changes under given path | |
| /// </summary> | |
| /// <param name="path"> | |
| /// The parent path. | |
| /// </param> | |
| /// <param name="listener"> | |
| /// The listener. | |
| /// </param> | |
| public void Subscribe(string path, IZooKeeperDataListener listener) | |
| { | |
| Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path)); | |
| Guard.Assert<ArgumentNullException>(() => listener != null); | |
| this.EnsuresNotDisposed(); | |
| this.dataChangedHandlers.AddOrUpdate( | |
| path, | |
| new DataChangedEventItem(Logger, listener.HandleDataChange, listener.HandleDataDelete), | |
| (key, oldValue) => | |
| { | |
| oldValue.DataChanged += listener.HandleDataChange; | |
| oldValue.DataDeleted += listener.HandleDataDelete; | |
| return oldValue; | |
| }); | |
| this.WatchForData(path); | |
| Logger.Debug("Subscribed data changes handler " + listener.GetType().Name + " for path: " + path); | |
| } | |
| /// <summary> | |
| /// Un-subscribes listeners on ZooKeeper data changes under given path | |
| /// </summary> | |
| /// <param name="path"> | |
| /// The parent path. | |
| /// </param> | |
| /// <param name="listener"> | |
| /// The listener. | |
| /// </param> | |
| public void Unsubscribe(string path, IZooKeeperDataListener listener) | |
| { | |
| Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path)); | |
| Guard.Assert<ArgumentNullException>(() => listener != null); | |
| this.EnsuresNotDisposed(); | |
| this.dataChangedHandlers.AddOrUpdate( | |
| path, | |
| new DataChangedEventItem(Logger), | |
| (key, oldValue) => | |
| { | |
| oldValue.DataChanged -= listener.HandleDataChange; | |
| oldValue.DataDeleted -= listener.HandleDataDelete; | |
| return oldValue; | |
| }); | |
| Logger.Debug("Unsubscribed data changes handler " + listener.GetType().Name + " for path: " + path); | |
| } | |
| /// <summary> | |
| /// Un-subscribes all listeners | |
| /// </summary> | |
| public void UnsubscribeAll() | |
| { | |
| this.EnsuresNotDisposed(); | |
| lock (this.eventLock) | |
| { | |
| this.stateChangedHandlers = null; | |
| this.sessionCreatedHandlers = null; | |
| this.childChangedHandlers.Clear(); | |
| this.dataChangedHandlers.Clear(); | |
| } | |
| Logger.Debug("Unsubscribed all handlers"); | |
| } | |
| /// <summary> | |
| /// Installs a child watch for the given path. | |
| /// </summary> | |
| /// <param name="path"> | |
| /// The parent path. | |
| /// </param> | |
| /// <returns> | |
| /// the current children of the path or null if the znode with the given path doesn't exist | |
| /// </returns> | |
| public IList<string> WatchForChilds(string path) | |
| { | |
| Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path)); | |
| this.EnsuresNotDisposed(); | |
| if (this.zooKeeperEventWorker != null && Thread.CurrentThread == this.zooKeeperEventWorker) | |
| { | |
| throw new InvalidOperationException("Must not be done in the zookeeper event thread."); | |
| } | |
| return this.RetryUntilConnected( | |
| () => | |
| { | |
| this.Exists(path); | |
| try | |
| { | |
| return this.GetChildren(path); | |
| } | |
| catch (KeeperException.NoNodeException) | |
| { | |
| return null; | |
| } | |
| }); | |
| } | |
| /// <summary> | |
| /// Installs a data watch for the given path. | |
| /// </summary> | |
| /// <param name="path"> | |
| /// The parent path. | |
| /// </param> | |
| public void WatchForData(string path) | |
| { | |
| Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path)); | |
| this.EnsuresNotDisposed(); | |
| this.RetryUntilConnected( | |
| () => this.Exists(path, true)); | |
| } | |
| /// <summary> | |
| /// Checks whether any data or child listeners are registered | |
| /// </summary> | |
| /// <param name="path"> | |
| /// The path. | |
| /// </param> | |
| /// <returns> | |
| /// Value indicates whether any data or child listeners are registered | |
| /// </returns> | |
| private bool HasListeners(string path) | |
| { | |
| ChildChangedEventItem childChanged; | |
| this.childChangedHandlers.TryGetValue(path, out childChanged); | |
| if (childChanged != null && childChanged.Count > 0) | |
| { | |
| return true; | |
| } | |
| DataChangedEventItem dataChanged; | |
| this.dataChangedHandlers.TryGetValue(path, out dataChanged); | |
| if (dataChanged != null && dataChanged.TotalCount > 0) | |
| { | |
| return true; | |
| } | |
| return false; | |
| } | |
| /// <summary> | |
| /// Event thread starting method | |
| /// </summary> | |
| private void RunEventWorker() | |
| { | |
| Logger.Debug("Starting ZooKeeper watcher event thread"); | |
| try | |
| { | |
| this.PoolEventsQueue(); | |
| } | |
| catch (ThreadInterruptedException) | |
| { | |
| Logger.Debug("Terminate ZooKeeper watcher event thread"); | |
| } | |
| } | |
| /// <summary> | |
| /// Pools ZooKeeper events form events queue | |
| /// </summary> | |
| /// <remarks> | |
| /// Thread sleeps if queue is empty | |
| /// </remarks> | |
| private void PoolEventsQueue() | |
| { | |
| while (true) | |
| { | |
| while (!this.eventsQueue.IsEmpty) | |
| { | |
| this.Dequeue(); | |
| } | |
| lock (this.somethingChanged) | |
| { | |
| Logger.Debug("Awaiting events ..."); | |
| this.idleTime = DateTime.Now; | |
| Monitor.Wait(this.somethingChanged); | |
| this.idleTime = null; | |
| } | |
| } | |
| } | |
| /// <summary> | |
| /// Enqueues new event from ZooKeeper in events queue | |
| /// </summary> | |
| /// <param name="e"> | |
| /// The event from ZooKeeper. | |
| /// </param> | |
| private void Enqueue(ZooKeeperEventArgs e) | |
| { | |
| Logger.Debug("New event queued: " + e); | |
| this.eventsQueue.Enqueue(e); | |
| } | |
| /// <summary> | |
| /// Dequeues event from events queue and invokes subscribed handlers | |
| /// </summary> | |
| private void Dequeue() | |
| { | |
| try | |
| { | |
| ZooKeeperEventArgs e; | |
| var success = this.eventsQueue.TryDequeue(out e); | |
| if (success) | |
| { | |
| if (e != null) | |
| { | |
| Logger.Debug("Event dequeued: " + e); | |
| switch (e.Type) | |
| { | |
| case ZooKeeperEventTypes.StateChanged: | |
| this.OnStateChanged((ZooKeeperStateChangedEventArgs)e); | |
| break; | |
| case ZooKeeperEventTypes.SessionCreated: | |
| this.OnSessionCreated((ZooKeeperSessionCreatedEventArgs)e); | |
| break; | |
| case ZooKeeperEventTypes.ChildChanged: | |
| this.OnChildChanged((ZooKeeperChildChangedEventArgs)e); | |
| break; | |
| case ZooKeeperEventTypes.DataChanged: | |
| this.OnDataChanged((ZooKeeperDataChangedEventArgs)e); | |
| break; | |
| default: | |
| throw new InvalidOperationException("Not supported event type"); | |
| } | |
| } | |
| } | |
| } | |
| catch (Exception exc) | |
| { | |
| Logger.Warn("Error handling event ", exc); | |
| } | |
| } | |
| /// <summary> | |
| /// Processess ZooKeeper state changes events | |
| /// </summary> | |
| /// <param name="e"> | |
| /// The event data. | |
| /// </param> | |
| private void ProcessStateChange(WatchedEvent e) | |
| { | |
| Logger.Info("zookeeper state changed (" + e.State + ")"); | |
| lock (this.stateChangedLock) | |
| { | |
| this.currentState = e.State; | |
| } | |
| if (this.shutdownTriggered) | |
| { | |
| return; | |
| } | |
| this.Enqueue(new ZooKeeperStateChangedEventArgs(e.State)); | |
| if (e.State == KeeperState.Expired) | |
| { | |
| this.Reconnect(this.connection.Servers, this.connection.SessionTimeout); | |
| this.Enqueue(ZooKeeperSessionCreatedEventArgs.Empty); | |
| } | |
| } | |
| /// <summary> | |
| /// Processess ZooKeeper childs or data changes events | |
| /// </summary> | |
| /// <param name="e"> | |
| /// The event data. | |
| /// </param> | |
| private void ProcessDataOrChildChange(WatchedEvent e) | |
| { | |
| if (this.shutdownTriggered) | |
| { | |
| return; | |
| } | |
| if (e.Type == EventType.NodeChildrenChanged | |
| || e.Type == EventType.NodeCreated | |
| || e.Type == EventType.NodeDeleted) | |
| { | |
| this.Enqueue(new ZooKeeperChildChangedEventArgs(e.Path)); | |
| } | |
| if (e.Type == EventType.NodeDataChanged | |
| || e.Type == EventType.NodeCreated | |
| || e.Type == EventType.NodeDeleted) | |
| { | |
| this.Enqueue(new ZooKeeperDataChangedEventArgs(e.Path)); | |
| } | |
| } | |
| /// <summary> | |
| /// Invokes subscribed handlers for ZooKeeeper state changes event | |
| /// </summary> | |
| /// <param name="e"> | |
| /// The event data. | |
| /// </param> | |
| private void OnStateChanged(ZooKeeperStateChangedEventArgs e) | |
| { | |
| try | |
| { | |
| var handlers = this.stateChangedHandlers; | |
| if (handlers == null) | |
| { | |
| return; | |
| } | |
| foreach (var handler in handlers.GetInvocationList()) | |
| { | |
| Logger.Debug(e + " sent to " + handler.Target); | |
| } | |
| handlers(e); | |
| } | |
| catch (Exception exc) | |
| { | |
| Logger.Error("Failed to handle state changed event.", exc); | |
| } | |
| } | |
| /// <summary> | |
| /// Invokes subscribed handlers for ZooKeeeper session re-creates event | |
| /// </summary> | |
| /// <param name="e"> | |
| /// The event data. | |
| /// </param> | |
| private void OnSessionCreated(ZooKeeperSessionCreatedEventArgs e) | |
| { | |
| var handlers = this.sessionCreatedHandlers; | |
| if (handlers == null) | |
| { | |
| return; | |
| } | |
| foreach (var handler in handlers.GetInvocationList()) | |
| { | |
| Logger.Debug(e + " sent to " + handler.Target); | |
| } | |
| handlers(e); | |
| } | |
| /// <summary> | |
| /// Invokes subscribed handlers for ZooKeeeper child changes event | |
| /// </summary> | |
| /// <param name="e"> | |
| /// The event data. | |
| /// </param> | |
| private void OnChildChanged(ZooKeeperChildChangedEventArgs e) | |
| { | |
| ChildChangedEventItem handlers; | |
| this.childChangedHandlers.TryGetValue(e.Path, out handlers); | |
| if (handlers == null || handlers.Count == 0) | |
| { | |
| return; | |
| } | |
| this.Exists(e.Path); | |
| try | |
| { | |
| IList<string> children = this.GetChildren(e.Path); | |
| e.Children = children; | |
| } | |
| catch (KeeperException.NoNodeException) | |
| { | |
| } | |
| handlers.OnChildChanged(e); | |
| } | |
| /// <summary> | |
| /// Invokes subscribed handlers for ZooKeeeper data changes event | |
| /// </summary> | |
| /// <param name="e"> | |
| /// The event data. | |
| /// </param> | |
| private void OnDataChanged(ZooKeeperDataChangedEventArgs e) | |
| { | |
| DataChangedEventItem handlers; | |
| this.dataChangedHandlers.TryGetValue(e.Path, out handlers); | |
| if (handlers == null || handlers.TotalCount == 0) | |
| { | |
| return; | |
| } | |
| try | |
| { | |
| this.Exists(e.Path, true); | |
| var data = this.ReadData<string>(e.Path, null, true); | |
| e.Data = data; | |
| handlers.OnDataChanged(e); | |
| } | |
| catch (KeeperException.NoNodeException) | |
| { | |
| handlers.OnDataDeleted(e); | |
| } | |
| } | |
| } | |
| } |