blob: 35c1d4ccb8ef9c6c0609c0c51611a22101648fed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.Producers
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Linq;
using System.Reflection;
using Kafka.Client.Cfg;
using Kafka.Client.Cluster;
using Kafka.Client.Messages;
using Kafka.Client.Producers.Async;
using Kafka.Client.Producers.Partitioning;
using Kafka.Client.Requests;
using Kafka.Client.Serialization;
using Kafka.Client.Utils;
using log4net;
/// <summary>
/// High-level Producer API that exposes all the producer functionality to the client
/// </summary>
/// <typeparam name="TKey">The type of the key.</typeparam>
/// <typeparam name="TData">The type of the data.</typeparam>
/// <remarks>
/// Provides serialization of data through a user-specified encoder, zookeeper based automatic broker discovery
/// and software load balancing through an optionally user-specified partitioner
/// </remarks>
public class Producer<TKey, TData> : KafkaClientBase, IProducer<TKey, TData>
where TKey : class
where TData : class
{
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private static readonly Random Randomizer = new Random();
private readonly ProducerConfiguration config;
private readonly IProducerPool<TData> producerPool;
private readonly IPartitioner<TKey> partitioner;
private readonly bool populateProducerPool;
private readonly IBrokerPartitionInfo brokerPartitionInfo;
private volatile bool disposed;
private readonly object shuttingDownLock = new object();
/// <summary>
/// Initializes a new instance of the <see cref="Producer&lt;TKey, TData&gt;"/> class.
/// </summary>
/// <param name="config">The config object.</param>
/// <param name="partitioner">The partitioner that implements <see cref="IPartitioner&lt;TKey&gt;" />
/// used to supply a custom partitioning strategy based on the message key.</param>
/// <param name="producerPool">Pool of producers, one per broker.</param>
/// <param name="populateProducerPool">if set to <c>true</c>, producers should be populated.</param>
/// <remarks>
/// Should be used for testing purpose only.
/// </remarks>
internal Producer(
ProducerConfiguration config,
IPartitioner<TKey> partitioner,
IProducerPool<TData> producerPool,
bool populateProducerPool = true)
{
Guard.NotNull(config, "config");
Guard.NotNull(producerPool, "producerPool");
this.config = config;
this.partitioner = partitioner ?? new DefaultPartitioner<TKey>();
this.populateProducerPool = populateProducerPool;
this.producerPool = producerPool;
if (this.config.IsZooKeeperEnabled)
{
this.brokerPartitionInfo = new ZKBrokerPartitionInfo(this.config, this.Callback);
}
else
{
this.brokerPartitionInfo = new ConfigBrokerPartitionInfo(this.config);
}
if (this.populateProducerPool)
{
IDictionary<int, Broker> allBrokers = this.brokerPartitionInfo.GetAllBrokerInfo();
foreach (var broker in allBrokers)
{
this.producerPool.AddProducer(
new Broker(broker.Key, broker.Value.Host, broker.Value.Host, broker.Value.Port));
}
}
}
/// <summary>
/// Initializes a new instance of the <see cref="Producer&lt;TKey, TData&gt;"/> class.
/// </summary>
/// <param name="config">The config object.</param>
/// <remarks>
/// Can be used when all config parameters will be specified through the config object
/// and will be instantiated via reflection
/// </remarks>
public Producer(ProducerConfiguration config)
: this(
config,
ReflectionHelper.Instantiate<IPartitioner<TKey>>(config.PartitionerClass),
ProducerPool<TData>.CreatePool(config, ReflectionHelper.Instantiate<IEncoder<TData>>(config.SerializerClass)))
{
Guard.NotNull(config, "config");
}
/// <summary>
/// Initializes a new instance of the <see cref="Producer&lt;TKey, TData&gt;"/> class.
/// </summary>
/// <param name="config">The config object.</param>
/// <param name="partitioner">The partitioner that implements <see cref="IPartitioner&lt;TKey&gt;" />
/// used to supply a custom partitioning strategy based on the message key.</param>
/// <param name="encoder">The encoder that implements <see cref="IEncoder&lt;TData&gt;" />
/// used to convert an object of type TData to <see cref="Message" />.</param>
/// <param name="callbackHandler">The callback handler that implements <see cref="ICallbackHandler" />, used
/// to supply callback invoked when sending asynchronous request is completed.</param>
/// <remarks>
/// Can be used to provide pre-instantiated objects for all config parameters
/// that would otherwise be instantiated via reflection.
/// </remarks>
public Producer(
ProducerConfiguration config,
IPartitioner<TKey> partitioner,
IEncoder<TData> encoder,
ICallbackHandler callbackHandler)
: this(
config,
partitioner,
ProducerPool<TData>.CreatePool(config, encoder, callbackHandler))
{
Guard.NotNull(config, "config");
Guard.NotNull(encoder, "encoder");
}
/// <summary>
/// Initializes a new instance of the <see cref="Producer&lt;TKey, TData&gt;"/> class.
/// </summary>
/// <param name="config">The config object.</param>
/// <param name="partitioner">The partitioner that implements <see cref="IPartitioner&lt;TKey&gt;" />
/// used to supply a custom partitioning strategy based on the message key.</param>
/// <param name="encoder">The encoder that implements <see cref="IEncoder&lt;TData&gt;" />
/// used to convert an object of type TData to <see cref="Message" />.</param>
/// <remarks>
/// Can be used to provide pre-instantiated objects for all config parameters
/// that would otherwise be instantiated via reflection.
/// </remarks>
public Producer(
ProducerConfiguration config,
IPartitioner<TKey> partitioner,
IEncoder<TData> encoder)
: this(
config,
partitioner,
ProducerPool<TData>.CreatePool(config, encoder, null))
{
Guard.NotNull(config, "config");
Guard.NotNull(encoder, "encoder");
}
/// <summary>
/// Sends the data to a multiple topics, partitioned by key, using either the
/// synchronous or the asynchronous producer.
/// </summary>
/// <param name="data">The producer data objects that encapsulate the topic, key and message data.</param>
public void Send(IEnumerable<ProducerData<TKey, TData>> data)
{
Guard.NotNull(data, "data");
Guard.Greater(data.Count(), 0, "data");
this.EnsuresNotDisposed();
var poolRequests = new List<ProducerPoolData<TData>>();
foreach (var dataItem in data)
{
Partition partition = this.GetPartition(dataItem);
var poolRequest = new ProducerPoolData<TData>(dataItem.Topic, partition, dataItem.Data);
poolRequests.Add(poolRequest);
}
this.producerPool.Send(poolRequests);
}
/// <summary>
/// Sends the data to a single topic, partitioned by key, using either the
/// synchronous or the asynchronous producer.
/// </summary>
/// <param name="data">The producer data object that encapsulates the topic, key and message data.</param>
public void Send(ProducerData<TKey, TData> data)
{
Guard.NotNull(data, "data");
Guard.NotNullNorEmpty(data.Topic, "data.Topic");
Guard.NotNull(data.Data, "data.Data");
Guard.Greater(data.Data.Count(), 0, "data.Data");
this.EnsuresNotDisposed();
this.Send(new[] { data });
}
protected override void Dispose(bool disposing)
{
if (!disposing)
{
return;
}
if (this.disposed)
{
return;
}
lock (this.shuttingDownLock)
{
if (this.disposed)
{
return;
}
this.disposed = true;
}
try
{
if (this.brokerPartitionInfo != null)
{
this.brokerPartitionInfo.Dispose();
}
if (this.producerPool != null)
{
this.producerPool.Dispose();
}
}
catch (Exception exc)
{
Logger.Warn("Ignoring unexpected errors on closing", exc);
}
}
/// <summary>
/// Callback to add a new producer to the producer pool.
/// Used by <see cref="ZKBrokerPartitionInfo" /> on registration of new broker in ZooKeeper
/// </summary>
/// <param name="bid">The broker Id.</param>
/// <param name="host">The broker host address.</param>
/// <param name="port">The broker port.</param>
private void Callback(int bid, string host, int port)
{
Guard.NotNullNorEmpty(host, "host");
Guard.Greater(port, 0, "port");
if (this.populateProducerPool)
{
this.producerPool.AddProducer(new Broker(bid, host, host, port));
}
else
{
Logger.Debug("Skipping the callback since populating producers is off");
}
}
/// <summary>
/// Retrieves the partition id based on key using given partitioner or select random partition if key is null
/// </summary>
/// <param name="key">The partition key.</param>
/// <param name="numPartitions">The total number of available partitions.</param>
/// <returns>Partition Id</returns>
private int GetPartitionId(TKey key, int numPartitions)
{
Guard.Greater(numPartitions, 0, "numPartitions");
return key == null
? Randomizer.Next(numPartitions)
: this.partitioner.Partition(key, numPartitions);
}
/// <summary>
/// Gets the partition for topic.
/// </summary>
/// <param name="dataItem">The producer data object that encapsulates the topic, key and message data.</param>
/// <returns>Partition for topic</returns>
private Partition GetPartition(ProducerData<TKey, TData> dataItem)
{
Logger.DebugFormat(
CultureInfo.CurrentCulture,
"Getting the number of broker partitions registered for topic: {0}",
dataItem.Topic);
SortedSet<Partition> brokerPartitions = this.brokerPartitionInfo.GetBrokerPartitionInfo(dataItem.Topic);
int totalNumPartitions = brokerPartitions.Count;
Logger.DebugFormat(
CultureInfo.CurrentCulture,
"Broker partitions registered for topic: {0} = {1}",
dataItem.Topic,
totalNumPartitions);
int partitionId = this.GetPartitionId(dataItem.Key, totalNumPartitions);
Partition brokerIdPartition = brokerPartitions.ToList()[partitionId];
Broker brokerInfo = this.brokerPartitionInfo.GetBrokerInfo(brokerIdPartition.BrokerId);
if (this.config.IsZooKeeperEnabled)
{
Logger.DebugFormat(
CultureInfo.CurrentCulture,
"Sending message to broker {0}:{1} on partition {2}",
brokerInfo.Host,
brokerInfo.Port,
brokerIdPartition.PartId);
return new Partition(brokerIdPartition.BrokerId, brokerIdPartition.PartId);
}
Logger.DebugFormat(
CultureInfo.CurrentCulture,
"Sending message to broker {0}:{1} on a randomly chosen partition",
brokerInfo.Host,
brokerInfo.Port);
return new Partition(brokerIdPartition.BrokerId, ProducerRequest.RandomPartition);
}
/// <summary>
/// Ensures that object was not disposed
/// </summary>
private void EnsuresNotDisposed()
{
if (this.disposed)
{
throw new ObjectDisposedException(this.GetType().Name);
}
}
}
}