blob: 580ea14c1dce4b26d42955ce8e3735461373977b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
namespace Kafka.Client.IntegrationTests
{
using System.Collections.Generic;
using System.Linq;
using Kafka.Client.Cluster;
using Kafka.Client.Producers.Partitioning;
using Kafka.Client.Utils;
using Kafka.Client.ZooKeeperIntegration;
using Kafka.Client.ZooKeeperIntegration.Listeners;
using NUnit.Framework;
using ZooKeeperNet;
[TestFixture]
public class ZkBrokerPartitionInfoTests : IntegrationFixtureBase
{
[Test]
public void ZkBrokerPartitionInfoGetsAllBrokerInfo()
{
var prodConfig = this.ZooKeeperBasedSyncProdConfig;
var prodConfigNotZk = this.ConfigBasedSyncProdConfig;
IDictionary<int, Broker> allBrokerInfo;
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(prodConfig, null))
{
allBrokerInfo = brokerPartitionInfo.GetAllBrokerInfo();
}
Assert.AreEqual(prodConfigNotZk.Brokers.Count, allBrokerInfo.Count);
allBrokerInfo.Values.All(x => prodConfigNotZk.Brokers.Any(
y => x.Id == y.BrokerId
&& x.Host == y.Host
&& x.Port == y.Port));
}
[Test]
public void ZkBrokerPartitionInfoGetsBrokerPartitionInfo()
{
var prodconfig = this.ZooKeeperBasedSyncProdConfig;
SortedSet<Partition> partitions;
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(prodconfig, null))
{
partitions = brokerPartitionInfo.GetBrokerPartitionInfo("test");
}
Assert.NotNull(partitions);
Assert.GreaterOrEqual(partitions.Count, 2);
}
[Test]
public void ZkBrokerPartitionInfoGetsBrokerInfo()
{
var prodConfig = this.ZooKeeperBasedSyncProdConfig;
var prodConfigNotZk = this.ConfigBasedSyncProdConfig;
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(prodConfig, null))
{
var testBroker = prodConfigNotZk.Brokers[0];
Broker broker = brokerPartitionInfo.GetBrokerInfo(testBroker.BrokerId);
Assert.NotNull(broker);
Assert.AreEqual(testBroker.Host, broker.Host);
Assert.AreEqual(testBroker.Port, broker.Port);
}
}
[Test]
public void WhenNewTopicIsAddedBrokerTopicsListenerCreatesNewMapping()
{
var prodConfig = this.ZooKeeperBasedSyncProdConfig;
IDictionary<string, SortedSet<Partition>> mappings;
IDictionary<int, Broker> brokers;
string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
{
brokers = brokerPartitionInfo.GetAllBrokerInfo();
mappings =
ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
"topicBrokerPartitions", brokerPartitionInfo);
}
}
Assert.NotNull(brokers);
Assert.Greater(brokers.Count, 0);
Assert.NotNull(mappings);
Assert.Greater(mappings.Count, 0);
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
client.Connect();
WaitUntillIdle(client, 500);
var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
client.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath, brokerTopicsListener);
client.CreatePersistent(topicPath, true);
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
WaitUntillIdle(client, 500);
client.DeleteRecursive(topicPath);
}
Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
}
[Test]
public void WhenNewBrokerIsAddedBrokerTopicsListenerUpdatesBrokersList()
{
var prodConfig = this.ZooKeeperBasedSyncProdConfig;
IDictionary<string, SortedSet<Partition>> mappings;
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
{
brokers = brokerPartitionInfo.GetAllBrokerInfo();
mappings =
ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
"topicBrokerPartitions", brokerPartitionInfo);
}
}
Assert.NotNull(brokers);
Assert.Greater(brokers.Count, 0);
Assert.NotNull(mappings);
Assert.Greater(mappings.Count, 0);
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
client.Connect();
WaitUntillIdle(client, 500);
var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
client.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, brokerTopicsListener);
WaitUntillIdle(client, 500);
client.CreatePersistent(brokerPath, true);
client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
WaitUntillIdle(client, 500);
client.DeleteRecursive(brokerPath);
}
Assert.IsTrue(brokers.ContainsKey(2345));
Assert.AreEqual("192.168.1.39", brokers[2345].Host);
Assert.AreEqual(9102, brokers[2345].Port);
Assert.AreEqual(2345, brokers[2345].Id);
}
[Test]
public void WhenBrokerIsRemovedBrokerTopicsListenerUpdatesBrokersList()
{
var prodConfig = this.ZooKeeperBasedSyncProdConfig;
IDictionary<string, SortedSet<Partition>> mappings;
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
{
brokers = brokerPartitionInfo.GetAllBrokerInfo();
mappings =
ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
"topicBrokerPartitions", brokerPartitionInfo);
}
}
Assert.NotNull(brokers);
Assert.Greater(brokers.Count, 0);
Assert.NotNull(mappings);
Assert.Greater(mappings.Count, 0);
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
client.Connect();
WaitUntillIdle(client, 500);
var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
client.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, brokerTopicsListener);
client.CreatePersistent(brokerPath, true);
client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
WaitUntillIdle(client, 500);
Assert.IsTrue(brokers.ContainsKey(2345));
client.DeleteRecursive(brokerPath);
WaitUntillIdle(client, 500);
Assert.IsFalse(brokers.ContainsKey(2345));
}
}
[Test]
public void WhenNewBrokerInTopicIsAddedBrokerTopicsListenerUpdatesMappings()
{
var prodConfig = this.ZooKeeperBasedSyncProdConfig;
IDictionary<string, SortedSet<Partition>> mappings;
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
string topicBrokerPath = topicPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
{
brokers = brokerPartitionInfo.GetAllBrokerInfo();
mappings =
ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
"topicBrokerPartitions", brokerPartitionInfo);
}
}
Assert.NotNull(brokers);
Assert.Greater(brokers.Count, 0);
Assert.NotNull(mappings);
Assert.Greater(mappings.Count, 0);
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
client.Connect();
WaitUntillIdle(client, 500);
var brokerTopicsListener = new BrokerTopicsListener(client, mappings, brokers, null);
client.Subscribe(ZooKeeperClient.DefaultBrokerIdsPath, brokerTopicsListener);
client.Subscribe(ZooKeeperClient.DefaultBrokerTopicsPath, brokerTopicsListener);
client.CreatePersistent(brokerPath, true);
client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
client.CreatePersistent(topicPath, true);
WaitUntillIdle(client, 500);
Assert.IsTrue(brokers.ContainsKey(2345));
Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
client.CreatePersistent(topicBrokerPath, true);
client.WriteData(topicBrokerPath, 5);
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
WaitUntillIdle(client, 500);
client.DeleteRecursive(brokerPath);
client.DeleteRecursive(topicPath);
}
Assert.IsTrue(brokers.ContainsKey(2345));
Assert.IsTrue(mappings.Keys.Contains(CurrentTestTopic));
Assert.AreEqual(5, mappings[CurrentTestTopic].Count);
}
[Test]
public void WhenSessionIsExpiredListenerRecreatesEphemeralNodes()
{
var prodConfig = this.ZooKeeperBasedSyncProdConfig;
IDictionary<string, SortedSet<Partition>> mappings;
IDictionary<int, Broker> brokers;
IDictionary<string, SortedSet<Partition>> mappings2;
IDictionary<int, Broker> brokers2;
using (
IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
{
brokers = brokerPartitionInfo.GetAllBrokerInfo();
mappings =
ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
"topicBrokerPartitions", brokerPartitionInfo);
Assert.NotNull(brokers);
Assert.Greater(brokers.Count, 0);
Assert.NotNull(mappings);
Assert.Greater(mappings.Count, 0);
client.Process(new WatchedEvent(KeeperState.Expired, EventType.None, null));
WaitUntillIdle(client, 3000);
brokers2 = brokerPartitionInfo.GetAllBrokerInfo();
mappings2 =
ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
"topicBrokerPartitions", brokerPartitionInfo);
}
}
Assert.NotNull(brokers2);
Assert.Greater(brokers2.Count, 0);
Assert.NotNull(mappings2);
Assert.Greater(mappings2.Count, 0);
Assert.AreEqual(brokers.Count, brokers2.Count);
Assert.AreEqual(mappings.Count, mappings2.Count);
}
[Test]
public void WhenNewTopicIsAddedZkBrokerPartitionInfoUpdatesMappings()
{
var prodConfig = this.ZooKeeperBasedSyncProdConfig;
IDictionary<string, SortedSet<Partition>> mappings;
string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
{
mappings =
ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
"topicBrokerPartitions", brokerPartitionInfo);
client.CreatePersistent(topicPath, true);
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
WaitUntillIdle(client, 500);
client.DeleteRecursive(topicPath);
}
}
Assert.NotNull(mappings);
Assert.Greater(mappings.Count, 0);
Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
}
[Test]
public void WhenNewBrokerIsAddedZkBrokerPartitionInfoUpdatesBrokersList()
{
var prodConfig = this.ZooKeeperBasedSyncProdConfig;
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
{
brokers = brokerPartitionInfo.GetAllBrokerInfo();
client.CreatePersistent(brokerPath, true);
client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
WaitUntillIdle(client, 500);
client.DeleteRecursive(brokerPath);
}
}
Assert.NotNull(brokers);
Assert.Greater(brokers.Count, 0);
Assert.IsTrue(brokers.ContainsKey(2345));
Assert.AreEqual("192.168.1.39", brokers[2345].Host);
Assert.AreEqual(9102, brokers[2345].Port);
Assert.AreEqual(2345, brokers[2345].Id);
}
[Test]
public void WhenBrokerIsRemovedZkBrokerPartitionInfoUpdatesBrokersList()
{
var prodConfig = this.ZooKeeperBasedSyncProdConfig;
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
{
WaitUntillIdle(client, 500);
brokers = brokerPartitionInfo.GetAllBrokerInfo();
client.CreatePersistent(brokerPath, true);
client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
WaitUntillIdle(client, 500);
Assert.NotNull(brokers);
Assert.Greater(brokers.Count, 0);
Assert.IsTrue(brokers.ContainsKey(2345));
client.DeleteRecursive(brokerPath);
WaitUntillIdle(client, 500);
}
}
Assert.NotNull(brokers);
Assert.Greater(brokers.Count, 0);
Assert.IsFalse(brokers.ContainsKey(2345));
}
[Test]
public void WhenNewBrokerInTopicIsAddedZkBrokerPartitionInfoUpdatesMappings()
{
var prodConfig = this.ZooKeeperBasedSyncProdConfig;
IDictionary<string, SortedSet<Partition>> mappings;
IDictionary<int, Broker> brokers;
string brokerPath = ZooKeeperClient.DefaultBrokerIdsPath + "/" + 2345;
string topicPath = ZooKeeperClient.DefaultBrokerTopicsPath + "/" + CurrentTestTopic;
string topicBrokerPath = topicPath + "/" + 2345;
using (IZooKeeperClient client = new ZooKeeperClient(
prodConfig.ZooKeeper.ZkConnect,
prodConfig.ZooKeeper.ZkSessionTimeoutMs,
ZooKeeperStringSerializer.Serializer))
{
using (var brokerPartitionInfo = new ZKBrokerPartitionInfo(client))
{
brokers = brokerPartitionInfo.GetAllBrokerInfo();
mappings =
ReflectionHelper.GetInstanceField<IDictionary<string, SortedSet<Partition>>>(
"topicBrokerPartitions", brokerPartitionInfo);
client.CreatePersistent(brokerPath, true);
client.WriteData(brokerPath, "192.168.1.39-1310449279123:192.168.1.39:9102");
client.CreatePersistent(topicPath, true);
WaitUntillIdle(client, 500);
Assert.IsTrue(brokers.ContainsKey(2345));
Assert.IsTrue(mappings.ContainsKey(CurrentTestTopic));
client.CreatePersistent(topicBrokerPath, true);
client.WriteData(topicBrokerPath, 5);
WaitUntillIdle(client, 500);
client.UnsubscribeAll();
WaitUntillIdle(client, 500);
client.DeleteRecursive(brokerPath);
client.DeleteRecursive(topicPath);
}
}
Assert.NotNull(brokers);
Assert.Greater(brokers.Count, 0);
Assert.NotNull(mappings);
Assert.Greater(mappings.Count, 0);
Assert.IsTrue(brokers.ContainsKey(2345));
Assert.IsTrue(mappings.Keys.Contains(CurrentTestTopic));
Assert.AreEqual(5, mappings[CurrentTestTopic].Count);
}
}
}