blob: cc1e0c01cac2d78476db98f7c30263c77dd50dcd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.Consumers
{
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Globalization;
using System.Reflection;
using Kafka.Client.Exceptions;
using Kafka.Client.Messages;
using log4net;
/// <summary>
/// An iterator that blocks until a value can be read from the supplied queue.
/// </summary>
/// <remarks>
/// The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
/// </remarks>
internal class ConsumerIterator : IEnumerator<Message>
{
private static readonly ILog Logger = LogManager.GetLogger(MethodBase.GetCurrentMethod().DeclaringType);
private readonly BlockingCollection<FetchedDataChunk> channel;
private readonly int consumerTimeoutMs;
private PartitionTopicInfo currentTopicInfo;
private ConsumerIteratorState state = ConsumerIteratorState.NotReady;
private IEnumerator<MessageAndOffset> current;
private FetchedDataChunk currentDataChunk = null;
private Message nextItem;
private long consumedOffset = -1;
/// <summary>
/// Initializes a new instance of the <see cref="ConsumerIterator"/> class.
/// </summary>
/// <param name="channel">
/// The queue containing
/// </param>
/// <param name="consumerTimeoutMs">
/// The consumer timeout in ms.
/// </param>
public ConsumerIterator(BlockingCollection<FetchedDataChunk> channel, int consumerTimeoutMs)
{
this.channel = channel;
this.consumerTimeoutMs = consumerTimeoutMs;
}
/// <summary>
/// Gets the element in the collection at the current position of the enumerator.
/// </summary>
/// <returns>
/// The element in the collection at the current position of the enumerator.
/// </returns>
public Message Current
{
get
{
if (!MoveNext())
{
throw new NoSuchElementException();
}
state = ConsumerIteratorState.NotReady;
if (nextItem != null)
{
if (consumedOffset < 0)
{
throw new IllegalStateException(String.Format(CultureInfo.CurrentCulture, "Offset returned by the message set is invalid {0}.", consumedOffset));
}
currentTopicInfo.ResetConsumeOffset(consumedOffset);
if (Logger.IsDebugEnabled)
{
Logger.DebugFormat(CultureInfo.CurrentCulture, "Setting consumed offset to {0}", consumedOffset);
}
return nextItem;
}
throw new IllegalStateException("Expected item but none found.");
}
}
/// <summary>
/// Gets the current element in the collection.
/// </summary>
/// <returns>
/// The current element in the collection.
/// </returns>
object IEnumerator.Current
{
get { return this.Current; }
}
/// <summary>
/// Advances the enumerator to the next element of the collection.
/// </summary>
/// <returns>
/// true if the enumerator was successfully advanced to the next element; false if the enumerator has passed the end of the collection.
/// </returns>
public bool MoveNext()
{
if (state == ConsumerIteratorState.Failed)
{
throw new IllegalStateException("Iterator is in failed state");
}
switch (state)
{
case ConsumerIteratorState.Done:
return false;
case ConsumerIteratorState.Ready:
return true;
default:
return MaybeComputeNext();
}
}
/// <summary>
/// Resets the enumerator's state to NotReady.
/// </summary>
public void Reset()
{
state = ConsumerIteratorState.NotReady;
}
public void Dispose()
{
}
private bool MaybeComputeNext()
{
state = ConsumerIteratorState.Failed;
nextItem = this.MakeNext();
if (state == ConsumerIteratorState.Done)
{
return false;
}
state = ConsumerIteratorState.Ready;
return true;
}
private Message MakeNext()
{
if (current == null || !current.MoveNext())
{
if (consumerTimeoutMs < 0)
{
currentDataChunk = this.channel.Take();
}
else
{
bool done = channel.TryTake(out currentDataChunk, consumerTimeoutMs);
if (!done)
{
Logger.Debug("Consumer iterator timing out...");
throw new ConsumerTimeoutException();
}
}
if (currentDataChunk.Equals(ZookeeperConsumerConnector.ShutdownCommand))
{
Logger.Debug("Received the shutdown command");
channel.Add(currentDataChunk);
return this.AllDone();
}
currentTopicInfo = currentDataChunk.TopicInfo;
if (currentTopicInfo.GetConsumeOffset() != currentDataChunk.FetchOffset)
{
Logger.ErrorFormat(
CultureInfo.CurrentCulture,
"consumed offset: {0} doesn't match fetch offset: {1} for {2}; consumer may lose data",
currentTopicInfo.GetConsumeOffset(),
currentDataChunk.FetchOffset,
currentTopicInfo);
currentTopicInfo.ResetConsumeOffset(currentDataChunk.FetchOffset);
}
current = currentDataChunk.Messages.GetEnumerator();
current.MoveNext();
}
var item = current.Current;
consumedOffset = item.Offset;
return item.Message;
}
private Message AllDone()
{
this.state = ConsumerIteratorState.Done;
return null;
}
}
}