blob: 50a8afc178cdca4a3f5177018fdbedb32eed7577 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.ZooKeeperIntegration.Listeners
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reflection;
using Kafka.Client.Cluster;
using Kafka.Client.Utils;
using Kafka.Client.ZooKeeperIntegration.Events;
using log4net;
/// <summary>
/// Listens to new broker registrations under a particular topic, in zookeeper and
/// keeps the related data structures updated
/// </summary>
internal class BrokerTopicsListener : IZooKeeperChildListener
{
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly IDictionary<int, Broker> actualBrokerIdMap;
private readonly Action<int, string, int> callback;
private readonly IDictionary<string, SortedSet<Partition>> actualBrokerTopicsPartitionsMap;
private IDictionary<int, Broker> oldBrokerIdMap;
private IDictionary<string, SortedSet<Partition>> oldBrokerTopicsPartitionsMap;
private readonly IZooKeeperClient zkclient;
private readonly object syncLock = new object();
/// <summary>
/// Initializes a new instance of the <see cref="BrokerTopicsListener"/> class.
/// </summary>
/// <param name="zkclient">The wrapper on ZooKeeper client.</param>
/// <param name="actualBrokerTopicsPartitionsMap">The actual broker topics partitions map.</param>
/// <param name="actualBrokerIdMap">The actual broker id map.</param>
/// <param name="callback">The callback invoked after new broker is added.</param>
public BrokerTopicsListener(
IZooKeeperClient zkclient,
IDictionary<string, SortedSet<Partition>> actualBrokerTopicsPartitionsMap,
IDictionary<int, Broker> actualBrokerIdMap,
Action<int, string, int> callback)
{
this.zkclient = zkclient;
this.actualBrokerTopicsPartitionsMap = actualBrokerTopicsPartitionsMap;
this.actualBrokerIdMap = actualBrokerIdMap;
this.callback = callback;
this.oldBrokerIdMap = new Dictionary<int, Broker>(this.actualBrokerIdMap);
this.oldBrokerTopicsPartitionsMap = new Dictionary<string, SortedSet<Partition>>(this.actualBrokerTopicsPartitionsMap);
Logger.Debug("Creating broker topics listener to watch the following paths - \n"
+ "/broker/topics, /broker/topics/topic, /broker/ids");
Logger.Debug("Initialized this broker topics listener with initial mapping of broker id to "
+ "partition id per topic with " + this.oldBrokerTopicsPartitionsMap.ToMultiString(
x => x.Key + " --> " + x.Value.ToMultiString(y => y.ToString(), ","), "; "));
}
/// <summary>
/// Called when the children of the given path changed
/// </summary>
/// <param name="e">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperChildChangedEventArgs"/> instance containing the event data
/// as parent path and children (null if parent was deleted).
/// </param>
public void HandleChildChange(ZooKeeperChildChangedEventArgs e)
{
Guard.NotNull(e, "e");
Guard.NotNullNorEmpty(e.Path, "e.Path");
Guard.NotNull(e.Children, "e.Children");
lock (this.syncLock)
{
try
{
string path = e.Path;
IList<string> childs = e.Children;
Logger.Debug("Watcher fired for path: " + path);
switch (path)
{
case ZooKeeperClient.DefaultBrokerTopicsPath:
List<string> oldTopics = this.oldBrokerTopicsPartitionsMap.Keys.ToList();
List<string> newTopics = childs.Except(oldTopics).ToList();
Logger.Debug("List of topics was changed at " + e.Path);
Logger.Debug("Current topics -> " + e.Children.ToMultiString(","));
Logger.Debug("Old list of topics -> " + oldTopics.ToMultiString(","));
Logger.Debug("List of newly registered topics -> " + newTopics.ToMultiString(","));
foreach (var newTopic in newTopics)
{
string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + newTopic;
IList<string> brokerList = this.zkclient.GetChildrenParentMayNotExist(brokerTopicPath);
this.ProcessNewBrokerInExistingTopic(newTopic, brokerList);
this.zkclient.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath + "/" + newTopic, this);
}
break;
case ZooKeeperClient.DefaultBrokerIdsPath:
Logger.Debug("List of brokers changed in the Kafka cluster " + e.Path);
Logger.Debug("Currently registered list of brokers -> " + e.Children.ToMultiString(","));
this.ProcessBrokerChange(path, childs);
break;
default:
string[] parts = path.Split('/');
string topic = parts.Last();
if (parts.Length == 4 && parts[2] == "topics" && childs != null)
{
Logger.Debug("List of brokers changed at " + path);
Logger.Debug(
"Currently registered list of brokers for topic " + topic + " -> " +
childs.ToMultiString(","));
this.ProcessNewBrokerInExistingTopic(topic, childs);
}
break;
}
this.oldBrokerTopicsPartitionsMap = this.actualBrokerTopicsPartitionsMap;
this.oldBrokerIdMap = this.actualBrokerIdMap;
}
catch (Exception exc)
{
Logger.Debug("Error while handling " + e, exc);
}
}
}
/// <summary>
/// Resets the state of listener.
/// </summary>
public void ResetState()
{
Logger.Debug("Before reseting broker topic partitions state -> "
+ this.oldBrokerTopicsPartitionsMap.ToMultiString(
x => x.Key + " --> " + x.Value.ToMultiString(y => y.ToString(), ","), "; "));
this.oldBrokerTopicsPartitionsMap = actualBrokerTopicsPartitionsMap;
Logger.Debug("After reseting broker topic partitions state -> "
+ this.oldBrokerTopicsPartitionsMap.ToMultiString(
x => x.Key + " --> " + x.Value.ToMultiString(y => y.ToString(), ","), "; "));
Logger.Debug("Before reseting broker id map state -> "
+ this.oldBrokerIdMap.ToMultiString(", "));
this.oldBrokerIdMap = this.actualBrokerIdMap;
Logger.Debug("After reseting broker id map state -> "
+ this.oldBrokerIdMap.ToMultiString(", "));
}
/// <summary>
/// Generate the updated mapping of (brokerId, numPartitions) for the new list of brokers
/// registered under some topic.
/// </summary>
/// <param name="topic">The path of the topic under which the brokers have changed..</param>
/// <param name="childs">The list of changed brokers.</param>
private void ProcessNewBrokerInExistingTopic(string topic, IEnumerable<string> childs)
{
if (this.actualBrokerTopicsPartitionsMap.ContainsKey(topic))
{
Logger.Debug("Old list of brokers -> " + this.oldBrokerTopicsPartitionsMap[topic].ToMultiString(x => x.BrokerId.ToString(), ","));
}
var updatedBrokers = new SortedSet<int>(childs.Select(x => int.Parse(x, CultureInfo.InvariantCulture)));
string brokerTopicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
var sortedBrokerPartitions = new SortedDictionary<int, int>();
foreach (var bid in updatedBrokers)
{
var num = this.zkclient.ReadData<string>(brokerTopicPath + "/" + bid);
sortedBrokerPartitions.Add(bid, int.Parse(num, CultureInfo.InvariantCulture));
}
var updatedBrokerParts = new SortedSet<Partition>();
foreach (var bp in sortedBrokerPartitions)
{
for (int i = 0; i < bp.Value; i++)
{
var bidPid = new Partition(bp.Key, i);
updatedBrokerParts.Add(bidPid);
}
}
Logger.Debug("Currently registered list of brokers for topic " + topic + " -> " + childs.ToMultiString(", "));
SortedSet<Partition> mergedBrokerParts = updatedBrokerParts;
if (this.actualBrokerTopicsPartitionsMap.ContainsKey(topic))
{
SortedSet<Partition> oldBrokerParts = this.actualBrokerTopicsPartitionsMap[topic];
Logger.Debug(
"Unregistered list of brokers for topic " + topic + " -> " + oldBrokerParts.ToMultiString(", "));
foreach (var oldBrokerPart in oldBrokerParts)
{
mergedBrokerParts.Add(oldBrokerPart);
}
}
else
{
this.actualBrokerTopicsPartitionsMap.Add(topic, null);
}
this.actualBrokerTopicsPartitionsMap[topic] = new SortedSet<Partition>(mergedBrokerParts.Where(x => this.actualBrokerIdMap.ContainsKey(x.BrokerId)));
}
/// <summary>
/// Processes change in the broker lists.
/// </summary>
/// <param name="path">The parent path of brokers list.</param>
/// <param name="childs">The current brokers.</param>
private void ProcessBrokerChange(string path, IEnumerable<string> childs)
{
if (path != ZooKeeperClient.DefaultBrokerIdsPath)
{
return;
}
List<int> updatedBrokers = childs.Select(x => int.Parse(x, CultureInfo.InvariantCulture)).ToList();
List<int> oldBrokers = this.oldBrokerIdMap.Select(x => x.Key).ToList();
List<int> newBrokers = updatedBrokers.Except(oldBrokers).ToList();
Logger.Debug("List of newly registered brokers -> " + newBrokers.ToMultiString(","));
foreach (int bid in newBrokers)
{
string brokerInfo = this.zkclient.ReadData<string>(ZooKeeperClient.DefaultBrokerIdsPath + "/" + bid);
string[] brokerHost = brokerInfo.Split(':');
var port = int.Parse(brokerHost[2], CultureInfo.InvariantCulture);
this.actualBrokerIdMap.Add(bid, new Broker(bid, brokerHost[1], brokerHost[1], port));
if (this.callback != null)
{
Logger.Debug("Invoking the callback for broker: " + bid);
this.callback(bid, brokerHost[1], port);
}
}
List<int> deadBrokers = oldBrokers.Except(updatedBrokers).ToList();
Logger.Debug("Deleting broker ids for dead brokers -> " + deadBrokers.ToMultiString(","));
foreach (int bid in deadBrokers)
{
Logger.Debug("Deleting dead broker: " + bid);
this.actualBrokerIdMap.Remove(bid);
foreach (var topicMap in this.actualBrokerTopicsPartitionsMap)
{
int affected = topicMap.Value.RemoveWhere(x => x.BrokerId == bid);
if (affected > 0)
{
Logger.Debug("Removing dead broker " + bid + " for topic: " + topicMap.Key);
Logger.Debug("Actual list of mapped brokers is -> " + topicMap.Value.ToMultiString(x => x.ToString(), ","));
}
}
}
}
}
}