| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import os |
| import subprocess |
| import logging |
| |
| DEFAULT_DAGS_ZIP_NAME = 'liminal.zip' |
| DEFAULT_LIMINAL_HOME = os.path.expanduser('~/liminal_home') |
| DEFAULT_PIPELINES_SUBDIR = "pipelines" |
| LIMINAL_HOME_PARAM_NAME = "LIMINAL_HOME" |
| LIMINAL_VERSION_PARAM_NAME = 'LIMINAL_VERSION' |
| |
| |
| def get_liminal_home(): |
| if not os.environ.get(LIMINAL_HOME_PARAM_NAME): |
| logging.info("no environment parameter called LIMINAL_HOME detected") |
| logging.info(f"registering {DEFAULT_LIMINAL_HOME} as the LIMINAL_HOME directory") |
| os.environ[LIMINAL_HOME_PARAM_NAME] = DEFAULT_LIMINAL_HOME |
| return os.environ.get(LIMINAL_HOME_PARAM_NAME, DEFAULT_LIMINAL_HOME) |
| |
| |
| def get_dags_dir(): |
| # if we are inside airflow, we will take it from the configured dags folder |
| base_dir = os.environ.get("AIRFLOW__CORE__DAGS_FOLDER", get_liminal_home()) |
| return os.path.join(base_dir, DEFAULT_PIPELINES_SUBDIR) |
| |
| |
| def get_liminal_version(): |
| result = os.environ.get(LIMINAL_VERSION_PARAM_NAME, None) |
| if not result: |
| output = subprocess.run(['pip freeze | grep \'apache-liminal\''], capture_output=True, |
| env=os.environ, shell=True) |
| pip_res = output.stdout.decode('UTF-8').strip() |
| scripts_dir = os.path.join( |
| os.path.dirname(os.path.dirname(os.path.dirname(__file__))), |
| 'scripts' |
| ) |
| whl_files = [file for file in os.listdir(scripts_dir) if file.endswith(".whl")] |
| if whl_files: |
| value = 'file://' + os.path.join(scripts_dir, whl_files[0]) |
| elif ' @ ' in pip_res: |
| value = pip_res[pip_res.index(' @ ') + 3:] |
| else: |
| value = pip_res |
| logging.info(f'LIMINAL_VERSION not set. Setting it to currently installed version: {value}') |
| os.environ[LIMINAL_VERSION_PARAM_NAME] = value |
| return os.environ.get(LIMINAL_VERSION_PARAM_NAME, 'apache-liminal') |