blob: 21936baf63c31eed55465aef09f169d055dd43cf [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Basic messaging BDD test implementation for Python SDK
"""
import asyncio
import socket
from pytest_bdd import scenarios, given, when, then, parsers
from apache_iggy import IggyClient, SendMessage, PollingStrategy
# Load scenarios from the shared feature file
scenarios('/app/features/basic_messaging.feature')
@given('I have a running Iggy server')
def running_server(context):
"""Ensure we have a running Iggy server and create client"""
async def _connect():
# Resolve hostname to IP if needed
host, port = context.server_addr.split(':')
try:
# Try to resolve hostname to IP
ip_addr = socket.gethostbyname(host)
resolved_addr = f"{ip_addr}:{port}"
except socket.gaierror:
# If resolution fails, use as-is (might be an IP already)
resolved_addr = context.server_addr
context.client = IggyClient(resolved_addr)
await context.client.connect()
await context.client.ping() # Health check
asyncio.run(_connect())
@given('I am authenticated as the root user')
def authenticated_root_user(context):
"""Authenticate as root user"""
async def _login():
await context.client.login_user("iggy", "iggy")
asyncio.run(_login())
@given('I have no streams in the system')
def no_streams_in_system(context):
"""Ensure no streams exist in the system"""
# With --fresh flag on server, this should already be clean
# Just verify by attempting to get a stream that shouldn't exist
pass
@when(parsers.parse('I create a stream with name {stream_name}'))
def create_stream(context, stream_name):
"""Create a stream with specified name"""
async def _create():
await context.client.create_stream(name=stream_name)
stream = await context.client.get_stream(stream_name)
if stream is None:
raise RuntimeError(f"Stream {stream_name} was not found after creation")
context.last_stream_id = stream.id
context.last_stream_name = stream_name
asyncio.run(_create())
@then('the stream should be created successfully')
def stream_created_successfully(context):
"""Verify stream was created successfully"""
async def _verify():
stream = await context.client.get_stream(context.last_stream_name)
assert stream is not None
asyncio.run(_verify())
@then(parsers.parse('the stream should have name {stream_name}'))
def verify_stream_properties(context, stream_name):
"""Verify stream has correct name"""
async def _verify():
stream = await context.client.get_stream(stream_name)
assert stream is not None
assert stream.name == stream_name
asyncio.run(_verify())
@when(parsers.parse('I create a topic with name {topic_name} in stream {stream_id:d} with {partitions:d} partitions'))
def create_topic(context, topic_name, stream_id, partitions):
"""Create a topic with specified parameters"""
async def _create():
await context.client.create_topic(
stream=stream_id,
name=topic_name,
partitions_count=partitions
)
topic = await context.client.get_topic(stream_id, topic_name)
if topic is None:
raise RuntimeError(f"Topic {topic_name} was not found after creation")
context.last_topic_id = topic.id
context.last_topic_name = topic_name
context.last_topic_partitions = partitions
asyncio.run(_create())
@then('the topic should be created successfully')
def topic_created_successfully(context):
"""Verify topic was created successfully"""
async def _verify():
topic = await context.client.get_topic(context.last_stream_id, context.last_topic_name)
assert topic is not None
asyncio.run(_verify())
@then(parsers.parse('the topic should have name {topic_name}'))
def verify_topic_properties(context, topic_name):
"""Verify topic has correct name"""
async def _verify():
topic = await context.client.get_topic(context.last_stream_id, topic_name)
assert topic is not None
assert topic.name == topic_name
asyncio.run(_verify())
@then(parsers.parse('the topic should have {partitions:d} partitions'))
def verify_topic_partitions(context, partitions):
"""Verify topic has correct number of partitions"""
async def _verify():
topic = await context.client.get_topic(context.last_stream_id, context.last_topic_name)
assert topic is not None
assert topic.partitions_count == partitions
asyncio.run(_verify())
@when(parsers.parse('I send {message_count:d} messages to stream {stream_id:d}, topic {topic_id:d}, partition {partition_id:d}'))
def send_messages(context, message_count, stream_id, topic_id, partition_id):
"""Send messages to specified stream, topic, and partition"""
async def _send():
messages = []
for i in range(message_count):
content = f"test message {i}"
messages.append(SendMessage(content))
await context.client.send_messages(
stream=stream_id,
topic=topic_id,
partitioning=partition_id,
messages=messages
)
# Store the last sent message content for comparison
if messages:
context.last_sent_message = f"test message {message_count - 1}"
asyncio.run(_send())
@then('all messages should be sent successfully')
def messages_sent_successfully(context):
"""Verify all messages were sent successfully"""
# If we got here without exception, messages were sent successfully
assert context.last_sent_message is not None
@when(parsers.parse('I poll messages from stream {stream_id:d}, topic {topic_id:d}, partition {partition_id:d} starting from offset {start_offset:d}'))
def poll_messages(context, stream_id, topic_id, partition_id, start_offset):
"""Poll messages from specified location"""
async def _poll():
context.last_polled_messages = await context.client.poll_messages(
stream=stream_id,
topic=topic_id,
partition_id=partition_id,
polling_strategy=PollingStrategy.Offset(value=start_offset),
count=100, # Poll up to 100 messages
auto_commit=True
)
asyncio.run(_poll())
@then(parsers.parse('I should receive {expected_count:d} messages'))
def verify_message_count(context, expected_count):
"""Verify correct number of messages received"""
assert context.last_polled_messages is not None
assert len(context.last_polled_messages) == expected_count
@then(parsers.parse('the messages should have sequential offsets from {start_offset:d} to {end_offset:d}'))
def verify_sequential_offsets(context, start_offset, end_offset):
"""Verify messages have sequential offsets"""
assert context.last_polled_messages is not None
for i, message in enumerate(context.last_polled_messages):
expected_offset = start_offset + i
assert message.offset() == expected_offset
last_message = context.last_polled_messages[-1]
assert last_message.offset() == end_offset
@then('each message should have the expected payload content')
def verify_payload_content(context):
"""Verify each message has expected payload content"""
assert context.last_polled_messages is not None
for i, message in enumerate(context.last_polled_messages):
expected_payload = f"test message {i}"
actual_payload = message.payload().decode('utf-8')
assert actual_payload == expected_payload
@then('the last polled message should match the last sent message')
def verify_last_message_match(context):
"""Verify last polled message matches last sent message"""
assert context.last_sent_message is not None
assert context.last_polled_messages is not None
last_polled = context.last_polled_messages[-1]
last_polled_payload = last_polled.payload().decode('utf-8')
assert last_polled_payload == context.last_sent_message