blob: 6c1a8b3ec889c7dc3488c51d1939fd6a2b87741b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.Producers.Partitioning
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Reflection;
using Kafka.Client.Cfg;
using Kafka.Client.Cluster;
using Kafka.Client.Utils;
using Kafka.Client.ZooKeeperIntegration;
using Kafka.Client.ZooKeeperIntegration.Events;
using Kafka.Client.ZooKeeperIntegration.Listeners;
using log4net;
using ZooKeeperNet;
/// <summary>
/// Fetch broker info like ID, host, port and number of partitions from ZooKeeper.
/// </summary>
/// <remarks>
/// Used when zookeeper based auto partition discovery is enabled
/// </remarks>
internal class ZKBrokerPartitionInfo : IBrokerPartitionInfo, IZooKeeperStateListener
{
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly Action<int, string, int> callback;
private IDictionary<int, Broker> brokers;
private IDictionary<string, SortedSet<Partition>> topicBrokerPartitions;
private readonly IZooKeeperClient zkclient;
private readonly BrokerTopicsListener brokerTopicsListener;
private volatile bool disposed;
private readonly object shuttingDownLock = new object();
/// <summary>
/// Initializes a new instance of the <see cref="ZKBrokerPartitionInfo"/> class.
/// </summary>
/// <param name="zkclient">The wrapper above ZooKeeper client.</param>
public ZKBrokerPartitionInfo(IZooKeeperClient zkclient)
{
this.zkclient = zkclient;
this.zkclient.Connect();
this.InitializeBrokers();
this.InitializeTopicBrokerPartitions();
this.brokerTopicsListener = new BrokerTopicsListener(this.zkclient, this.topicBrokerPartitions, this.brokers, this.callback);
this.RegisterListeners();
}
/// <summary>
/// Initializes a new instance of the <see cref="ZKBrokerPartitionInfo"/> class.
/// </summary>
/// <param name="config">The config.</param>
/// <param name="callback">The callback invoked when new broker is added.</param>
public ZKBrokerPartitionInfo(ProducerConfiguration config, Action<int, string, int> callback)
: this(new ZooKeeperClient(config.ZooKeeper.ZkConnect, config.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer))
{
this.callback = callback;
}
/// <summary>
/// Gets a mapping from broker ID to the host and port for all brokers
/// </summary>
/// <returns>
/// Mapping from broker ID to the host and port for all brokers
/// </returns>
public IDictionary<int, Broker> GetAllBrokerInfo()
{
this.EnsuresNotDisposed();
return this.brokers;
}
/// <summary>
/// Gets a mapping from broker ID to partition IDs
/// </summary>
/// <param name="topic">The topic for which this information is to be returned</param>
/// <returns>
/// Mapping from broker ID to partition IDs
/// </returns>
public SortedSet<Partition> GetBrokerPartitionInfo(string topic)
{
Guard.NotNullNorEmpty(topic, "topic");
this.EnsuresNotDisposed();
SortedSet<Partition> brokerPartitions = null;
if (this.topicBrokerPartitions.ContainsKey(topic))
{
brokerPartitions = this.topicBrokerPartitions[topic];
}
else
{
this.topicBrokerPartitions.Add(topic, null);
}
if (brokerPartitions == null || brokerPartitions.Count == 0)
{
var numBrokerPartitions = this.BootstrapWithExistingBrokers(topic);
this.topicBrokerPartitions[topic] = numBrokerPartitions;
return numBrokerPartitions;
}
return brokerPartitions;
}
/// <summary>
/// Gets the host and port information for the broker identified by the given broker ID
/// </summary>
/// <param name="brokerId">The broker ID.</param>
/// <returns>
/// Host and port of broker
/// </returns>
public Broker GetBrokerInfo(int brokerId)
{
this.EnsuresNotDisposed();
return this.brokers.ContainsKey(brokerId) ? this.brokers[brokerId] : null;
}
/// <summary>
/// Closes underlying connection to ZooKeeper
/// </summary>
public void Dispose()
{
if (this.disposed)
{
return;
}
lock (this.shuttingDownLock)
{
if (this.disposed)
{
return;
}
this.disposed = true;
}
try
{
if (this.zkclient != null)
{
this.zkclient.Dispose();
}
}
catch (Exception exc)
{
Logger.Warn("Ignoring unexpected errors on closing", exc);
}
}
/// <summary>
/// Initializes the list of brokers.
/// </summary>
private void InitializeBrokers()
{
if (this.brokers != null)
{
return;
}
this.brokers = new ConcurrentDictionary<int, Broker>();
IList<string> brokerIds = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerIdsPath);
foreach (var brokerId in brokerIds)
{
string path = ZooKeeperClient.DefaultBrokerIdsPath + "/" + brokerId;
int id = int.Parse(brokerId, CultureInfo.InvariantCulture);
var info = this.zkclient.ReadData<string>(path, null);
string[] parts = info.Split(':');
int port = int.Parse(parts[2], CultureInfo.InvariantCulture);
this.brokers.Add(id, new Broker(id, parts[0], parts[1], port));
}
}
/// <summary>
/// Initializes the topic - broker's partitions mappings.
/// </summary>
private void InitializeTopicBrokerPartitions()
{
if (this.topicBrokerPartitions != null)
{
return;
}
this.topicBrokerPartitions = new ConcurrentDictionary<string, SortedSet<Partition>>();
this.zkclient.MakeSurePersistentPathExists(ZooKeeperClient.DefaultBrokerTopicsPath);
IList<string> topics = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerTopicsPath);
foreach (string topic in topics)
{
string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
IList<string> brokersPerTopic = this.zkclient.GetChildrenParentMayNotExist(brokerTopicPath);
var brokerPartitions = new SortedDictionary<int, int>();
foreach (string brokerId in brokersPerTopic)
{
string path = brokerTopicPath + "/" + brokerId;
var numPartitionsPerBrokerAndTopic = this.zkclient.ReadData<string>(path);
brokerPartitions.Add(int.Parse(brokerId, CultureInfo.InvariantCulture), int.Parse(numPartitionsPerBrokerAndTopic, CultureInfo.CurrentCulture));
}
var brokerParts = new SortedSet<Partition>();
foreach (var brokerPartition in brokerPartitions)
{
for (int i = 0; i < brokerPartition.Value; i++)
{
var bidPid = new Partition(brokerPartition.Key, i);
brokerParts.Add(bidPid);
}
}
this.topicBrokerPartitions.Add(topic, brokerParts);
}
}
/// <summary>
/// Add the all available brokers with default one partition for new topic, so all of the brokers
/// participate in hosting this topic
/// </summary>
/// <param name="topic">The new topic.</param>
/// <returns>Default partitions for new broker</returns>
/// <remarks>
/// Since we do not have the in formation about number of partitions on these brokers, just assume single partition
/// just pick partition 0 from each broker as a candidate
/// </remarks>
private SortedSet<Partition> BootstrapWithExistingBrokers(string topic)
{
Logger.Debug("Currently, no brokers are registered under topic: " + topic);
Logger.Debug("Bootstrapping topic: " + topic + " with available brokers in the cluster with default "
+ "number of partitions = 1");
var numBrokerPartitions = new SortedSet<Partition>();
var allBrokers = this.zkclient.GetChildrenParentMayNotExist(ZooKeeperClient.DefaultBrokerIdsPath);
Logger.Debug("List of all brokers currently registered in zookeeper -> " + string.Join(", ", allBrokers));
foreach (var broker in allBrokers)
{
numBrokerPartitions.Add(new Partition(int.Parse(broker, CultureInfo.InvariantCulture), 0));
}
Logger.Debug("Adding following broker id, partition id for NEW topic: " + topic + " -> " + string.Join(", ", numBrokerPartitions));
return numBrokerPartitions;
}
/// <summary>
/// Registers the listeners under several path in ZooKeeper
/// to keep related data structures updated.
/// </summary>
/// <remarks>
/// Watch on following path:
/// /broker/topics
/// /broker/topics/[topic]
/// /broker/ids
/// </remarks>
private void RegisterListeners()
{
this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath, this.brokerTopicsListener);
Logger.Debug("Registering listener on path: " + ZooKeeperClient.DefaultBrokerTopicsPath);
foreach (string topic in this.topicBrokerPartitions.Keys)
{
string path = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
this.zkclient.Subscribe(path, this.brokerTopicsListener);
Logger.Debug("Registering listener on path: " + path);
}
this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, this.brokerTopicsListener);
Logger.Debug("Registering listener on path: " + ZooKeeperClient.DefaultBrokerIdsPath);
this.zkclient.Subscribe(this);
Logger.Debug("Registering listener on state changed event");
}
/// <summary>
/// Resets the related data structures
/// </summary>
private void Reset()
{
this.topicBrokerPartitions = null;
this.brokers = null;
this.InitializeBrokers();
this.InitializeTopicBrokerPartitions();
}
/// <summary>
/// Ensures that object was not disposed
/// </summary>
private void EnsuresNotDisposed()
{
if (this.disposed)
{
throw new ObjectDisposedException(this.GetType().Name);
}
}
/// <summary>
/// Called when the ZooKeeper connection state has changed.
/// </summary>
/// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperStateChangedEventArgs"/> instance containing the event data.</param>
/// <remarks>
/// Do nothing, since zkclient will do reconnect for us.
/// </remarks>
public void HandleStateChanged(ZooKeeperStateChangedEventArgs args)
{
Guard.NotNull(args, "args");
Guard.Assert<ArgumentException>(() => args.State != KeeperState.Unknown);
this.EnsuresNotDisposed();
Logger.Debug("Handle state change: do nothing, since zkclient will do reconnect for us.");
}
/// <summary>
/// Called after the ZooKeeper session has expired and a new session has been created.
/// </summary>
/// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperSessionCreatedEventArgs"/> instance containing the event data.</param>
/// <remarks>
/// We would have to re-create any ephemeral nodes here.
/// </remarks>
public void HandleSessionCreated(ZooKeeperSessionCreatedEventArgs args)
{
Guard.NotNull(args, "args");
this.EnsuresNotDisposed();
Logger.Debug("ZK expired; release old list of broker partitions for topics ");
this.Reset();
this.brokerTopicsListener.ResetState();
foreach (var topic in this.topicBrokerPartitions.Keys)
{
this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic, this.brokerTopicsListener);
}
}
}
}