blob: 6daa5032ad2e49d0074263d299f8b9399b2abe16 [file] [log] [blame]
// // Licensed to the Apache Software Foundation (ASF) under one
// // or more contributor license agreements. See the NOTICE file
// // distributed with this work for additional information
// // regarding copyright ownership. The ASF licenses this file
// // to you under the Apache License, Version 2.0 (the
// // "License"); you may not use this file except in compliance
// // with the License. You may obtain a copy of the License at
// //
// // http://www.apache.org/licenses/LICENSE-2.0
// //
// // Unless required by applicable law or agreed to in writing,
// // software distributed under the License is distributed on an
// // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// // KIND, either express or implied. See the License for the
// // specific language governing permissions and limitations
// // under the License.
using Apache.Iggy.Contracts;
using Apache.Iggy.Enums;
using Apache.Iggy.Exceptions;
using Apache.Iggy.Messages;
using Apache.Iggy.Tests.Integrations.Fixtures;
using Apache.Iggy.Tests.Integrations.Helpers;
using Shouldly;
using Partitioning = Apache.Iggy.Kinds.Partitioning;
namespace Apache.Iggy.Tests.Integrations;
public class StreamsTests
{
private const string Name = "StreamTests";
private const uint StreamId = 10000;
private const uint StreamIdSecond = 10001;
[ClassDataSource<StreamsFixture>(Shared = SharedType.PerClass)]
public required StreamsFixture Fixture { get; init; }
[Test]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task CreateStream_HappyPath_Should_CreateStream_Successfully(Protocol protocol)
{
var response = await Fixture.Clients[protocol]
.CreateStreamAsync(Name.GetWithProtocol(protocol), StreamId.GetWithProtocol(protocol));
response.ShouldNotBeNull();
response.Id.ShouldBe(StreamId.GetWithProtocol(protocol));
response.Name.ShouldBe(Name.GetWithProtocol(protocol));
response.Size.ShouldBe(0u);
response.CreatedAt.UtcDateTime.ShouldBe(DateTimeOffset.UtcNow.UtcDateTime, TimeSpan.FromSeconds(20));
response.MessagesCount.ShouldBe(0u);
response.TopicsCount.ShouldBe(0);
response.Topics.ShouldBeEmpty();
}
[Test]
[DependsOn(nameof(CreateStream_HappyPath_Should_CreateStream_Successfully))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task CreateStream_Duplicate_Should_Throw_InvalidResponse(Protocol protocol)
{
await Should.ThrowAsync<InvalidResponseException>(Fixture.Clients[protocol]
.CreateStreamAsync(Name.GetWithProtocol(protocol), StreamId.GetWithProtocol(protocol)));
}
[Test]
[DependsOn(nameof(CreateStream_Duplicate_Should_Throw_InvalidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task GetStreams_Should_ReturnValidResponse(Protocol protocol)
{
await Fixture.Clients[protocol].CreateStreamAsync("test-stream-2".GetWithProtocol(protocol),
StreamIdSecond.GetWithProtocol(protocol));
IReadOnlyList<StreamResponse> response = await Fixture.Clients[protocol].GetStreamsAsync();
response.ShouldNotBeNull();
response.Count.ShouldBeGreaterThanOrEqualTo(2);
response.ShouldContain(x => x.Id == StreamId.GetWithProtocol(protocol));
response.ShouldContain(x => x.Id == StreamIdSecond.GetWithProtocol(protocol));
}
[Test]
[DependsOn(nameof(GetStreams_Should_ReturnValidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task GetStreamById_Should_ReturnValidResponse(Protocol protocol)
{
var response = await Fixture.Clients[protocol]
.GetStreamByIdAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol)));
response.ShouldNotBeNull();
response.Id.ShouldBe(StreamId.GetWithProtocol(protocol));
response.Name.ShouldBe(Name.GetWithProtocol(protocol));
response.Size.ShouldBe(0u);
response.CreatedAt.UtcDateTime.ShouldBe(DateTimeOffset.UtcNow.UtcDateTime, TimeSpan.FromSeconds(20));
response.MessagesCount.ShouldBe(0u);
response.TopicsCount.ShouldBe(0);
response.Topics.ShouldBeEmpty();
}
[Test]
[DependsOn(nameof(GetStreamById_Should_ReturnValidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task GetStreamById_WithTopics_Should_ReturnValidResponse(Protocol protocol)
{
var topicRequest1 = TopicFactory.CreateTopic("Topic1", messageExpiry: 100_000);
var topicRequest2 = TopicFactory.CreateTopic("Topic2", messageExpiry: 100_000);
await Fixture.Clients[protocol].CreateTopicAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol)),
topicRequest1.Name, topicRequest1.PartitionsCount, messageExpiry: topicRequest1.MessageExpiry,
topicId: topicRequest1.TopicId);
await Fixture.Clients[protocol].CreateTopicAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol)),
topicRequest2.Name, topicRequest2.PartitionsCount, messageExpiry: topicRequest2.MessageExpiry,
topicId: topicRequest2.TopicId);
await Fixture.Clients[protocol].SendMessagesAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol)),
Identifier.String(topicRequest1.Name), Partitioning.None(),
[
new Message(Guid.NewGuid(), "Test message 1"u8.ToArray()),
new Message(Guid.NewGuid(), "Test message 2"u8.ToArray()),
new Message(Guid.NewGuid(), "Test message 3"u8.ToArray())
]);
await Fixture.Clients[protocol].SendMessagesAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol)),
Identifier.String(topicRequest2.Name), Partitioning.None(), [
new Message(Guid.NewGuid(), "Test message 4"u8.ToArray()),
new Message(Guid.NewGuid(), "Test message 5"u8.ToArray()),
new Message(Guid.NewGuid(), "Test message 6"u8.ToArray()),
new Message(Guid.NewGuid(), "Test message 7"u8.ToArray())
]);
var response = await Fixture.Clients[protocol]
.GetStreamByIdAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol)));
response.ShouldNotBeNull();
response.Id.ShouldBe(StreamId.GetWithProtocol(protocol));
response.Name.ShouldBe(Name.GetWithProtocol(protocol));
response.Size.ShouldBe(490u);
response.CreatedAt.UtcDateTime.ShouldBe(DateTimeOffset.UtcNow.UtcDateTime, TimeSpan.FromSeconds(20));
response.MessagesCount.ShouldBe(7u);
response.TopicsCount.ShouldBe(2);
response.Topics.Count().ShouldBe(2);
var topic = response.Topics.First(x => x.Name == topicRequest1.Name);
topic.CreatedAt.UtcDateTime.ShouldBe(DateTimeOffset.UtcNow.UtcDateTime, TimeSpan.FromSeconds(10));
topic.Name.ShouldBe(topicRequest1.Name);
topic.CompressionAlgorithm.ShouldBe(topicRequest1.CompressionAlgorithm);
topic.Partitions.ShouldBeNull();
topic.MessageExpiry.ShouldBe(topicRequest1.MessageExpiry);
topic.Size.ShouldBe(210u);
topic.PartitionsCount.ShouldBe(topicRequest1.PartitionsCount);
topic.ReplicationFactor.ShouldBe(topicRequest1.ReplicationFactor);
topic.MaxTopicSize.ShouldBeGreaterThan(0u);
topic.MessagesCount.ShouldBe(3u);
}
[Test]
[DependsOn(nameof(GetStreamById_WithTopics_Should_ReturnValidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task UpdateStream_Should_UpdateStream_Successfully(Protocol protocol)
{
await Fixture.Clients[protocol].UpdateStreamAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol)),
"updated-test-stream".GetWithProtocol(protocol));
var result = await Fixture.Clients[protocol]
.GetStreamByIdAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol)));
result.ShouldNotBeNull();
result.Name.ShouldBe("updated-test-stream".GetWithProtocol(protocol));
}
[Test]
[DependsOn(nameof(UpdateStream_Should_UpdateStream_Successfully))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task PurgeStream_Should_PurgeStream_Successfully(Protocol protocol)
{
// Ensure the stream has messages (created in previous steps) before purging
var stream = await Fixture.Clients[protocol]
.GetStreamByIdAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol)));
stream.ShouldNotBeNull();
stream.MessagesCount.ShouldBe(7u);
stream.TopicsCount.ShouldBe(2);
await Should.NotThrowAsync(() =>
Fixture.Clients[protocol].PurgeStreamAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol))));
stream = await Fixture.Clients[protocol]
.GetStreamByIdAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol)));
stream.ShouldNotBeNull();
stream.MessagesCount.ShouldBe(0u);
stream.TopicsCount.ShouldBe(2);
}
[Test]
[DependsOn(nameof(PurgeStream_Should_PurgeStream_Successfully))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task DeleteStream_Should_DeleteStream_Successfully(Protocol protocol)
{
await Should.NotThrowAsync(() =>
Fixture.Clients[protocol].DeleteStreamAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol))));
}
[Test]
[DependsOn(nameof(DeleteStream_Should_DeleteStream_Successfully))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task DeleteStream_NotExists_Should_Throw_InvalidResponse(Protocol protocol)
{
await Should.ThrowAsync<InvalidResponseException>(() =>
Fixture.Clients[protocol].DeleteStreamAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol))));
}
[Test]
[DependsOn(nameof(DeleteStream_NotExists_Should_Throw_InvalidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task GetStreamById_AfterDelete_Should_Throw_InvalidResponse(Protocol protocol)
{
var stream = await Fixture.Clients[protocol]
.GetStreamByIdAsync(Identifier.Numeric(StreamId.GetWithProtocol(protocol)));
stream.ShouldBeNull();
}
[Test]
[DependsOn(nameof(GetStreamById_AfterDelete_Should_Throw_InvalidResponse))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task CreateStream_HappyPath_OnlyStreamName_Should_CreateStream_Successfully(Protocol protocol)
{
var name = "test-stream-3".GetWithProtocol(protocol);
var response = await Fixture.Clients[protocol].CreateStreamAsync(name);
response.ShouldNotBeNull();
response.Id.ShouldNotBe(0u);
response.Name.ShouldBe(name);
response.Size.ShouldBe(0u);
response.CreatedAt.UtcDateTime.ShouldBe(DateTimeOffset.UtcNow.UtcDateTime, TimeSpan.FromSeconds(20));
response.MessagesCount.ShouldBe(0u);
response.TopicsCount.ShouldBe(0);
response.Topics.ShouldBeEmpty();
}
[Test]
[DependsOn(nameof(CreateStream_HappyPath_OnlyStreamName_Should_CreateStream_Successfully))]
[MethodDataSource<IggyServerFixture>(nameof(IggyServerFixture.ProtocolData))]
public async Task GetStreams_ByStreamName_Should_ReturnValidResponse(Protocol protocol)
{
var name = "test-stream-3".GetWithProtocol(protocol);
var response = await Fixture.Clients[protocol].GetStreamByIdAsync(Identifier.String(name));
response.ShouldNotBeNull();
response.Id.ShouldNotBe(0u);
response.Name.ShouldBe(name);
response.Size.ShouldBe(0u);
response.CreatedAt.UtcDateTime.ShouldBe(DateTimeOffset.UtcNow.UtcDateTime, TimeSpan.FromSeconds(20));
response.MessagesCount.ShouldBe(0u);
response.TopicsCount.ShouldBe(0);
response.Topics.ShouldBeEmpty();
}
}