blob: 4eae2ef68e1386074424aaf01109aad5f2d58383 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import kubernetes.client.models as k8s
import six
from airflow.configuration import conf
from airflow.kubernetes.k8s_model import append_to_pod
from airflow.kubernetes.pod_generator import PodGenerator
from airflow.kubernetes.secret import Secret
from airflow.utils.log.logging_mixin import LoggingMixin
class WorkerConfiguration(LoggingMixin):
"""
Contains Kubernetes Airflow Worker configuration logic
:param kube_config: the kubernetes configuration from airflow.cfg
:type kube_config: airflow.executors.kubernetes_executor.KubeConfig
"""
dags_volume_name = 'airflow-dags'
logs_volume_name = 'airflow-logs'
git_sync_ssh_secret_volume_name = 'git-sync-ssh-key'
git_ssh_key_secret_key = 'gitSshKey'
git_sync_ssh_known_hosts_volume_name = 'git-sync-known-hosts'
git_ssh_known_hosts_configmap_key = 'known_hosts'
def __init__(self, kube_config):
self.kube_config = kube_config
self.worker_airflow_home = self.kube_config.airflow_home
self.worker_airflow_dags = self.kube_config.dags_folder
self.worker_airflow_logs = self.kube_config.base_log_folder
super(WorkerConfiguration, self).__init__()
def _get_init_containers(self):
"""When using git to retrieve the DAGs, use the GitSync Init Container"""
# If we're using volume claims to mount the dags, no init container is needed
if self.kube_config.dags_volume_claim or \
self.kube_config.dags_volume_host or self.kube_config.dags_in_image:
return []
# Otherwise, define a git-sync init container
init_environment = [k8s.V1EnvVar(
name='GIT_SYNC_REPO',
value=self.kube_config.git_repo
), k8s.V1EnvVar(
name='GIT_SYNC_BRANCH',
value=self.kube_config.git_branch
), k8s.V1EnvVar(
name='GIT_SYNC_ROOT',
value=self.kube_config.git_sync_root
), k8s.V1EnvVar(
name='GIT_SYNC_DEST',
value=self.kube_config.git_sync_dest
), k8s.V1EnvVar(
name='GIT_SYNC_DEPTH',
value=self.kube_config.git_sync_depth
), k8s.V1EnvVar(
name='GIT_SYNC_ONE_TIME',
value='true'
), k8s.V1EnvVar(
name='GIT_SYNC_REV',
value=self.kube_config.git_sync_rev
)]
for env_var_name, env_var_val in six.iteritems(self.kube_config.kube_env_vars):
init_environment.append(k8s.V1EnvVar(
name=env_var_name,
value=env_var_val
))
if self.kube_config.git_user:
init_environment.append(k8s.V1EnvVar(
name='GIT_SYNC_USERNAME',
value=self.kube_config.git_user
))
if self.kube_config.git_password:
init_environment.append(k8s.V1EnvVar(
name='GIT_SYNC_PASSWORD',
value=self.kube_config.git_password
))
volume_mounts = [k8s.V1VolumeMount(
mount_path=self.kube_config.git_sync_root,
name=self.dags_volume_name,
read_only=False
)]
if self.kube_config.git_sync_credentials_secret:
init_environment.extend([
k8s.V1EnvVar(
name='GIT_SYNC_USERNAME',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=self.kube_config.git_sync_credentials_secret,
key='GIT_SYNC_USERNAME')
)
),
k8s.V1EnvVar(
name='GIT_SYNC_PASSWORD',
value_from=k8s.V1EnvVarSource(
secret_key_ref=k8s.V1SecretKeySelector(
name=self.kube_config.git_sync_credentials_secret,
key='GIT_SYNC_PASSWORD')
)
)
])
if self.kube_config.git_ssh_key_secret_name:
volume_mounts.append(k8s.V1VolumeMount(
name=self.git_sync_ssh_secret_volume_name,
mount_path='/etc/git-secret/ssh',
sub_path='ssh'
))
init_environment.extend([
k8s.V1EnvVar(
name='GIT_SSH_KEY_FILE',
value='/etc/git-secret/ssh'
),
k8s.V1EnvVar(
name='GIT_SYNC_ADD_USER',
value='true'
),
k8s.V1EnvVar(
name='GIT_SYNC_SSH',
value='true'
)
])
if self.kube_config.git_ssh_known_hosts_configmap_name:
volume_mounts.append(k8s.V1VolumeMount(
name=self.git_sync_ssh_known_hosts_volume_name,
mount_path='/etc/git-secret/known_hosts',
sub_path='known_hosts'
))
init_environment.extend([k8s.V1EnvVar(
name='GIT_KNOWN_HOSTS',
value='true'
), k8s.V1EnvVar(
name='GIT_SSH_KNOWN_HOSTS_FILE',
value='/etc/git-secret/known_hosts'
)])
else:
init_environment.append(k8s.V1EnvVar(
name='GIT_KNOWN_HOSTS',
value='false'
))
init_containers = k8s.V1Container(
name=self.kube_config.git_sync_init_container_name,
image=self.kube_config.git_sync_container,
env=init_environment,
volume_mounts=volume_mounts
)
if self.kube_config.git_sync_run_as_user != "":
init_containers.security_context = k8s.V1SecurityContext(
run_as_user=self.kube_config.git_sync_run_as_user
) # git-sync user
return [init_containers]
def _get_environment(self):
"""Defines any necessary environment variables for the pod executor"""
env = {}
for env_var_name, env_var_val in six.iteritems(self.kube_config.kube_env_vars):
env[env_var_name] = env_var_val
env["AIRFLOW__CORE__EXECUTOR"] = "LocalExecutor"
if self.kube_config.airflow_configmap:
env['AIRFLOW_HOME'] = self.worker_airflow_home
env['AIRFLOW__CORE__DAGS_FOLDER'] = self.worker_airflow_dags
if (not self.kube_config.airflow_configmap and
'AIRFLOW__CORE__SQL_ALCHEMY_CONN' not in self.kube_config.kube_secrets):
env['AIRFLOW__CORE__SQL_ALCHEMY_CONN'] = conf.get("core", "SQL_ALCHEMY_CONN")
if self.kube_config.git_dags_folder_mount_point:
# /root/airflow/dags/repo/dags
dag_volume_mount_path = os.path.join(
self.kube_config.git_dags_folder_mount_point,
self.kube_config.git_sync_dest, # repo
self.kube_config.git_subpath # dags
)
env['AIRFLOW__CORE__DAGS_FOLDER'] = dag_volume_mount_path
return env
def _get_configmaps(self):
"""Extracts any configmapRefs to envFrom"""
env_from = []
if self.kube_config.env_from_configmap_ref:
for config_map_ref in self.kube_config.env_from_configmap_ref.split(','):
env_from.append(
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(config_map_ref))
)
if self.kube_config.env_from_secret_ref:
for secret_ref in self.kube_config.env_from_secret_ref.split(','):
env_from.append(
k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(secret_ref))
)
return env_from
def _get_env_from(self):
"""Extracts any configmapRefs to envFrom"""
env_from = []
if self.kube_config.env_from_configmap_ref:
for config_map_ref in self.kube_config.env_from_configmap_ref.split(','):
env_from.append(
k8s.V1EnvFromSource(config_map_ref=k8s.V1ConfigMapEnvSource(config_map_ref))
)
if self.kube_config.env_from_secret_ref:
for secret_ref in self.kube_config.env_from_secret_ref.split(','):
env_from.append(
k8s.V1EnvFromSource(secret_ref=k8s.V1SecretEnvSource(secret_ref))
)
return env_from
def _get_secrets(self):
"""Defines any necessary secrets for the pod executor"""
worker_secrets = []
for env_var_name, obj_key_pair in six.iteritems(self.kube_config.kube_secrets):
k8s_secret_obj, k8s_secret_key = obj_key_pair.split('=')
worker_secrets.append(
Secret('env', env_var_name, k8s_secret_obj, k8s_secret_key)
)
if self.kube_config.env_from_secret_ref:
for secret_ref in self.kube_config.env_from_secret_ref.split(','):
worker_secrets.append(
Secret('env', None, secret_ref)
)
return worker_secrets
def _get_image_pull_secrets(self):
"""Extracts any image pull secrets for fetching container(s)"""
if not self.kube_config.image_pull_secrets:
return []
pull_secrets = self.kube_config.image_pull_secrets.split(',')
return list(map(lambda name: k8s.V1LocalObjectReference(name), pull_secrets))
def _get_security_context(self):
"""Defines the security context"""
security_context = k8s.V1PodSecurityContext()
if self.kube_config.worker_run_as_user != "":
security_context.run_as_user = self.kube_config.worker_run_as_user
if self.kube_config.worker_fs_group != "":
security_context.fs_group = self.kube_config.worker_fs_group
# set fs_group to 65533 if not explicitly specified and using git ssh keypair auth
if self.kube_config.git_ssh_key_secret_name and security_context.fs_group is None:
security_context.fs_group = 65533
return security_context
def _get_labels(self, kube_executor_labels, labels):
copy = self.kube_config.kube_labels.copy()
copy.update(kube_executor_labels)
copy.update(labels)
return copy
def _get_volume_mounts(self):
volume_mounts = {
self.dags_volume_name: k8s.V1VolumeMount(
name=self.dags_volume_name,
mount_path=self.generate_dag_volume_mount_path(),
read_only=True,
),
self.logs_volume_name: k8s.V1VolumeMount(
name=self.logs_volume_name,
mount_path=self.worker_airflow_logs,
)
}
if self.kube_config.dags_volume_subpath:
volume_mounts[self.dags_volume_name].sub_path = self.kube_config.dags_volume_subpath
if self.kube_config.logs_volume_subpath:
volume_mounts[self.logs_volume_name].sub_path = self.kube_config.logs_volume_subpath
if self.kube_config.dags_in_image:
del volume_mounts[self.dags_volume_name]
# Mount the airflow.cfg file via a configmap the user has specified
if self.kube_config.airflow_configmap:
config_volume_name = 'airflow-config'
config_path = '{}/airflow.cfg'.format(self.worker_airflow_home)
volume_mounts[config_volume_name] = k8s.V1VolumeMount(
name=config_volume_name,
mount_path=config_path,
sub_path='airflow.cfg',
read_only=True
)
if self.kube_config.airflow_local_settings_configmap:
config_path = '{}/config/airflow_local_settings.py'.format(self.worker_airflow_home)
if self.kube_config.airflow_local_settings_configmap != self.kube_config.airflow_configmap:
config_volume_name = 'airflow-local-settings'
volume_mounts[config_volume_name] = k8s.V1VolumeMount(
name=config_volume_name,
mount_path=config_path,
sub_path='airflow_local_settings.py',
read_only=True
)
else:
volume_mounts['airflow-local-settings'] = k8s.V1VolumeMount(
name='airflow-config',
mount_path=config_path,
sub_path='airflow_local_settings.py',
read_only=True
)
return list(volume_mounts.values())
def _get_volumes(self):
def _construct_volume(name, claim, host):
volume = k8s.V1Volume(name=name)
if claim:
volume.persistent_volume_claim = k8s.V1PersistentVolumeClaimVolumeSource(
claim_name=claim
)
elif host:
volume.host_path = k8s.V1HostPathVolumeSource(
path=host,
type=''
)
else:
volume.empty_dir = {}
return volume
volumes = {
self.dags_volume_name: _construct_volume(
self.dags_volume_name,
self.kube_config.dags_volume_claim,
self.kube_config.dags_volume_host
),
self.logs_volume_name: _construct_volume(
self.logs_volume_name,
self.kube_config.logs_volume_claim,
self.kube_config.logs_volume_host
)
}
if self.kube_config.dags_in_image:
del volumes[self.dags_volume_name]
# Get the SSH key from secrets as a volume
if self.kube_config.git_ssh_key_secret_name:
volumes[self.git_sync_ssh_secret_volume_name] = k8s.V1Volume(
name=self.git_sync_ssh_secret_volume_name,
secret=k8s.V1SecretVolumeSource(
secret_name=self.kube_config.git_ssh_key_secret_name,
items=[k8s.V1KeyToPath(
key=self.git_ssh_key_secret_key,
path='ssh',
mode=0o440
)]
)
)
if self.kube_config.git_ssh_known_hosts_configmap_name:
volumes[self.git_sync_ssh_known_hosts_volume_name] = k8s.V1Volume(
name=self.git_sync_ssh_known_hosts_volume_name,
config_map=k8s.V1ConfigMapVolumeSource(
name=self.kube_config.git_ssh_known_hosts_configmap_name,
default_mode=0o440
)
)
# Mount the airflow.cfg file via a configmap the user has specified
if self.kube_config.airflow_configmap:
config_volume_name = 'airflow-config'
volumes[config_volume_name] = k8s.V1Volume(
name=config_volume_name,
config_map=k8s.V1ConfigMapVolumeSource(
name=self.kube_config.airflow_configmap
)
)
if self.kube_config.airflow_local_settings_configmap:
if self.kube_config.airflow_local_settings_configmap != self.kube_config.airflow_configmap:
config_volume_name = 'airflow-local-settings'
volumes[config_volume_name] = k8s.V1Volume(
name=config_volume_name,
config_map=k8s.V1ConfigMapVolumeSource(
name=self.kube_config.airflow_local_settings_configmap
)
)
return list(volumes.values())
def generate_dag_volume_mount_path(self):
"""Generate path for DAG volume"""
if self.kube_config.dags_volume_mount_point:
return self.kube_config.dags_volume_mount_point
if self.kube_config.dags_volume_claim or self.kube_config.dags_volume_host:
return self.worker_airflow_dags
return self.kube_config.git_dags_folder_mount_point
def as_pod(self):
"""Creates POD."""
if self.kube_config.pod_template_file:
return PodGenerator(pod_template_file=self.kube_config.pod_template_file).gen_pod()
pod = PodGenerator(
image=self.kube_config.kube_image,
image_pull_policy=self.kube_config.kube_image_pull_policy or 'IfNotPresent',
image_pull_secrets=self.kube_config.image_pull_secrets,
volumes=self._get_volumes(),
volume_mounts=self._get_volume_mounts(),
init_containers=self._get_init_containers(),
annotations=self.kube_config.kube_annotations,
affinity=self.kube_config.kube_affinity,
tolerations=self.kube_config.kube_tolerations,
envs=self._get_environment(),
node_selectors=self.kube_config.kube_node_selectors,
service_account_name=self.kube_config.worker_service_account_name or 'default',
restart_policy='Never'
).gen_pod()
pod.spec.containers[0].env_from = pod.spec.containers[0].env_from or []
pod.spec.containers[0].env_from.extend(self._get_env_from())
pod.spec.security_context = self._get_security_context()
return append_to_pod(pod, self._get_secrets())