blob: 968445b3647fa7b20c54e3f76fc5c1ed1bace14a [file] [log] [blame]
#!/usr/bin/env python3
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import os
import pathlib
import shutil
import subprocess
import sys
import click
import scripts
from liminal import settings
from liminal.build import liminal_apps_builder
from liminal.core import environment
from liminal.core.config.config import ConfigUtil
from liminal.core.util import files_util
from liminal.kubernetes import volume_util
from liminal.logging.logging_setup import logging_initialization
from liminal.runners.airflow import dag
logging_initialization()
settings.initialize()
try:
import importlib.resources as pkg_resources
except ImportError:
# Try backported to PY<37 `importlib_resources`.
import importlib_resources as pkg_resources
@click.group()
def cli():
pass
def docker_is_running():
try:
return not subprocess.check_output("docker info >/dev/null 2>&1", shell=True)
except subprocess.CalledProcessError:
msg = "Docker is not running. Please start docker service on your machine\n"
sys.stderr.write(f"ERROR: {msg}")
raise RuntimeError(msg)
@cli.command("build", short_help="builds dockers from your business logic")
@click.option('--path', default=os.getcwd(), help='Build within this path.')
def build(path):
click.echo(f'Building liminal apps in {path}')
if docker_is_running():
liminal_apps_builder.build_liminal_apps(path)
configs = ConfigUtil(path).safe_load(is_render_variables=True,
soft_merge=True)
for config in configs:
if config.get('volumes'):
logging.info(f'local volume is being created')
volume_util.create_local_volumes(config, os.path.dirname(
files_util.resolve_pipeline_source_file(config['name'])))
def deploy_liminal_core_internal(liminal_home, clean):
# noinspection PyTypeChecker
with pkg_resources.path(dag, 'liminal_dags.py') as p:
dags_path = p
os.makedirs(environment.get_dags_dir(), exist_ok=True)
dag_target_file = os.path.join(environment.get_liminal_home(), 'liminal_dags.py')
shutil.copyfile(dags_path, dag_target_file)
# initialize the env. variable which indicates to the docke compose which
# liminal to install in airflow docker
liminal_version = environment.get_liminal_version()
logging.info(f'Deploying liminal version: {liminal_version}')
# if liminal is installed from local file - the developer needs to put it in the /scripts folder
# in which case it will end up inside the container during build
if liminal_version.find("file://") > -1:
local_file_name = os.path.basename(liminal_version)
full_path = os.path.join('/opt/airflow/dags', local_file_name)
logging.info(f'Liminal was installed locally, setting LIMINAL_VERSION to {full_path}')
os.environ[environment.LIMINAL_VERSION_PARAM_NAME] = full_path
_, project_dir = get_docker_compose_paths()
shutil.copyfile(os.path.join(liminal_home, local_file_name),
os.path.join(project_dir, local_file_name))
if clean:
docker_compose_command('down', ['--remove-orphans', '--rmi', 'local'])
docker_compose_command('build', [])
docker_compose_command('run', ['webserver', 'db reset', '-y'])
docker_compose_command('run', ['webserver', 'db init'])
docker_compose_command('run', ['webserver', 'users create '
'--role Admin '
'--username admin '
'--email admin '
'--firstname admin '
'--lastname admin '
'--password admin'])
docker_compose_command('down', ['--remove-orphans'])
def docker_compose_command(command_name, args):
detached_mode = False
docker_compose_path, project_dir = get_docker_compose_paths()
concated_args = ' '.join(args)
run_command = [
'docker-compose '
f'-f "{docker_compose_path}" '
'-p liminal --project-directory '
f'{project_dir} {command_name} {concated_args}'
]
logging.info(run_command[0])
if 'tail' in str(args) or '-d' in str(args) or command_name in ['ps', 'down']:
detached_mode = True
if detached_mode:
output = subprocess.run(run_command, env=os.environ, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, universal_newlines=True)
return output.stdout, output.stderr
else:
subprocess.call(run_command, env=os.environ, shell=True)
return '', ''
@cli.command("deploy", short_help="deploys your liminal.yaml files to $LIMINAL_HOME folder")
@click.option('--path', default=os.getcwd(), help="folder containing liminal.yaml files")
@click.option('--clean', is_flag=True, default=False,
help="re-deploy liminal airflow components from scratch")
def deploy_liminal_apps(path, clean):
click.echo("deploying liminal yaml files")
liminal_home = environment.get_liminal_home()
if (clean):
click.echo("cleaning liminal home directory")
shutil.rmtree(liminal_home)
os.makedirs(liminal_home, exist_ok=True)
os.makedirs(environment.get_dags_dir(), exist_ok=True)
deploy_liminal_core_internal(liminal_home, clean)
config_files = files_util.find_config_files(path)
for config_file in config_files:
click.echo(f"deploying liminal file: {config_file}")
relative_path = os.path.relpath(config_file, os.path.dirname(path))
target_yml_name = os.path.join(environment.get_dags_dir(), relative_path)
pathlib.Path(target_yml_name).parent.mkdir(parents=True, exist_ok=True)
shutil.copyfile(config_file, target_yml_name)
def liminal_is_running():
stdout, stderr = docker_compose_command('ps', [])
return "liminal" in stdout
@cli.command("stop", short_help="stops the docker compose")
def stop():
if docker_is_running() and liminal_is_running():
# initialize liminal home by default
environment.get_liminal_home()
docker_compose_command('down', [])
@cli.command("logs", short_help="display the docker compose logs")
@click.option('--follow', '-f', is_flag=True, default=False, help="Follow log output.")
@click.option('--tail', '-t', default=0, type=int,
help="Number of lines to show from the end of the log")
def logs(follow, tail):
if docker_is_running() and liminal_is_running():
# initialize liminal home by default
environment.get_liminal_home()
if follow:
docker_compose_command('logs', ['--follow'])
if tail > 0:
stdout, stderr = docker_compose_command('logs', [f'--tail={tail}'])
logging.info(stdout)
@cli.command("start",
short_help="starts a local airflow in docker compose. should be run after deploy. " +
"Make sure docker is running on your machine")
@click.option('--detached-mode', '-d', is_flag=True, default=False,
help="Start liminal in detached mode.")
def start(detached_mode):
liminal_version = environment.get_liminal_version()
logging.info(f'starting liminal version: {liminal_version}')
if docker_is_running():
# initialize liminal home by default
environment.get_liminal_home()
if detached_mode:
docker_compose_command('up', ['-d'])
else:
docker_compose_command('up', [])
@cli.command("delete", short_help="delete liminal resource registration")
@click.option('--path', default=os.getcwd(), help='Delete within this path.')
@click.option('--clean', is_flag=True, default=False,
help="Delete all resource: DAGs, Volumes")
def delete(path, clean):
configs = ConfigUtil(path).safe_load(is_render_variables=True,
soft_merge=True)
for config in configs:
if config.get('volumes'):
logging.info(f'local volume is being deleted')
volume_util.delete_local_volumes(config, os.path.dirname(
files_util.resolve_pipeline_source_file(config['name'])))
if clean:
dir_path = os.path.abspath(environment.get_liminal_home())
shutil.rmtree(dir_path)
docker_compose_command('down', ['--remove-orphans', '--rmi', 'local'])
def get_docker_compose_paths():
# noinspection PyTypeChecker
with pkg_resources.path(scripts, 'docker-compose.yml') as p:
docker_compose_path = p
project_dir = pathlib.Path(docker_compose_path).parent.parent.absolute()
return docker_compose_path, project_dir
if __name__ == '__main__':
cli()