blob: db0b138a8b73dc0b5a306b603f077803bb96b58d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.ZooKeeperIntegration.Listeners
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reflection;
using System.Threading;
using Kafka.Client.Cfg;
using Kafka.Client.Cluster;
using Kafka.Client.Consumers;
using Kafka.Client.Exceptions;
using Kafka.Client.Utils;
using Kafka.Client.ZooKeeperIntegration.Events;
using log4net;
using ZooKeeperNet;
internal class ZKRebalancerListener : IZooKeeperChildListener
{
private IDictionary<string, IList<string>> oldPartitionsPerTopicMap = new Dictionary<string, IList<string>>();
private IDictionary<string, IList<string>> oldConsumersPerTopicMap = new Dictionary<string, IList<string>>();
private readonly IDictionary<string, IDictionary<Partition, PartitionTopicInfo>> topicRegistry;
private readonly IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>> queues;
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly string consumerIdString;
private readonly object syncLock;
private readonly ConsumerConfiguration config;
private readonly IZooKeeperClient zkClient;
private readonly ZKGroupDirs dirs;
private readonly Fetcher fetcher;
private readonly ZookeeperConsumerConnector zkConsumerConnector;
internal ZKRebalancerListener(
ConsumerConfiguration config,
string consumerIdString,
IDictionary<string, IDictionary<Partition, PartitionTopicInfo>> topicRegistry,
IZooKeeperClient zkClient,
ZookeeperConsumerConnector zkConsumerConnector,
IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>> queues,
Fetcher fetcher,
object syncLock)
{
this.syncLock = syncLock;
this.consumerIdString = consumerIdString;
this.config = config;
this.topicRegistry = topicRegistry;
this.zkClient = zkClient;
this.dirs = new ZKGroupDirs(config.GroupId);
this.zkConsumerConnector = zkConsumerConnector;
this.queues = queues;
this.fetcher = fetcher;
}
public void SyncedRebalance()
{
lock (this.syncLock)
{
for (int i = 0; i < ZookeeperConsumerConnector.MaxNRetries; i++)
{
Logger.InfoFormat(CultureInfo.CurrentCulture, "begin rebalancing consumer {0} try #{1}", consumerIdString, i);
bool done = false;
try
{
done = this.Rebalance();
}
catch (Exception ex)
{
Logger.InfoFormat(CultureInfo.CurrentCulture, "exception during rebalance {0}", ex);
}
Logger.InfoFormat(CultureInfo.CurrentCulture, "end rebalancing consumer {0} try #{1}", consumerIdString, i);
if (done)
{
return;
}
//// release all partitions, reset state and retry
this.ReleasePartitionOwnership();
this.ResetState();
Thread.Sleep(config.ZooKeeper.ZkSyncTimeMs);
}
}
throw new ZKRebalancerException(string.Format(CultureInfo.CurrentCulture, "{0} can't rebalance after {1} retries", this.consumerIdString, ZookeeperConsumerConnector.MaxNRetries));
}
/// <summary>
/// Called when the children of the given path changed
/// </summary>
/// <param name="args">The <see cref="Kafka.Client.ZooKeeperIntegration.Events.ZooKeeperChildChangedEventArgs"/> instance containing the event data
/// as parent path and children (null if parent was deleted).
/// </param>
/// <remarks>
/// http://zookeeper.wiki.sourceforge.net/ZooKeeperWatches
/// </remarks>
public void HandleChildChange(ZooKeeperChildChangedEventArgs args)
{
Guard.NotNull(args, "args");
Guard.NotNullNorEmpty(args.Path, "args.Path");
Guard.NotNull(args.Children, "args.Children");
SyncedRebalance();
}
/// <summary>
/// Resets the state of listener.
/// </summary>
public void ResetState()
{
this.topicRegistry.Clear();
this.oldConsumersPerTopicMap.Clear();
this.oldPartitionsPerTopicMap.Clear();
}
private bool Rebalance()
{
var myTopicThresdIdsMap = this.GetTopicCount(this.consumerIdString).GetConsumerThreadIdsPerTopic();
var cluster = new Cluster(zkClient);
var consumersPerTopicMap = this.GetConsumersPerTopic(this.config.GroupId);
var partitionsPerTopicMap = ZkUtils.GetPartitionsForTopics(this.zkClient, myTopicThresdIdsMap.Keys);
var relevantTopicThreadIdsMap = GetRelevantTopicMap(
myTopicThresdIdsMap,
partitionsPerTopicMap,
this.oldPartitionsPerTopicMap,
consumersPerTopicMap,
this.oldConsumersPerTopicMap);
if (relevantTopicThreadIdsMap.Count <= 0)
{
Logger.InfoFormat(CultureInfo.CurrentCulture, "Consumer {0} with {1} doesn't need to rebalance.", this.consumerIdString, consumersPerTopicMap);
return true;
}
Logger.Info("Committing all offsets");
this.zkConsumerConnector.CommitOffsets();
Logger.Info("Releasing parittion ownership");
this.ReleasePartitionOwnership();
var queuesToBeCleared = new List<BlockingCollection<FetchedDataChunk>>();
foreach (var item in relevantTopicThreadIdsMap)
{
this.topicRegistry.Remove(item.Key);
this.topicRegistry.Add(item.Key, new Dictionary<Partition, PartitionTopicInfo>());
var topicDirs = new ZKGroupTopicDirs(config.GroupId, item.Key);
var curConsumers = consumersPerTopicMap[item.Key];
var curPartitions = new List<string>(partitionsPerTopicMap[item.Key]);
var numberOfPartsPerConsumer = curPartitions.Count / curConsumers.Count;
var numberOfConsumersWithExtraPart = curPartitions.Count % curConsumers.Count;
Logger.InfoFormat(
CultureInfo.CurrentCulture,
"Consumer {0} rebalancing the following partitions: {1} for topic {2} with consumers: {3}",
this.consumerIdString,
string.Join(",", curPartitions),
item.Key,
string.Join(",", curConsumers));
foreach (string consumerThreadId in item.Value)
{
var myConsumerPosition = curConsumers.IndexOf(consumerThreadId);
if (myConsumerPosition < 0)
{
continue;
}
var startPart = (numberOfPartsPerConsumer * myConsumerPosition) +
Math.Min(myConsumerPosition, numberOfConsumersWithExtraPart);
var numberOfParts = numberOfPartsPerConsumer + (myConsumerPosition + 1 > numberOfConsumersWithExtraPart ? 0 : 1);
if (numberOfParts <= 0)
{
Logger.WarnFormat(CultureInfo.CurrentCulture, "No broker partitions consumed by consumer thread {0} for topic {1}", consumerThreadId, item.Key);
}
else
{
for (int i = startPart; i < startPart + numberOfParts; i++)
{
var partition = curPartitions[i];
Logger.InfoFormat(CultureInfo.CurrentCulture, "{0} attempting to claim partition {1}", consumerThreadId, partition);
if (!this.ProcessPartition(topicDirs, partition, item.Key, consumerThreadId))
{
return false;
}
}
queuesToBeCleared.Add(queues[new Tuple<string, string>(item.Key, consumerThreadId)]);
}
}
}
this.UpdateFetcher(cluster, queuesToBeCleared);
this.oldPartitionsPerTopicMap = partitionsPerTopicMap;
this.oldConsumersPerTopicMap = consumersPerTopicMap;
return true;
}
private void UpdateFetcher(Cluster cluster, IEnumerable<BlockingCollection<FetchedDataChunk>> queuesToBeCleared)
{
var allPartitionInfos = new List<PartitionTopicInfo>();
foreach (var item in this.topicRegistry.Values)
{
foreach (var partitionTopicInfo in item.Values)
{
allPartitionInfos.Add(partitionTopicInfo);
}
}
Logger.InfoFormat(
CultureInfo.CurrentCulture,
"Consumer {0} selected partitions: {1}",
this.consumerIdString,
string.Join(",", allPartitionInfos.OrderBy(x => x.Partition.Name).Select(y => y.Partition.Name)));
if (this.fetcher != null)
{
this.fetcher.InitConnections(allPartitionInfos, cluster, queuesToBeCleared);
}
}
private bool ProcessPartition(ZKGroupTopicDirs topicDirs, string partition, string topic, string consumerThreadId)
{
var partitionOwnerPath = topicDirs.ConsumerOwnerDir + "/" + partition;
try
{
ZkUtils.CreateEphemeralPathExpectConflict(zkClient, partitionOwnerPath, consumerThreadId);
}
catch (KeeperException.NodeExistsException)
{
//// The node hasn't been deleted by the original owner. So wait a bit and retry.
Logger.InfoFormat(CultureInfo.CurrentCulture, "waiting for the partition ownership to be deleted: {0}", partition);
return false;
}
AddPartitionTopicInfo(topicDirs, partition, topic, consumerThreadId);
return true;
}
private void AddPartitionTopicInfo(ZKGroupTopicDirs topicDirs, string partitionString, string topic, string consumerThreadId)
{
var partition = Partition.ParseFrom(partitionString);
var partTopicInfoMap = this.topicRegistry[topic];
var znode = topicDirs.ConsumerOffsetDir + "/" + partition.Name;
var offsetString = this.zkClient.ReadData<string>(znode, true);
long offset = string.IsNullOrEmpty(offsetString) ? 0 : long.Parse(offsetString, CultureInfo.InvariantCulture);
var queue = this.queues[new Tuple<string, string>(topic, consumerThreadId)];
var partTopicInfo = new PartitionTopicInfo(
topic,
partition.BrokerId,
partition,
queue,
offset,
offset,
this.config.FetchSize);
partTopicInfoMap.Add(partition, partTopicInfo);
if (Logger.IsDebugEnabled)
{
Logger.DebugFormat(CultureInfo.CurrentCulture, "{0} selected new offset {1}", partTopicInfo, offset);
}
}
private void ReleasePartitionOwnership()
{
foreach (KeyValuePair<string, IDictionary<Partition, PartitionTopicInfo>> item in topicRegistry)
{
var topicDirs = new ZKGroupTopicDirs(this.config.GroupId, item.Key);
foreach (var partition in item.Value.Keys)
{
string znode = topicDirs.ConsumerOwnerDir + "/" + partition.Name;
ZkUtils.DeletePath(zkClient, znode);
if (Logger.IsDebugEnabled)
{
Logger.DebugFormat(CultureInfo.CurrentCulture, "Consumer {0} releasing {1}", this.consumerIdString, znode);
}
}
}
}
private TopicCount GetTopicCount(string consumerId)
{
var topicCountJson = this.zkClient.ReadData<string>(this.dirs.ConsumerRegistryDir + "/" + consumerId);
return TopicCount.ConstructTopicCount(consumerId, topicCountJson);
}
private IDictionary<string, IList<string>> GetConsumersPerTopic(string group)
{
var consumers = this.zkClient.GetChildrenParentMayNotExist(this.dirs.ConsumerRegistryDir);
var consumersPerTopicMap = new Dictionary<string, IList<string>>();
foreach (var consumer in consumers)
{
TopicCount topicCount = GetTopicCount(consumer);
foreach (KeyValuePair<string, IList<string>> consumerThread in topicCount.GetConsumerThreadIdsPerTopic())
{
foreach (string consumerThreadId in consumerThread.Value)
{
if (!consumersPerTopicMap.ContainsKey(consumerThread.Key))
{
consumersPerTopicMap.Add(consumerThread.Key, new List<string> { consumerThreadId });
}
else
{
consumersPerTopicMap[consumerThread.Key].Add(consumerThreadId);
}
}
}
}
foreach (KeyValuePair<string, IList<string>> item in consumersPerTopicMap)
{
item.Value.ToList().Sort();
}
return consumersPerTopicMap;
}
private static IDictionary<string, IList<string>> GetRelevantTopicMap(
IDictionary<string, IList<string>> myTopicThreadIdsMap,
IDictionary<string, IList<string>> newPartMap,
IDictionary<string, IList<string>> oldPartMap,
IDictionary<string, IList<string>> newConsumerMap,
IDictionary<string, IList<string>> oldConsumerMap)
{
var relevantTopicThreadIdsMap = new Dictionary<string, IList<string>>();
foreach (var myMap in myTopicThreadIdsMap)
{
var oldPartValue = oldPartMap.ContainsKey(myMap.Key) ? oldPartMap[myMap.Key] : null;
var newPartValue = newPartMap.ContainsKey(myMap.Key) ? newPartMap[myMap.Key] : null;
var oldConsumerValue = oldConsumerMap.ContainsKey(myMap.Key) ? oldConsumerMap[myMap.Key] : null;
var newConsumerValue = newConsumerMap.ContainsKey(myMap.Key) ? newConsumerMap[myMap.Key] : null;
if (oldPartValue != newPartValue || oldConsumerValue != newConsumerValue)
{
relevantTopicThreadIdsMap.Add(myMap.Key, myMap.Value);
}
}
return relevantTopicThreadIdsMap;
}
}
}