blob: e8262053073f8db0af97c4c8b1525cba9d124b8a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Comprehensive test suite for the Python SDK.
This module contains all the tests for the Iggy Python SDK, organized by functionality.
Tests are marked as either 'unit' or 'integration' based on their requirements.
"""
import asyncio
import uuid
from datetime import timedelta
import pytest
from apache_iggy import AutoCommit, IggyClient, PollingStrategy, ReceiveMessage
from apache_iggy import SendMessage as Message
from .utils import get_server_config, wait_for_ping, wait_for_server
class TestConnectivity:
"""Test basic connectivity and authentication."""
@pytest.mark.asyncio
async def test_ping(self, iggy_client: IggyClient):
"""Test server ping functionality."""
await iggy_client.ping()
@pytest.mark.asyncio
async def test_client_not_none(self, iggy_client: IggyClient):
"""Test that client fixture is properly initialized."""
assert iggy_client is not None
@pytest.mark.asyncio
async def test_client_from_connection_string(self):
"""Test that client can be created and set up with a connection string."""
host, port = get_server_config()
wait_for_server(host, port)
client = IggyClient.from_connection_string(
f"iggy+tcp://iggy:iggy@{host}:{port}"
)
await client.connect()
await wait_for_ping(client)
class TestStreamOperations:
"""Test stream creation, retrieval, and management."""
@pytest.fixture
def unique_stream_name(self):
"""Generate unique stream name for each test."""
return f"test-stream-{uuid.uuid4().hex[:8]}"
@pytest.mark.asyncio
async def test_create_and_get_stream(
self, iggy_client: IggyClient, unique_stream_name
):
"""Test stream creation and retrieval."""
# Create stream
await iggy_client.create_stream(unique_stream_name)
# Get stream by name
stream = await iggy_client.get_stream(unique_stream_name)
assert stream is not None
assert stream.name == unique_stream_name
assert stream.id >= 0
@pytest.mark.asyncio
async def test_list_streams(self, iggy_client: IggyClient, unique_stream_name):
"""Test listing streams."""
# Create a stream first
await iggy_client.create_stream(unique_stream_name)
# Get the stream we just created
stream = await iggy_client.get_stream(unique_stream_name)
assert stream is not None
assert stream.name == unique_stream_name
assert stream.id > 0
assert stream.topics_count == 0 # New stream has no topics
class TestTopicOperations:
"""Test topic creation, retrieval, and management."""
@pytest.fixture
def unique_names(self):
"""Generate unique stream and topic names."""
unique_id = uuid.uuid4().hex[:8]
return {
"stream": f"test-stream-{unique_id}",
"topic": f"test-topic-{unique_id}",
}
@pytest.mark.asyncio
async def test_create_and_get_topic(self, iggy_client: IggyClient, unique_names):
"""Test topic creation and retrieval."""
stream_name = unique_names["stream"]
topic_name = unique_names["topic"]
# Create stream first
await iggy_client.create_stream(stream_name)
# Create topic
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=2
)
# Get topic by name
topic = await iggy_client.get_topic(stream_name, topic_name)
assert topic is not None
assert topic.name == topic_name
assert topic.id >= 0
assert topic.partitions_count == 2
@pytest.mark.asyncio
async def test_list_topics(self, iggy_client: IggyClient, unique_names):
"""Test listing topics in a stream."""
stream_name = unique_names["stream"]
topic_name = unique_names["topic"]
# Create stream and topic
await iggy_client.create_stream(stream_name)
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)
# Get the topic we just created
topic = await iggy_client.get_topic(stream_name, topic_name)
assert topic is not None
assert topic.name == topic_name
assert topic.id >= 0
assert topic.partitions_count == 1
class TestMessageOperations:
"""Test message sending, polling, and processing."""
@pytest.fixture
def message_setup(self):
"""Setup unique names and test data for messaging tests."""
unique_id = uuid.uuid4().hex[:8]
return {
"stream": f"msg-stream-{unique_id}",
"topic": f"msg-topic-{unique_id}",
"partition_id": 0,
"messages": [f"Test message {i} - {unique_id}" for i in range(1, 4)],
}
@pytest.mark.asyncio
async def test_send_and_poll_messages(self, iggy_client: IggyClient, message_setup):
"""Test basic message sending and polling."""
stream_name = message_setup["stream"]
topic_name = message_setup["topic"]
partition_id = message_setup["partition_id"]
test_messages = message_setup["messages"]
# Setup stream and topic
await iggy_client.create_stream(stream_name)
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)
# Send messages
messages = [Message(msg) for msg in test_messages]
await iggy_client.send_messages(
stream=stream_name,
topic=topic_name,
partitioning=partition_id,
messages=messages,
)
# Poll messages
polled_messages = await iggy_client.poll_messages(
stream=stream_name,
topic=topic_name,
partition_id=partition_id,
polling_strategy=PollingStrategy.First(),
count=10,
auto_commit=True,
)
# Verify we got our messages
assert len(polled_messages) >= len(test_messages)
# Check the first few messages match what we sent
for i, expected_msg in enumerate(test_messages):
if i < len(polled_messages):
actual_payload = polled_messages[i].payload().decode("utf-8")
assert actual_payload == expected_msg
@pytest.mark.asyncio
async def test_send_and_poll_messages_as_bytes(
self, iggy_client: IggyClient, message_setup
):
"""Test basic message sending and polling with message payload as bytes."""
stream_name = message_setup["stream"]
topic_name = message_setup["topic"]
partition_id = message_setup["partition_id"]
test_messages = message_setup["messages"]
# Setup stream and topic
await iggy_client.create_stream(stream_name)
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)
# Send messages
messages = [Message(msg.encode()) for msg in test_messages]
await iggy_client.send_messages(
stream=stream_name,
topic=topic_name,
partitioning=partition_id,
messages=messages,
)
# Poll messages
polled_messages = await iggy_client.poll_messages(
stream=stream_name,
topic=topic_name,
partition_id=partition_id,
polling_strategy=PollingStrategy.First(),
count=10,
auto_commit=True,
)
# Verify we got our messages
assert len(polled_messages) >= len(test_messages)
# Check the first few messages match what we sent
for i, expected_msg in enumerate(test_messages):
if i < len(polled_messages):
actual_payload = polled_messages[i].payload().decode("utf-8")
assert actual_payload == expected_msg
@pytest.mark.asyncio
async def test_message_properties(self, iggy_client: IggyClient, message_setup):
"""Test access to message properties."""
stream_name = message_setup["stream"]
topic_name = message_setup["topic"]
partition_id = message_setup["partition_id"]
# Setup
await iggy_client.create_stream(stream_name)
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)
# Send a test message
test_payload = f"Property test - {uuid.uuid4().hex[:8]}"
message = Message(test_payload)
await iggy_client.send_messages(
stream=stream_name,
topic=topic_name,
partitioning=partition_id,
messages=[message],
)
# Poll and verify properties
polled_messages = await iggy_client.poll_messages(
stream=stream_name,
topic=topic_name,
partition_id=partition_id,
polling_strategy=PollingStrategy.Last(),
count=1,
auto_commit=True,
)
assert len(polled_messages) >= 1
msg = polled_messages[0]
# Test all message properties are accessible and have reasonable values
assert msg.payload().decode("utf-8") == test_payload
assert isinstance(msg.offset(), int) and msg.offset() >= 0
assert isinstance(msg.id(), int) and msg.id() > 0
assert isinstance(msg.timestamp(), int) and msg.timestamp() > 0
assert isinstance(msg.checksum(), int)
assert isinstance(msg.length(), int) and msg.length() > 0
class TestPollingStrategies:
"""Test different polling strategies."""
@pytest.fixture
def polling_setup(self):
"""Setup for polling strategy tests."""
unique_id = uuid.uuid4().hex[:8]
return {
"stream": f"poll-stream-{unique_id}",
"topic": f"poll-topic-{unique_id}",
"partition_id": 0,
"messages": [f"Polling test {i} - {unique_id}" for i in range(5)],
}
@pytest.mark.asyncio
async def test_polling_strategies(self, iggy_client: IggyClient, polling_setup):
"""Test different polling strategies work correctly."""
stream_name = polling_setup["stream"]
topic_name = polling_setup["topic"]
partition_id = polling_setup["partition_id"]
test_messages = polling_setup["messages"]
# Setup
await iggy_client.create_stream(stream_name)
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)
# Send test messages
messages = [Message(msg) for msg in test_messages]
await iggy_client.send_messages(
stream=stream_name,
topic=topic_name,
partitioning=partition_id,
messages=messages,
)
# Test First strategy
first_messages = await iggy_client.poll_messages(
stream=stream_name,
topic=topic_name,
partition_id=partition_id,
polling_strategy=PollingStrategy.First(),
count=1,
auto_commit=False,
)
assert len(first_messages) >= 1
# Test Last strategy
last_messages = await iggy_client.poll_messages(
stream=stream_name,
topic=topic_name,
partition_id=partition_id,
polling_strategy=PollingStrategy.Last(),
count=1,
auto_commit=False,
)
assert len(last_messages) >= 1
# Test Next strategy
next_messages = await iggy_client.poll_messages(
stream=stream_name,
topic=topic_name,
partition_id=partition_id,
polling_strategy=PollingStrategy.Next(),
count=2,
auto_commit=False,
)
assert len(next_messages) >= 1
# Test Offset strategy (if we have messages)
if first_messages:
offset_messages = await iggy_client.poll_messages(
stream=stream_name,
topic=topic_name,
partition_id=partition_id,
polling_strategy=PollingStrategy.Offset(
value=first_messages[0].offset()
),
count=1,
auto_commit=False,
)
assert len(offset_messages) >= 1
class TestErrorHandling:
"""Test error handling and edge cases."""
@pytest.mark.asyncio
async def test_duplicate_stream_creation(self, iggy_client: IggyClient):
"""Test that creating duplicate streams raises appropriate errors."""
stream_name = f"duplicate-test-{uuid.uuid4().hex[:8]}"
# Create stream first time - should succeed
await iggy_client.create_stream(stream_name)
# Create same stream again - should fail
with pytest.raises(RuntimeError) as exc_info:
await iggy_client.create_stream(stream_name)
assert "StreamNameAlreadyExists" in str(exc_info.value)
@pytest.mark.asyncio
async def test_get_nonexistent_stream(self, iggy_client: IggyClient):
"""Test getting a non-existent stream."""
nonexistent_name = f"nonexistent-{uuid.uuid4().hex}"
# get_stream returns None for non-existent streams
stream = await iggy_client.get_stream(nonexistent_name)
assert stream is None
@pytest.mark.asyncio
async def test_create_topic_in_nonexistent_stream(self, iggy_client: IggyClient):
"""Test creating a topic in a non-existent stream."""
nonexistent_stream = f"nonexistent-{uuid.uuid4().hex}"
topic_name = "test-topic"
with pytest.raises(RuntimeError):
await iggy_client.create_topic(
stream=nonexistent_stream, name=topic_name, partitions_count=1
)
class TestConsumerGroup:
"""Test consumer groups."""
@pytest.fixture(scope="function")
def consumer_group_setup(self):
"""Setup for polling strategy tests."""
unique_id = uuid.uuid4().hex[:8]
return {
"consumer": f"consumer-group-consumer-{unique_id}",
"stream": f"consumer-group-stream-{unique_id}",
"topic": f"consumer-group-topic-{unique_id}",
"partition_id": 0,
"messages": [f"Consumer group test {i} - {unique_id}" for i in range(5)],
}
@pytest.mark.asyncio
async def test_meta(self, iggy_client: IggyClient, consumer_group_setup):
"""Test that meta information can be read about the consumer group."""
consumer_name = consumer_group_setup["consumer"]
stream_name = consumer_group_setup["stream"]
topic_name = consumer_group_setup["topic"]
partition_id = consumer_group_setup["partition_id"]
# Setup
await iggy_client.create_stream(stream_name)
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)
consumer = await iggy_client.consumer_group(
consumer_name,
stream_name,
topic_name,
partition_id,
PollingStrategy.Next(),
10,
auto_commit=AutoCommit.Interval(timedelta(seconds=5)),
poll_interval=timedelta(seconds=1),
)
assert consumer.stream() == stream_name
assert consumer.topic() == topic_name
# This internally loads `current_partition_id` which is set to 0 until you start consuming
assert consumer.partition_id() == 0
assert consumer.get_last_consumed_offset(partition_id) is None
assert consumer.get_last_stored_offset(partition_id) is None
@pytest.mark.asyncio
async def test_consume_messages(
self, iggy_client: IggyClient, consumer_group_setup
):
"""Test that the consumer group can consume messages."""
consumer_name = consumer_group_setup["consumer"]
stream_name = consumer_group_setup["stream"]
topic_name = consumer_group_setup["topic"]
partition_id = consumer_group_setup["partition_id"]
test_messages = consumer_group_setup["messages"]
# Setup
received_messages = []
shutdown_event = asyncio.Event()
await iggy_client.create_stream(stream_name)
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)
consumer = await iggy_client.consumer_group(
consumer_name,
stream_name,
topic_name,
partition_id,
PollingStrategy.Next(),
10,
auto_commit=AutoCommit.Interval(timedelta(seconds=5)),
poll_interval=timedelta(seconds=1),
)
async def take(message: ReceiveMessage) -> None:
received_messages.append(message.payload().decode())
if len(received_messages) == 5:
shutdown_event.set()
async def send() -> None:
await iggy_client.send_messages(
stream_name,
topic_name,
partition_id,
[Message(m) for m in test_messages],
)
await asyncio.gather(consumer.consume_messages(take, shutdown_event), send())
assert received_messages == test_messages
@pytest.mark.asyncio
async def test_iter_messages(self, iggy_client: IggyClient, consumer_group_setup):
"""Test that the consumer group can consume messages."""
consumer_name = consumer_group_setup["consumer"]
stream_name = consumer_group_setup["stream"]
topic_name = consumer_group_setup["topic"]
partition_id = consumer_group_setup["partition_id"]
test_messages = consumer_group_setup["messages"]
# Setup
received_messages = []
await iggy_client.create_stream(stream_name)
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)
consumer = await iggy_client.consumer_group(
consumer_name,
stream_name,
topic_name,
partition_id,
PollingStrategy.Next(),
10,
auto_commit=AutoCommit.Interval(timedelta(seconds=5)),
poll_interval=timedelta(seconds=1),
)
await iggy_client.send_messages(
stream_name,
topic_name,
partition_id,
[Message(m) for m in test_messages],
)
async for message in consumer.iter_messages():
received_messages.append(message.payload().decode())
if len(received_messages) == 5:
break
assert received_messages == test_messages
@pytest.mark.asyncio
async def test_shutdown(self, iggy_client: IggyClient, consumer_group_setup):
"""Test that the consumer group can be signaled to shutdown."""
consumer_name = consumer_group_setup["consumer"]
stream_name = consumer_group_setup["stream"]
topic_name = consumer_group_setup["topic"]
partition_id = consumer_group_setup["partition_id"]
# Setup
shutdown_event = asyncio.Event()
await iggy_client.create_stream(stream_name)
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)
consumer = await iggy_client.consumer_group(
consumer_name,
stream_name,
topic_name,
partition_id,
PollingStrategy.Next(),
10,
auto_commit=AutoCommit.Interval(timedelta(seconds=5)),
poll_interval=timedelta(seconds=1),
)
async def take(_) -> None:
pass
shutdown_event.set()
await consumer.consume_messages(take, shutdown_event)