blob: 58894f3cf0bec02a87fe999b5b5d8b3dbfa5aac7 [file] [log] [blame]
// // Licensed to the Apache Software Foundation (ASF) under one
// // or more contributor license agreements. See the NOTICE file
// // distributed with this work for additional information
// // regarding copyright ownership. The ASF licenses this file
// // to you under the Apache License, Version 2.0 (the
// // "License"); you may not use this file except in compliance
// // with the License. You may obtain a copy of the License at
// //
// // http://www.apache.org/licenses/LICENSE-2.0
// //
// // Unless required by applicable law or agreed to in writing,
// // software distributed under the License is distributed on an
// // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// // KIND, either express or implied. See the License for the
// // specific language governing permissions and limitations
// // under the License.
using Apache.Iggy.Contracts;
using Apache.Iggy.Enums;
using Apache.Iggy.Exceptions;
using Apache.Iggy.IggyClient;
using Apache.Iggy.Tests.Integrations.Attributes;
using Apache.Iggy.Tests.Integrations.Fixtures;
using Apache.Iggy.Tests.Integrations.Helpers;
using Shouldly;
namespace Apache.Iggy.Tests.Integrations;
public class ConsumerGroupTests
{
private static readonly uint GroupId = 1;
private static readonly string GroupName = "test_consumer_group";
private Identifier TopicId => Identifier.String(Fixture.TopicId);
[ClassDataSource<ConsumerGroupFixture>(Shared = SharedType.PerClass)]
public required ConsumerGroupFixture Fixture { get; init; }
[Test]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task CreateConsumerGroup_HappyPath_Should_CreateConsumerGroup_Successfully(Protocol protocol)
{
var consumerGroup
= await Fixture.Clients[protocol]
.CreateConsumerGroupAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId,
GroupName, GroupId);
consumerGroup.ShouldNotBeNull();
consumerGroup.Id.ShouldBe(GroupId);
consumerGroup.PartitionsCount.ShouldBe(Fixture.PartitionsCount);
consumerGroup.MembersCount.ShouldBe(0u);
consumerGroup.Name.ShouldBe(GroupName);
}
[Test]
[DependsOn(nameof(CreateConsumerGroup_HappyPath_Should_CreateConsumerGroup_Successfully))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task CreateConsumerGroup_Should_Throw_InvalidResponse(Protocol protocol)
{
await Should.ThrowAsync<InvalidResponseException>(() =>
Fixture.Clients[protocol]
.CreateConsumerGroupAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId,
GroupName, GroupId));
}
[Test]
[DependsOn(nameof(CreateConsumerGroup_Should_Throw_InvalidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task GetConsumerGroupById_Should_Return_ValidResponse(Protocol protocol)
{
var response = await Fixture.Clients[protocol]
.GetConsumerGroupByIdAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId,
Identifier.Numeric(GroupId));
response.ShouldNotBeNull();
response.Id.ShouldBe(GroupId);
response.Name.ShouldBe(GroupName);
response.PartitionsCount.ShouldBe(Fixture.PartitionsCount);
response.MembersCount.ShouldBe(0u);
}
[Test]
[DependsOn(nameof(GetConsumerGroupById_Should_Return_ValidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task GetConsumerGroups_Should_Return_ValidResponse(Protocol protocol)
{
IReadOnlyList<ConsumerGroupResponse> response
= await Fixture.Clients[protocol]
.GetConsumerGroupsAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId);
response.ShouldNotBeNull();
response.Count.ShouldBe(1);
var group = response.FirstOrDefault();
group.ShouldNotBeNull();
group.Id.ShouldBe(GroupId);
group.Name.ShouldBe(GroupName);
group.PartitionsCount.ShouldBe(Fixture.PartitionsCount);
group.MembersCount.ShouldBe(0u);
}
[Test]
[SkipHttp]
[DependsOn(nameof(GetConsumerGroups_Should_Return_ValidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task JoinConsumerGroup_Tcp_Should_JoinConsumerGroup_Successfully(Protocol protocol)
{
await Should.NotThrowAsync(() =>
Fixture.Clients[protocol]
.JoinConsumerGroupAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId,
Identifier.Numeric(GroupId)));
}
[Test]
[SkipTcp]
[DependsOn(nameof(GetConsumerGroups_Should_Return_ValidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task JoinConsumerGroup_Http_Should_Throw_FeatureUnavailable(Protocol protocol)
{
await Should.ThrowAsync<FeatureUnavailableException>(() =>
Fixture.Clients[protocol]
.JoinConsumerGroupAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId,
Identifier.Numeric(GroupId)));
}
[Test]
[SkipHttp]
[DependsOn(nameof(JoinConsumerGroup_Tcp_Should_JoinConsumerGroup_Successfully))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task LeaveConsumerGroup_Tcp_Should_LeaveConsumerGroup_Successfully(Protocol protocol)
{
await Should.NotThrowAsync(() =>
Fixture.Clients[protocol]
.LeaveConsumerGroupAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId,
Identifier.Numeric(GroupId)));
}
[Test]
[SkipTcp]
[DependsOn(nameof(JoinConsumerGroup_Http_Should_Throw_FeatureUnavailable))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task LeaveConsumerGroup_Http_Should_Throw_FeatureUnavailable(Protocol protocol)
{
await Should.ThrowAsync<FeatureUnavailableException>(() =>
Fixture.Clients[protocol]
.LeaveConsumerGroupAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId,
Identifier.Numeric(GroupId)));
}
[Test]
[DependsOn(nameof(LeaveConsumerGroup_Http_Should_Throw_FeatureUnavailable))]
[DependsOn(nameof(LeaveConsumerGroup_Tcp_Should_LeaveConsumerGroup_Successfully))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task GetConsumerGroupById_WithMembers_Should_Return_ValidResponse(Protocol protocol)
{
var clientIds = new List<uint>();
var clients = new List<IIggyClient>();
for (var i = 0; i < 2; i++)
{
var client = Fixture.IggyServerFixture.CreateClient(Protocol.Tcp, protocol);
clients.Add(client);
await client.LoginUser("iggy", "iggy");
var me = await client.GetMeAsync();
clientIds.Add(me!.ClientId);
await client.JoinConsumerGroupAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId,
Identifier.Numeric(GroupId));
}
var response = await Fixture.Clients[protocol]
.GetConsumerGroupByIdAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId,
Identifier.Numeric(GroupId));
response.ShouldNotBeNull();
response.Id.ShouldBe(GroupId);
response.PartitionsCount.ShouldBe(Fixture.PartitionsCount);
response.MembersCount.ShouldBe(2u);
response.Members.ShouldNotBeNull();
response.Members.Count.ShouldBe(2);
response.Members.ShouldAllBe(m => clientIds.Contains(m.Id));
response.Members.ShouldAllBe(x => x.PartitionsCount == 5);
response.Members.ShouldAllBe(x => x.Partitions.Count == 5);
}
[Test]
[DependsOn(nameof(GetConsumerGroupById_WithMembers_Should_Return_ValidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task DeleteConsumerGroup_Should_DeleteConsumerGroup_Successfully(Protocol protocol)
{
await Should.NotThrowAsync(() =>
Fixture.Clients[protocol].DeleteConsumerGroupAsync(
Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId, Identifier.Numeric(GroupId)));
}
[Test]
[SkipHttp]
[DependsOn(nameof(DeleteConsumerGroup_Should_DeleteConsumerGroup_Successfully))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task JoinConsumerGroup_Tcp_Should_Throw_InvalidResponse(Protocol protocol)
{
await Should.ThrowAsync<InvalidResponseException>(() =>
Fixture.Clients[protocol]
.JoinConsumerGroupAsync(Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId,
Identifier.Numeric(GroupId)));
}
[Test]
[DependsOn(nameof(JoinConsumerGroup_Tcp_Should_Throw_InvalidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task DeleteConsumerGroup_Should_Throw_InvalidResponse(Protocol protocol)
{
await Should.ThrowAsync<InvalidResponseException>(() =>
Fixture.Clients[protocol].DeleteConsumerGroupAsync(
Identifier.String(Fixture.StreamId.GetWithProtocol(protocol)), TopicId, Identifier.Numeric(GroupId)));
}
}