/** | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
namespace Kafka.Client.Producers | |
{ | |
using System; | |
using System.Collections.Generic; | |
using System.Globalization; | |
using System.Linq; | |
using System.Reflection; | |
using Kafka.Client.Cfg; | |
using Kafka.Client.Cluster; | |
using Kafka.Client.Messages; | |
using Kafka.Client.Producers.Async; | |
using Kafka.Client.Producers.Partitioning; | |
using Kafka.Client.Requests; | |
using Kafka.Client.Serialization; | |
using Kafka.Client.Utils; | |
using log4net; | |
/// <summary> | |
/// High-level Producer API that exposes all the producer functionality to the client | |
/// </summary> | |
/// <typeparam name="TKey">The type of the key.</typeparam> | |
/// <typeparam name="TData">The type of the data.</typeparam> | |
/// <remarks> | |
/// Provides serialization of data through a user-specified encoder, zookeeper based automatic broker discovery | |
/// and software load balancing through an optionally user-specified partitioner | |
/// </remarks> | |
public class Producer<TKey, TData> : KafkaClientBase, IProducer<TKey, TData> | |
where TKey : class | |
where TData : class | |
{ | |
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType); | |
private static readonly Random Randomizer = new Random(); | |
private readonly ProducerConfiguration config; | |
private readonly IProducerPool<TData> producerPool; | |
private readonly IPartitioner<TKey> partitioner; | |
private readonly bool populateProducerPool; | |
private readonly IBrokerPartitionInfo brokerPartitionInfo; | |
private volatile bool disposed; | |
private readonly object shuttingDownLock = new object(); | |
/// <summary> | |
/// Initializes a new instance of the <see cref="Producer<TKey, TData>"/> class. | |
/// </summary> | |
/// <param name="config">The config object.</param> | |
/// <param name="partitioner">The partitioner that implements <see cref="IPartitioner<TKey>" /> | |
/// used to supply a custom partitioning strategy based on the message key.</param> | |
/// <param name="producerPool">Pool of producers, one per broker.</param> | |
/// <param name="populateProducerPool">if set to <c>true</c>, producers should be populated.</param> | |
/// <remarks> | |
/// Should be used for testing purpose only. | |
/// </remarks> | |
internal Producer( | |
ProducerConfiguration config, | |
IPartitioner<TKey> partitioner, | |
IProducerPool<TData> producerPool, | |
bool populateProducerPool = true) | |
{ | |
Guard.NotNull(config, "config"); | |
Guard.NotNull(producerPool, "producerPool"); | |
this.config = config; | |
this.partitioner = partitioner ?? new DefaultPartitioner<TKey>(); | |
this.populateProducerPool = populateProducerPool; | |
this.producerPool = producerPool; | |
if (this.config.IsZooKeeperEnabled) | |
{ | |
this.brokerPartitionInfo = new ZKBrokerPartitionInfo(this.config, this.Callback); | |
} | |
else | |
{ | |
this.brokerPartitionInfo = new ConfigBrokerPartitionInfo(this.config); | |
} | |
if (this.populateProducerPool) | |
{ | |
IDictionary<int, Broker> allBrokers = this.brokerPartitionInfo.GetAllBrokerInfo(); | |
foreach (var broker in allBrokers) | |
{ | |
this.producerPool.AddProducer( | |
new Broker(broker.Key, broker.Value.Host, broker.Value.Host, broker.Value.Port)); | |
} | |
} | |
} | |
/// <summary> | |
/// Initializes a new instance of the <see cref="Producer<TKey, TData>"/> class. | |
/// </summary> | |
/// <param name="config">The config object.</param> | |
/// <remarks> | |
/// Can be used when all config parameters will be specified through the config object | |
/// and will be instantiated via reflection | |
/// </remarks> | |
public Producer(ProducerConfiguration config) | |
: this( | |
config, | |
ReflectionHelper.Instantiate<IPartitioner<TKey>>(config.PartitionerClass), | |
ProducerPool<TData>.CreatePool(config, ReflectionHelper.Instantiate<IEncoder<TData>>(config.SerializerClass))) | |
{ | |
Guard.NotNull(config, "config"); | |
} | |
/// <summary> | |
/// Initializes a new instance of the <see cref="Producer<TKey, TData>"/> class. | |
/// </summary> | |
/// <param name="config">The config object.</param> | |
/// <param name="partitioner">The partitioner that implements <see cref="IPartitioner<TKey>" /> | |
/// used to supply a custom partitioning strategy based on the message key.</param> | |
/// <param name="encoder">The encoder that implements <see cref="IEncoder<TData>" /> | |
/// used to convert an object of type TData to <see cref="Message" />.</param> | |
/// <param name="callbackHandler">The callback handler that implements <see cref="ICallbackHandler" />, used | |
/// to supply callback invoked when sending asynchronous request is completed.</param> | |
/// <remarks> | |
/// Can be used to provide pre-instantiated objects for all config parameters | |
/// that would otherwise be instantiated via reflection. | |
/// </remarks> | |
public Producer( | |
ProducerConfiguration config, | |
IPartitioner<TKey> partitioner, | |
IEncoder<TData> encoder, | |
ICallbackHandler callbackHandler) | |
: this( | |
config, | |
partitioner, | |
ProducerPool<TData>.CreatePool(config, encoder, callbackHandler)) | |
{ | |
Guard.NotNull(config, "config"); | |
Guard.NotNull(encoder, "encoder"); | |
} | |
/// <summary> | |
/// Initializes a new instance of the <see cref="Producer<TKey, TData>"/> class. | |
/// </summary> | |
/// <param name="config">The config object.</param> | |
/// <param name="partitioner">The partitioner that implements <see cref="IPartitioner<TKey>" /> | |
/// used to supply a custom partitioning strategy based on the message key.</param> | |
/// <param name="encoder">The encoder that implements <see cref="IEncoder<TData>" /> | |
/// used to convert an object of type TData to <see cref="Message" />.</param> | |
/// <remarks> | |
/// Can be used to provide pre-instantiated objects for all config parameters | |
/// that would otherwise be instantiated via reflection. | |
/// </remarks> | |
public Producer( | |
ProducerConfiguration config, | |
IPartitioner<TKey> partitioner, | |
IEncoder<TData> encoder) | |
: this( | |
config, | |
partitioner, | |
ProducerPool<TData>.CreatePool(config, encoder, null)) | |
{ | |
Guard.NotNull(config, "config"); | |
Guard.NotNull(encoder, "encoder"); | |
} | |
/// <summary> | |
/// Sends the data to a multiple topics, partitioned by key, using either the | |
/// synchronous or the asynchronous producer. | |
/// </summary> | |
/// <param name="data">The producer data objects that encapsulate the topic, key and message data.</param> | |
public void Send(IEnumerable<ProducerData<TKey, TData>> data) | |
{ | |
Guard.NotNull(data, "data"); | |
Guard.Greater(data.Count(), 0, "data"); | |
this.EnsuresNotDisposed(); | |
var poolRequests = new List<ProducerPoolData<TData>>(); | |
foreach (var dataItem in data) | |
{ | |
Partition partition = this.GetPartition(dataItem); | |
var poolRequest = new ProducerPoolData<TData>(dataItem.Topic, partition, dataItem.Data); | |
poolRequests.Add(poolRequest); | |
} | |
this.producerPool.Send(poolRequests); | |
} | |
/// <summary> | |
/// Sends the data to a single topic, partitioned by key, using either the | |
/// synchronous or the asynchronous producer. | |
/// </summary> | |
/// <param name="data">The producer data object that encapsulates the topic, key and message data.</param> | |
public void Send(ProducerData<TKey, TData> data) | |
{ | |
Guard.NotNull(data, "data"); | |
Guard.NotNullNorEmpty(data.Topic, "data.Topic"); | |
Guard.NotNull(data.Data, "data.Data"); | |
Guard.Greater(data.Data.Count(), 0, "data.Data"); | |
this.EnsuresNotDisposed(); | |
this.Send(new[] { data }); | |
} | |
protected override void Dispose(bool disposing) | |
{ | |
if (!disposing) | |
{ | |
return; | |
} | |
if (this.disposed) | |
{ | |
return; | |
} | |
lock (this.shuttingDownLock) | |
{ | |
if (this.disposed) | |
{ | |
return; | |
} | |
this.disposed = true; | |
} | |
try | |
{ | |
if (this.brokerPartitionInfo != null) | |
{ | |
this.brokerPartitionInfo.Dispose(); | |
} | |
if (this.producerPool != null) | |
{ | |
this.producerPool.Dispose(); | |
} | |
} | |
catch (Exception exc) | |
{ | |
Logger.Warn("Ignoring unexpected errors on closing", exc); | |
} | |
} | |
/// <summary> | |
/// Callback to add a new producer to the producer pool. | |
/// Used by <see cref="ZKBrokerPartitionInfo" /> on registration of new broker in ZooKeeper | |
/// </summary> | |
/// <param name="bid">The broker Id.</param> | |
/// <param name="host">The broker host address.</param> | |
/// <param name="port">The broker port.</param> | |
private void Callback(int bid, string host, int port) | |
{ | |
Guard.NotNullNorEmpty(host, "host"); | |
Guard.Greater(port, 0, "port"); | |
if (this.populateProducerPool) | |
{ | |
this.producerPool.AddProducer(new Broker(bid, host, host, port)); | |
} | |
else | |
{ | |
Logger.Debug("Skipping the callback since populating producers is off"); | |
} | |
} | |
/// <summary> | |
/// Retrieves the partition id based on key using given partitioner or select random partition if key is null | |
/// </summary> | |
/// <param name="key">The partition key.</param> | |
/// <param name="numPartitions">The total number of available partitions.</param> | |
/// <returns>Partition Id</returns> | |
private int GetPartitionId(TKey key, int numPartitions) | |
{ | |
Guard.Greater(numPartitions, 0, "numPartitions"); | |
return key == null | |
? Randomizer.Next(numPartitions) | |
: this.partitioner.Partition(key, numPartitions); | |
} | |
/// <summary> | |
/// Gets the partition for topic. | |
/// </summary> | |
/// <param name="dataItem">The producer data object that encapsulates the topic, key and message data.</param> | |
/// <returns>Partition for topic</returns> | |
private Partition GetPartition(ProducerData<TKey, TData> dataItem) | |
{ | |
Logger.DebugFormat( | |
CultureInfo.CurrentCulture, | |
"Getting the number of broker partitions registered for topic: {0}", | |
dataItem.Topic); | |
SortedSet<Partition> brokerPartitions = this.brokerPartitionInfo.GetBrokerPartitionInfo(dataItem.Topic); | |
int totalNumPartitions = brokerPartitions.Count; | |
Logger.DebugFormat( | |
CultureInfo.CurrentCulture, | |
"Broker partitions registered for topic: {0} = {1}", | |
dataItem.Topic, | |
totalNumPartitions); | |
int partitionId = this.GetPartitionId(dataItem.Key, totalNumPartitions); | |
Partition brokerIdPartition = brokerPartitions.ToList()[partitionId]; | |
Broker brokerInfo = this.brokerPartitionInfo.GetBrokerInfo(brokerIdPartition.BrokerId); | |
if (this.config.IsZooKeeperEnabled) | |
{ | |
Logger.DebugFormat( | |
CultureInfo.CurrentCulture, | |
"Sending message to broker {0}:{1} on partition {2}", | |
brokerInfo.Host, | |
brokerInfo.Port, | |
brokerIdPartition.PartId); | |
return new Partition(brokerIdPartition.BrokerId, brokerIdPartition.PartId); | |
} | |
Logger.DebugFormat( | |
CultureInfo.CurrentCulture, | |
"Sending message to broker {0}:{1} on a randomly chosen partition", | |
brokerInfo.Host, | |
brokerInfo.Port); | |
return new Partition(brokerIdPartition.BrokerId, ProducerRequest.RandomPartition); | |
} | |
/// <summary> | |
/// Ensures that object was not disposed | |
/// </summary> | |
private void EnsuresNotDisposed() | |
{ | |
if (this.disposed) | |
{ | |
throw new ObjectDisposedException(this.GetType().Name); | |
} | |
} | |
} | |
} |