blob: bd1985f938b6d0382bfb54e8c2d1526b7be0cbe1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
from minifi_test_framework.containers.container import Container
from minifi_test_framework.core.helpers import wait_for_condition
from minifi_test_framework.core.minifi_test_context import MinifiTestContext
class S3ServerContainer(Container):
def __init__(self, test_context: MinifiTestContext):
super().__init__("adobe/s3mock:3.12.0", f"s3-server-{test_context.scenario_id}", test_context.network)
self.environment.append("initialBuckets=test_bucket")
def deploy(self):
super().deploy()
finished_str = "Started S3MockApplication"
return wait_for_condition(
condition=lambda: finished_str in self.get_logs(),
timeout_seconds=15,
bail_condition=lambda: self.exited,
context=None)
def check_kinesis_server_record_data(self, container_name, record_data):
(code, output) = self.exec_run(["node", "/app/consumer/consumer.js", record_data])
return code == 0
def check_s3_server_object_data(self, test_data):
(code, output) = self.exec_run(["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
if code != 0:
return False
s3_mock_dir = output.strip()
(code, file_data) = self.exec_run(["cat", s3_mock_dir + "/binaryData"])
return code == 0 and file_data == test_data
def check_s3_server_object_hash(self, expected_file_hash: str):
(code, output) = self.exec_run(["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
if code != 0:
return False
dir_candidates = output.split("\n")
for candidate in dir_candidates:
if "multiparts" not in candidate:
s3_mock_dir = candidate
break
(code, md5_output) = self.exec_run(["md5sum", s3_mock_dir + "/binaryData"])
if code != 0:
return False
file_hash = md5_output.split(' ')[0].strip()
return file_hash == expected_file_hash
def check_s3_server_object_metadata(self, content_type="application/octet-stream", metadata=dict()):
(code, output) = self.exec_run(["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
if code != 0:
return False
s3_mock_dir = output.strip()
(code, output) = self.exec_run(["cat", s3_mock_dir + "/objectMetadata.json"])
server_metadata = json.loads(output)
return code == 0 and server_metadata["contentType"] == content_type and metadata == server_metadata["userMetadata"]
def is_s3_bucket_empty(self):
(code, output) = self.exec_run(["find", "/s3mockroot/test_bucket", "-mindepth", "1", "-maxdepth", "1", "-type", "d"])
logging.info(f"is_s3_bucket_empty: {output}")
return code == 0 and not output.strip()