blob: 633a5ac32bba30dab524d6ca40d92433e8954415 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.ZooKeeperIntegration
{
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using Kafka.Client.Exceptions;
using Kafka.Client.Utils;
using log4net;
using Org.Apache.Zookeeper.Data;
using ZooKeeperNet;
/// <summary>
/// Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on nodes in ZooKeeper
/// </summary>
internal partial class ZooKeeperClient : IZooKeeperClient
{
private const int DefaultConnectionTimeout = int.MaxValue;
public const string DefaultConsumersPath = "/consumers";
public const string DefaultBrokerIdsPath = "/brokers/ids";
public const string DefaultBrokerTopicsPath = "/brokers/topics";
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private IZooKeeperConnection connection;
private bool shutdownTriggered;
private KeeperState currentState;
private readonly IZooKeeperSerializer serializer;
private readonly object stateChangedLock = new object();
private readonly object znodeChangedLock = new object();
private readonly object somethingChanged = new object();
private readonly object shuttingDownLock = new object();
private volatile bool disposed;
private readonly int connectionTimeout;
/// <summary>
/// Initializes a new instance of the <see cref="ZooKeeperClient"/> class.
/// </summary>
/// <param name="connection">
/// The connection to ZooKeeper.
/// </param>
/// <param name="serializer">
/// The given serializer.
/// </param>
/// <param name="connectionTimeout">
/// The connection timeout (in miliseconds). Default is infinitive.
/// </param>
/// <remarks>
/// Default serializer is string UTF-8 serializer
/// </remarks>
public ZooKeeperClient(
IZooKeeperConnection connection,
IZooKeeperSerializer serializer,
int connectionTimeout = DefaultConnectionTimeout)
{
this.serializer = serializer;
this.connection = connection;
this.connectionTimeout = connectionTimeout;
}
/// <summary>
/// Initializes a new instance of the <see cref="ZooKeeperClient"/> class.
/// </summary>
/// <param name="servers">
/// The list of ZooKeeper servers.
/// </param>
/// <param name="sessionTimeout">
/// The session timeout (in miliseconds).
/// </param>
/// <param name="serializer">
/// The given serializer.
/// </param>
/// <remarks>
/// Default serializer is string UTF-8 serializer.
/// It is recommended to use quite large sessions timeouts for ZooKeeper.
/// </remarks>
public ZooKeeperClient(string servers, int sessionTimeout, IZooKeeperSerializer serializer)
: this(new ZooKeeperConnection(servers, sessionTimeout), serializer)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="ZooKeeperClient"/> class.
/// </summary>
/// <param name="servers">
/// The list of ZooKeeper servers.
/// </param>
/// <param name="sessionTimeout">
/// The session timeout (in miliseconds).
/// </param>
/// <param name="serializer">
/// The given serializer.
/// </param>
/// <param name="connectionTimeout">
/// The connection timeout (in miliseconds).
/// </param>
/// <remarks>
/// Default serializer is string UTF-8 serializer.
/// It is recommended to use quite large sessions timeouts for ZooKeeper.
/// </remarks>
public ZooKeeperClient(
string servers,
int sessionTimeout,
IZooKeeperSerializer serializer,
int connectionTimeout)
: this(new ZooKeeperConnection(servers, sessionTimeout), serializer, connectionTimeout)
{
}
/// <summary>
/// Connects to ZooKeeper server within given time period and installs watcher in ZooKeeper
/// </summary>
/// <remarks>
/// Also, starts background thread for event handling
/// </remarks>
public void Connect()
{
this.EnsuresNotDisposed();
bool started = false;
try
{
this.shutdownTriggered = false;
this.eventWorker = new Thread(this.RunEventWorker) { IsBackground = true };
this.eventWorker.Name = "ZooKeeperkWatcher-EventThread-" + this.eventWorker.ManagedThreadId + "-" + this.connection.Servers;
this.eventWorker.Start();
this.connection.Connect(this);
Logger.Debug("Awaiting connection to Zookeeper server");
if (!this.WaitUntilConnected(this.connectionTimeout))
{
throw new ZooKeeperException(
"Unable to connect to zookeeper server within timeout: " + this.connection.SessionTimeout);
}
started = true;
Logger.Debug("Connection to Zookeeper server established");
}
catch (ThreadInterruptedException)
{
throw new InvalidOperationException(
"Not connected with zookeeper server yet. Current state is " + this.connection.ClientState);
}
finally
{
if (!started)
{
this.Disconnect();
}
}
}
/// <summary>
/// Closes current connection to ZooKeeper
/// </summary>
/// <remarks>
/// Also, stops background thread
/// </remarks>
public void Disconnect()
{
Logger.Debug("Closing ZooKeeperClient...");
this.shutdownTriggered = true;
this.eventWorker.Interrupt();
this.eventWorker.Join(2000);
this.connection.Dispose();
this.connection = null;
}
/// <summary>
/// Re-connect to ZooKeeper server when session expired
/// </summary>
/// <param name="servers">
/// The servers.
/// </param>
/// <param name="connectionTimeout">
/// The connection timeout.
/// </param>
public void Reconnect(string servers, int connectionTimeout)
{
this.EnsuresNotDisposed();
Logger.Debug("Reconnecting");
this.connection.Dispose();
this.connection = new ZooKeeperConnection(servers, connectionTimeout);
this.connection.Connect(this);
Logger.Debug("Reconnected");
}
/// <summary>
/// Waits untill ZooKeeper connection is established
/// </summary>
/// <param name="connectionTimeout">
/// The connection timeout.
/// </param>
/// <returns>
/// Status
/// </returns>
public bool WaitUntilConnected(int connectionTimeout)
{
Guard.Greater(connectionTimeout, 0, "connectionTimeout");
this.EnsuresNotDisposed();
if (this.eventWorker != null && this.eventWorker == Thread.CurrentThread)
{
throw new InvalidOperationException("Must not be done in the ZooKeeper event thread.");
}
Logger.Debug("Waiting for keeper state: " + KeeperState.SyncConnected);
bool stillWaiting = true;
lock (this.stateChangedLock)
{
while (this.currentState != KeeperState.SyncConnected)
{
if (!stillWaiting)
{
return false;
}
stillWaiting = Monitor.Wait(this.stateChangedLock, connectionTimeout);
}
Logger.Debug("State is " + this.currentState);
}
return true;
}
/// <summary>
/// Retries given delegate until connections is established
/// </summary>
/// <param name="callback">
/// The delegate to invoke.
/// </param>
/// <typeparam name="T">
/// Type of data returned by delegate
/// </typeparam>
/// <returns>
/// data returned by delegate
/// </returns>
public T RetryUntilConnected<T>(Func<T> callback)
{
Guard.NotNull(callback, "callback");
this.EnsuresNotDisposed();
if (this.zooKeeperEventWorker != null && this.zooKeeperEventWorker == Thread.CurrentThread)
{
throw new InvalidOperationException("Must not be done in the zookeeper event thread");
}
while (true)
{
try
{
return callback();
}
catch (KeeperException.ConnectionLossException)
{
Thread.Yield();
this.WaitUntilConnected(this.connection.SessionTimeout);
}
catch (KeeperException.SessionExpiredException)
{
Thread.Yield();
this.WaitUntilConnected(this.connection.SessionTimeout);
}
}
}
/// <summary>
/// Checks whether znode for a given path exists
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <returns>
/// Result of check
/// </returns>
/// <remarks>
/// Will reinstall watcher in ZooKeeper if any listener for given path exists
/// </remarks>
public bool Exists(string path)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
bool hasListeners = this.HasListeners(path);
return this.Exists(path, hasListeners);
}
/// <summary>
/// Checks whether znode for a given path exists.
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="watch">
/// Indicates whether should reinstall watcher in ZooKeeper.
/// </param>
/// <returns>
/// Result of check
/// </returns>
public bool Exists(string path, bool watch)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.RetryUntilConnected(
() => this.connection.Exists(path, watch));
}
/// <summary>
/// Gets all children for a given path
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <returns>
/// Children
/// </returns>
/// <remarks>
/// Will reinstall watcher in ZooKeeper if any listener for given path exists
/// </remarks>
public IList<string> GetChildren(string path)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
bool hasListeners = this.HasListeners(path);
return this.GetChildren(path, hasListeners);
}
/// <summary>
/// Gets all children for a given path
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="watch">
/// Indicates whether should reinstall watcher in ZooKeeper.
/// </param>
/// <returns>
/// Children
/// </returns>
public IList<string> GetChildren(string path, bool watch)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.RetryUntilConnected(
() => this.connection.GetChildren(path, watch));
}
/// <summary>
/// Counts number of children for a given path.
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <returns>
/// Number of children
/// </returns>
/// <remarks>
/// Will reinstall watcher in ZooKeeper if any listener for given path exists.
/// Returns 0 if path does not exist
/// </remarks>
public int CountChildren(string path)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
try
{
return this.GetChildren(path).Count;
}
catch (KeeperException.NoNodeException)
{
return 0;
}
}
/// <summary>
/// Fetches data from a given path in ZooKeeper
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="stats">
/// The statistics.
/// </param>
/// <param name="watch">
/// Indicates whether should reinstall watcher in ZooKeeper.
/// </param>
/// <typeparam name="T">
/// Expected type of data
/// </typeparam>
/// <returns>
/// Data
/// </returns>
/// <remarks>
/// Uses given serializer to deserialize data
/// Use null for stats
/// </remarks>
public T ReadData<T>(string path, Stat stats, bool watch)
where T : class
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
byte[] bytes = this.RetryUntilConnected(
() => this.connection.ReadData(path, stats, watch));
return this.serializer.Deserialize(bytes) as T;
}
/// <summary>
/// Fetches data from a given path in ZooKeeper
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="stats">
/// The statistics.
/// </param>
/// <typeparam name="T">
/// Expected type of data
/// </typeparam>
/// <returns>
/// Data
/// </returns>
/// <remarks>
/// Uses given serializer to deserialize data.
/// Will reinstall watcher in ZooKeeper if any listener for given path exists.
/// Use null for stats
/// </remarks>
public T ReadData<T>(string path, Stat stats) where T : class
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
bool hasListeners = this.HasListeners(path);
return this.ReadData<T>(path, null, hasListeners);
}
/// <summary>
/// Writes data for a given path
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="data">
/// The data to write.
/// </param>
public void WriteData(string path, object data)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
this.WriteData(path, data, -1);
}
/// <summary>
/// Writes data for a given path
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="data">
/// The data to write.
/// </param>
/// <param name="expectedVersion">
/// Expected version of data
/// </param>
/// <remarks>
/// Use -1 for expected version
/// </remarks>
public void WriteData(string path, object data, int expectedVersion)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
byte[] bytes = this.serializer.Serialize(data);
this.RetryUntilConnected(
() =>
{
this.connection.WriteData(path, bytes, expectedVersion);
return null as object;
});
}
/// <summary>
/// Deletes znode for a given path
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <returns>
/// Status
/// </returns>
public bool Delete(string path)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.RetryUntilConnected(
() =>
{
try
{
this.connection.Delete(path);
return true;
}
catch (KeeperException.NoNodeException)
{
return false;
}
});
}
/// <summary>
/// Deletes znode and his children for a given path
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <returns>
/// Status
/// </returns>
public bool DeleteRecursive(string path)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
IList<string> children;
try
{
children = this.GetChildren(path, false);
}
catch (KeeperException.NoNodeException)
{
return true;
}
foreach (var child in children)
{
if (!this.DeleteRecursive(path + "/" + child))
{
return false;
}
}
return this.Delete(path);
}
/// <summary>
/// Creates persistent znode and all intermediate znodes (if do not exist) for a given path
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
public void MakeSurePersistentPathExists(string path)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
if (!this.Exists(path))
{
this.CreatePersistent(path, true);
}
}
/// <summary>
/// Fetches children for a given path
/// </summary>
/// <param name="path">
/// The path.
/// </param>
/// <returns>
/// Children or null, if znode does not exist
/// </returns>
public IList<string> GetChildrenParentMayNotExist(string path)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
try
{
return this.GetChildren(path);
}
catch (KeeperException.NoNodeException)
{
return null;
}
}
/// <summary>
/// Fetches data from a given path in ZooKeeper
/// </summary>
/// <typeparam name="T">
/// Expected type of data
/// </typeparam>
/// <param name="path">
/// The given path.
/// </param>
/// <returns>
/// Data or null, if znode does not exist
/// </returns>
public T ReadData<T>(string path)
where T : class
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.ReadData<T>(path, false);
}
/// <summary>
/// Closes connection to ZooKeeper
/// </summary>
public void Dispose()
{
if (this.disposed)
{
return;
}
lock (this.shuttingDownLock)
{
if (this.disposed)
{
return;
}
this.disposed = true;
}
try
{
this.Disconnect();
}
catch (ThreadInterruptedException)
{
}
catch (Exception exc)
{
Logger.Debug("Ignoring unexpected errors on closing ZooKeeperClient", exc);
}
Logger.Debug("Closing ZooKeeperClient... done");
}
/// <summary>
/// Creates a persistent znode for a given path
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="createParents">
/// Indicates, whether should create missing intermediate znodes
/// </param>
/// <remarks>
/// Persistent znodes won't disappear after session close
/// </remarks>
public void CreatePersistent(string path, bool createParents)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
try
{
this.Create(path, null, CreateMode.Persistent);
}
catch (KeeperException.NodeExistsException)
{
if (!createParents)
{
throw;
}
}
catch (KeeperException.NoNodeException)
{
if (!createParents)
{
throw;
}
string parentDir = path.Substring(0, path.LastIndexOf('/'));
this.CreatePersistent(parentDir, true);
this.CreatePersistent(path, true);
}
}
/// <summary>
/// Creates a persistent znode for a given path
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <remarks>
/// Persistent znodes won't disappear after session close
/// Doesn't re-create missing intermediate znodes
/// </remarks>
public void CreatePersistent(string path)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
this.CreatePersistent(path, false);
}
/// <summary>
/// Creates a persistent znode for a given path and writes data into it
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="data">
/// The data to write.
/// </param>
/// <remarks>
/// Persistent znodes won't disappear after session close
/// Doesn't re-create missing intermediate znodes
/// </remarks>
public void CreatePersistent(string path, object data)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
this.Create(path, data, CreateMode.Persistent);
}
/// <summary>
/// Creates a sequential, persistent znode for a given path and writes data into it
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="data">
/// The data to write.
/// </param>
/// <remarks>
/// Persistent znodes won't disappear after session close
/// Doesn't re-create missing intermediate znodes
/// </remarks>
/// <returns>
/// The created znode's path
/// </returns>
public string CreatePersistentSequential(string path, object data)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.Create(path, data, CreateMode.PersistentSequential);
}
/// <summary>
/// Helper method to create znode
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="data">
/// The data to write.
/// </param>
/// <param name="mode">
/// The create mode.
/// </param>
/// <returns>
/// The created znode's path
/// </returns>
private string Create(string path, object data, CreateMode mode)
{
if (path == null)
{
throw new ArgumentNullException("Path must not be null");
}
byte[] bytes = data == null ? null : this.serializer.Serialize(data);
return this.RetryUntilConnected(() =>
this.connection.Create(path, bytes, mode));
}
/// <summary>
/// Creates a ephemeral znode for a given path
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <remarks>
/// Ephemeral znodes will disappear after session close
/// </remarks>
public void CreateEphemeral(string path)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
this.Create(path, null, CreateMode.Ephemeral);
}
/// <summary>
/// Creates a ephemeral znode for a given path and writes data into it
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="data">
/// The data to write.
/// </param>
/// <remarks>
/// Ephemeral znodes will disappear after session close
/// </remarks>
public void CreateEphemeral(string path, object data)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
this.Create(path, data, CreateMode.Ephemeral);
}
/// <summary>
/// Creates a ephemeral, sequential znode for a given path and writes data into it
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="data">
/// The data to write.
/// </param>
/// <remarks>
/// Ephemeral znodes will disappear after session close
/// </remarks>
/// <returns>
/// Created znode's path
/// </returns>
public string CreateEphemeralSequential(string path, object data)
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
return this.Create(path, data, CreateMode.EphemeralSequential);
}
/// <summary>
/// Fetches data for given path
/// </summary>
/// <param name="path">
/// The given path.
/// </param>
/// <param name="returnNullIfPathNotExists">
/// Indicates, whether should return null or throw exception when
/// znode doesn't exist
/// </param>
/// <typeparam name="T">
/// Expected type of data
/// </typeparam>
/// <returns>
/// Data
/// </returns>
public T ReadData<T>(string path, bool returnNullIfPathNotExists)
where T : class
{
Guard.NotNullNorEmpty(path, "path");
this.EnsuresNotDisposed();
try
{
return this.ReadData<T>(path, null);
}
catch (KeeperException.NoNodeException)
{
if (!returnNullIfPathNotExists)
{
throw;
}
return null;
}
}
/// <summary>
/// Ensures that object wasn't disposed
/// </summary>
private void EnsuresNotDisposed()
{
if (this.disposed)
{
throw new ObjectDisposedException(this.GetType().Name);
}
}
}
}