blob: 2097f98ded7fee201eca4a81936f965c52ab6a5d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import argparse
import asyncio
from collections import namedtuple
from apache_iggy import IggyClient, PollingStrategy, ReceiveMessage
from loguru import logger
STREAM_NAME = "sample-stream"
TOPIC_NAME = "sample-topic"
STREAM_ID = 0
TOPIC_ID = 0
PARTITION_ID = 0
BATCHES_LIMIT = 5
ArgNamespace = namedtuple("ArgNamespace", ["connection_string"])
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"connection_string",
help=(
"Connection string for Iggy client, e.g. 'iggy+tcp://iggy:iggy@127.0.0.1:8090'"
),
default="iggy+tcp://iggy:iggy@127.0.0.1:8090",
nargs="?",
type=str,
)
return parser.parse_args()
async def main():
args: ArgNamespace = parse_args()
client = IggyClient.from_connection_string(args.connection_string)
logger.info("Connecting to Iggy")
await client.connect()
logger.info("Connected")
await consume_messages(client)
async def consume_messages(client: IggyClient):
interval = 0.5 # 500 milliseconds in seconds for asyncio.sleep
logger.info(
f"Messages will be consumed from stream: {STREAM_NAME}, topic: {TOPIC_NAME}, partition: {PARTITION_ID} with "
f"interval {interval * 1000} ms."
)
offset = 0
messages_per_batch = 10
n_consumed_batches = 0
while n_consumed_batches < BATCHES_LIMIT:
try:
logger.debug("Polling for messages...")
polled_messages = await client.poll_messages(
stream=STREAM_NAME,
topic=TOPIC_NAME,
partition_id=PARTITION_ID,
polling_strategy=PollingStrategy.Next(),
count=messages_per_batch,
auto_commit=True,
)
if not polled_messages:
logger.info("No messages found in current poll")
await asyncio.sleep(interval)
continue
offset += len(polled_messages)
for message in polled_messages:
handle_message(message)
n_consumed_batches += 1
await asyncio.sleep(interval)
except Exception as error:
logger.exception("Exception occurred while consuming messages: {}", error)
break
logger.info(f"Consumed {n_consumed_batches} batches of messages, exiting.")
def handle_message(message: ReceiveMessage):
payload = message.payload().decode("utf-8")
logger.info(
f"Handling message at offset: {message.offset()} with payload: {payload}..."
)
if __name__ == "__main__":
asyncio.run(main())