| #!/usr/bin/env bash |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # Might be empty |
| AIRFLOW_COMMAND="${1}" |
| |
| set -euo pipefail |
| |
| # We want to avoid misleading messages and perform only forward lookup of the service IP address. |
| # Netcat when run without -n performs both forward and reverse lookup and fails if the reverse |
| # lookup name does not match the original name even if the host is reachable via IP. This happens |
| # randomly with docker-compose in Github Actions. |
| # Since we are not using reverse lookup elsewhere, we can perform forward lookup in python |
| # And use the IP in NC and add '-n' switch to disable any DNS use. |
| # Even if this message might be harmless, it might hide the real reason for the problem |
| # Which is the long time needed to start some services, seeing this message might be totally misleading |
| # when you try to analyse the problem, that's why it's best to avoid it, |
| function run_nc() { |
| local host=${1} |
| local port=${2} |
| local ip |
| ip=$(python -c "import socket; print(socket.gethostbyname('${host}'))") |
| |
| nc -zvvn "${ip}" "${port}" |
| } |
| |
| function wait_for_connection { |
| # Waits for Connection to the backend specified via URL passed as first parameter |
| # Detects backend type depending on the URL schema and assigns |
| # default port numbers if not specified in the URL. |
| # Then it loops until connection to the host/port specified can be established |
| # It tries `CONNECTION_CHECK_MAX_COUNT` times and sleeps `CONNECTION_CHECK_SLEEP_TIME` between checks |
| local connection_url |
| connection_url="${1}" |
| |
| local detected_backend="" |
| local detected_host="" |
| local detected_port="" |
| |
| |
| if [[ ${connection_url} != sqlite* ]]; then |
| # Auto-detect DB parameters |
| [[ ${connection_url} =~ ([^:]*)://([^:]*[@.*]?):([^@]*)@?([^/:]*):?([0-9]*)/([^\?]*)\??(.*) ]] && \ |
| detected_backend=${BASH_REMATCH[1]} && |
| # Not used USER match |
| # Not used PASSWORD match |
| detected_host=${BASH_REMATCH[4]} && |
| detected_port=${BASH_REMATCH[5]} && |
| # Not used SCHEMA match |
| # Not used PARAMS match |
| |
| echo BACKEND="${BACKEND:=${detected_backend}}" |
| readonly BACKEND |
| |
| if [[ -z "${detected_port=}" ]]; then |
| if [[ ${BACKEND} == "postgres"* ]]; then |
| detected_port=5432 |
| elif [[ ${BACKEND} == "mysql"* ]]; then |
| detected_port=3306 |
| elif [[ ${BACKEND} == "redis"* ]]; then |
| detected_port=6379 |
| elif [[ ${BACKEND} == "amqp"* ]]; then |
| detected_port=5672 |
| fi |
| fi |
| |
| detected_host=${detected_host:="localhost"} |
| |
| # Allow the DB parameters to be overridden by environment variable |
| echo DB_HOST="${DB_HOST:=${detected_host}}" |
| readonly DB_HOST |
| |
| echo DB_PORT="${DB_PORT:=${detected_port}}" |
| readonly DB_PORT |
| local countdown |
| countdown="${CONNECTION_CHECK_MAX_COUNT}" |
| while true |
| do |
| set +e |
| local last_check_result |
| local res |
| last_check_result=$(run_nc "${DB_HOST}" "${DB_PORT}" >/dev/null 2>&1) |
| res=$? |
| set -e |
| if [[ ${res} == 0 ]]; then |
| echo |
| break |
| else |
| echo -n "." |
| countdown=$((countdown-1)) |
| fi |
| if [[ ${countdown} == 0 ]]; then |
| echo |
| echo "ERROR! Maximum number of retries (${CONNECTION_CHECK_MAX_COUNT}) reached." |
| echo " while checking ${BACKEND} connection." |
| echo |
| echo "Last check result:" |
| echo |
| echo "${last_check_result}" |
| echo |
| exit 1 |
| else |
| sleep "${CONNECTION_CHECK_SLEEP_TIME}" |
| fi |
| done |
| fi |
| } |
| |
| function create_www_user() { |
| local local_password="" |
| # Warning: command environment variables (*_CMD) have priority over usual configuration variables |
| # for configuration parameters that require sensitive information. This is the case for the SQL database |
| # and the broker backend in this entrypoint script. |
| if [[ -n "${_AIRFLOW_WWW_USER_PASSWORD_CMD=}" ]]; then |
| local_password=$(eval "${_AIRFLOW_WWW_USER_PASSWORD_CMD}") |
| unset _AIRFLOW_WWW_USER_PASSWORD_CMD |
| elif [[ -n "${_AIRFLOW_WWW_USER_PASSWORD=}" ]]; then |
| local_password="${_AIRFLOW_WWW_USER_PASSWORD}" |
| unset _AIRFLOW_WWW_USER_PASSWORD |
| fi |
| if [[ -z ${local_password} ]]; then |
| echo |
| echo "ERROR! Airflow Admin password not set via _AIRFLOW_WWW_USER_PASSWORD or _AIRFLOW_WWW_USER_PASSWORD_CMD variables!" |
| echo |
| exit 1 |
| fi |
| |
| airflow users create \ |
| --username "${_AIRFLOW_WWW_USER_USERNAME="admin"}" \ |
| --firstname "${_AIRFLOW_WWW_USER_FIRSTNAME="Airflow"}" \ |
| --lastname "${_AIRFLOW_WWW_USER_LASTNME="Admin"}" \ |
| --email "${_AIRFLOW_WWW_USER_EMAIL="airflowadmin@example.com"}" \ |
| --role "${_AIRFLOW_WWW_USER_ROLE="Admin"}" \ |
| --password "${local_password}" || |
| airflow create_user \ |
| --username "${_AIRFLOW_WWW_USER_USERNAME="admin"}" \ |
| --firstname "${_AIRFLOW_WWW_USER_FIRSTNAME="Airflow"}" \ |
| --lastname "${_AIRFLOW_WWW_USER_LASTNME="Admin"}" \ |
| --email "${_AIRFLOW_WWW_USER_EMAIL="airflowadmin@example.com"}" \ |
| --role "${_AIRFLOW_WWW_USER_ROLE="Admin"}" \ |
| --password "${local_password}" || true |
| } |
| |
| function create_system_user_if_missing() { |
| # This is needed in case of OpenShift-compatible container execution. In case of OpenShift random |
| # User id is used when starting the image, however group 0 is kept as the user group. Our production |
| # Image is OpenShift compatible, so all permissions on all folders are set so that 0 group can exercise |
| # the same privileges as the default "airflow" user, this code checks if the user is already |
| # present in /etc/passwd and will create the system user dynamically, including setting its |
| # HOME directory to the /home/airflow so that (for example) the ${HOME}/.local folder where airflow is |
| # Installed can be automatically added to PYTHONPATH |
| if ! whoami &> /dev/null; then |
| if [[ -w /etc/passwd ]]; then |
| echo "${USER_NAME:-default}:x:$(id -u):0:${USER_NAME:-default} user:${AIRFLOW_USER_HOME_DIR}:/sbin/nologin" \ |
| >> /etc/passwd |
| fi |
| export HOME="${AIRFLOW_USER_HOME_DIR}" |
| fi |
| } |
| |
| function wait_for_airflow_db() { |
| # Verifies connection to the Airflow DB |
| if [[ -n "${AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD=}" ]]; then |
| wait_for_connection "$(eval "${AIRFLOW__CORE__SQL_ALCHEMY_CONN_CMD}")" |
| else |
| # if no DB configured - use sqlite db by default |
| AIRFLOW__CORE__SQL_ALCHEMY_CONN="${AIRFLOW__CORE__SQL_ALCHEMY_CONN:="sqlite:///${AIRFLOW_HOME}/airflow.db"}" |
| wait_for_connection "${AIRFLOW__CORE__SQL_ALCHEMY_CONN}" |
| fi |
| } |
| |
| function upgrade_db() { |
| # Runs airflow db upgrade |
| airflow db upgrade || airflow upgradedb || true |
| } |
| |
| function wait_for_celery_backend() { |
| # Verifies connection to Celery Broker |
| if [[ -n "${AIRFLOW__CELERY__BROKER_URL_CMD=}" ]]; then |
| wait_for_connection "$(eval "${AIRFLOW__CELERY__BROKER_URL_CMD}")" |
| else |
| AIRFLOW__CELERY__BROKER_URL=${AIRFLOW__CELERY__BROKER_URL:=} |
| if [[ -n ${AIRFLOW__CELERY__BROKER_URL=} ]]; then |
| wait_for_connection "${AIRFLOW__CELERY__BROKER_URL}" |
| fi |
| fi |
| } |
| |
| function exec_to_bash_or_python_command_if_specified() { |
| # If one of the commands: 'airflow', 'bash', 'python' is used, either run appropriate |
| # command with exec or update the command line parameters |
| if [[ ${AIRFLOW_COMMAND} == "bash" ]]; then |
| shift |
| exec "/bin/bash" "${@}" |
| elif [[ ${AIRFLOW_COMMAND} == "python" ]]; then |
| shift |
| exec "python" "${@}" |
| fi |
| } |
| |
| |
| CONNECTION_CHECK_MAX_COUNT=${CONNECTION_CHECK_MAX_COUNT:=20} |
| readonly CONNECTION_CHECK_MAX_COUNT |
| |
| CONNECTION_CHECK_SLEEP_TIME=${CONNECTION_CHECK_SLEEP_TIME:=3} |
| readonly CONNECTION_CHECK_SLEEP_TIME |
| |
| create_system_user_if_missing |
| wait_for_airflow_db |
| |
| if [[ -n "${_AIRFLOW_DB_UPGRADE=}" ]] ; then |
| upgrade_db |
| fi |
| |
| if [[ -n "${_AIRFLOW_WWW_USER_CREATE=}" ]] ; then |
| create_www_user |
| fi |
| |
| # The `bash` and `python` commands should also verify the basic connections |
| # So they are run after the DB check |
| exec_to_bash_or_python_command_if_specified "${@}" |
| |
| # Remove "airflow" if it is specified as airflow command |
| # This way both command types work the same way: |
| # |
| # docker run IMAGE airflow webserver |
| # docker run IMAGE webserver |
| # |
| if [[ ${AIRFLOW_COMMAND} == "airflow" ]]; then |
| AIRFLOW_COMMAND="${2}" |
| shift |
| fi |
| |
| # Note: the broker backend configuration concerns only a subset of Airflow components |
| if [[ ${AIRFLOW_COMMAND} =~ ^(scheduler|celery|worker|flower)$ ]]; then |
| wait_for_celery_backend "${@}" |
| fi |
| |
| exec "airflow" "${@}" |