blob: 6dfaa62b24bc0b90e5e702f753c434b8dde3054c [file]
#!/usr/bin/env python
# coding=utf-8
# Copyright [2020] [Apache Software Foundation]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from docker import DockerClient
from docker.errors import ImageNotFound, NotFound
from shutil import copyfile
from .misc import package_to_name, generate_engine_package, name_to_package
from .misc import get_chunk_and_untar, get_tar_data
from ..utils.log import get_logger
logger = get_logger('docker')
def _get_client(env=True, url=None):
if env:
return DockerClient.from_env()
return DockerClient(base_url=url)
def rename_image(old_name, new_name):
os.system("docker tag {} {}".format(old_name, new_name))
def _search_docker_image(name):
client = _get_client()
try:
client.images.get(name)
return True
except ImageNotFound:
return False
def _search_docker_container(name):
client = _get_client()
try:
client.containers.get(name)
return True
except NotFound:
return False
def search_docker_volume(name):
client = _get_client()
try:
client.volumes.get(name)
return True
except NotFound:
return False
def search_engine_container(engine_name):
container_name = "marvin-cont-" + engine_name
return _search_docker_container(container_name)
def search_engine_images(engine_name):
image_name = "marvin-" + engine_name
return _search_docker_image(image_name)
def create_engine_image(engine_name, engine_path):
logger.info("Creating engine docker image ...")
_package = name_to_package(engine_name)
dockerfile_path = os.path.join(engine_path, "docker",
"develop", "daemon")
generate_engine_package(_package, engine_path)
client = _get_client()
client.images.build(
path=dockerfile_path,
tag="marvin-" + engine_name
)
logger.info("Creating engine docker image ... Done!")
def create_deploy_image_and_push(engine_name, engine_path, name_in_registry):
logger.info("Creating engine deploy image ...")
_package = name_to_package(engine_name)
dockerfile_path = os.path.join(engine_path, "docker",
"deploy", "daemon")
generate_engine_package(_package, engine_path,
dest=dockerfile_path)
client = _get_client()
client.images.build(
path=dockerfile_path,
tag=name_in_registry
)
logger.info("Creating engine deploy image... Done!")
logger.info("Pushing deploy image to registry...")
os.system("docker push {}".format(name_in_registry))
logger.info("Pushing deploy image to registry... Done!")
def create_docker_volume(volume_name):
client = _get_client()
logger.info("Creating {} docker volume...".format(volume_name))
client.volumes.create(volume_name)
logger.info("Creating {} docker volume... Done!".format(volume_name))
def shutdown_and_delete_container(name):
client = _get_client()
try:
_container = client.containers.get(name)
_container.stop()
logger.warning("{} container was stopped!".format(name))
_container.remove()
logger.info("{} container was removed!".format(name))
except NotFound:
logger.warning("Container not found!")
def delete_image_and_volume(name, vol):
client = _get_client()
try:
client.images.remove(name)
logger.warning("{} image was untagged, use prune to delete it permanently!".format(name))
_vol = client.volumes.get(vol)
_vol.remove()
logger.warning("{} volume was deleted!".format(vol))
except NotFound:
logger.warning("Image or volume not found!")
def create_daemon_container(engine_name):
logger.info("Creating engine docker container ...")
client = _get_client()
_engine_volume = "marvin-{}-vol".format(engine_name)
_data_volume = "marvin-data"
_log_volume = "marvin-log"
client.containers.run(
image="marvin-" + engine_name,
name="marvin-cont-" + engine_name,
volumes={
_engine_volume: {
'bind': '/home/marvin/engine',
'mode': 'rw'
},
_data_volume: {
'bind': '/home/marvin/data',
'mode': 'rw'
},
_log_volume: {
'bind': '/home/marvin/log',
'mode': 'rw'
}
},
ports={
'22/tcp':2022,
'50051/tcp': 50051,
'50052/tcp': 50052,
'50053/tcp': 50053,
'50054/tcp': 50054,
'50055/tcp': 50055,
'50056/tcp': 50056,
'50057/tcp': 50057,
'8888/tcp': 8888
},
detach=True
)
logger.info("Creating engine docker container ... Done!")
def create_executor_container(engine_name):
logger.info("Creating engine executor container ...")
client = _get_client()
_engine_volume = "marvin-{}-vol".format(engine_name)
client.containers.run(
image="marvin-executor",
name="marvin-executor-" + engine_name,
volumes={
_engine_volume: {
'bind': '/home/marvin/engine',
'mode': 'rw'
}
},
ports={
'8000/tcp': 8000
},
detach=True
)
logger.info("Creating executor docker container ... Done!")
def create_tfserving_container(engine_name, model_path, serve=False):
logger.info("Creating temporary container ...")
_container_name = "tfserving-{}".format(engine_name)
_dest_path = "/models/{}".format(engine_name)
client = _get_client()
container = client.containers.run(
image="tensorflow/serving:1.11.1",
name=_container_name,
detach=True
)
logger.info("Creating temporary container ... Done!")
logger.info("Writing model on image ...")
os.system("docker cp {0} {1}:/models/{2}".format(model_path, _container_name, engine_name))
os.system(
"docker commit --change \'ENV MODEL_NAME {0}\' {1} tfserving:{0}"
.format(engine_name, _container_name)
)
logger.info("Writing model on image ... Done!")
logger.info("Cleaning...")
shutdown_and_delete_container(_container_name)
logger.info("Cleaning... Done!")
if serve:
logger.info("Serving...")
client.containers.run(
image="tensorflow/serving:{}".format(engine_name),
name=_container_name,
ports={
'8500/tcp': 8500
},
detach=True
)
logger.info("Serving... Done!")
def get_stats(engine_name):
client = _get_client()
if search_engine_container(engine_name):
container_name = "marvin-cont-" + engine_name
container = client.containers.get(container_name)
return container.stats(stream=False)
else:
logger.error("Engine container was not found!")
return None
class DaemonManagement:
data_path = '/home/marvin/data'
log_path = '/home/marvin/log'
def __init__(self, engine_name):
self.engine_name = engine_name
self.engine_path = '/home/marvin/engine'
@staticmethod
def get_container(engine_name):
_engine_container = "marvin-cont-{}".format(engine_name)
client = _get_client()
return client.containers.get(_engine_container)
def delete_from_daemon(self, source, folder):
if folder:
source = os.path.join(source, '*')
_container = self.get_container(self.engine_name)
_response = _container.exec_run("sh -c 'rm -rf {}'".format(source))
if _response.exit_code == 0:
logger.info("Files were deleted sucessfully!")
else:
logger.error("Error deleting files.")
def list_dir_from_daemon(self, source):
_container = self.get_container(self.engine_name)
_response = _container.exec_run("ls {}".format(source))
if _response.exit_code == 0:
print(_response.output.decode())
else:
logger.error("Error listing files.")
def copy_from_daemon(self, source, dest):
_container = self.get_container(self.engine_name)
bits, stats = _container.get_archive(source)
logger.info("Copying file from daemon container. Size:{} bytes...".format(stats['size']))
get_chunk_and_untar(bits, dest)
logger.info("Copying file from daemon container. Size:{} bytes... Done!".format(stats['size']))
def copy_to_daemon(self, source, dest, folder=True):
_container = self.get_container(self.engine_name)
data, filepath = get_tar_data(source, folder)
_response = _container.put_archive(dest, data)
if _response:
logger.info("File copy was sucessfull!")
else:
logger.error("Error in file copy.")
#delete tmp file
os.remove(filepath)
def push_data(self, path):
self.copy_to_daemon(path, self.data_path, compress, folder=False)
def delete_data(self, file):
_file_path = os.path.join(self.data_path, file)
self.delete_from_daemon(_file_path, folder=False)
def list_data_files(self):
self.list_dir_from_daemon(self.data_path)
def clone_engine(self):
_dest = os.environ['MARVIN_HOME']
_orig_folder_name = os.path.join(_dest, 'engine')
_mod_folder_name = os.path.join(_dest, self.engine_name)
self.copy_from_daemon(self.engine_path, _dest)
os.rename(_orig_folder_name, _mod_folder_name)
def push_engine(self):
_source = os.environ['MARVIN_HOME']
_mod_folder_name = os.path.join(_source, self.engine_name)
_engine_folder = os.path.join(_source, 'engine')
os.rename(_mod_folder_name, _engine_folder)
self.delete_from_daemon(self.engine_path, folder=True)
self.copy_to_daemon(_engine_folder, self.engine_path)
def clone_artifacts(self):
_dest = os.path.join(os.environ['MARVIN_DATA_PATH'], '.artifacts', self.engine_name)
_source = os.path.join(self.data_path, '.artifacts', self.engine_name)
self.copy_from_daemon(_source, _dest)
def clone_logs(self):
_dest = os.path.join(os.environ['MARVIN_DATA_PATH'], '.log', self.engine_name)
self.copy_from_daemon(self.log_path, _dest)