blob: b597ef32e26882992ffdba35454604c885a9533b [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import os
import sys
from time import sleep
import base64, shutil
from pathlib import Path
from kubernetes import client, config
from kubernetes.client import V1PersistentVolume, V1PersistentVolumeClaim, V1Secret
# noinspection PyBroadException
try:
config.load_kube_config()
except Exception:
msg = "Kubernetes is not running\n"
sys.stdout.write(f"INFO: {msg}")
_LOG = logging.getLogger('volume_util')
_LOCAL_VOLUMES = set()
_kubernetes = client.CoreV1Api()
def get_volume_configs(liminal_config, base_dir):
volumes_config = liminal_config.get('volumes', [])
for volume_config in volumes_config:
if 'local' in volume_config:
path = volume_config['local']['path']
if path.startswith(".."):
path = os.path.join(base_dir, path)
if path.startswith("."):
path = os.path.join(base_dir, path[1:])
volume_config['local']['path'] = path
if 'path' not in volume_config:
volume_config['path'] = '~/.aws/credentials'
return volumes_config
def create_local_volumes(liminal_config, base_dir):
volumes_config = get_volume_configs(liminal_config, base_dir)
for volume_config in volumes_config:
if 'secret' in volume_config:
logging.info(f'Creating local kubernetes secret if needed: {volume_config}')
create_secret(volume_config)
else:
logging.info(f'Creating local kubernetes volume if needed: {volume_config}')
create_local_volume(volume_config)
def create_secret(conf, namespace='default') -> None:
name = conf['volume']
_LOG.info(f'Requested volume {name}')
if name not in _LOCAL_VOLUMES:
matching_secrets = _kubernetes.list_namespaced_secret(namespace, field_selector=f'metadata.name={name}').to_dict()[
'items'
]
while len(matching_secrets) == 0:
_create_secret(namespace, conf, name)
sleep(5)
matching_secrets = _kubernetes.list_namespaced_secret(namespace, field_selector=f'metadata.name={name}').to_dict()[
'items'
]
_LOCAL_VOLUMES.add(name)
def create_local_volume(conf, namespace='default') -> None:
name = conf['volume']
_LOG.info(f'Requested volume {name}')
if name not in _LOCAL_VOLUMES:
matching_volumes = _kubernetes.list_persistent_volume(field_selector=f'metadata.name={name}').to_dict()[
'items'
]
while len(matching_volumes) == 0:
_create_local_volume(conf, name)
sleep(5)
matching_volumes = _kubernetes.list_persistent_volume(field_selector=f'metadata.name={name}').to_dict()[
'items'
]
pvc_name = conf.get('claim_name', f'{name}-pvc')
matching_claims = _kubernetes.list_persistent_volume_claim_for_all_namespaces(
field_selector=f'metadata.name={pvc_name}'
).to_dict()['items']
while len(matching_claims) == 0:
_create_persistent_volume_claim(pvc_name, name, namespace)
sleep(5)
matching_claims = _kubernetes.list_persistent_volume_claim_for_all_namespaces(
field_selector=f'metadata.name={pvc_name}'
).to_dict()['items']
_LOCAL_VOLUMES.add(name)
def delete_local_volumes(liminal_config, base_dir):
volumes_config = get_volume_configs(liminal_config, base_dir)
for volume_config in volumes_config:
if 'secret' in volume_config:
logging.info(f'Delete local secret if needed: {volume_config}')
delete_local_secret(volume_config)
else:
logging.info(f'Delete local kubernetes volume if needed: {volume_config}')
delete_local_volume(volume_config['volume'])
def delete_local_secret(volume_config, namespace='default'):
name = volume_config['volume']
matching_secrets = _kubernetes.list_namespaced_secret(namespace, field_selector=f'metadata.name={name}').to_dict()[
'items'
]
if len(matching_secrets) > 0:
_LOG.info(f'Deleting secret {name}')
_kubernetes.delete_namespaced_secret(name, namespace)
if name in _LOCAL_VOLUMES:
_LOCAL_VOLUMES.remove(name)
def delete_local_volume(name, namespace='default'):
pvc_name = f'{name}-pvc'
matching_claims = _list_persistent_volume_claims(pvc_name)
if len(matching_claims) > 0:
_LOG.info(f'Deleting persistent volume claim {pvc_name}')
_kubernetes.delete_namespaced_persistent_volume_claim(pvc_name, namespace)
while len(matching_claims) > 0:
matching_claims = _list_persistent_volume_claims(pvc_name)
matching_volumes = _list_persistent_volumes(name)
if len(matching_volumes) > 0:
_LOG.info(f'Deleting persistent volume {name}')
_kubernetes.delete_persistent_volume(name)
while len(matching_volumes) > 0:
matching_volumes = _list_persistent_volumes(name)
if name in _LOCAL_VOLUMES:
_LOCAL_VOLUMES.remove(name)
def _list_persistent_volume_claims(name):
return _kubernetes.list_persistent_volume_claim_for_all_namespaces(
field_selector=f'metadata.name={name}'
).to_dict()['items']
def _list_persistent_volumes(name):
return _kubernetes.list_persistent_volume(field_selector=f'metadata.name={name}').to_dict()['items']
def _create_persistent_volume_claim(pvc_name, volume_name, namespace):
_LOG.info(f'Creating persistent volume claim {pvc_name} with volume {volume_name}')
spec = {
'volumeName': volume_name,
'volumeMode': 'Filesystem',
'storageClassName': 'local-storage',
'accessModes': ['ReadWriteOnce'],
'resources': {'requests': {'storage': '100Gi'}},
}
_kubernetes.create_namespaced_persistent_volume_claim(
namespace,
V1PersistentVolumeClaim(
api_version='v1', kind='PersistentVolumeClaim', metadata={'name': pvc_name}, spec=spec
),
)
def _create_secret(namespace, conf, name):
_LOG.info(f'Creating persistent volume {name} with spec {conf}')
_kubernetes.create_namespaced_secret(
namespace,
V1Secret(api_version='v1', kind='Secret',
metadata={
'name': name,
'labels': {
"apache/incubator-liminal" : "liminal.apache.org"
},
},
data={
'credentials': base64.b64encode(
Path(os.path.expanduser(conf['path'])).read_text().encode('ascii')).decode('ascii')
},
)
)
def _create_local_volume(conf, name):
_LOG.info(f'Creating persistent volume {name} with spec {conf}')
spec = {
'capacity': {'storage': '100Gi'},
'volumeMode': 'Filesystem',
'accessModes': ['ReadWriteOnce'],
'persistentVolumeReclaimPolicy': 'Retain',
'storageClassName': 'local-storage',
'nodeAffinity': {
'required': {
'nodeSelectorTerms': [
{'matchExpressions': [{'key': 'kubernetes.io/hostname', 'operator': 'NotIn', 'values': ['']}]}
]
}
},
}
spec.update(conf)
_kubernetes.create_persistent_volume(
V1PersistentVolume(api_version='v1', kind='PersistentVolume', metadata={'name': name}, spec=spec)
)