blob: 7417e709f571d82ea1a270d5cd68be1865ff3e5c [file]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import binascii
import time
from behave import step, when, given
from minifi_behave.steps import checking_steps # noqa: F401
from minifi_behave.steps import configuration_steps # noqa: F401
from minifi_behave.steps import core_steps # noqa: F401
from minifi_behave.steps import flow_building_steps # noqa: F401
from minifi_behave.core.minifi_test_context import MinifiTestContext
from minifi_behave.minifi.processor import Processor
from containers.kafka_server_container import KafkaServer
@step("a Kafka server is set up")
def setup_kafka_server(context):
context.containers["kafka-server"] = KafkaServer(context)
@step("ConsumeKafka processor is set up to communicate with that server")
def setup_consume_kafka_processor(context):
consume_kafka = Processor("ConsumeKafka", "ConsumeKafka")
consume_kafka.add_property("Kafka Brokers", f"kafka-server-{context.scenario_id}:9092")
consume_kafka.add_property("Topic Names", "ConsumeKafkaTest")
consume_kafka.add_property("Topic Name Format", "Names")
consume_kafka.add_property("Honor Transactions", "true")
consume_kafka.add_property("Group ID", "docker_test_group")
consume_kafka.add_property("Offset Reset", "earliest")
consume_kafka.add_property("Key Attribute Encoding", "UTF-8")
consume_kafka.add_property("Message Header Encoding", "UTF-8")
consume_kafka.add_property("Max Poll Time", "4 sec")
consume_kafka.add_property("Session Timeout", "6 sec")
context.get_or_create_default_minifi_container().flow_definition.add_processor(consume_kafka)
@step("PublishKafka processor is set up to communicate with that server")
def setup_publish_kafka_processor(context):
publish_kafka = Processor("PublishKafka", "PublishKafka")
publish_kafka.add_property("Known Brokers", f"kafka-server-{context.scenario_id}:9092")
publish_kafka.add_property("Client Name", "minifi-client")
publish_kafka.add_property("Topic Name", "test")
publish_kafka.add_property("Batch Size", "10")
publish_kafka.add_property("Compress Codec", "none")
publish_kafka.add_property("Delivery Guarantee", "1")
publish_kafka.add_property("Request Timeout", "10 sec")
publish_kafka.add_property("Message Timeout", "12 sec")
context.get_or_create_default_minifi_container().flow_definition.add_processor(publish_kafka)
@step("the Kafka server is started")
def start_kafka_server(context: MinifiTestContext):
kafka_server_container = context.containers["kafka-server"]
assert isinstance(kafka_server_container, KafkaServer)
assert kafka_server_container.deploy(context)
@step('the topic "{topic_name}" is initialized on the kafka broker')
def initialize_kafka_topic(context: MinifiTestContext, topic_name: str):
kafka_server_container = context.containers["kafka-server"]
assert isinstance(kafka_server_container, KafkaServer)
assert kafka_server_container.create_topic(topic_name=topic_name) or kafka_server_container.log_app_output()
@when('a message with content "{message}" is published to the "{topic_name}" topic')
def publish_message_to_topic(context: MinifiTestContext, message: str, topic_name: str):
kafka_server_container = context.containers["kafka-server"]
assert isinstance(kafka_server_container, KafkaServer)
assert kafka_server_container.produce_message(topic_name=topic_name, message=message) or kafka_server_container.log_app_output()
@step('a message with content "{message}" is published to the "{topic_name}" topic with key "{key}"')
def publish_message_with_key_to_topic(context: MinifiTestContext, message: str, topic_name: str, key: str):
kafka_server_container = context.containers["kafka-server"]
assert isinstance(kafka_server_container, KafkaServer)
assert kafka_server_container.produce_message_with_key(topic_name=topic_name, message=message, message_key=key) or kafka_server_container.log_app_output()
@given("the \"{property_name}\" property of the {processor_name} processor is set to match {key_attribute_encoding} encoded kafka message key \"{message_key}\"")
def set_property_to_match_message_key(context, property_name, processor_name, key_attribute_encoding, message_key):
if key_attribute_encoding.lower() == "hex":
encoded_key = binascii.hexlify(message_key.encode("utf-8")).upper()
elif key_attribute_encoding.lower() == "(not set)":
encoded_key = message_key.encode("utf-8")
else:
encoded_key = message_key.encode(key_attribute_encoding)
filtering = "${kafka.key:equals('" + encoded_key.decode("utf-8") + "')}"
processor = context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name)
processor.add_property(property_name, filtering)
@when("the publisher performs a {transaction_type} transaction publishing to the \"{topic_name}\" topic these messages: {messages}")
def publish_to_topic_transaction_style(context, transaction_type, topic_name, messages):
if transaction_type == "SINGLE_COMMITTED_TRANSACTION":
python_code = f"""
from confluent_kafka import Producer
producer = Producer({{"bootstrap.servers": "kafka-server-{context.scenario_id}:9092", "transactional.id": "1001"}})
producer.init_transactions()
producer.begin_transaction()
for content in "{messages}".split(", "):
producer.produce("{topic_name}", content.encode("utf-8"))
producer.commit_transaction()
producer.flush(10)
"""
elif transaction_type == "TWO_SEPARATE_TRANSACTIONS":
python_code = f"""
from confluent_kafka import Producer
producer = Producer({{"bootstrap.servers": "kafka-server-{context.scenario_id}:9092", "transactional.id": "1001"}})
producer.init_transactions()
for content in "{messages}".split(", "):
producer.begin_transaction()
producer.produce("{topic_name}", content.encode("utf-8"))
producer.commit_transaction()
producer.flush(10)
"""
elif transaction_type == "NON_COMMITTED_TRANSACTION":
python_code = f"""
from confluent_kafka import Producer
producer = Producer({{"bootstrap.servers": "kafka-server-{context.scenario_id}:9092", "transactional.id": "1001"}})
producer.init_transactions()
producer.begin_transaction()
for content in "{messages}".split(", "):
producer.produce("{topic_name}", content.encode("utf-8"))
producer.flush(10)
"""
elif transaction_type == "CANCELLED_TRANSACTION":
python_code = f"""
from confluent_kafka import Producer
producer = Producer({{"bootstrap.servers": "kafka-server-{context.scenario_id}:9092", "transactional.id": "1001"}})
producer.init_transactions()
producer.begin_transaction()
for content in "{messages}".split(", "):
producer.produce("{topic_name}", content.encode("utf-8"))
producer.flush(10)
producer.abort_transaction()
"""
else:
raise Exception("Unknown transaction type.")
assert context.containers["kafka-server"].run_python_in_kafka_helper_docker(python_code) or context.containers["kafka-server"].log_app_output()
@when("a message with content \"{content}\" is published to the \"{topic_name}\" topic with headers \"{semicolon_separated_headers}\"")
def publish_with_headers_to_topic(context, content, topic_name, semicolon_separated_headers):
python_code = f"""
from confluent_kafka import Producer
headers = []
for header in "{semicolon_separated_headers}".split(";"):
kv = header.split(":")
headers.append((kv[0].strip(), kv[1].strip().encode("utf-8")))
producer = Producer({{"bootstrap.servers": "kafka-server-{context.scenario_id}:9092"}})
producer.produce("{topic_name}", "{content}".encode("utf-8"), headers=headers)
producer.flush(10)
"""
assert context.containers["kafka-server"].run_python_in_kafka_helper_docker(python_code) or context.containers["kafka-server"].log_app_output()
@when("two messages with content \"{content_one}\" and \"{content_two}\" is published to the \"{topic_name}\" topic")
def publish_two_messages_to_topic(context, content_one, content_two, topic_name):
python_code = f"""
from confluent_kafka import Producer
import uuid
producer = Producer({{"bootstrap.servers": "kafka-server-{context.scenario_id}:9092"}})
producer.produce("{topic_name}", "{content_one}")
producer.produce("{topic_name}", "{content_two}")
producer.flush(10)
"""
assert context.containers["kafka-server"].run_python_in_kafka_helper_docker(python_code) or context.containers["kafka-server"].log_app_output()
@when("{number_of_messages} kafka messages are sent to the topic \"{topic_name}\"")
def publish_batch_to_topic(context, number_of_messages, topic_name):
python_code = f"""
from confluent_kafka import Producer
import uuid
producer = Producer({{"bootstrap.servers": "kafka-server-{context.scenario_id}:9092"}})
for i in range(0, int({number_of_messages})):
producer.produce("{topic_name}", str(uuid.uuid4()).encode("utf-8"))
producer.flush(10)
"""
assert context.containers["kafka-server"].run_python_in_kafka_helper_docker(python_code) or context.containers["kafka-server"].log_app_output()
@when("the Kafka consumer is registered in kafka broker")
def wait_for_consumer_registration(context):
assert context.containers["kafka-server"].wait_for_kafka_consumer_to_be_registered(1) or context.containers["kafka-server"].log_app_output()
# After the consumer is registered there is still some time needed for consumer-broker synchronization
# Unfortunately there are no additional log messages that could be checked for this
time.sleep(2)
@when("the Kafka consumer is reregistered in kafka broker")
def wait_for_consumer_reregistration(context):
assert context.containers["kafka-server"].wait_for_kafka_consumer_to_be_registered(2) or context.containers["kafka-server"].log_app_output()
# After the consumer is registered there is still some time needed for consumer-broker synchronization
# Unfortunately there are no additional log messages that could be checked for this
time.sleep(2)