blob: 753c536362a27721dcf33f9c4779affb731f2194 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.Consumers
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Reflection;
using Kafka.Client.Cfg;
using Kafka.Client.Cluster;
using Kafka.Client.Utils;
using Kafka.Client.ZooKeeperIntegration;
using Kafka.Client.ZooKeeperIntegration.Listeners;
using log4net;
/// <summary>
/// The consumer high-level API, that hides the details of brokers from the consumer.
/// It also maintains the state of what has been consumed.
/// </summary>
public class ZookeeperConsumerConnector : KafkaClientBase, IConsumerConnector
{
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
public static readonly int MaxNRetries = 4;
internal static readonly FetchedDataChunk ShutdownCommand = new FetchedDataChunk(null, null, -1);
private readonly ConsumerConfiguration config;
private IZooKeeperClient zkClient;
private readonly object shuttingDownLock = new object();
private readonly bool enableFetcher;
private Fetcher fetcher;
private readonly KafkaScheduler scheduler = new KafkaScheduler();
private readonly IDictionary<string, IDictionary<Partition, PartitionTopicInfo>> topicRegistry = new ConcurrentDictionary<string, IDictionary<Partition, PartitionTopicInfo>>();
private readonly IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>> queues = new Dictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>>();
private readonly object syncLock = new object();
private volatile bool disposed;
/// <summary>
/// Gets the consumer group ID.
/// </summary>
public string ConsumerGroup
{
get { return this.config.GroupId; }
}
/// <summary>
/// Initializes a new instance of the <see cref="ZookeeperConsumerConnector"/> class.
/// </summary>
/// <param name="config">
/// The consumer configuration. At the minimum, need to specify the group ID
/// of the consumer and the ZooKeeper connection string.
/// </param>
/// <param name="enableFetcher">
/// Indicates whether fetchers should be enabled
/// </param>
public ZookeeperConsumerConnector(ConsumerConfiguration config, bool enableFetcher)
{
this.config = config;
this.enableFetcher = enableFetcher;
this.ConnectZk();
this.CreateFetcher();
if (this.config.AutoCommit)
{
Logger.InfoFormat(CultureInfo.CurrentCulture, "starting auto committer every {0} ms", this.config.AutoCommitInterval);
scheduler.ScheduleWithRate(this.AutoCommit, this.config.AutoCommitInterval, this.config.AutoCommitInterval);
}
}
/// <summary>
/// Commits the offsets of all messages consumed so far.
/// </summary>
public void CommitOffsets()
{
this.EnsuresNotDisposed();
if (this.zkClient == null)
{
return;
}
foreach (KeyValuePair<string, IDictionary<Partition, PartitionTopicInfo>> topic in topicRegistry)
{
var topicDirs = new ZKGroupTopicDirs(this.config.GroupId, topic.Key);
foreach (KeyValuePair<Partition, PartitionTopicInfo> partition in topic.Value)
{
var newOffset = partition.Value.GetConsumeOffset();
try
{
ZkUtils.UpdatePersistentPath(zkClient, topicDirs.ConsumerOffsetDir + "/" + partition.Value.Partition.Name, newOffset.ToString());
}
catch (Exception ex)
{
Logger.WarnFormat(CultureInfo.CurrentCulture, "exception during CommitOffsets: {0}", ex);
}
if (Logger.IsDebugEnabled)
{
Logger.DebugFormat(CultureInfo.CurrentCulture, "Commited offset {0} for topic {1}", newOffset, partition);
}
}
}
}
public void AutoCommit()
{
this.EnsuresNotDisposed();
try
{
this.CommitOffsets();
}
catch (Exception ex)
{
Logger.ErrorFormat(CultureInfo.CurrentCulture, "exception during AutoCommit: {0}", ex);
}
}
protected override void Dispose(bool disposing)
{
if (!disposing)
{
return;
}
if (this.disposed)
{
return;
}
lock (this.shuttingDownLock)
{
if (this.disposed)
{
return;
}
Logger.Info("ZookeeperConsumerConnector shutting down");
this.disposed = true;
}
try
{
if (this.scheduler != null)
{
this.scheduler.Dispose();
}
if (this.fetcher != null)
{
this.fetcher.Dispose();
}
this.SendShutdownToAllQueues();
if (this.zkClient != null)
{
this.zkClient.Dispose();
}
}
catch (Exception exc)
{
Logger.Debug("Ignoring unexpected errors on shutting down", exc);
}
Logger.Info("ZookeeperConsumerConnector shut down completed");
}
/// <summary>
/// Creates a list of message streams for each topic.
/// </summary>
/// <param name="topicCountDict">
/// The map of topic on number of streams
/// </param>
/// <returns>
/// The list of <see cref="KafkaMessageStream"/>, which are iterators over topic.
/// </returns>
/// <remarks>
/// Explicitly triggers load balancing for this consumer
/// </remarks>
public IDictionary<string, IList<KafkaMessageStream>> CreateMessageStreams(IDictionary<string, int> topicCountDict)
{
this.EnsuresNotDisposed();
return this.Consume(topicCountDict);
}
private void ConnectZk()
{
Logger.InfoFormat(CultureInfo.CurrentCulture, "Connecting to zookeeper instance at {0}", this.config.ZooKeeper.ZkConnect);
this.zkClient = new ZooKeeperClient(this.config.ZooKeeper.ZkConnect, this.config.ZooKeeper.ZkSessionTimeoutMs, ZooKeeperStringSerializer.Serializer);
this.zkClient.Connect();
}
private void CreateFetcher()
{
if (this.enableFetcher)
{
this.fetcher = new Fetcher(this.config, this.zkClient);
}
}
private IDictionary<string, IList<KafkaMessageStream>> Consume(IDictionary<string, int> topicCountDict)
{
Logger.Debug("entering consume");
if (topicCountDict == null)
{
throw new ArgumentNullException();
}
var dirs = new ZKGroupDirs(this.config.GroupId);
var result = new Dictionary<string, IList<KafkaMessageStream>>();
string consumerUuid = Environment.MachineName + "-" + DateTime.Now.Millisecond;
string consumerIdString = this.config.GroupId + "_" + consumerUuid;
var topicCount = new TopicCount(consumerIdString, topicCountDict);
// listener to consumer and partition changes
var loadBalancerListener = new ZKRebalancerListener(
this.config,
consumerIdString,
this.topicRegistry,
this.zkClient,
this,
queues,
this.fetcher,
this.syncLock);
this.RegisterConsumerInZk(dirs, consumerIdString, topicCount);
this.zkClient.Subscribe(dirs.ConsumerRegistryDir, loadBalancerListener);
//// create a queue per topic per consumer thread
var consumerThreadIdsPerTopicMap = topicCount.GetConsumerThreadIdsPerTopic();
foreach (var topic in consumerThreadIdsPerTopicMap.Keys)
{
var streamList = new List<KafkaMessageStream>();
foreach (string threadId in consumerThreadIdsPerTopicMap[topic])
{
var stream = new BlockingCollection<FetchedDataChunk>(new ConcurrentQueue<FetchedDataChunk>());
this.queues.Add(new Tuple<string, string>(topic, threadId), stream);
streamList.Add(new KafkaMessageStream(stream, this.config.Timeout));
}
result.Add(topic, streamList);
Logger.DebugFormat(CultureInfo.CurrentCulture, "adding topic {0} and stream to map...", topic);
// register on broker partition path changes
string partitionPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + topic;
this.zkClient.MakeSurePersistentPathExists(partitionPath);
this.zkClient.Subscribe(partitionPath, loadBalancerListener);
}
//// register listener for session expired event
this.zkClient.Subscribe(new ZKSessionExpireListener(dirs, consumerIdString, topicCount, loadBalancerListener, this));
//// explicitly trigger load balancing for this consumer
lock (this.syncLock)
{
loadBalancerListener.SyncedRebalance();
}
return result;
}
private void SendShutdownToAllQueues()
{
foreach (var queue in this.queues)
{
Logger.Debug("Clearing up queue");
//// clear the queue
while (queue.Value.Count > 0)
{
queue.Value.Take();
}
queue.Value.Add(ShutdownCommand);
Logger.Debug("Cleared queue and sent shutdown command");
}
}
internal void RegisterConsumerInZk(ZKGroupDirs dirs, string consumerIdString, TopicCount topicCount)
{
this.EnsuresNotDisposed();
Logger.InfoFormat(CultureInfo.CurrentCulture, "begin registering consumer {0} in ZK", consumerIdString);
ZkUtils.CreateEphemeralPathExpectConflict(this.zkClient, dirs.ConsumerRegistryDir + "/" + consumerIdString, topicCount.ToJsonString());
Logger.InfoFormat(CultureInfo.CurrentCulture, "end registering consumer {0} in ZK", consumerIdString);
}
/// <summary>
/// Ensures that object was not disposed
/// </summary>
private void EnsuresNotDisposed()
{
if (this.disposed)
{
throw new ObjectDisposedException(this.GetType().Name);
}
}
}
}