blob: f2d05e4260ca62d42bd9a7af5747cac2d25ac4b3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.IntegrationTests
{
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Reflection;
using System.Text;
using System.Threading;
using Kafka.Client.Consumers;
using Kafka.Client.Exceptions;
using Kafka.Client.Messages;
using Kafka.Client.Producers.Sync;
using Kafka.Client.Requests;
using NUnit.Framework;
[TestFixture]
public class ConsumerTests : IntegrationFixtureBase
{
[Test]
public void ConsumerConnectorIsCreatedConnectsDisconnectsAndShutsDown()
{
var config = this.ZooKeeperBasedConsumerConfig;
using (new ZookeeperConsumerConnector(config, true))
{
}
}
[Test]
public void SimpleSyncProducerSends2MessagesAndConsumerConnectorGetsThemBack()
{
var prodConfig = this.SyncProducerConfig1;
var consumerConfig = this.ZooKeeperBasedConsumerConfig;
// first producing
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
var msg1 = new Message(payloadData1);
string payload2 = "kafka 2.";
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
var msg2 = new Message(payloadData2);
var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1, msg2 });
using (var producer = new SyncProducer(prodConfig))
{
producer.Send(producerRequest);
}
// now consuming
var resultMessages = new List<Message>();
using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
{
var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
var messages = consumerConnector.CreateMessageStreams(topicCount);
var sets = messages[CurrentTestTopic];
try
{
foreach (var set in sets)
{
foreach (var message in set)
{
resultMessages.Add(message);
}
}
}
catch (ConsumerTimeoutException)
{
// do nothing, this is expected
}
}
Assert.AreEqual(2, resultMessages.Count);
Assert.AreEqual(msg1.ToString(), resultMessages[0].ToString());
Assert.AreEqual(msg2.ToString(), resultMessages[1].ToString());
}
[Test]
public void OneMessageIsSentAndReceivedThenExceptionsWhenNoMessageThenAnotherMessageIsSentAndReceived()
{
var prodConfig = this.SyncProducerConfig1;
var consumerConfig = this.ZooKeeperBasedConsumerConfig;
// first producing
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
var msg1 = new Message(payloadData1);
using (var producer = new SyncProducer(prodConfig))
{
var producerRequest = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
producer.Send(producerRequest);
// now consuming
using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
{
var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
var messages = consumerConnector.CreateMessageStreams(topicCount);
var sets = messages[CurrentTestTopic];
KafkaMessageStream myStream = sets[0];
var enumerator = myStream.GetEnumerator();
Assert.IsTrue(enumerator.MoveNext());
Assert.AreEqual(msg1.ToString(), enumerator.Current.ToString());
Assert.Throws<ConsumerTimeoutException>(() => enumerator.MoveNext());
Assert.Throws<IllegalStateException>(() => enumerator.MoveNext()); // iterator is in failed state
enumerator.Reset();
// producing again
string payload2 = "kafka 2.";
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
var msg2 = new Message(payloadData2);
var producerRequest2 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg2 });
producer.Send(producerRequest2);
Thread.Sleep(3000);
Assert.IsTrue(enumerator.MoveNext());
Assert.AreEqual(msg2.ToString(), enumerator.Current.ToString());
}
}
}
[Test]
public void ConsumerConnectorConsumesTwoDifferentTopics()
{
var prodConfig = this.SyncProducerConfig1;
var consumerConfig = this.ZooKeeperBasedConsumerConfig;
string topic1 = CurrentTestTopic + "1";
string topic2 = CurrentTestTopic + "2";
// first producing
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
var msg1 = new Message(payloadData1);
string payload2 = "kafka 2.";
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
var msg2 = new Message(payloadData2);
using (var producer = new SyncProducer(prodConfig))
{
var producerRequest1 = new ProducerRequest(topic1, 0, new List<Message> { msg1 });
producer.Send(producerRequest1);
var producerRequest2 = new ProducerRequest(topic2, 0, new List<Message> { msg2 });
producer.Send(producerRequest2);
}
// now consuming
var resultMessages1 = new List<Message>();
var resultMessages2 = new List<Message>();
using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
{
var topicCount = new Dictionary<string, int> { { topic1, 1 }, { topic2, 1 } };
var messages = consumerConnector.CreateMessageStreams(topicCount);
Assert.IsTrue(messages.ContainsKey(topic1));
Assert.IsTrue(messages.ContainsKey(topic2));
var sets1 = messages[topic1];
try
{
foreach (var set in sets1)
{
foreach (var message in set)
{
resultMessages1.Add(message);
}
}
}
catch (ConsumerTimeoutException)
{
// do nothing, this is expected
}
var sets2 = messages[topic2];
try
{
foreach (var set in sets2)
{
foreach (var message in set)
{
resultMessages2.Add(message);
}
}
}
catch (ConsumerTimeoutException)
{
// do nothing, this is expected
}
}
Assert.AreEqual(1, resultMessages1.Count);
Assert.AreEqual(msg1.ToString(), resultMessages1[0].ToString());
Assert.AreEqual(1, resultMessages2.Count);
Assert.AreEqual(msg2.ToString(), resultMessages2[0].ToString());
}
[Test]
public void ConsumerConnectorReceivesAShutdownSignal()
{
var consumerConfig = this.ZooKeeperBasedConsumerConfig;
// now consuming
using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
{
var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
var messages = consumerConnector.CreateMessageStreams(topicCount);
// putting the shutdown command into the queue
FieldInfo fi = typeof(ZookeeperConsumerConnector).GetField(
"queues", BindingFlags.NonPublic | BindingFlags.Instance);
var value =
(IDictionary<Tuple<string, string>, BlockingCollection<FetchedDataChunk>>)
fi.GetValue(consumerConnector);
foreach (var topicConsumerQueueMap in value)
{
topicConsumerQueueMap.Value.Add(ZookeeperConsumerConnector.ShutdownCommand);
}
var sets = messages[CurrentTestTopic];
var resultMessages = new List<Message>();
foreach (var set in sets)
{
foreach (var message in set)
{
resultMessages.Add(message);
}
}
Assert.AreEqual(0, resultMessages.Count);
}
}
[Test]
public void ProducersSendMessagesToDifferentPartitionsAndConsumerConnectorGetsThemBack()
{
var prodConfig = this.SyncProducerConfig1;
var consumerConfig = this.ZooKeeperBasedConsumerConfig;
// first producing
string payload1 = "kafka 1.";
byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
var msg1 = new Message(payloadData1);
string payload2 = "kafka 2.";
byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
var msg2 = new Message(payloadData2);
using (var producer = new SyncProducer(prodConfig))
{
var producerRequest1 = new ProducerRequest(CurrentTestTopic, 0, new List<Message> { msg1 });
producer.Send(producerRequest1);
var producerRequest2 = new ProducerRequest(CurrentTestTopic, 1, new List<Message> { msg2 });
producer.Send(producerRequest2);
}
// now consuming
var resultMessages = new List<Message>();
using (IConsumerConnector consumerConnector = new ZookeeperConsumerConnector(consumerConfig, true))
{
var topicCount = new Dictionary<string, int> { { CurrentTestTopic, 1 } };
var messages = consumerConnector.CreateMessageStreams(topicCount);
var sets = messages[CurrentTestTopic];
try
{
foreach (var set in sets)
{
foreach (var message in set)
{
resultMessages.Add(message);
}
}
}
catch (ConsumerTimeoutException)
{
// do nothing, this is expected
}
}
Assert.AreEqual(2, resultMessages.Count);
Assert.AreEqual(msg1.ToString(), resultMessages[0].ToString());
Assert.AreEqual(msg2.ToString(), resultMessages[1].ToString());
}
}
}