blob: b03d7b8bb85876e7d2e4d54e4fef772ca28e2ee7 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import atexit
import multiprocessing
import os
import signal
import sys
import time
from dataclasses import dataclass
from multiprocessing.pool import Pool
from pathlib import Path
import yaml
from airflow_breeze.global_constants import DEFAULT_PYTHON_MAJOR_MINOR_VERSION
from airflow_breeze.utils.console import Output, get_console
from airflow_breeze.utils.github import download_constraints_file, download_file_from_github
from airflow_breeze.utils.path_utils import AIRFLOW_SOURCES_ROOT, FILES_DIR
from airflow_breeze.utils.run_utils import run_command
from airflow_breeze.utils.shared_options import get_dry_run
def start_cdxgen_server(application_root_path: Path, run_in_parallel: bool, parallelism: int) -> None:
"""
Start cdxgen server that is used to perform cdxgen scans of applications in child process
:param run_in_parallel: run parallel servers
:param parallelism: parallelism to use
:param application_root_path: path where the application to scan is located
"""
run_command(
[
"docker",
"pull",
"ghcr.io/cyclonedx/cdxgen",
],
check=True,
)
if not run_in_parallel:
fork_cdxgen_server(application_root_path)
else:
for i in range(parallelism):
fork_cdxgen_server(application_root_path, port=9091 + i)
time.sleep(1)
get_console().print("[info]Waiting for cdxgen server to start")
time.sleep(3)
def fork_cdxgen_server(application_root_path, port=9090):
pid = os.fork()
if pid:
# Parent process - send signal to process group of the child process
atexit.register(os.killpg, pid, signal.SIGTERM)
# Give the server child process some time to start
else:
# Check if we are not a group leader already (We should not be)
if os.getpid() != os.getsid(0):
# and create a new process group where we are the leader
os.setpgid(0, 0)
run_command(
[
"docker",
"run",
"--init",
"--rm",
"-p",
f"{port}:{port}",
"-v",
"/tmp:/tmp",
"-v",
f"{application_root_path}:/app",
"-t",
"ghcr.io/cyclonedx/cdxgen",
"--server",
"--server-host",
"0.0.0.0",
"--server-port",
str(port),
],
check=True,
)
# we should get here when the server gets terminated
sys.exit(0)
def get_port_mapping(x):
# if we do not sleep here, then we could skip mapping for some process if it is handle
time.sleep(1)
return multiprocessing.current_process().name, 9091 + x
def get_cdxgen_port_mapping(parallelism: int, pool: Pool) -> dict[str, int]:
"""
Map processes from pool to port numbers so that there is always the same port
used by the same process in the pool - effectively having one multiprocessing
process talking to the same cdxgen server
:param parallelism: parallelism to use
:param pool: pool to map ports for
:return: mapping of process name to port
"""
port_map: dict[str, int] = dict(pool.map(get_port_mapping, range(parallelism)))
return port_map
def get_provider_requirement_image_name(airflow_version: str, python_version: str) -> str:
return f"apache/airflow-dev/base_requirements/{airflow_version}/python{python_version}"
def build_providers_base_image(airflow_version: str, python_version: str):
image_name = get_provider_requirement_image_name(
airflow_version=airflow_version, python_version=python_version
)
dockerfile = f"""
FROM ghcr.io/apache/airflow/main/ci/python{python_version}
RUN pip install --upgrade pip
# Remove all packages
RUN python -m venv /opt/airflow/providers
RUN /opt/airflow/providers/bin/pip install --upgrade pip
RUN /opt/airflow/providers/bin/pip install apache-airflow=={airflow_version} \
--constraint https://raw.githubusercontent.com/apache/airflow/\
constraints-{airflow_version}/constraints-{python_version}.txt
"""
run_command(["docker", "build", "--tag", image_name, "-"], input=dockerfile, text=True, check=True)
TARGET_DIR_NAME = "provider_requirements"
DOCKER_FILE_PREFIX = f"/files/{TARGET_DIR_NAME}/"
def get_requirements_for_provider(
provider_id: str,
airflow_version: str,
provider_version: str | None = None,
python_version: str = DEFAULT_PYTHON_MAJOR_MINOR_VERSION,
):
provider_path_array = provider_id.split(".")
if not provider_version:
provider_file = (AIRFLOW_SOURCES_ROOT / "airflow" / "providers").joinpath(
*provider_path_array
) / "provider.yaml"
provider_version = yaml.safe_load(provider_file.read_text())["versions"][0]
airflow_file_name = f"provider-{provider_id}-{provider_version}-base-requirements.txt"
provider_with_airflow_file_name = f"provider-{provider_id}-{provider_version}-airflow-requirements.txt"
provider_file_name = f"provider-{provider_id}-{provider_version}-requirements.txt"
command = f"""
mkdir -pv {DOCKER_FILE_PREFIX}
/opt/airflow/providers/bin/pip freeze | sort > {DOCKER_FILE_PREFIX}{airflow_file_name}
/opt/airflow/providers/bin/pip install apache-airflow=={airflow_version} \
apache-airflow-providers-{provider_id}=={provider_version}
/opt/airflow/providers/bin/pip freeze | sort > {DOCKER_FILE_PREFIX}{provider_with_airflow_file_name}
chown --recursive {os.getuid()}:{os.getgid()} {DOCKER_FILE_PREFIX}
"""
run_command(
[
"docker",
"run",
"--rm",
"-e",
f"HOST_USER_ID={os.getuid()}",
"-e",
f"HOST_GROUP_ID={os.getgid()}",
"-v",
f"{AIRFLOW_SOURCES_ROOT}/files:/files",
get_provider_requirement_image_name(
airflow_version=airflow_version, python_version=python_version
),
"-c",
";".join(command.split("\n")[1:-1]),
]
)
target_dir = FILES_DIR / TARGET_DIR_NAME
airflow_file = target_dir / airflow_file_name
provider_with_airflow_file = target_dir / provider_with_airflow_file_name
get_console().print(f"[info]Airflow requirements in {airflow_file}")
get_console().print(f"[info]Provider requirements in {provider_with_airflow_file}")
base_packages = set([package.split("==")[0] for package in airflow_file.read_text().split("\n")])
base_packages.add("apache-airflow-providers-" + provider_id.replace(".", "-"))
provider_packages = sorted(
[
line
for line in provider_with_airflow_file.read_text().split("\n")
if line.split("==")[0] not in base_packages
]
)
get_console().print(
f"[info]Provider {provider_id} has {len(provider_packages)} transitively "
f"dependent packages (excluding airflow and it's dependencies)"
)
get_console().print(provider_packages)
provider_file = target_dir / provider_file_name
provider_file.write_text("\n".join(provider_packages) + "\n")
get_console().print(
f"[success]Generated {provider_id}:{provider_version} requirements in {provider_file}"
)
@dataclass
class SbomApplicationJob:
airflow_version: str
python_version: str
application_root_path: Path
include_provider_dependencies: bool
target_path: Path
def produce_sbom_for_application_via_cdxgen_server(
job: SbomApplicationJob, output: Output | None, port_map: dict[str, int] | None = None
) -> tuple[int, str]:
"""
Produces SBOM for application using cdxgen server.
:param job: Job to run
:param output: Output to use
:param port_map map of process name to port - making sure that one process talks to one server
in case parallel processing is used
:return: tuple with exit code and output
"""
import requests
if port_map is None:
port = 9090
else:
port = port_map[multiprocessing.current_process().name]
get_console(output=output).print(f"[info]Using port {port}")
get_console(output=output).print(
f"[info]Updating sbom for Airflow {job.airflow_version} and python {job.python_version}"
)
source_dir = job.application_root_path / job.airflow_version / job.python_version
source_dir.mkdir(parents=True, exist_ok=True)
lock_file_relative_path = "airflow/www/yarn.lock"
download_file_from_github(
tag=job.airflow_version, path=lock_file_relative_path, output_file=source_dir / "yarn.lock"
)
if not download_constraints_file(
airflow_version=job.airflow_version,
python_version=job.python_version,
include_provider_dependencies=job.include_provider_dependencies,
output_file=source_dir / "requirements.txt",
):
get_console(output=output).print(
f"[warning]Failed to download constraints file for "
f"{job.airflow_version} and {job.python_version}. Skipping"
)
return 0, f"SBOM Generate {job.airflow_version}:{job.python_version}"
get_console(output=output).print(
f"[info]Generating sbom for Airflow {job.airflow_version} and python {job.python_version} with cdxgen"
)
url = (
f"http://127.0.0.1:{port}/sbom?path=/app/{job.airflow_version}/{job.python_version}&"
f"project-name=apache-airflow&project-version={job.airflow_version}&multiProject=true"
)
get_console(output=output).print(f"[info]Triggering sbom generation in {job.airflow_version} via {url}")
if not get_dry_run():
response = requests.get(url)
if response.status_code != 200:
get_console(output=output).print(
f"[error]Generation for Airflow {job.airflow_version}:{job.python_version} failed. "
f"Status code {response.status_code}"
)
return response.status_code, f"SBOM Generate {job.airflow_version}:{job.python_version}"
job.target_path.write_bytes(response.content)
get_console(output=output).print(
f"[success]Generated SBOM for {job.airflow_version}:{job.python_version}"
)
return 0, f"SBOM Generate {job.airflow_version}:{job.python_version}"