| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| {{/* Standard Airflow environment variables */}} |
| {{- define "standard_airflow_environment" }} |
| # Hard Coded Airflow Envs |
| - name: AIRFLOW__CORE__FERNET_KEY |
| valueFrom: |
| secretKeyRef: |
| name: {{ template "fernet_key_secret" . }} |
| key: fernet-key |
| - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN |
| valueFrom: |
| secretKeyRef: |
| name: {{ template "airflow_metadata_secret" . }} |
| key: connection |
| - name: AIRFLOW_CONN_AIRFLOW_DB |
| valueFrom: |
| secretKeyRef: |
| name: {{ template "airflow_metadata_secret" . }} |
| key: connection |
| {{- if eq .Values.executor "CeleryExecutor" }} |
| - name: AIRFLOW__CELERY__CELERY_RESULT_BACKEND |
| valueFrom: |
| secretKeyRef: |
| name: {{ template "airflow_result_backend_secret" . }} |
| key: connection |
| - name: AIRFLOW__CELERY__RESULT_BACKEND |
| valueFrom: |
| secretKeyRef: |
| name: {{ template "airflow_result_backend_secret" . }} |
| key: connection |
| - name: AIRFLOW__CELERY__BROKER_URL |
| valueFrom: |
| secretKeyRef: |
| name: {{ .Release.Name }}-broker-url |
| key: connection |
| {{- end }} |
| {{- if .Values.elasticsearch.enabled }} |
| # The elasticsearch variables were updated to the shorter names in v1.10.4 |
| - name: AIRFLOW__ELASTICSEARCH__HOST |
| valueFrom: |
| secretKeyRef: |
| name: {{ template "elasticsearch_secret" . }} |
| key: connection |
| # This is the older format for these variable names, kept here for backward compatibility |
| - name: AIRFLOW__ELASTICSEARCH__ELASTICSEARCH_HOST |
| valueFrom: |
| secretKeyRef: |
| name: {{ template "elasticsearch_secret" . }} |
| key: connection |
| {{- end }} |
| {{- end }} |
| |
| {{/* User defined Airflow environment variables */}} |
| {{- define "custom_airflow_environment" }} |
| # Dynamically created environment variables |
| {{- range $i, $config := .Values.env }} |
| - name: {{ $config.name }} |
| value: {{ $config.value | quote }} |
| {{- if eq $.Values.executor "KubernetesExecutor" }} |
| - name: AIRFLOW__KUBERNETES_ENVIRONMENT_VARIABLES__{{ $config.name }} |
| value: {{ $config.value | quote }} |
| {{- end }} |
| {{- end }} |
| # Dynamically created secret envs |
| {{- range $i, $config := .Values.secret }} |
| - name: {{ $config.envName }} |
| valueFrom: |
| secretKeyRef: |
| name: {{ $config.secretName }} |
| key: {{ default "value" $config.secretKey }} |
| {{- end }} |
| {{- if eq .Values.executor "KubernetesExecutor" }} |
| {{- range $i, $config := .Values.secret }} |
| - name: AIRFLOW__KUBERNETES_SECRETS__{{ $config.envName }} |
| value: {{ printf "%s=%s" $config.secretName $config.secretKey }} |
| {{- end }} |
| {{ end }} |
| {{- end }} |
| |
| {{/* Git ssh key volume */}} |
| {{- define "git_sync_ssh_key_volume"}} |
| - name: git-sync-ssh-key |
| secret: |
| secretName: {{ .Values.dags.gitSync.sshKeySecret }} |
| defaultMode: 288 |
| {{- end }} |
| |
| {{/* Git sync container */}} |
| {{- define "git_sync_container"}} |
| - name: {{ .Values.dags.gitSync.containerName }} |
| image: "{{ .Values.dags.gitSync.containerRepository }}:{{ .Values.dags.gitSync.containerTag }}" |
| securityContext: |
| runAsUser: {{ .Values.dags.gitSync.uid }} |
| env: |
| {{- if .Values.dags.gitSync.sshKeySecret }} |
| - name: GIT_SSH_KEY_FILE |
| value: "/etc/git-secret/ssh" |
| - name: GIT_SYNC_SSH |
| value: "true" |
| {{- if .Values.dags.gitSync.knownHosts }} |
| - name: GIT_KNOWN_HOSTS |
| value: "true" |
| - name: GIT_SSH_KNOWN_HOSTS_FILE |
| value: "/etc/git-secret/known_hosts" |
| {{- else }} |
| - name: GIT_KNOWN_HOSTS |
| value: "false" |
| {{- end }} |
| {{ else if .Values.dags.gitSync.credentialsSecret }} |
| - name: GIT_SYNC_USERNAME |
| valueFrom: |
| secretKeyRef: |
| name: {{ .Values.dags.gitSync.credentialsSecret | quote }} |
| key: GIT_SYNC_USERNAME |
| - name: GIT_SYNC_PASSWORD |
| valueFrom: |
| secretKeyRef: |
| name: {{ .Values.dags.gitSync.credentialsSecret | quote }} |
| key: GIT_SYNC_PASSWORD |
| {{- end }} |
| - name: GIT_SYNC_REV |
| value: {{ .Values.dags.gitSync.rev | quote }} |
| - name: GIT_SYNC_BRANCH |
| value: {{ .Values.dags.gitSync.branch | quote }} |
| - name: GIT_SYNC_REPO |
| value: {{ .Values.dags.gitSync.repo | quote }} |
| - name: GIT_SYNC_DEPTH |
| value: {{ .Values.dags.gitSync.depth | quote }} |
| - name: GIT_SYNC_ROOT |
| value: {{ .Values.dags.gitSync.root | quote }} |
| - name: GIT_SYNC_DEST |
| value: {{ .Values.dags.gitSync.dest | quote }} |
| - name: GIT_SYNC_ADD_USER |
| value: "true" |
| - name: GIT_SYNC_WAIT |
| value: {{ .Values.dags.gitSync.wait | quote }} |
| - name: GIT_SYNC_MAX_SYNC_FAILURES |
| value: {{ .Values.dags.gitSync.maxFailures | quote }} |
| volumeMounts: |
| - name: dags |
| mountPath: {{ .Values.dags.gitSync.root }} |
| {{- if and .Values.dags.gitSync.enabled .Values.dags.gitSync.sshKeySecret }} |
| - name: git-sync-ssh-key |
| mountPath: /etc/git-secret/ssh |
| readOnly: true |
| subPath: gitSshKey |
| {{- if .Values.dags.gitSync.knownHosts }} |
| - name: config |
| mountPath: /etc/git-secret/known_hosts |
| readOnly: true |
| subPath: known_hosts |
| {{- end }} |
| {{- end }} |
| {{- end }} |
| |
| # This helper will change when customers deploy a new image. |
| {{ define "airflow_image" -}} |
| {{ printf "%s:%s" (.Values.images.airflow.repository | default .Values.defaultAirflowRepository) (.Values.images.airflow.tag | default .Values.defaultAirflowTag) }} |
| {{- end }} |
| |
| # This helper is used for airflow containers that do not need the users code. |
| {{ define "default_airflow_image" -}} |
| {{ printf "%s:%s" .Values.defaultAirflowRepository .Values.defaultAirflowTag }} |
| {{- end }} |
| |
| {{ define "flower_image" -}} |
| {{ printf "%s:%s" (.Values.images.flower.repository | default .Values.defaultAirflowRepository) (.Values.images.flower.tag | default .Values.defaultAirflowTag) }} |
| {{- end }} |
| |
| {{ define "statsd_image" -}} |
| {{ printf "%s:%s" .Values.images.statsd.repository .Values.images.statsd.tag }} |
| {{- end }} |
| |
| {{ define "redis_image" -}} |
| {{ printf "%s:%s" .Values.images.redis.repository .Values.images.redis.tag }} |
| {{- end }} |
| |
| {{ define "pgbouncer_image" -}} |
| {{ printf "%s:%s" .Values.images.pgbouncer.repository .Values.images.pgbouncer.tag }} |
| {{- end }} |
| |
| {{ define "pgbouncer_exporter_image" -}} |
| {{ printf "%s:%s" .Values.images.pgbouncerExporter.repository .Values.images.pgbouncerExporter.tag }} |
| {{- end }} |
| |
| {{ define "fernet_key_secret" -}} |
| {{ default (printf "%s-fernet-key" .Release.Name) .Values.fernetKeySecretName }} |
| {{- end }} |
| |
| {{ define "redis_password_secret" -}} |
| {{ default (printf "%s-redis-password" .Release.Name) .Values.redis.passwordSecretName }} |
| {{- end }} |
| |
| {{ define "airflow_metadata_secret" -}} |
| {{ default (printf "%s-airflow-metadata" .Release.Name) .Values.data.metadataSecretName }} |
| {{- end }} |
| |
| {{ define "airflow_result_backend_secret" -}} |
| {{ default (printf "%s-airflow-result-backend" .Release.Name) .Values.data.resultBackendSecretName }} |
| {{- end }} |
| |
| {{ define "airflow_pod_template_file" -}} |
| {{ (printf "%s/pod_templates" .Values.airflowHome) }} |
| {{- end }} |
| |
| {{ define "pgbouncer_config_secret" -}} |
| {{ .Release.Name }}-pgbouncer-config |
| {{- end }} |
| |
| {{ define "pgbouncer_stats_secret" -}} |
| {{ .Release.Name }}-pgbouncer-stats |
| {{- end }} |
| |
| {{ define "registry_secret" -}} |
| {{ default (printf "%s-registry" .Release.Name) .Values.registry.secretName }} |
| {{- end }} |
| |
| {{ define "elasticsearch_secret" -}} |
| {{ default (printf "%s-elasticsearch" .Release.Name) .Values.elasticsearch.secretName }} |
| {{- end }} |
| |
| {{ define "pgbouncer_config" }} |
| {{- $pgMetadataHost := .Values.data.metadataConnection.host | default (printf "%s-%s.%s.svc.cluster.local" .Release.Name "postgresql" .Release.Namespace) }} |
| {{- $pgResultBackendHost := .Values.data.resultBackendConnection.host | default (printf "%s-%s.%s.svc.cluster.local" .Release.Name "postgresql" .Release.Namespace) }} |
| [databases] |
| {{ .Release.Name }}-metadata = host={{ $pgMetadataHost }} dbname={{ .Values.data.metadataConnection.db }} port={{ .Values.data.metadataConnection.port }} pool_size={{ .Values.pgbouncer.metadataPoolSize }} |
| {{ .Release.Name }}-result-backend = host={{ $pgResultBackendHost }} dbname={{ .Values.data.resultBackendConnection.db }} port={{ .Values.data.resultBackendConnection.port }} pool_size={{ .Values.pgbouncer.resultBackendPoolSize }} |
| |
| [pgbouncer] |
| pool_mode = transaction |
| listen_port = {{ .Values.ports.pgbouncer }} |
| listen_addr = * |
| auth_type = md5 |
| auth_file = /etc/pgbouncer/users.txt |
| stats_users = {{ .Values.data.metadataConnection.user }} |
| ignore_startup_parameters = extra_float_digits |
| max_client_conn = {{ .Values.pgbouncer.maxClientConn }} |
| verbose = {{ .Values.pgbouncer.verbose }} |
| log_disconnections = {{ .Values.pgbouncer.logDisconnections }} |
| log_connections = {{ .Values.pgbouncer.logConnections }} |
| {{- end }} |
| |
| {{ define "pgbouncer_users" }} |
| {{ .Values.data.metadataConnection.user | quote }} {{ .Values.data.metadataConnection.pass | quote }} |
| {{ .Values.data.resultBackendConnection.user | quote }} {{ .Values.data.resultBackendConnection.pass | quote }} |
| {{- end }} |
| |
| {{ define "airflow_logs" -}} |
| {{ (printf "%s/logs" .Values.airflowHome) | quote }} |
| {{- end }} |
| |
| {{ define "airflow_dags" -}} |
| {{- if .Values.dags.gitSync.enabled -}} |
| {{ (printf "%s/dags/%s/%s" .Values.airflowHome .Values.dags.gitSync.dest .Values.dags.gitSync.subPath ) }} |
| {{- else -}} |
| {{ (printf "%s/dags" .Values.airflowHome) }} |
| {{- end -}} |
| {{- end -}} |
| |
| {{ define "airflow_dags_volume_claim" -}} |
| {{- if .Values.dags.persistence.existingClaim -}} |
| {{ .Values.dags.persistence.existingClaim }} |
| {{- else -}} |
| {{ .Release.Name }}-dags |
| {{- end -}} |
| {{- end -}} |
| |
| {{ define "airflow_dags_mount_path" -}} |
| {{ (printf "%s/dags" .Values.airflowHome) }} |
| {{- end }} |
| |
| {{ define "airflow_config_path" -}} |
| {{ (printf "%s/airflow.cfg" .Values.airflowHome) | quote }} |
| {{- end }} |
| |
| {{ define "airflow_webserver_config_path" -}} |
| {{ (printf "%s/webserver_config.py" .Values.airflowHome) | quote }} |
| {{- end }} |
| |
| {{ define "airflow_local_setting_path" -}} |
| {{ (printf "%s/config/airflow_local_settings.py" .Values.airflowHome) | quote }} |
| {{- end }} |
| |
| {{ define "airflow_config" -}} |
| {{ (printf "%s-airflow-config" .Release.Name) }} |
| {{- end }} |
| |
| {{ define "wait-for-migrations-command" }} |
| {{/* From Airflow 2.0.0 this can become [airflow, db, check-migrations] */}} |
| - python |
| - -c |
| - | |
| import airflow |
| import os |
| import time |
| |
| from alembic.config import Config |
| from alembic.runtime.migration import MigrationContext |
| from alembic.script import ScriptDirectory |
| |
| from airflow import settings |
| |
| package_dir = os.path.abspath(os.path.dirname(airflow.__file__)) |
| directory = os.path.join(package_dir, 'migrations') |
| config = Config(os.path.join(package_dir, 'alembic.ini')) |
| config.set_main_option('script_location', directory) |
| config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN) |
| script_ = ScriptDirectory.from_config(config) |
| |
| timeout=60 |
| |
| with settings.engine.connect() as connection: |
| context = MigrationContext.configure(connection) |
| ticker = 0 |
| while True: |
| source_heads = set(script_.get_heads()) |
| |
| db_heads = set(context.get_current_heads()) |
| if source_heads == db_heads: |
| break |
| |
| if ticker >= timeout: |
| raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker)) |
| ticker += 1 |
| time.sleep(1) |
| log.info('Waiting for migrations... %s second(s)', ticker) |
| {{- end }} |
| |
| {{ define "registry_docker_config" -}} |
| {{- $host := .Values.registry.connection.host }} |
| {{- $email := .Values.registry.connection.email }} |
| {{- $user := .Values.registry.connection.user -}} |
| {{- $pass := .Values.registry.connection.pass -}} |
| |
| {{- $config := dict "auths" -}} |
| {{- $auth := dict -}} |
| {{- $data := dict -}} |
| {{- $_ := set $data "username" $user -}} |
| {{- $_ := set $data "password" $pass -}} |
| {{- $_ := set $data "email" $email -}} |
| {{- $_ := set $data "auth" (printf "%v:%v" $user $pass | b64enc) -}} |
| {{- $_ := set $auth $host $data -}} |
| {{- $_ := set $config "auths" $auth -}} |
| {{ $config | toJson | print }} |
| {{- end }} |