blob: c38a2dccb0de5abfaae5f72aee1e727358badd2d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import json
import logging
import os
import shlex
import tempfile
import docker
from docker.models.networks import Network
from minifi_test_framework.containers.directory import Directory
from minifi_test_framework.containers.file import File
from minifi_test_framework.containers.host_file import HostFile
class Container:
def __init__(self, image_name: str, container_name: str, network: Network):
self.image_name: str = image_name
self.container_name: str = container_name
self.network: Network = network
self.user: str = "0:0"
self.client = docker.from_env()
self.container = None
self.files: list[File] = []
self.dirs: list[Directory] = []
self.host_files: list[HostFile] = []
self.volumes = {}
self.command = None
self._temp_dir = None
self.ports = None
self.environment: list[str] = []
def deploy(self) -> bool:
self._temp_dir = tempfile.TemporaryDirectory()
for file in self.files:
temp_path = os.path.join(self._temp_dir.name, file.host_filename)
with open(temp_path, "w") as temp_file:
temp_file.write(file.content)
self.volumes[temp_path] = {"bind": os.path.join(file.path, file.host_filename), "mode": file.mode}
for directory in self.dirs:
temp_path = self._temp_dir.name + directory.path
os.makedirs(temp_path, exist_ok=True)
for file_name, content in directory.files.items():
file_path = os.path.join(temp_path, file_name)
with open(file_path, "w") as temp_file:
temp_file.write(content)
self.volumes[temp_path] = {"bind": directory.path, "mode": directory.mode}
for host_file in self.host_files:
self.volumes[host_file.container_path] = {"bind": host_file.host_path, "mode": host_file.mode}
try:
existing_container = self.client.containers.get(self.container_name)
logging.warning(f"Found existing container '{self.container_name}'. Removing it first.")
existing_container.remove(force=True)
except docker.errors.NotFound:
pass
try:
logging.info(f"Creating and starting container '{self.container_name}'...")
self.container = self.client.containers.run(
image=self.image_name, name=self.container_name, ports=self.ports,
environment=self.environment, volumes=self.volumes, network=self.network.name,
command=self.command, user=self.user, detach=True)
except Exception as e:
logging.error(f"Error starting container: {e}")
raise
return True
def clean_up(self):
if self.container:
self.container.remove(force=True)
def exec_run(self, command) -> tuple[int | None, str]:
if self.container:
(code, output) = self.container.exec_run(command)
return code, output.decode("utf-8")
return None, "Container not running."
def not_empty_dir_exists(self, directory_path: str) -> bool:
if not self.container:
return False
dir_exists_exit_code, dir_exists_output = self.exec_run(
"sh -c {}".format(shlex.quote(f"test -d {directory_path}")))
if dir_exists_exit_code != 0:
return False
dir_not_empty_ec, dir_not_empty_output = self.exec_run(
"sh -c {}".format(shlex.quote(f'[ "$(ls -A {directory_path})" ]')))
return dir_not_empty_ec == 0
def directory_contains_file_with_content(self, directory_path: str, expected_content: str) -> bool:
if not self.container or not self.not_empty_dir_exists(directory_path):
return False
quoted_content = shlex.quote(expected_content)
command = "sh -c {}".format(shlex.quote(f"grep -l -F -- {quoted_content} {directory_path}/*"))
exit_code, output = self.exec_run(command)
return exit_code == 0
def directory_contains_file_with_regex(self, directory_path: str, regex_str: str) -> bool:
if not self.container or not self.not_empty_dir_exists(directory_path):
return False
safe_dir_path = shlex.quote(directory_path)
safe_regex_str = shlex.quote(regex_str)
command = (f"find {safe_dir_path} -maxdepth 1 -type f -print0 | "
f"xargs -0 -r grep -l -E -- {safe_regex_str}")
exit_code, output = self.exec_run(f"sh -c \"{command}\"")
if exit_code != 0:
logging.warning(f"directory_contains_file_with_regex {output}")
return exit_code == 0
def path_with_content_exists(self, path: str, content: str) -> bool:
count_command = f"sh -c 'cat {path} | grep \"^{content}$\" | wc -l'"
exit_code, output = self.exec_run(count_command)
if exit_code != 0:
logging.error(f"Error running command '{count_command}': {output}")
return False
try:
file_count = int(output.strip())
except (ValueError, IndexError):
logging.error(f"Error parsing output '{output}' from command '{count_command}'")
return False
return file_count == 1
def directory_has_single_file_with_content(self, directory_path: str, expected_content: str) -> bool:
if not self.container or not self.not_empty_dir_exists(directory_path):
return False
count_command = f"sh -c 'find {directory_path} -maxdepth 1 -type f | wc -l'"
exit_code, output = self.exec_run(count_command)
if exit_code != 0:
logging.error(f"Error running command '{count_command}': {output}")
return False
try:
file_count = int(output.strip())
except (ValueError, IndexError):
logging.error(f"Error parsing output '{output}' from command '{count_command}'")
return False
if file_count != 1:
logging.error(f"{directory_path} has too many or too few ({file_count}) files")
return False
content_command = f"sh -c 'cat {directory_path}/*'"
exit_code, output = self.exec_run(content_command)
if exit_code != 0:
logging.error(f"Error running command '{content_command}': {output}")
return False
actual_content = output.strip()
logging.debug(f"Comparing: '{actual_content}' vs {expected_content}")
return actual_content == expected_content.strip()
def get_logs(self) -> str:
logging.debug("Getting logs from container '%s'", self.container_name)
if not self.container:
return ""
logs_as_bytes = self.container.logs()
return logs_as_bytes.decode('utf-8')
@property
def exited(self) -> bool:
if not self.container:
return False
try:
self.container.reload()
return self.container.status == 'exited'
except docker.errors.NotFound:
self.container = None
return False
except Exception:
return False
def get_number_of_files(self, directory_path: str) -> int:
if not self.container or not self.not_empty_dir_exists(directory_path):
return -1
count_command = f"sh -c 'find {directory_path} -maxdepth 1 -type f | wc -l'"
exit_code, output = self.exec_run(count_command)
if exit_code != 0:
logging.error(f"Error running command '{count_command}': {output}")
return -1
try:
return int(output.strip())
except (ValueError, IndexError):
logging.error(f"Error parsing output '{output}' from command '{count_command}'")
return -1
def verify_file_contents(self, directory_path: str, expected_contents: list[str]) -> bool:
if not self.container or not self.not_empty_dir_exists(directory_path):
return False
safe_dir_path = shlex.quote(directory_path)
list_files_command = f"find {safe_dir_path} -mindepth 1 -maxdepth 1 -type f -print0"
exit_code, output = self.exec_run(f"sh -c \"{list_files_command}\"")
if exit_code != 0:
logging.error(f"Error running command '{list_files_command}': {output}")
return False
actual_filepaths = [path for path in output.split('\0') if path]
if len(actual_filepaths) != len(expected_contents):
logging.debug(f"Expected {len(expected_contents)} files, but found {len(actual_filepaths)}")
return False
actual_file_contents = []
for path in actual_filepaths:
safe_path = shlex.quote(path)
read_command = f"cat {safe_path}"
exit_code, content = self.exec_run(read_command)
if exit_code != 0:
error_message = f"Command to read file '{path}' failed with exit code {exit_code}"
logging.error(error_message)
return False
actual_file_contents.append(content)
return sorted(actual_file_contents) == sorted(expected_contents)
def log_app_output(self) -> bool:
logs = self.get_logs()
logging.info("Logs of container '%s':", self.container_name)
for line in logs.splitlines():
logging.info(line)
return False
def verify_path_with_json_content(self, directory_path: str, expected_str: str) -> bool:
if not self.container or not self.not_empty_dir_exists(directory_path):
return False
count_command = f"sh -c 'find {directory_path} -maxdepth 1 -type f | wc -l'"
exit_code, output = self.exec_run(count_command)
if exit_code != 0:
logging.error(f"Error running command '{count_command}': {output}")
return False
try:
file_count = int(output.strip())
except (ValueError, IndexError):
logging.error(f"Error parsing output '{output}' from command '{count_command}'")
return False
if file_count != 1:
logging.error(f"{directory_path} has too many or too few ({file_count}) files")
return False
content_command = f"sh -c 'cat {directory_path}/*'"
exit_code, output = self.exec_run(content_command)
if exit_code != 0:
logging.error(f"Error running command '{content_command}': {output}")
return False
actual_content = output.strip()
actual_json = json.loads(actual_content)
expected_json = json.loads(expected_str)
return actual_json == expected_json