blob: 6c83d7574ce81fc7648502a42bf3851d03fd923a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.Consumers
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reflection;
using System.Threading;
using Kafka.Client.Cfg;
using Kafka.Client.Cluster;
using Kafka.Client.ZooKeeperIntegration;
using log4net;
/// <summary>
/// Background thread that fetches data from a set of servers
/// </summary>
internal class Fetcher : IDisposable
{
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly ConsumerConfiguration config;
private readonly IZooKeeperClient zkClient;
private FetcherRunnable[] fetcherWorkerObjects;
private volatile bool disposed;
private readonly object shuttingDownLock = new object();
/// <summary>
/// Initializes a new instance of the <see cref="Fetcher"/> class.
/// </summary>
/// <param name="config">
/// The consumer configuration.
/// </param>
/// <param name="zkClient">
/// The wrapper above ZooKeeper client.
/// </param>
public Fetcher(ConsumerConfiguration config, IZooKeeperClient zkClient)
{
this.config = config;
this.zkClient = zkClient;
}
/// <summary>
/// Shuts down all fetch threads
/// </summary>
private void Shutdown()
{
if (fetcherWorkerObjects != null)
{
foreach (FetcherRunnable fetcherRunnable in fetcherWorkerObjects)
{
fetcherRunnable.Shutdown();
}
fetcherWorkerObjects = null;
}
}
/// <summary>
/// Opens connections to brokers.
/// </summary>
/// <param name="topicInfos">
/// The topic infos.
/// </param>
/// <param name="cluster">
/// The cluster.
/// </param>
/// <param name="queuesToBeCleared">
/// The queues to be cleared.
/// </param>
public void InitConnections(IEnumerable<PartitionTopicInfo> topicInfos, Cluster cluster, IEnumerable<BlockingCollection<FetchedDataChunk>> queuesToBeCleared)
{
this.EnsuresNotDisposed();
this.Shutdown();
if (topicInfos == null)
{
return;
}
foreach (var queueToBeCleared in queuesToBeCleared)
{
while (queueToBeCleared.Count > 0)
{
queueToBeCleared.Take();
}
}
var partitionTopicInfoMap = new Dictionary<int, List<PartitionTopicInfo>>();
//// re-arrange by broker id
foreach (var topicInfo in topicInfos)
{
if (!partitionTopicInfoMap.ContainsKey(topicInfo.BrokerId))
{
partitionTopicInfoMap.Add(topicInfo.BrokerId, new List<PartitionTopicInfo>() { topicInfo });
}
else
{
partitionTopicInfoMap[topicInfo.BrokerId].Add(topicInfo);
}
}
//// open a new fetcher thread for each broker
fetcherWorkerObjects = new FetcherRunnable[partitionTopicInfoMap.Count];
int i = 0;
foreach (KeyValuePair<int, List<PartitionTopicInfo>> item in partitionTopicInfoMap)
{
Broker broker = cluster.GetBroker(item.Key);
var fetcherRunnable = new FetcherRunnable("FetcherRunnable-" + i, zkClient, config, broker, item.Value);
var threadStart = new ThreadStart(fetcherRunnable.Run);
var fetcherThread = new Thread(threadStart);
fetcherWorkerObjects[i] = fetcherRunnable;
fetcherThread.Start();
i++;
}
}
public void Dispose()
{
if (this.disposed)
{
return;
}
lock (this.shuttingDownLock)
{
if (this.disposed)
{
return;
}
this.disposed = true;
}
try
{
this.Shutdown();
}
catch (Exception exc)
{
Logger.Warn("Ignoring unexpected errors on closing", exc);
}
}
/// <summary>
/// Ensures that object was not disposed
/// </summary>
private void EnsuresNotDisposed()
{
if (this.disposed)
{
throw new ObjectDisposedException(this.GetType().Name);
}
}
}
}