| #!/usr/bin/env python3 |
| |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import logging |
| import os |
| import pathlib |
| import shutil |
| import subprocess |
| import sys |
| |
| import click |
| import yaml |
| |
| import scripts |
| from liminal.build import liminal_apps_builder |
| from liminal.core import environment |
| from liminal.core.util import files_util |
| from liminal.kubernetes import volume_util |
| from liminal.logging.logging_setup import logging_initialization |
| from liminal.runners.airflow import dag |
| |
| logging_initialization() |
| |
| try: |
| import importlib.resources as pkg_resources |
| except ImportError: |
| # Try backported to PY<37 `importlib_resources`. |
| import importlib_resources as pkg_resources |
| |
| |
| @click.group() |
| def cli(): |
| pass |
| |
| |
| def docker_is_running(): |
| try: |
| return not subprocess.check_output("docker info >/dev/null 2>&1", shell=True) |
| except subprocess.CalledProcessError: |
| msg = "Docker is not running. Please start docker service on your machine\n" |
| sys.stderr.write(f"ERROR: {msg}") |
| raise RuntimeError(msg) |
| |
| |
| @cli.command("build", short_help="builds dockers from your business logic") |
| @click.option('--path', default=os.getcwd(), help='Build within this path.') |
| def build(path): |
| click.echo(f'Building liminal apps in {path}') |
| if docker_is_running(): |
| liminal_apps_builder.build_liminal_apps(path) |
| |
| |
| def deploy_liminal_core_internal(clean): |
| # noinspection PyTypeChecker |
| with pkg_resources.path(dag, 'liminal_dags.py') as p: |
| dags_path = p |
| os.makedirs(environment.get_dags_dir(), exist_ok=True) |
| dag_target_file = os.path.join(environment.get_liminal_home(), 'liminal_dags.py') |
| shutil.copyfile(dags_path, dag_target_file) |
| # initialize the env. variable which indicates to the docke compose which |
| # liminal to install in airflow docker |
| liminal_version = environment.get_liminal_version() |
| logging.info(f'Deploying liminal version: {liminal_version}') |
| # if liminal is installed from local file - the developer needs to put it in the /scripts folder |
| # in which case it will end up inside the container during build |
| if liminal_version.find("file://") > -1: |
| local_file_name = os.path.basename(liminal_version) |
| full_path = os.path.join('/opt/airflow/dags', local_file_name) |
| logging.info( |
| f'Liminal was installed locally, changing the LIMINAL_VERSION parameter to {full_path}') |
| os.environ[environment.LIMINAL_VERSION_PARAM_NAME] = full_path |
| if clean: |
| docker_compose_command('down', ['--remove-orphans', '--rmi', 'local']) |
| docker_compose_command('build', ['--no-cache']) |
| docker_compose_command('run', ['webserver', 'resetdb', '-y']) |
| docker_compose_command('run', ['webserver', 'initdb']) |
| docker_compose_command('down', ['--remove-orphans']) |
| |
| |
| def docker_compose_command(command_name, args): |
| detached_mode = False |
| docker_compose_path, project_dir = get_docker_compose_paths() |
| concated_args = ' '.join(args) |
| run_command = [ |
| 'docker-compose ' |
| f'-f "{docker_compose_path}" ' |
| '-p liminal --project-directory ' |
| f'{project_dir} {command_name} {concated_args}' |
| ] |
| logging.info(run_command[0]) |
| if ('tail' in str(args) or '-d' in str(args) or command_name in ['ps', 'down']): |
| detached_mode = True |
| |
| if detached_mode: |
| output = subprocess.run(run_command, env=os.environ, shell=True, stdout=subprocess.PIPE, |
| stderr=subprocess.PIPE, universal_newlines=True) |
| return output.stdout, output.stderr |
| else: |
| subprocess.call(run_command, env=os.environ, shell=True) |
| return '', '' |
| |
| @cli.command("deploy", short_help="deploys your liminal.yaml files to $LIMINAL_HOME folder") |
| @click.option('--path', default=os.getcwd(), help="folder containing liminal.yaml files") |
| @click.option('--clean', is_flag=True, default=False, |
| help="re-deploy liminal airflow components from scratch") |
| def deploy_liminal_apps(path, clean): |
| click.echo("deploying liminal yaml files") |
| liminal_home = environment.get_liminal_home() |
| os.makedirs(liminal_home, exist_ok=True) |
| os.makedirs(environment.get_dags_dir(), exist_ok=True) |
| deploy_liminal_core_internal(clean) |
| config_files = files_util.find_config_files(path) |
| for config_file in config_files: |
| click.echo(f"deploying liminal file: {config_file}") |
| yml_name = os.path.basename(config_file) |
| target_yml_name = os.path.join(environment.get_dags_dir(), yml_name) |
| shutil.copyfile(config_file, target_yml_name) |
| with open(config_file) as stream: |
| config = yaml.safe_load(stream) |
| volume_util.create_local_volumes(config, os.path.dirname(config_file)) |
| |
| |
| def liminal_is_running(): |
| stdout, stderr = docker_compose_command('ps', []) |
| return "liminal" in stdout |
| |
| |
| @cli.command("stop", short_help="stops the docker compose") |
| def stop(): |
| if docker_is_running() and liminal_is_running(): |
| # initialize liminal home by default |
| environment.get_liminal_home() |
| docker_compose_command('down', []) |
| |
| |
| @cli.command("logs", short_help="display the docker compose logs") |
| @click.option('--follow', '-f', is_flag=True, default=False, help="Follow log output.") |
| @click.option('--tail', '-t', default=0, type=int, |
| help="Number of lines to show from the end of the log") |
| def logs(follow, tail): |
| if docker_is_running() and liminal_is_running(): |
| # initialize liminal home by default |
| environment.get_liminal_home() |
| if follow: |
| docker_compose_command('logs', ['--follow']) |
| if tail > 0: |
| stdout, stderr = docker_compose_command('logs', [f'--tail={tail}']) |
| logging.info(stdout) |
| |
| |
| @cli.command("start", |
| short_help="starts a local airflow in docker compose. should be run after deploy. " + |
| "Make sure docker is running on your machine") |
| @click.option('--detached-mode', '-d', is_flag=True, default=False, help="Start liminal in detached mode.") |
| def start(detached_mode): |
| liminal_version = environment.get_liminal_version() |
| logging.info(f'starting liminal version: {liminal_version}') |
| if docker_is_running(): |
| # initialize liminal home by default |
| environment.get_liminal_home() |
| if detached_mode: |
| docker_compose_command('up', ['-d']) |
| else: |
| docker_compose_command('up', []) |
| |
| |
| def get_docker_compose_paths(): |
| # noinspection PyTypeChecker |
| with pkg_resources.path(scripts, 'docker-compose.yml') as p: |
| docker_compose_path = p |
| project_dir = pathlib.Path(docker_compose_path).parent.parent.absolute() |
| return docker_compose_path, project_dir |
| |
| |
| if __name__ == '__main__': |
| cli() |