blob: d17335de00c9abf6aa13e78b7edcc3ddb61d3bc5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.ZooKeeperIntegration
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using Kafka.Client.Utils;
using Kafka.Client.ZooKeeperIntegration.Events;
using Kafka.Client.ZooKeeperIntegration.Listeners;
using ZooKeeperNet;
internal partial class ZooKeeperClient
{
/// <summary>
/// Represents the method that will handle a ZooKeeper event
/// </summary>
/// <param name="args">
/// The args.
/// </param>
/// <typeparam name="T">
/// Type of event data
/// </typeparam>
public delegate void ZooKeeperEventHandler<T>(T args)
where T : ZooKeeperEventArgs;
/// <summary>
/// Occurs when ZooKeeper connection state changes
/// </summary>
public event ZooKeeperEventHandler<ZooKeeperStateChangedEventArgs> StateChanged
{
add
{
this.EnsuresNotDisposed();
lock (this.eventLock)
{
this.stateChangedHandlers -= value;
this.stateChangedHandlers += value;
}
}
remove
{
this.EnsuresNotDisposed();
lock (this.eventLock)
{
this.stateChangedHandlers -= value;
}
}
}
/// <summary>
/// Occurs when ZooKeeper session re-creates
/// </summary>
public event ZooKeeperEventHandler<ZooKeeperSessionCreatedEventArgs> SessionCreated
{
add
{
this.EnsuresNotDisposed();
lock (this.eventLock)
{
this.sessionCreatedHandlers -= value;
this.sessionCreatedHandlers += value;
}
}
remove
{
this.EnsuresNotDisposed();
lock (this.eventLock)
{
this.sessionCreatedHandlers -= value;
}
}
}
private readonly ConcurrentQueue<ZooKeeperEventArgs> eventsQueue = new ConcurrentQueue<ZooKeeperEventArgs>();
private readonly object eventLock = new object();
private ZooKeeperEventHandler<ZooKeeperStateChangedEventArgs> stateChangedHandlers;
private ZooKeeperEventHandler<ZooKeeperSessionCreatedEventArgs> sessionCreatedHandlers;
private Thread eventWorker;
private Thread zooKeeperEventWorker;
private readonly ConcurrentDictionary<string, ChildChangedEventItem> childChangedHandlers = new ConcurrentDictionary<string, ChildChangedEventItem>();
private readonly ConcurrentDictionary<string, DataChangedEventItem> dataChangedHandlers = new ConcurrentDictionary<string, DataChangedEventItem>();
private DateTime? idleTime;
/// <summary>
/// Gets time (in miliseconds) of event thread iddleness
/// </summary>
/// <remarks>
/// Used for testing purpose
/// </remarks>
public int IdleTime
{
get
{
return this.idleTime.HasValue ? Convert.ToInt32((DateTime.Now - this.idleTime.Value).TotalMilliseconds) : 0;
}
}
/// <summary>
/// Processes ZooKeeper event
/// </summary>
/// <param name="e">
/// The event data.
/// </param>
/// <remarks>
/// Requires installed watcher
/// </remarks>
public void Process(WatchedEvent e)
{
this.EnsuresNotDisposed();
Logger.Debug("Received event: " + e);
this.zooKeeperEventWorker = Thread.CurrentThread;
if (this.shutdownTriggered)
{
Logger.Debug("ignoring event '{" + e.Type + " | " + e.Path + "}' since shutdown triggered");
return;
}
bool stateChanged = e.Path == null;
bool znodeChanged = e.Path != null;
bool dataChanged =
e.Type == EventType.NodeDataChanged
|| e.Type == EventType.NodeDeleted
|| e.Type == EventType.NodeCreated
|| e.Type == EventType.NodeChildrenChanged;
lock (this.somethingChanged)
{
try
{
if (stateChanged)
{
this.ProcessStateChange(e);
}
if (dataChanged)
{
this.ProcessDataOrChildChange(e);
}
}
finally
{
if (stateChanged)
{
lock (this.stateChangedLock)
{
Monitor.PulseAll(this.stateChangedLock);
}
if (e.State == KeeperState.Expired)
{
lock (this.znodeChangedLock)
{
Monitor.PulseAll(this.znodeChangedLock);
}
foreach (string path in this.childChangedHandlers.Keys)
{
this.Enqueue(new ZooKeeperChildChangedEventArgs(path));
}
foreach (string path in this.dataChangedHandlers.Keys)
{
this.Enqueue(new ZooKeeperDataChangedEventArgs(path));
}
}
}
if (znodeChanged)
{
lock (this.znodeChangedLock)
{
Monitor.PulseAll(this.znodeChangedLock);
}
}
}
Monitor.PulseAll(this.somethingChanged);
}
}
/// <summary>
/// Subscribes listeners on ZooKeeper state changes events
/// </summary>
/// <param name="listener">
/// The listener.
/// </param>
public void Subscribe(IZooKeeperStateListener listener)
{
Guard.Assert<ArgumentNullException>(() => listener != null);
this.EnsuresNotDisposed();
this.StateChanged += listener.HandleStateChanged;
this.SessionCreated += listener.HandleSessionCreated;
Logger.Debug("Subscribed state changes handler " + listener.GetType().Name);
}
/// <summary>
/// Un-subscribes listeners on ZooKeeper state changes events
/// </summary>
/// <param name="listener">
/// The listener.
/// </param>
public void Unsubscribe(IZooKeeperStateListener listener)
{
Guard.Assert<ArgumentNullException>(() => listener != null);
this.EnsuresNotDisposed();
this.StateChanged -= listener.HandleStateChanged;
this.SessionCreated -= listener.HandleSessionCreated;
Logger.Debug("Unsubscribed state changes handler " + listener.GetType().Name);
}
/// <summary>
/// Subscribes listeners on ZooKeeper child changes under given path
/// </summary>
/// <param name="path">
/// The parent path.
/// </param>
/// <param name="listener">
/// The listener.
/// </param>
public void Subscribe(string path, IZooKeeperChildListener listener)
{
Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
Guard.Assert<ArgumentNullException>(() => listener != null);
this.EnsuresNotDisposed();
this.childChangedHandlers.AddOrUpdate(
path,
new ChildChangedEventItem(Logger, listener.HandleChildChange),
(key, oldValue) => { oldValue.ChildChanged += listener.HandleChildChange; return oldValue; });
this.WatchForChilds(path);
Logger.Debug("Subscribed child changes handler " + listener.GetType().Name + " for path: " + path);
}
/// <summary>
/// Un-subscribes listeners on ZooKeeper child changes under given path
/// </summary>
/// <param name="path">
/// The parent path.
/// </param>
/// <param name="listener">
/// The listener.
/// </param>
public void Unsubscribe(string path, IZooKeeperChildListener listener)
{
Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
Guard.Assert<ArgumentNullException>(() => listener != null);
this.EnsuresNotDisposed();
this.childChangedHandlers.AddOrUpdate(
path,
new ChildChangedEventItem(Logger),
(key, oldValue) => { oldValue.ChildChanged -= listener.HandleChildChange; return oldValue; });
Logger.Debug("Unsubscribed child changes handler " + listener.GetType().Name + " for path: " + path);
}
/// <summary>
/// Subscribes listeners on ZooKeeper data changes under given path
/// </summary>
/// <param name="path">
/// The parent path.
/// </param>
/// <param name="listener">
/// The listener.
/// </param>
public void Subscribe(string path, IZooKeeperDataListener listener)
{
Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
Guard.Assert<ArgumentNullException>(() => listener != null);
this.EnsuresNotDisposed();
this.dataChangedHandlers.AddOrUpdate(
path,
new DataChangedEventItem(Logger, listener.HandleDataChange, listener.HandleDataDelete),
(key, oldValue) =>
{
oldValue.DataChanged += listener.HandleDataChange;
oldValue.DataDeleted += listener.HandleDataDelete;
return oldValue;
});
this.WatchForData(path);
Logger.Debug("Subscribed data changes handler " + listener.GetType().Name + " for path: " + path);
}
/// <summary>
/// Un-subscribes listeners on ZooKeeper data changes under given path
/// </summary>
/// <param name="path">
/// The parent path.
/// </param>
/// <param name="listener">
/// The listener.
/// </param>
public void Unsubscribe(string path, IZooKeeperDataListener listener)
{
Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
Guard.Assert<ArgumentNullException>(() => listener != null);
this.EnsuresNotDisposed();
this.dataChangedHandlers.AddOrUpdate(
path,
new DataChangedEventItem(Logger),
(key, oldValue) =>
{
oldValue.DataChanged -= listener.HandleDataChange;
oldValue.DataDeleted -= listener.HandleDataDelete;
return oldValue;
});
Logger.Debug("Unsubscribed data changes handler " + listener.GetType().Name + " for path: " + path);
}
/// <summary>
/// Un-subscribes all listeners
/// </summary>
public void UnsubscribeAll()
{
this.EnsuresNotDisposed();
lock (this.eventLock)
{
this.stateChangedHandlers = null;
this.sessionCreatedHandlers = null;
this.childChangedHandlers.Clear();
this.dataChangedHandlers.Clear();
}
Logger.Debug("Unsubscribed all handlers");
}
/// <summary>
/// Installs a child watch for the given path.
/// </summary>
/// <param name="path">
/// The parent path.
/// </param>
/// <returns>
/// the current children of the path or null if the znode with the given path doesn't exist
/// </returns>
public IList<string> WatchForChilds(string path)
{
Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
this.EnsuresNotDisposed();
if (this.zooKeeperEventWorker != null && Thread.CurrentThread == this.zooKeeperEventWorker)
{
throw new InvalidOperationException("Must not be done in the zookeeper event thread.");
}
return this.RetryUntilConnected(
() =>
{
this.Exists(path);
try
{
return this.GetChildren(path);
}
catch (KeeperException.NoNodeException)
{
return null;
}
});
}
/// <summary>
/// Installs a data watch for the given path.
/// </summary>
/// <param name="path">
/// The parent path.
/// </param>
public void WatchForData(string path)
{
Guard.Assert<ArgumentException>(() => !string.IsNullOrEmpty(path));
this.EnsuresNotDisposed();
this.RetryUntilConnected(
() => this.Exists(path, true));
}
/// <summary>
/// Checks whether any data or child listeners are registered
/// </summary>
/// <param name="path">
/// The path.
/// </param>
/// <returns>
/// Value indicates whether any data or child listeners are registered
/// </returns>
private bool HasListeners(string path)
{
ChildChangedEventItem childChanged;
this.childChangedHandlers.TryGetValue(path, out childChanged);
if (childChanged != null && childChanged.Count > 0)
{
return true;
}
DataChangedEventItem dataChanged;
this.dataChangedHandlers.TryGetValue(path, out dataChanged);
if (dataChanged != null && dataChanged.TotalCount > 0)
{
return true;
}
return false;
}
/// <summary>
/// Event thread starting method
/// </summary>
private void RunEventWorker()
{
Logger.Debug("Starting ZooKeeper watcher event thread");
try
{
this.PoolEventsQueue();
}
catch (ThreadInterruptedException)
{
Logger.Debug("Terminate ZooKeeper watcher event thread");
}
}
/// <summary>
/// Pools ZooKeeper events form events queue
/// </summary>
/// <remarks>
/// Thread sleeps if queue is empty
/// </remarks>
private void PoolEventsQueue()
{
while (true)
{
while (!this.eventsQueue.IsEmpty)
{
this.Dequeue();
}
lock (this.somethingChanged)
{
Logger.Debug("Awaiting events ...");
this.idleTime = DateTime.Now;
Monitor.Wait(this.somethingChanged);
this.idleTime = null;
}
}
}
/// <summary>
/// Enqueues new event from ZooKeeper in events queue
/// </summary>
/// <param name="e">
/// The event from ZooKeeper.
/// </param>
private void Enqueue(ZooKeeperEventArgs e)
{
Logger.Debug("New event queued: " + e);
this.eventsQueue.Enqueue(e);
}
/// <summary>
/// Dequeues event from events queue and invokes subscribed handlers
/// </summary>
private void Dequeue()
{
try
{
ZooKeeperEventArgs e;
var success = this.eventsQueue.TryDequeue(out e);
if (success)
{
if (e != null)
{
Logger.Debug("Event dequeued: " + e);
switch (e.Type)
{
case ZooKeeperEventTypes.StateChanged:
this.OnStateChanged((ZooKeeperStateChangedEventArgs)e);
break;
case ZooKeeperEventTypes.SessionCreated:
this.OnSessionCreated((ZooKeeperSessionCreatedEventArgs)e);
break;
case ZooKeeperEventTypes.ChildChanged:
this.OnChildChanged((ZooKeeperChildChangedEventArgs)e);
break;
case ZooKeeperEventTypes.DataChanged:
this.OnDataChanged((ZooKeeperDataChangedEventArgs)e);
break;
default:
throw new InvalidOperationException("Not supported event type");
}
}
}
}
catch (Exception exc)
{
Logger.Warn("Error handling event ", exc);
}
}
/// <summary>
/// Processess ZooKeeper state changes events
/// </summary>
/// <param name="e">
/// The event data.
/// </param>
private void ProcessStateChange(WatchedEvent e)
{
Logger.Info("zookeeper state changed (" + e.State + ")");
lock (this.stateChangedLock)
{
this.currentState = e.State;
}
if (this.shutdownTriggered)
{
return;
}
this.Enqueue(new ZooKeeperStateChangedEventArgs(e.State));
if (e.State == KeeperState.Expired)
{
this.Reconnect(this.connection.Servers, this.connection.SessionTimeout);
this.Enqueue(ZooKeeperSessionCreatedEventArgs.Empty);
}
}
/// <summary>
/// Processess ZooKeeper childs or data changes events
/// </summary>
/// <param name="e">
/// The event data.
/// </param>
private void ProcessDataOrChildChange(WatchedEvent e)
{
if (this.shutdownTriggered)
{
return;
}
if (e.Type == EventType.NodeChildrenChanged
|| e.Type == EventType.NodeCreated
|| e.Type == EventType.NodeDeleted)
{
this.Enqueue(new ZooKeeperChildChangedEventArgs(e.Path));
}
if (e.Type == EventType.NodeDataChanged
|| e.Type == EventType.NodeCreated
|| e.Type == EventType.NodeDeleted)
{
this.Enqueue(new ZooKeeperDataChangedEventArgs(e.Path));
}
}
/// <summary>
/// Invokes subscribed handlers for ZooKeeeper state changes event
/// </summary>
/// <param name="e">
/// The event data.
/// </param>
private void OnStateChanged(ZooKeeperStateChangedEventArgs e)
{
try
{
var handlers = this.stateChangedHandlers;
if (handlers == null)
{
return;
}
foreach (var handler in handlers.GetInvocationList())
{
Logger.Debug(e + " sent to " + handler.Target);
}
handlers(e);
}
catch (Exception exc)
{
Logger.Error("Failed to handle state changed event.", exc);
}
}
/// <summary>
/// Invokes subscribed handlers for ZooKeeeper session re-creates event
/// </summary>
/// <param name="e">
/// The event data.
/// </param>
private void OnSessionCreated(ZooKeeperSessionCreatedEventArgs e)
{
var handlers = this.sessionCreatedHandlers;
if (handlers == null)
{
return;
}
foreach (var handler in handlers.GetInvocationList())
{
Logger.Debug(e + " sent to " + handler.Target);
}
handlers(e);
}
/// <summary>
/// Invokes subscribed handlers for ZooKeeeper child changes event
/// </summary>
/// <param name="e">
/// The event data.
/// </param>
private void OnChildChanged(ZooKeeperChildChangedEventArgs e)
{
ChildChangedEventItem handlers;
this.childChangedHandlers.TryGetValue(e.Path, out handlers);
if (handlers == null || handlers.Count == 0)
{
return;
}
this.Exists(e.Path);
try
{
IList<string> children = this.GetChildren(e.Path);
e.Children = children;
}
catch (KeeperException.NoNodeException)
{
}
handlers.OnChildChanged(e);
}
/// <summary>
/// Invokes subscribed handlers for ZooKeeeper data changes event
/// </summary>
/// <param name="e">
/// The event data.
/// </param>
private void OnDataChanged(ZooKeeperDataChangedEventArgs e)
{
DataChangedEventItem handlers;
this.dataChangedHandlers.TryGetValue(e.Path, out handlers);
if (handlers == null || handlers.TotalCount == 0)
{
return;
}
try
{
this.Exists(e.Path, true);
var data = this.ReadData<string>(e.Path, null, true);
e.Data = data;
handlers.OnDataChanged(e);
}
catch (KeeperException.NoNodeException)
{
handlers.OnDataDeleted(e);
}
}
}
}