blob: 6f8d1d7579f9d02f219824ca74f3ba15e367ee6b [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import random
import string
import humanfriendly
from behave import step
from minifi_test_framework.containers.directory import Directory
from minifi_test_framework.steps import checking_steps # noqa: F401
from minifi_test_framework.steps import configuration_steps # noqa: F401
from minifi_test_framework.steps import core_steps # noqa: F401
from minifi_test_framework.steps import flow_building_steps # noqa: F401
from minifi_test_framework.core.minifi_test_context import MinifiTestContext
from minifi_test_framework.minifi.processor import Processor
from minifi_test_framework.core.helpers import wait_for_condition
from s3_server_container import S3ServerContainer
@step('a {processor_name} processor set up to communicate with an s3 server')
@step('a {processor_name} processor set up to communicate with the same s3 server')
def step_impl(context: MinifiTestContext, processor_name: str):
processor = Processor(processor_name, processor_name)
processor.add_property('Object Key', 'test_object_key')
processor.add_property('Bucket', 'test_bucket')
processor.add_property('Access Key', 'test_access_key')
processor.add_property('Secret Key', 'test_secret')
processor.add_property('Endpoint Override URL', f"http://s3-server-{context.scenario_id}:9090")
processor.add_property('Proxy Host', '')
processor.add_property('Proxy Port', '')
processor.add_property('Proxy Username', '')
processor.add_property('Proxy Password', '')
context.get_or_create_default_minifi_container().flow_definition.add_processor(processor)
@step('a s3 server is set up in correspondence with the {processor_name}')
@step('an s3 server is set up in correspondence with the {processor_name}')
def step_impl(context: MinifiTestContext, processor_name: str):
context.containers["s3-server"] = S3ServerContainer(context)
@step('the object on the s3 server is "{object_data}"')
def step_impl(context: MinifiTestContext, object_data: str):
s3_server_container = context.containers["s3-server"]
assert isinstance(s3_server_container, S3ServerContainer)
assert s3_server_container.check_s3_server_object_data(object_data)
@step('the object content type on the s3 server is "{content_type}" and the object metadata matches use metadata')
def step_impl(context: MinifiTestContext, content_type: str):
s3_server_container = context.containers["s3-server"]
assert isinstance(s3_server_container, S3ServerContainer)
assert s3_server_container.check_s3_server_object_metadata(content_type)
@step("the object bucket on the s3 server is empty in less than 10 seconds")
def step_impl(context):
s3_server_container = context.containers["s3-server"]
assert isinstance(s3_server_container, S3ServerContainer)
assert wait_for_condition(
condition=lambda: s3_server_container.is_s3_bucket_empty(),
timeout_seconds=10, bail_condition=lambda: s3_server_container.exited, context=context)
@step("the object on the s3 server is present and matches the original hash")
def step_impl(context):
s3_server_container = context.containers["s3-server"]
assert isinstance(s3_server_container, S3ServerContainer)
assert s3_server_container.check_s3_server_object_hash(context.original_hash)
def computeMD5hash(my_string):
m = hashlib.md5()
m.update(my_string.encode('utf-8'))
return m.hexdigest()
@step('there is a 6MB file at the "/tmp/input" directory and we keep track of the hash of that')
def step_impl(context):
size = humanfriendly.parse_size("6MB")
content = ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(size))
new_dir = Directory("/tmp/input")
new_dir.files["input.txt"] = content
context.get_or_create_default_minifi_container().dirs.append(new_dir)
context.original_hash = computeMD5hash(content)