blob: 511dcc9597f1f0c40b578f8ddb70c730984c975e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.Consumers
{
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Reflection;
using Kafka.Client.Cfg;
using Kafka.Client.Exceptions;
using Kafka.Client.Messages;
using Kafka.Client.Requests;
using Kafka.Client.Utils;
using log4net;
/// <summary>
/// The low-level API of consumer of Kafka messages
/// </summary>
/// <remarks>
/// Maintains a connection to a single broker and has a close correspondence
/// to the network requests sent to the server.
/// Also, is completely stateless.
/// </remarks>
public class Consumer : IConsumer
{
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly ConsumerConfiguration config;
private readonly string host;
private readonly int port;
/// <summary>
/// Initializes a new instance of the <see cref="Consumer"/> class.
/// </summary>
/// <param name="config">
/// The consumer configuration.
/// </param>
public Consumer(ConsumerConfiguration config)
{
Guard.NotNull(config, "config");
this.config = config;
this.host = config.Broker.Host;
this.port = config.Broker.Port;
}
/// <summary>
/// Initializes a new instance of the <see cref="Consumer"/> class.
/// </summary>
/// <param name="config">
/// The consumer configuration.
/// </param>
/// <param name="host"></param>
/// <param name="port"></param>
public Consumer(ConsumerConfiguration config, string host, int port)
{
Guard.NotNull(config, "config");
this.config = config;
this.host = host;
this.port = port;
}
/// <summary>
/// Fetch a set of messages from a topic.
/// </summary>
/// <param name="request">
/// Specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
/// </param>
/// <returns>
/// A set of fetched messages.
/// </returns>
/// <remarks>
/// Offset is passed in on every request, allowing the user to maintain this metadata
/// however they choose.
/// </remarks>
public BufferedMessageSet Fetch(FetchRequest request)
{
short tryCounter = 1;
while (tryCounter <= this.config.NumberOfTries)
{
try
{
using (var conn = new KafkaConnection(
this.host,
this.port,
this.config.BufferSize,
this.config.SocketTimeout))
{
conn.Write(request);
int size = conn.Reader.ReadInt32();
return BufferedMessageSet.ParseFrom(conn.Reader, size);
}
}
catch (Exception ex)
{
//// if maximum number of tries reached
if (tryCounter == this.config.NumberOfTries)
{
throw;
}
tryCounter++;
Logger.InfoFormat(CultureInfo.CurrentCulture, "Fetch reconnect due to {0}", ex);
}
}
return null;
}
/// <summary>
/// Combine multiple fetch requests in one call.
/// </summary>
/// <param name="request">
/// The list of fetch requests.
/// </param>
/// <returns>
/// A list of sets of fetched messages.
/// </returns>
/// <remarks>
/// Offset is passed in on every request, allowing the user to maintain this metadata
/// however they choose.
/// </remarks>
public IList<BufferedMessageSet> MultiFetch(MultiFetchRequest request)
{
var result = new List<BufferedMessageSet>();
short tryCounter = 1;
while (tryCounter <= this.config.NumberOfTries)
{
try
{
using (var conn = new KafkaConnection(
this.host,
this.port,
this.config.BufferSize,
this.config.SocketTimeout))
{
conn.Write(request);
int size = conn.Reader.ReadInt32();
return BufferedMessageSet.ParseMultiFrom(conn.Reader, size, request.ConsumerRequests.Count);
}
}
catch (Exception ex)
{
// if maximum number of tries reached
if (tryCounter == this.config.NumberOfTries)
{
throw;
}
tryCounter++;
Logger.InfoFormat(CultureInfo.CurrentCulture, "MultiFetch reconnect due to {0}", ex);
}
}
return result;
}
/// <summary>
/// Gets a list of valid offsets (up to maxSize) before the given time.
/// </summary>
/// <param name="request">
/// The offset request.
/// </param>
/// <returns>
/// The list of offsets, in descending order.
/// </returns>
public IList<long> GetOffsetsBefore(OffsetRequest request)
{
var result = new List<long>();
short tryCounter = 1;
while (tryCounter <= this.config.NumberOfTries)
{
try
{
using (var conn = new KafkaConnection(
this.host,
this.port,
this.config.BufferSize,
this.config.SocketTimeout))
{
conn.Write(request);
int size = conn.Reader.ReadInt32();
if (size == 0)
{
return result;
}
short errorCode = conn.Reader.ReadInt16();
if (errorCode != KafkaException.NoError)
{
throw new KafkaException(errorCode);
}
int count = conn.Reader.ReadInt32();
for (int i = 0; i < count; i++)
{
result.Add(conn.Reader.ReadInt64());
}
return result;
}
}
catch (Exception ex)
{
//// if maximum number of tries reached
if (tryCounter == this.config.NumberOfTries)
{
throw;
}
tryCounter++;
Logger.InfoFormat(CultureInfo.CurrentCulture, "GetOffsetsBefore reconnect due to {0}", ex);
}
}
return result;
}
}
}