blob: e63c05c27ec89b929a38426ee7856990d07ea0bb [file] [log] [blame]
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
docker_to_squash.py is a tool to facilitate the process of converting
Docker images into squashFS layers, manifests, and configs.
Tool dependencies: skopeo, squashfs-tools, tar, setfattr
"""
import argparse
from collections import Iterable
import glob
import hashlib
import json
import logging
import os
import re
import shutil
import subprocess
import time
from threading import Timer
LOG_LEVEL = None
HADOOP_BIN_DIR = "/undefined"
MAX_IMAGE_LAYERS = 0
MAX_IMAGE_SIZE = 0
HADOOP_PREFIX = "/undefined"
ARG_MAX = 10
HDFS_ROOT = "/undefined"
HDFS_MANIFEST_DIR = "/undefined"
HDFS_CONFIG_DIR = "/undefined"
HDFS_LAYERS_DIR = "/undefined"
HDFS_UNREF_DIR = "/undefined"
def shell_command(command, print_stdout, print_stderr, raise_on_error, timeout_sec=600):
global LOG_LEVEL
global ARG_MAX
stdout_val = subprocess.PIPE
stderr_val = subprocess.PIPE
logging.debug("command: %s", command)
for arg in command:
if len(arg) > ARG_MAX:
raise Exception(f"command length ({len(arg)}) greater than ARG_MAX ({ARG_MAX})")
if print_stdout:
stdout_val = None
if print_stderr or LOG_LEVEL == "DEBUG":
stderr_val = None
process = None
timer = None
out = None
err = None
try:
process = subprocess.Popen(command, stdout=stdout_val, stderr=stderr_val)
timer = Timer(timeout_sec, process_timeout, [process])
timer.start()
out, err = process.communicate()
if raise_on_error and process.returncode:
exception_string = f"Command: {command} failed with returncode: {process.returncode}"
if out:
exception_string = exception_string + "\nstdout: " + str(out)
if err:
exception_string = exception_string + "\nstderr: " + str(err)
raise Exception(exception_string)
except Exception as ex:
if process and process.poll() is None:
process.kill()
raise Exception("Popen failure, " + str(ex))
finally:
# Note that finally clause executes even when there is no exception, hence the "return" statement
if timer:
timer.cancel()
return out, err, (process.returncode if process else -1)
def process_timeout(process):
process.kill()
logging.error("Process killed due to timeout")
def does_hdfs_entry_exist(entry, raise_on_error=True):
out, err, rc = hdfs_ls(entry, raise_on_error=raise_on_error)
if rc:
return False
return True
def setup_hdfs_dirs(dirs):
if does_hdfs_entry_exist(dirs, raise_on_error=False):
return
hdfs_mkdir(dirs, create_parents=True)
chmod_dirs = []
for dir_entry in dirs:
directories = dir_entry.split("/")[1:]
dir_path = ""
for directory in directories:
dir_path = dir_path + "/" + directory
logging.info("dir_path: %s", str(dir_path))
chmod_dirs.append(dir_path)
hdfs_chmod("755", chmod_dirs)
def append_or_extend_to_list(src, src_list):
if isinstance(src, list):
src_list.extend(src)
else:
src_list.append(src)
def hdfs_get(src, dest, print_stdout=False, print_stderr=False, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-get"]
append_or_extend_to_list(src, command)
command.append(dest)
out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
return out, err, rc
def hdfs_ls(file_path, options="", print_stdout=False, print_stderr=False, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-ls"]
if options:
append_or_extend_to_list(options, command)
append_or_extend_to_list(file_path, command)
out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
return out, err, rc
def hdfs_cat(file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cat"]
append_or_extend_to_list(file_path, command)
out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
return out, err, rc
def hdfs_mkdir(file_path, print_stdout=False, print_stderr=True, raise_on_error=True, create_parents=False):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-mkdir"]
if create_parents:
command.append("-p")
append_or_extend_to_list(file_path, command)
out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
return out, err, rc
def hdfs_rm(file_path, error_on_file_not_found=False, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-rm"]
if not error_on_file_not_found:
command.append("-f")
append_or_extend_to_list(file_path, command)
out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
return out, err, rc
def hdfs_put(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-put"]
if force:
command.append("-f")
append_or_extend_to_list(src, command)
command.append(dest)
out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error, 60)
return out, err, rc
def hdfs_chmod(mode, file_path, print_stdout=False, print_stderr=True, raise_on_error=True, recursive=False):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-chmod"]
if recursive:
command.append("-R")
command.append(mode)
append_or_extend_to_list(file_path, command)
out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
return out, err, rc
def hdfs_setrep(replication, file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-setrep", str(replication)]
append_or_extend_to_list(file_path, command)
out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
return out, err, rc
def hdfs_cp(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cp"]
if force:
command.append("-f")
append_or_extend_to_list(src, command)
command.append(dest)
out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
return out, err, rc
def hdfs_touchz(file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-touchz"]
append_or_extend_to_list(file_path, command)
out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
return out, err, rc
def hdfs_stat(file_path, fmt, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-stat"]
append_or_extend_to_list(fmt, command)
command.append(file_path)
out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
return out, err, rc
def get_working_dir(directory):
working_dir = "(undefined)"
try:
if os.path.isdir(directory):
working_dir = os.path.join(directory, "docker-to-squash")
else:
working_dir = directory
os.makedirs(working_dir)
except Exception as ex:
raise Exception(f"Could not create working_dir: {working_dir}, {ex}")
return working_dir
def is_sha256_hash(string):
if not re.findall(r"^[a-fA-F\d]{64}$", string):
return False
return True
def calculate_file_hash(filename):
sha = hashlib.sha256()
with open(filename, 'rb') as file_pointer:
while True:
data = file_pointer.read(65536)
if not data:
break
sha.update(data)
hex_digest = sha.hexdigest()
if hex_digest == 0:
raise Exception(f"Hex digest for file: {filename} returned 0")
return hex_digest
def calculate_string_hash(string):
sha = hashlib.sha256()
sha.update(string)
return sha.hexdigest()
def get_local_manifest_from_path(manifest_path):
with open(manifest_path, "rb") as file_pointer:
out = file_pointer.read()
manifest_hash = calculate_string_hash(str(out))
manifest = json.loads(out)
return manifest, manifest_hash
def get_hdfs_manifest_from_path(manifest_path):
out, err, rc = hdfs_cat(manifest_path)
manifest_hash = calculate_string_hash(str(out))
manifest = json.loads(out)
return manifest, manifest_hash
def get_hdfs_manifests_from_paths(manifest_paths):
out, err, rc = hdfs_cat(manifest_paths)
manifests_list = out.split("}{")
manifests = []
for manifest_str in manifests_list:
if manifest_str[0] != "{":
manifest_str = "{" + manifest_str
if manifest_str[-1] != "}":
manifest_str = manifest_str + "}"
manifest_hash = calculate_string_hash(manifest_str)
logging.debug("manifest for %s:\n%s", manifest_hash, manifest_str)
manifest = json.loads(manifest_str)
manifests.append((manifest, manifest_hash))
return manifests
def get_config_hash_from_manifest(manifest):
config_hash = manifest['config']['digest'].split(":", 1)[1]
return config_hash
def check_total_layer_number(layers):
global MAX_IMAGE_LAYERS
if len(layers) > MAX_IMAGE_LAYERS:
logging.error("layers: " + str(layers))
raise Exception("Image has " + str(len(layers)) +
" layers, which is more than the maximum " + str(MAX_IMAGE_LAYERS) +
" layers. Failing out")
def check_total_layer_size(manifest, size):
global MAX_IMAGE_SIZE
if size > MAX_IMAGE_SIZE:
for layer in manifest['layers']:
logging.error("layer " + layer['digest'] + " has size " + str(layer['size']))
raise Exception("Image has total size " + str(size) +
" B. which is more than the maximum size " + str(MAX_IMAGE_SIZE) + " B. Failing out")
def get_layer_hashes_from_manifest(manifest, error_on_size_check=True):
layers = []
size = 0
for layer in manifest['layers']:
layers.append(layer['digest'].split(":", 1)[1])
size += layer['size']
if error_on_size_check:
check_total_layer_number(layers)
check_total_layer_size(manifest, size)
return layers
def get_pull_fmt_string(pull_format):
pull_fmt_string = pull_format + ":"
if pull_format == "docker":
pull_fmt_string = pull_fmt_string + "//"
return pull_fmt_string
def get_manifest_from_docker_image(pull_format, image):
pull_fmt_string = get_pull_fmt_string(pull_format)
out, err, rc = shell_command(["skopeo", "inspect", "--raw", pull_fmt_string + image], False, True, True)
manifest = json.loads(out)
if 'manifests' in manifest:
logging.debug("skopeo inspect --raw returned a list of manifests")
manifests_dict = manifest['manifests']
sha = None
for mfest in manifests_dict:
if mfest['platform']['architecture'] == "amd64":
sha = mfest['digest']
break
if not sha:
raise Exception("Could not find amd64 manifest for image " + image)
image_without_tag = image.split("/", 1)[-1].split(":", 1)[0]
image_and_sha = image_without_tag + "@" + sha
logging.debug("amd64 manifest sha is: %s", sha)
manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image_and_sha)
else:
manifest_hash = calculate_string_hash(str(out))
logging.debug("manifest: %s", str(manifest))
return manifest, manifest_hash
def split_image_and_tag(image_and_tag):
split = image_and_tag.split(",")
image = split[0]
tags = split[1:]
return image, tags
def read_image_tag_to_hash(image_tag_to_hash):
hash_to_tags = dict()
tag_to_hash = dict()
with open(image_tag_to_hash, 'rb') as file_pointer:
while True:
line = file_pointer.readline()
if not line:
break
line = str(line).rstrip()
if not line:
continue
comment_split_line = line.split("#", 1)
line = comment_split_line[0]
comment = comment_split_line[1:]
split_line = line.rsplit(":", 1)
manifest_hash = split_line[-1]
tags_list = ' '.join(split_line[:-1]).split(",")
if not is_sha256_hash(manifest_hash) or not tags_list:
logging.warning("image-tag-to-hash file malformed. Skipping entry %s", line)
continue
tags_and_comments = hash_to_tags.get(manifest_hash, None)
if tags_and_comments is None:
known_tags = tags_list
known_comment = comment
else:
known_tags = tags_and_comments[0]
for tag in tags_list:
if tag not in known_tags:
known_tags.append(tag)
known_comment = tags_and_comments[1]
known_comment.extend(comment)
hash_to_tags[manifest_hash] = (known_tags, known_comment)
for tag in tags_list:
cur_manifest = tag_to_hash.get(tag, None)
if cur_manifest is not None:
logging.warning(f"tag_to_hash already has manifest {cur_manifest} defined for tag {tag}. "
f"This entry will be overwritten")
tag_to_hash[tag] = manifest_hash
return hash_to_tags, tag_to_hash
def remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag):
if not hash_to_tags:
logging.debug("hash_to_tags is null. Not removing tag %s", tag)
return
prev_hash = tag_to_hash.get(tag, None)
if prev_hash is not None:
del tag_to_hash[tag]
prev_tags, prev_comment = hash_to_tags.get(prev_hash, (None, None))
prev_tags.remove(tag)
if prev_tags == 0:
del hash_to_tags[prev_hash]
else:
hash_to_tags[prev_hash] = (prev_tags, prev_comment)
else:
logging.debug("Tag not found. Not removing tag: %s", tag)
def remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_hash):
if not hash_to_tags:
logging.debug("hash_to_tags is null. Not removing image_hash %s", image_hash)
return
logging.debug("hash_to_tags: %s", str(hash_to_tags))
logging.debug("Removing image_hash from dicts: %s", image_hash)
prev_tags, prev_comments = hash_to_tags.get(image_hash, None)
if prev_tags is not None:
hash_to_tags.pop(image_hash)
for tag in prev_tags:
del tag_to_hash[tag]
def add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment):
tag_to_hash[tag] = manifest_hash
new_tags_and_comments = hash_to_tags.get(manifest_hash, None)
if new_tags_and_comments is None:
new_tags = [tag]
new_comment = [comment]
else:
new_tags = new_tags_and_comments[0]
new_comment = new_tags_and_comments[1]
if tag not in new_tags:
new_tags.append(tag)
if comment and comment not in new_comment:
new_comment.append(comment)
hash_to_tags[manifest_hash] = (new_tags, new_comment)
def write_local_image_tag_to_hash(image_tag_to_hash, hash_to_tags):
file_contents = []
for key, value in hash_to_tags.iteritems():
manifest_hash = key
# Sort tags list to preserve consistent order
value[0].sort()
tags = ','.join(map(str, value[0]))
if tags:
# Sort comments list to preserve consistent order
value[1].sort()
comment = ', '.join(map(str, value[1]))
if comment:
comment = "#" + comment
file_contents.append(tags + ":" + manifest_hash + comment + "\n")
file_contents.sort()
with open(image_tag_to_hash, 'w') as file_pointer:
for val in file_contents:
file_pointer.write(val)
def update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, comment):
for tag in tags:
update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment)
def update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment):
remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag)
add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment)
def remove_from_dicts(hash_to_tags, tag_to_hash, tags):
for tag in tags:
logging.debug("removing tag: %s", tag)
remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag)
def populate_tag_dicts(image_tag_to_hash, local_image_tag_to_hash):
return populate_tag_dicts_set_root(image_tag_to_hash, local_image_tag_to_hash, None)
def populate_tag_dicts_set_root(image_tag_to_hash, local_image_tag_to_hash, hdfs_root):
# Setting hdfs_root to None will default it to using the global
global HDFS_ROOT
if not hdfs_root:
hdfs_root = HDFS_ROOT
hdfs_get(hdfs_root + "/" + image_tag_to_hash, local_image_tag_to_hash, raise_on_error=True)
image_tag_to_hash_hash = calculate_file_hash(local_image_tag_to_hash)
if image_tag_to_hash_hash:
hash_to_tags, tag_to_hash = read_image_tag_to_hash(local_image_tag_to_hash)
else:
hash_to_tags = {}
tag_to_hash = {}
return hash_to_tags, tag_to_hash, image_tag_to_hash_hash
def setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path):
logging.debug("Setting up squashfs hdfs_dirs: %s", str(hdfs_dirs))
setup_hdfs_dirs(hdfs_dirs)
if not does_hdfs_entry_exist(image_tag_to_hash_path, raise_on_error=False):
hdfs_touchz(image_tag_to_hash_path)
hdfs_chmod("755", image_tag_to_hash_path)
def skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir):
logging.info("Pulling image: %s", image)
if os.path.isdir(skopeo_dir):
raise Exception("Skopeo output directory already exists. "
+ "Please delete and try again "
+ "Directory: " + skopeo_dir)
pull_fmt_string = get_pull_fmt_string(pull_format)
shell_command(["skopeo", "copy", pull_fmt_string + image,
skopeo_format + ":" + skopeo_dir], False, True, True)
def untar_layer(tmp_dir, layer_path):
shell_command(["tar", "-C", tmp_dir, "--xattrs", "--xattrs-include='*'", "-xf", layer_path], False, True, True)
def tar_file_search(archive, target):
out, err, rc = shell_command(["tar", "-xf", archive, target, "-O"], False, False, False)
return out
def set_fattr(directory):
shell_command(["setfattr", "-n", "trusted.overlay.opaque", "-v", "y", directory], False, True, True)
def make_whiteout_block_device(file_path, whiteout):
shell_command(["mknod", "-m", "000", file_path, "c", "0", "0"], False, True, True)
out, err, rc = shell_command(["stat", "-c", "%U:%G", whiteout], False, True, True)
perms = str(out).strip()
shell_command(["chown", perms, file_path], False, True, True)
def convert_oci_whiteouts(tmp_dir):
out, err, rc = shell_command(["find", tmp_dir, "-name", ".wh.*"], False, False, True)
whiteouts = str(out).splitlines()
for whiteout in whiteouts:
if whiteout == 0:
continue
basename = os.path.basename(whiteout)
directory = os.path.dirname(whiteout)
if basename == ".wh..wh..opq":
set_fattr(directory)
else:
whiteout_string = ".wh."
idx = basename.rfind(whiteout_string)
bname = basename[idx + len(whiteout_string):]
file_path = os.path.join(directory, bname)
make_whiteout_block_device(file_path, whiteout)
shell_command(["rm", whiteout], False, True, True)
def dir_to_squashfs(tmp_dir, squash_path):
shell_command(["mksquashfs", tmp_dir, squash_path, "-write-queue", "4096", "-read-queue", "4096",
"-fragment-queue", "4096"],
False, True, True)
def upload_to_hdfs(src, dest, replication, mode, force=False):
if does_hdfs_entry_exist(dest, raise_on_error=False):
if not force:
logging.warning("Not uploading to HDFS. File already exists: %s", dest)
return
logging.info("File already exists, but overwriting due to force option: %s", dest)
hdfs_put(src, dest, force)
hdfs_setrep(replication, dest)
hdfs_chmod(mode, dest)
logging.info(f"Uploaded file {dest} with replication {replication} and permissions {mode}")
def atomic_upload_mv_to_hdfs(src, dest, replication, image_tag_to_hash_file_hash):
global HADOOP_PREFIX
global HADOOP_BIN_DIR
local_hash = calculate_file_hash(src)
if local_hash == image_tag_to_hash_file_hash:
logging.info("image_tag_to_hash file unchanged. Not uploading")
return
tmp_dest = dest + ".tmp"
try:
if does_hdfs_entry_exist(tmp_dest, raise_on_error=False):
hdfs_rm(tmp_dest)
hdfs_put(src, tmp_dest)
hdfs_setrep(replication, tmp_dest)
hdfs_chmod("444", tmp_dest)
jar_path = HADOOP_PREFIX + "/share/hadoop/tools/lib/hadoop-extras-*.jar"
jar_file = None
for file in glob.glob(jar_path):
jar_file = file
if not jar_file:
raise Exception("SymlinkTool Jar doesn't exist: %s" % jar_path)
logging.debug("jar_file: " + jar_file)
shell_command([HADOOP_BIN_DIR + "/hadoop", "jar", jar_file, "org.apache.hadoop.tools.SymlinkTool",
"mvlink", "-f", tmp_dest, dest], False, False, True)
except Exception as ex:
if does_hdfs_entry_exist(tmp_dest, raise_on_error=False):
hdfs_rm(tmp_dest)
raise Exception("image tag to hash file upload failed, exception=" + str(ex))
def docker_to_squash(layer_dir, layer, working_dir):
tmp_dir = os.path.join(working_dir, "expand_archive_" + layer)
layer_path = os.path.join(layer_dir, layer)
squash_path = layer_path + ".sqsh"
if os.path.isdir(tmp_dir):
raise Exception("tmp_dir already exists. Please delete and try again " +
"Directory: " + tmp_dir)
os.makedirs(tmp_dir)
try:
untar_layer(tmp_dir, layer_path)
convert_oci_whiteouts(tmp_dir)
dir_to_squashfs(tmp_dir, squash_path)
finally:
os.remove(layer_path)
shell_command(["rm", "-rf", tmp_dir], False, True, True)
def check_image_for_magic_file(magic_file, skopeo_dir, layers):
magic_file_absolute = magic_file.strip("/")
logging.debug("Searching for magic file %s", magic_file_absolute)
for layer in layers:
ret = tar_file_search(os.path.join(skopeo_dir, layer), magic_file_absolute)
if ret:
logging.debug("Found magic file %s in layer %s", magic_file_absolute, layer)
logging.debug("Magic file %s has contents:\n%s", magic_file_absolute, ret)
return ret
raise Exception(f"Magic file {magic_file_absolute} doesn't exist in any layer")
def pull_build_push_update(args):
global HDFS_ROOT
global HDFS_MANIFEST_DIR
global HDFS_CONFIG_DIR
global HDFS_LAYERS_DIR
global HDFS_UNREF_DIR
skopeo_format = args.skopeo_format
pull_format = args.pull_format
image_tag_to_hash = args.image_tag_to_hash
replication = args.replication
force = args.force
images_and_tags = args.images_and_tags
check_magic_file = args.check_magic_file
magic_file = args.magic_file
bootstrap = args.bootstrap
working_dir = None
try:
working_dir = get_working_dir(args.working_dir)
local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
if bootstrap:
hdfs_dirs = [HDFS_ROOT, HDFS_LAYERS_DIR, HDFS_CONFIG_DIR, HDFS_MANIFEST_DIR, HDFS_UNREF_DIR]
image_tag_to_hash_path = HDFS_ROOT + "/" + image_tag_to_hash
setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
local_image_tag_to_hash)
for image_and_tag_arg in images_and_tags:
image, tags = split_image_and_tag(image_and_tag_arg)
if not image or not tags:
raise Exception("Positional parameter requires an image and at least 1 tag: " + image_and_tag_arg)
logging.info("Working on image %s with tags %s", image, str(tags))
manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
layers = get_layer_hashes_from_manifest(manifest)
config_hash = get_config_hash_from_manifest(manifest)
logging.debug("Layers: %s", str(layers))
logging.debug("Config: %s", str(config_hash))
update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, image)
hdfs_files_to_check = [HDFS_MANIFEST_DIR + "/" + manifest_hash, HDFS_CONFIG_DIR + "/" + config_hash]
for layer in layers:
hdfs_files_to_check.append(HDFS_LAYERS_DIR + "/" + layer + ".sqsh")
if does_hdfs_entry_exist(hdfs_files_to_check, raise_on_error=False):
if not force:
logging.info("All image files exist in HDFS, skipping this image")
continue
logging.info("All image files exist in HDFS, but force option set, so overwriting image")
skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
logging.debug("skopeo_dir: %s", skopeo_dir)
skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir)
if check_magic_file:
check_image_for_magic_file(magic_file, skopeo_dir, layers)
for layer in layers:
logging.info("Squashifying and uploading layer: %s", layer)
hdfs_squash_path = HDFS_LAYERS_DIR + "/" + layer + ".sqsh"
if does_hdfs_entry_exist(hdfs_squash_path, raise_on_error=False):
if force:
logging.info(f"Layer already exists, but overwriting due to force option: {layer}")
else:
logging.info(f"Layer exists. Skipping and not squashifying or uploading: {layer}")
continue
docker_to_squash(skopeo_dir, layer, working_dir)
squash_path = os.path.join(skopeo_dir, layer + ".sqsh")
squash_name = os.path.basename(squash_path)
upload_to_hdfs(squash_path, HDFS_LAYERS_DIR + "/" + squash_name, replication, "444", force)
config_local_path = os.path.join(skopeo_dir, config_hash)
upload_to_hdfs(config_local_path,
HDFS_CONFIG_DIR + "/" + os.path.basename(config_local_path),
replication, "444", force)
manifest_local_path = os.path.join(skopeo_dir, "manifest.json")
upload_to_hdfs(manifest_local_path, HDFS_MANIFEST_DIR + "/" + manifest_hash, replication, "444", force)
write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
atomic_upload_mv_to_hdfs(local_image_tag_to_hash, HDFS_ROOT + "/" + image_tag_to_hash,
replication, image_tag_to_hash_hash)
finally:
if working_dir:
if os.path.isdir(working_dir):
shell_command(["rm", "-rf", working_dir], False, True, True)
def pull_build(args):
skopeo_format = args.skopeo_format
pull_format = args.pull_format
images_and_tags = args.images_and_tags
check_magic_file = args.check_magic_file
magic_file = args.magic_file
for image_and_tag_arg in images_and_tags:
image, tags = split_image_and_tag(image_and_tag_arg)
if not image or not tags:
raise Exception("Positional parameter requires an image and at least 1 tag: " + image_and_tag_arg)
logging.info(f"Working on image {image} with tags {tags}")
manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
layers = get_layer_hashes_from_manifest(manifest)
config_hash = get_config_hash_from_manifest(manifest)
logging.debug(f"Layers: {layers}")
logging.debug(f"Config: {config_hash}")
skopeo_dir = None
try:
working_dir = get_working_dir(args.working_dir)
skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
logging.debug(f"skopeo_dir: {skopeo_dir}")
skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir)
if check_magic_file:
check_image_for_magic_file(magic_file, skopeo_dir, layers)
for layer in layers:
logging.info(f"Squashifying layer: {layer}")
docker_to_squash(skopeo_dir, layer, working_dir)
except Exception as _:
if skopeo_dir and os.path.isdir(skopeo_dir):
shutil.rmtree(skopeo_dir)
raise
def push_update(args):
global HDFS_ROOT
global HDFS_MANIFEST_DIR
global HDFS_CONFIG_DIR
global HDFS_LAYERS_DIR
global HDFS_UNREF_DIR
image_tag_to_hash = args.image_tag_to_hash
replication = args.replication
force = args.force
images_and_tags = args.images_and_tags
bootstrap = args.bootstrap
local_image_tag_to_hash = None
try:
working_dir = get_working_dir(args.working_dir)
local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
if bootstrap:
hdfs_dirs = [HDFS_ROOT, HDFS_LAYERS_DIR, HDFS_CONFIG_DIR, HDFS_MANIFEST_DIR, HDFS_UNREF_DIR]
image_tag_to_hash_path = HDFS_ROOT + "/" + image_tag_to_hash
setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
local_image_tag_to_hash)
for image_and_tag_arg in images_and_tags:
image, tags = split_image_and_tag(image_and_tag_arg)
if not image or not tags:
raise Exception("Positional parameter requires an image and at least 1 tag: " + image_and_tag_arg)
logging.info("Working on image %s with tags %s", image, str(tags))
skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
if not os.path.exists(skopeo_dir):
raise Exception(f"skopeo_dir doesn't exists: {skopeo_dir}")
manifest, manifest_hash = get_local_manifest_from_path(f"{skopeo_dir}/manifest.json")
layers = get_layer_hashes_from_manifest(manifest)
config_hash = get_config_hash_from_manifest(manifest)
logging.debug("Layers: %s", str(layers))
logging.debug("Config: %s", str(config_hash))
update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, image)
hdfs_files_to_check = [f"{HDFS_MANIFEST_DIR}/{manifest_hash}", f"{HDFS_CONFIG_DIR}/{config_hash}"]
for layer in layers:
hdfs_files_to_check.append(f"{HDFS_LAYERS_DIR}/{layer}.sqsh")
if does_hdfs_entry_exist(hdfs_files_to_check, raise_on_error=False):
if not force:
logging.info("All image files exist in HDFS, skipping this image")
continue
logging.info("All image files exist in HDFS, but force option set, so overwriting image")
for layer in layers:
hdfs_squash_path = f"{HDFS_LAYERS_DIR}/{layer}.sqsh"
if does_hdfs_entry_exist(hdfs_squash_path, raise_on_error=False):
if force:
logging.info(f"Layer already exists, but overwriting due to force option: {layer}")
else:
logging.info(f"Layer exists. Skipping and not squashifying or uploading: {layer}")
continue
squash_path = os.path.join(skopeo_dir, layer + ".sqsh")
squash_name = os.path.basename(squash_path)
upload_to_hdfs(squash_path, f"{HDFS_LAYERS_DIR}/{squash_name}", replication, "444", force)
config_local_path = os.path.join(skopeo_dir, config_hash)
upload_to_hdfs(config_local_path,
f"{HDFS_CONFIG_DIR}/" + os.path.basename(config_local_path),
replication, "444", force)
manifest_local_path = os.path.join(skopeo_dir, "manifest.json")
upload_to_hdfs(manifest_local_path, HDFS_MANIFEST_DIR + "/" + manifest_hash, replication, "444", force)
write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
atomic_upload_mv_to_hdfs(local_image_tag_to_hash, f"{HDFS_ROOT}/{image_tag_to_hash}",
replication, image_tag_to_hash_hash)
finally:
if local_image_tag_to_hash:
if os.path.isfile(local_image_tag_to_hash):
os.remove(local_image_tag_to_hash)
def remove_image(args):
global HDFS_ROOT
global HDFS_MANIFEST_DIR
global HDFS_CONFIG_DIR
global HDFS_LAYERS_DIR
global HDFS_UNREF_DIR
image_tag_to_hash = args.image_tag_to_hash
replication = args.replication
images_or_tags = args.images_or_tags
working_dir = None
try:
working_dir = get_working_dir(args.working_dir)
local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
images_and_tags_to_remove = []
for image_or_tag_arg in images_or_tags:
images_and_tags_to_remove.extend(image_or_tag_arg.split(","))
logging.debug("images_and_tags_to_remove:\n%s", images_and_tags_to_remove)
hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
local_image_tag_to_hash)
logging.debug("hash_to_tags: %s", str(hash_to_tags))
logging.debug("tag_to_hash: %s", str(tag_to_hash))
known_images = get_all_known_images()
if not known_images:
logging.warn("No known images\n")
return
images_to_remove = get_images_from_args(images_and_tags_to_remove, tag_to_hash, known_images)
logging.debug("images_to_remove:\n%s", images_to_remove)
if not images_to_remove:
logging.warning("No images to remove")
return
delete_list = get_delete_list_from_images_to_remove(images_to_remove, known_images)
for image_to_remove in images_to_remove:
remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_to_remove.manifest_stat.name)
write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
atomic_upload_mv_to_hdfs(local_image_tag_to_hash, HDFS_ROOT + "/" + image_tag_to_hash, replication,
image_tag_to_hash_hash)
hdfs_rm(delete_list)
finally:
if working_dir:
if os.path.isdir(working_dir):
shutil.rmtree(working_dir)
def get_images_from_args(images_and_tags, tag_to_hash, known_images):
images = []
if isinstance(images_and_tags, Iterable):
for image_arg in images_and_tags:
image = get_image_hash_from_arg(image_arg, tag_to_hash, known_images)
if image:
images.append(image)
else:
image_arg = images_and_tags[0]
image = get_image_hash_from_arg(image_arg, tag_to_hash, known_images)
if image:
images.append(image)
return images
def get_image_hash_from_arg(image_str, tag_to_hash, known_images):
if is_sha256_hash(image_str):
image_hash = image_str
else:
image_hash = tag_to_hash.get(image_str, None)
if image_hash:
image = get_known_image_by_hash(image_hash, known_images)
else:
logging.warn("image tag unknown: %s", image_str)
return None
return image
def get_delete_list_from_images_to_remove(images_to_remove, known_images):
global HDFS_ROOT
global HDFS_MANIFEST_DIR
global HDFS_CONFIG_DIR
global HDFS_LAYERS_DIR
global HDFS_UNREF_DIR
layers_to_keep = []
delete_list = []
for image in known_images:
if image not in images_to_remove:
layers_to_keep.extend(image.layers)
for image_to_remove in images_to_remove:
delete_list.append(HDFS_MANIFEST_DIR + "/" + image_to_remove.manifest_stat.name)
delete_list.append(HDFS_CONFIG_DIR + "/" + image_to_remove.config)
if image_to_remove.unref_file_stat:
delete_list.append(HDFS_UNREF_DIR + "/" + image_to_remove.unref_file_stat.name)
layers = image_to_remove.layers
for layer in layers:
if layer not in layers_to_keep:
layer_path = HDFS_LAYERS_DIR + "/" + layer + ".sqsh"
if layer_path not in delete_list:
delete_list.append(layer_path)
logging.debug("delete_list:\n%s", delete_list)
return delete_list
def add_remove_tag(args):
global HDFS_ROOT
global HDFS_MANIFEST_DIR
pull_format = args.pull_format
image_tag_to_hash = args.image_tag_to_hash
replication = args.replication
sub_command = args.sub_command
images_and_tags = args.images_and_tags
working_dir = None
try:
working_dir = get_working_dir(args.working_dir)
local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
local_image_tag_to_hash)
for image_and_tag_arg in images_and_tags:
if sub_command == "add-tag":
image, tags = split_image_and_tag(image_and_tag_arg)
if is_sha256_hash(image):
manifest_hash = image
else:
manifest_hash = tag_to_hash.get(image, None)
if manifest_hash:
manifest_path = HDFS_MANIFEST_DIR + "/" + manifest_hash
out, err, returncode = hdfs_cat(manifest_path)
manifest = json.loads(out)
logging.debug("image tag exists for %s", image)
else:
manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, image)
elif sub_command == "remove-tag":
tags = image_and_tag_arg.split(",")
image = None
manifest = None
manifest_hash = 0
remove_from_dicts(hash_to_tags, tag_to_hash, tags)
else:
raise Exception(f"Invalid sub_command: {sub_command}")
write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
atomic_upload_mv_to_hdfs(local_image_tag_to_hash, HDFS_ROOT + "/" + image_tag_to_hash, replication,
image_tag_to_hash_hash)
finally:
if working_dir:
if os.path.isdir(working_dir):
shutil.rmtree(working_dir)
def copy_update(args):
image_tag_to_hash = args.image_tag_to_hash
replication = args.replication
force = args.force
src_root = args.src_root
dest_root = args.dest_root
images_and_tags = args.images_and_tags
bootstrap = args.bootstrap
src_layers_dir = src_root + "/layers"
src_config_dir = src_root + "/config"
src_manifest_dir = src_root + "/manifests"
dest_layers_dir = dest_root + "/layers"
dest_config_dir = dest_root + "/config"
dest_manifest_dir = dest_root + "/manifests"
dest_unref_dir = dest_root + "/unreferenced"
if bootstrap:
hdfs_dirs = [dest_root, dest_layers_dir, dest_config_dir, dest_manifest_dir, dest_unref_dir]
image_tag_to_hash_path = dest_root + "/" + image_tag_to_hash
setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
working_dir = None
try:
working_dir = get_working_dir(args.working_dir)
local_src_image_tag_to_hash = os.path.join(working_dir, "src-"
+ os.path.basename(image_tag_to_hash))
local_dest_image_tag_to_hash = os.path.join(working_dir, "dest-"
+ os.path.basename(image_tag_to_hash))
src_hash_to_tags, src_tag_to_hash, src_image_tag_to_hash_hash = populate_tag_dicts_set_root(image_tag_to_hash,
local_src_image_tag_to_hash,
src_root)
dest_hash_to_tags, dest_tag_to_hash, dest_image_tag_to_hash_hash = populate_tag_dicts_set_root(
image_tag_to_hash, local_dest_image_tag_to_hash, dest_root)
for image_and_tag_arg in images_and_tags:
image, tags = split_image_and_tag(image_and_tag_arg)
if not image:
raise Exception("Positional parameter requires an image: " + image_and_tag_arg)
if not tags:
logging.debug("Tag not given. Using image tag instead: %s", image)
tags = [image]
src_manifest_hash = src_tag_to_hash.get(image, None)
if not src_manifest_hash:
logging.info("Manifest not found for image %s. Skipping", image)
continue
src_manifest_path = src_manifest_dir + "/" + src_manifest_hash
dest_manifest_path = dest_manifest_dir + "/" + src_manifest_hash
src_manifest, src_manifest_hash = get_hdfs_manifest_from_path(src_manifest_path)
src_config_hash = get_config_hash_from_manifest(src_manifest)
src_config_path = src_config_dir + "/" + src_config_hash
dest_config_path = dest_config_dir + "/" + src_config_hash
src_layers = get_layer_hashes_from_manifest(src_manifest)
src_layers_paths = [src_layers_dir + "/" + layer + ".sqsh" for layer in src_layers]
dest_layers_paths = [dest_layers_dir + "/" + layer + ".sqsh" for layer in src_layers]
logging.debug("Copying Manifest: %s", str(src_manifest_path))
logging.debug("Copying Layers: %s", str(src_layers_paths))
logging.debug("Copying Config: %s", str(src_config_hash))
if not does_hdfs_entry_exist(dest_layers_paths, raise_on_error=False):
dest_layers_paths = []
for layer in src_layers:
dest_layer_path = dest_layers_dir + "/" + layer + ".sqsh"
src_layer_path = src_layers_dir + "/" + layer + ".sqsh"
if not does_hdfs_entry_exist(dest_layer_path, raise_on_error=False):
hdfs_cp(src_layer_path, dest_layer_path, force)
dest_layers_paths.append(dest_layer_path)
hdfs_setrep(replication, dest_layers_paths)
hdfs_chmod("444", dest_layers_paths)
if not does_hdfs_entry_exist(dest_config_path, raise_on_error=False):
hdfs_cp(src_config_path, dest_config_dir, force)
hdfs_setrep(replication, dest_config_path)
hdfs_chmod("444", dest_config_path)
if not does_hdfs_entry_exist(dest_manifest_path, raise_on_error=False):
hdfs_cp(src_manifest_path, dest_manifest_dir, force)
hdfs_setrep(replication, dest_manifest_path)
hdfs_chmod("444", dest_manifest_path)
for tag in tags:
comment = None
new_tags_and_comments = src_hash_to_tags.get(src_manifest_hash, None)
if new_tags_and_comments:
comment = ', '.join(map(str, new_tags_and_comments[1]))
if comment is None:
comment = image
update_dicts(dest_hash_to_tags, dest_tag_to_hash, tag, src_manifest_hash, comment)
write_local_image_tag_to_hash(local_dest_image_tag_to_hash, dest_hash_to_tags)
atomic_upload_mv_to_hdfs(local_dest_image_tag_to_hash, dest_root + "/" + image_tag_to_hash,
replication,
dest_image_tag_to_hash_hash)
finally:
if working_dir:
if os.path.isdir(working_dir):
shutil.rmtree(working_dir)
def query_tag(args):
global HDFS_ROOT
global HDFS_MANIFEST_DIR
global HDFS_CONFIG_DIR
global HDFS_LAYERS_DIR
image_tag_to_hash = args.image_tag_to_hash
tags = args.tags
working_dir = None
try:
working_dir = get_working_dir(args.working_dir)
local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
local_image_tag_to_hash)
logging.debug("hash_to_tags: %s", str(hash_to_tags))
logging.debug("tag_to_hash: %s", str(tag_to_hash))
for tag in tags:
image_hash = tag_to_hash.get(tag, None)
if not image_hash:
logging.info("image hash mapping doesn't exist for tag %s", tag)
continue
manifest_path = HDFS_MANIFEST_DIR + "/" + image_hash
if does_hdfs_entry_exist(manifest_path, raise_on_error=False):
logging.debug("image manifest for %s exists: %s", tag, manifest_path)
else:
logging.info("Image manifest for %s doesn't exist: %s", tag, manifest_path)
continue
manifest, manifest_hash = get_hdfs_manifest_from_path(manifest_path)
layers = get_layer_hashes_from_manifest(manifest, False)
config_hash = get_config_hash_from_manifest(manifest)
config_path = HDFS_CONFIG_DIR + "/" + config_hash
layers_paths = [HDFS_LAYERS_DIR + "/" + layer + ".sqsh" for layer in layers]
logging.info("Image info for '%s'", tag)
logging.info(manifest_path)
logging.info(config_path)
for layer in layers_paths:
logging.info(layer)
finally:
if working_dir:
if os.path.isdir(working_dir):
shutil.rmtree(working_dir)
def list_tags(args):
global HDFS_ROOT
image_tag_to_hash = args.image_tag_to_hash
hdfs_image_tag_to_hash = HDFS_ROOT + "/" + image_tag_to_hash
hdfs_cat(hdfs_image_tag_to_hash, True, True, True)
def bootstrap_setup(args):
global HDFS_ROOT
global HDFS_MANIFEST_DIR
global HDFS_CONFIG_DIR
global HDFS_LAYERS_DIR
global HDFS_UNREF_DIR
image_tag_to_hash = args.image_tag_to_hash
hdfs_dirs = [HDFS_ROOT, HDFS_LAYERS_DIR, HDFS_CONFIG_DIR, HDFS_MANIFEST_DIR, HDFS_UNREF_DIR]
image_tag_to_hash_path = HDFS_ROOT + "/" + image_tag_to_hash
setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
def cleanup_untagged_images(args):
global HDFS_ROOT
global HDFS_MANIFEST_DIR
global HDFS_CONFIG_DIR
global HDFS_LAYERS_DIR
global HDFS_UNREF_DIR
global DEAD_PERMS
image_tag_to_hash = args.image_tag_to_hash
working_dir = get_working_dir(args.working_dir)
local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
try:
hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
local_image_tag_to_hash)
logging.debug("hash_to_tags: %s\n", hash_to_tags)
logging.debug("tag_to_hash: %s\n", tag_to_hash)
known_images = get_all_known_images()
tagged_images = [image for image in known_images if image.manifest_stat.name in hash_to_tags.keys()]
untagged_images = get_untagged_images(known_images, tagged_images)
stale_images = get_stale_images(untagged_images)
dead_images = get_dead_images(untagged_images)
cleanup_handle_tagged_images(tagged_images)
cleanup_handle_untagged_images(untagged_images)
cleanup_handle_stale_images(stale_images)
cleanup_handle_dead_images(dead_images, known_images)
finally:
if working_dir:
if os.path.isdir(working_dir):
shutil.rmtree(working_dir)
class Filestat:
def __init__(self, perms, mod_time, name):
self.perms = perms
self.mod_time = mod_time
self.name = name
def __repr__(self):
return "Perms: {}, Mod time: {}, Name: {}\n".format(self.perms, self.mod_time, self.name)
def __str__(self):
return "Perms: {}, Mod time: {}, Name: {}\n".format(self.perms, self.mod_time, self.name)
class Image:
def __init__(self, manifest_stat, layers, config, unref_file_stat):
self.manifest_stat = manifest_stat
self.layers = layers
self.config = config
self.unref_file_stat = unref_file_stat
def __repr__(self):
return "Manifest: {}, Layers: {}, Config: {}, Unreferenced File: {}\n".format(self.manifest_stat, self.layers,
self.config, self.unref_file_stat)
def __str__(self):
return "Manifest: {}, Layers: {}, Config: {}, Unreferenced File: {}\n".format(self.manifest_stat, self.layers,
self.config, self.unref_file_stat)
def get_all_known_images():
global HDFS_MANIFEST_DIR
global HDFS_UNREF_DIR
known_manifest_paths, err, returncode = hdfs_stat(HDFS_MANIFEST_DIR + "/*", "%a %Y %n", False, False, False)
known_manifests = [image for image in known_manifest_paths.split("\n") if image is not "" or None]
unref_manifest_paths, err, returncode = hdfs_stat(HDFS_UNREF_DIR + "/*", "%a %Y %n", False, False, False)
logging.debug("unref_manifest_paths:\n%s", unref_manifest_paths)
unref_manifests = []
if unref_manifest_paths:
unref_manifests = [image for image in unref_manifest_paths.split("\n") if image is not "" or None]
logging.debug("unref_manifests:\n%s", unref_manifests)
unref_manifests_stats_dict = {}
if unref_manifests:
for unref_manifest in unref_manifests:
unref_manifest_split = unref_manifest.split()
unref_manifest_perms = unref_manifest_split[0]
unref_manifest_mod_time = long(unref_manifest_split[1])
unref_manifest_name = unref_manifest_split[2]
unref_manifest_stat = Filestat(unref_manifest_perms, unref_manifest_mod_time, unref_manifest_name)
unref_manifests_stats_dict[unref_manifest_name] = unref_manifest_stat
logging.debug("unref_manifests_stats_dict:\n%s", unref_manifests_stats_dict)
known_manifests_names = [known_manifest.split()[2] for known_manifest in known_manifests]
layers_and_configs = get_all_layers_and_configs(known_manifests_names)
known_images = []
for manifest in known_manifests:
manifest_split = manifest.split()
manifest_perms = manifest_split[0]
manifest_mod_time = long(manifest_split[1])
manifest_name = manifest_split[2]
manifest_stat = Filestat(manifest_perms, manifest_mod_time, manifest_name)
unref_image_stat = unref_manifests_stats_dict.get(manifest_name, None)
layers = layers_and_configs[manifest_name][0]
config = layers_and_configs[manifest_name][1]
known_image = Image(manifest_stat, layers, config, unref_image_stat)
known_images.append(known_image)
return known_images
def get_known_image_by_hash(image_hash, known_images):
for image in known_images:
if image_hash == image.manifest_stat.name:
return image
logging.debug("Couldn't find known image by hash:\n%s", image_hash)
return None
def get_all_layers_and_configs(manifest_names):
global HDFS_MANIFEST_DIR
manifests_tuples = get_hdfs_manifests_from_paths(
[HDFS_MANIFEST_DIR + "/" + manifest_name for manifest_name in manifest_names])
layers_and_configs = {}
for manifest_tuple in manifests_tuples:
manifest = manifest_tuple[0]
manifest_hash = manifest_tuple[1]
layers = []
layers.extend(get_layer_hashes_from_manifest(manifest, False))
config = get_config_hash_from_manifest(manifest)
layers_and_configs[manifest_hash] = (layers, config)
logging.debug("layers for %s:\n%s", manifest_hash, layers)
logging.debug("config for %s:\n%s", manifest_hash, config)
return layers_and_configs
def get_untagged_images(known_images, tagged_images):
untagged_images = []
for image in known_images:
if is_image_untagged(image, tagged_images):
untagged_images.append(image)
logging.debug("known_images:\n%s", known_images)
logging.debug("tagged_images:\n%s", tagged_images)
logging.debug("untagged_images:\n%s", untagged_images)
return untagged_images
def get_stale_images(untagged_images):
stale_images = [image for image in untagged_images if is_image_stale(image)]
logging.debug("stale_images:\n%s", stale_images)
return stale_images
def get_dead_images(untagged_images):
dead_images = [image for image in untagged_images if is_image_dead(image)]
logging.debug("dead_images:\n%s", dead_images)
return dead_images
def is_image_untagged(image, tagged_images):
for tagged_image in tagged_images:
if tagged_image.manifest_stat.name == image.manifest_stat.name:
return False
return True
def is_image_stale(image):
return does_image_have_unref_file(image) and not does_image_have_dead_perms(image)
def is_image_dead(image):
return does_image_have_unref_file(image) and does_image_have_dead_perms(image)
def does_image_have_unref_file(image):
return image.unref_file_stat != None
def does_image_have_dead_perms(image):
global DEAD_PERMS
return image.manifest_stat.perms == DEAD_PERMS
def is_mod_time_old(mod_time):
global UNTAGGED_TRANSITION_SEC
cutoff_time = long(time.time() * 1000) - UNTAGGED_TRANSITION_SEC * 1000
logging.debug("Mod time: %d, Cutoff time: %d", mod_time, cutoff_time)
return mod_time < cutoff_time
def cleanup_handle_tagged_images(tagged_images):
# Remove unreferenced files if they exist
if not tagged_images:
return
unref_remove_list = []
for image in tagged_images:
if does_image_have_unref_file(image):
unref_remove_list.append(image)
remove_unref_files(unref_remove_list)
def cleanup_handle_untagged_images(untagged_images):
# Create unreferenced file
if not untagged_images:
return
touch_list = []
for image in untagged_images:
if not does_image_have_unref_file(image):
touch_list.append(image)
touch_unref_files(touch_list)
def cleanup_handle_stale_images(stale_images):
# Set blob permissions to 400 for old stale images
if not stale_images:
return
make_unreadable_list = []
for image in stale_images:
if is_mod_time_old(image.unref_file_stat.mod_time):
make_unreadable_list.append(image)
make_manifests_unreadable(make_unreadable_list)
touch_unref_files(make_unreadable_list)
def cleanup_handle_dead_images(dead_images, known_images):
# Remove old dead images
if not dead_images:
return
images_to_remove = []
for image in dead_images:
if is_mod_time_old(image.unref_file_stat.mod_time):
images_to_remove.append(image)
remove_dead_images(images_to_remove, known_images)
def make_manifests_unreadable(images):
global HDFS_MANIFEST_DIR
global DEAD_PERMS
if not images:
return
manifest_file_paths = [HDFS_MANIFEST_DIR + "/" + image.unref_file_stat.name for image in images]
logging.debug("Chmod %s manifest file:\n%s", DEAD_PERMS, manifest_file_paths)
hdfs_chmod(DEAD_PERMS, manifest_file_paths)
def touch_unref_files(images):
global HDFS_UNREF_DIR
if not images:
return
unref_file_paths = [HDFS_UNREF_DIR + "/" + image.manifest_stat.name for image in images]
logging.debug("Touching unref file:\n%s", unref_file_paths)
hdfs_touchz(unref_file_paths)
def remove_unref_files(images):
global HDFS_UNREF_DIR
if not images:
return
unref_file_paths = [HDFS_UNREF_DIR + "/" + image.manifest_stat.name for image in images]
logging.debug("Removing unref files:\n%s", unref_file_paths)
hdfs_rm(unref_file_paths)
def remove_dead_images(images_to_remove, known_images):
if not images_to_remove:
return
logging.debug("Removing dead images:\n%s", images_to_remove)
delete_list = get_delete_list_from_images_to_remove(images_to_remove, known_images)
if delete_list:
hdfs_rm(delete_list)
def create_parsers():
parser = argparse.ArgumentParser()
add_parser_default_arguments(parser)
subparsers = parser.add_subparsers(help='sub help', dest='sub_command')
parse_pull_build_push_update = subparsers.add_parser('pull-build-push-update',
help='Pull an image, build its squashfs'
+ ' layers, push it to hdfs, and'
+ ' atomically update the'
+ ' image-tag-to-hash file')
parse_pull_build_push_update.set_defaults(func=pull_build_push_update)
add_parser_default_arguments(parse_pull_build_push_update)
parse_pull_build_push_update.add_argument("images_and_tags", nargs="+",
help="Image and tag argument (can specify multiple)")
parse_pull_build = subparsers.add_parser('pull-build',
help='Pull an image and build its squashfs layers')
parse_pull_build.set_defaults(func=pull_build)
add_parser_default_arguments(parse_pull_build)
parse_pull_build.add_argument("images_and_tags", nargs="+",
help="Image and tag argument (can specify multiple)")
parse_push_update = subparsers.add_parser('push-update',
help='Push the squashfs layers to hdfs and update'
+ ' the image-tag-to-hash file')
parse_push_update.set_defaults(func=push_update)
add_parser_default_arguments(parse_push_update)
parse_push_update.add_argument("images_and_tags", nargs="+",
help="Image and tag argument (can specify multiple)")
parse_remove_image = subparsers.add_parser('remove-image',
help='Remove an image (manifest, config, layers)'
+ ' from hdfs based on its tag or manifest hash')
parse_remove_image.set_defaults(func=remove_image)
add_parser_default_arguments(parse_remove_image)
parse_remove_image.add_argument("images_or_tags", nargs="+",
help="Image or tag argument (can specify multiple)")
parse_remove_tag = subparsers.add_parser('remove-tag',
help='Remove an image to tag mapping in the'
+ ' image-tag-to-hash file')
parse_remove_tag.set_defaults(func=add_remove_tag)
add_parser_default_arguments(parse_remove_tag)
parse_remove_tag.add_argument("images_and_tags", nargs="+",
help="Image and tag argument (can specify multiple)")
parse_add_tag = subparsers.add_parser('add-tag',
help='Add an image to tag mapping in the'
+ ' image-tag-to-hash file')
parse_add_tag.set_defaults(func=add_remove_tag)
add_parser_default_arguments(parse_add_tag)
parse_add_tag.add_argument("images_and_tags", nargs="+",
help="Image and tag argument (can specify multiple)")
parse_copy_update = subparsers.add_parser('copy-update',
help='Copy an image from hdfs in one cluster to'
+ ' another and then update the'
+ ' image-tag-to-hash file')
parse_copy_update.set_defaults(func=copy_update)
add_parser_default_arguments(parse_copy_update)
parse_copy_update.add_argument("src_root",
help="HDFS path for source root directory")
parse_copy_update.add_argument("dest_root",
help="HDFS path for destination root directory")
parse_copy_update.add_argument("images_and_tags", nargs="+",
help="Image and tag argument (can specify multiple)")
parse_query_tag = subparsers.add_parser('query-tag',
help='Get the manifest, config, and layers'
+ ' associated with a tag')
parse_query_tag.set_defaults(func=query_tag)
add_parser_default_arguments(parse_query_tag)
parse_query_tag.add_argument("tags", nargs="+",
help="Image or tag argument (can specify multiple)")
parse_list_tags = subparsers.add_parser('list-tags',
help='List all tags in image-tag-to-hash file')
parse_list_tags.set_defaults(func=list_tags)
add_parser_default_arguments(parse_list_tags)
parse_bootstrap_setup = subparsers.add_parser('bootstrap', help='Bootstrap setup of required HDFS directories')
parse_bootstrap_setup.set_defaults(func=bootstrap_setup)
add_parser_default_arguments(parse_bootstrap_setup)
parse_cleanup_untagged_images = subparsers.add_parser('cleanup',
help='Cleanup untagged images in HDFS')
parse_cleanup_untagged_images.set_defaults(func=cleanup_untagged_images)
add_parser_default_arguments(parse_cleanup_untagged_images)
return parser
def add_parser_default_arguments(parser):
parser.add_argument("--working-dir", type=str, dest='working_dir', default="dts-work-dir",
help="Name of working directory")
parser.add_argument("--skopeo-format", type=str, dest='skopeo_format',
default='dir', help="Output format for skopeo copy")
parser.add_argument("--pull-format", type=str, dest='pull_format',
default='docker', help="Pull format for skopeo")
parser.add_argument("-l", "--log", type=str, dest='LOG_LEVEL',
default="INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)")
parser.add_argument("--hdfs-root", type=str, dest='HDFS_ROOT',
default='/runc-root', help="The root directory in HDFS for all of the"
+ "squashfs images")
parser.add_argument("--image-tag-to-hash", type=str,
dest='image_tag_to_hash', default='image-tag-to-hash',
help="image-tag-to-hash filename in hdfs")
parser.add_argument("-r", "--replication", type=int, dest='replication',
default=3, help="Replication factor for all files uploaded to HDFS")
parser.add_argument("--hadoop-prefix", type=str, dest='hadoop_prefix',
default=os.environ.get('HADOOP_PREFIX'),
help="hadoop prefix value for environment")
parser.add_argument("-f", "--force", dest='force',
action="store_true", default=False, help="Force overwrites in HDFS")
parser.add_argument("--check-magic-file", dest='check_magic_file',
action="store_true", default=False, help="Check for a specific magic file"
+ "in the image before uploading")
parser.add_argument("--magic-file", type=str, dest='magic_file',
default='etc/dockerfile-version', help="The magic file to check for"
+ "in the image")
parser.add_argument("--max-layers", type=int, dest='MAX_IMAGE_LAYERS',
default=37, help="Maximum number of layers an image is allowed to have")
parser.add_argument("--max-size", type=int, dest='MAX_IMAGE_SIZE',
default=10 * 1024 * 1024 * 1024, help="Maximum size an image is allowed to be")
parser.add_argument("--untagged-transition-sec", type=long, dest='UNTAGGED_TRANSITION_SEC',
default=7 * 24 * 60 * 60, help="Time that untagged images will spend in each state"
+ "before moving to the next one")
parser.add_argument("--dead-perms", type=str, dest='DEAD_PERMS',
default="400", help="Permissions to set for manifests that are untagged "
+ "before they are removed")
parser.add_argument("-b", "--bootstrap", dest='bootstrap',
action="store_true", default=False, help="Bootstrap setup"
+ " of required HDFS directories")
return parser
def check_dependencies():
global HADOOP_BIN_DIR
try:
command = [HADOOP_BIN_DIR + "/hadoop", "-h"]
shell_command(command, False, False, True)
except Exception as ex:
logging.error("Could not find hadoop. Make sure HADOOP_PREFIX " +
"is set correctly either in your environment or on the command line " +
"via --hadoop-prefix" +
", exception is " + str(ex))
return 1
try:
command = ["skopeo", "-v"]
shell_command(command, False, False, True)
except Exception as _:
logging.error("Could not find skopeo. Make sure it is installed and present on the PATH")
return 1
try:
command = ["/usr/sbin/mksquashfs", "-version"]
shell_command(command, False, False, True)
except Exception as _:
logging.error("Could not find /usr/sbin/mksquashfs. Make sure squashfs-tools is installed " +
"and /usr/sbin/mksquashfs is present on the the PATH")
return 1
try:
command = ["tar", "--version"]
shell_command(command, False, False, True)
except Exception as _:
logging.error("Could not find tar. Make sure it is installed and present on the PATH")
return 1
try:
command = ["setfattr", "--version"]
shell_command(command, False, False, True)
except:
logging.error("Could not find setfattr . Make sure it is installed and present on the PATH")
return 1
return 0
def main():
global LOG_LEVEL
global HADOOP_PREFIX
global HADOOP_BIN_DIR
global HDFS_ROOT
global HDFS_MANIFEST_DIR
global HDFS_CONFIG_DIR
global HDFS_LAYERS_DIR
global HDFS_UNREF_DIR
global MAX_IMAGE_LAYERS
global MAX_IMAGE_SIZE
global UNTAGGED_TRANSITION_SEC
global ARG_MAX
global DEAD_PERMS
if os.geteuid() != 0:
logging.error("Script must be run as root")
return
parser = create_parsers()
args, extra = parser.parse_known_args()
if extra:
raise Exception(f"Extra unknown arguments given: {extra}")
ARG_MAX = os.sysconf("SC_ARG_MAX")
HDFS_ROOT = args.HDFS_ROOT
HDFS_MANIFEST_DIR = HDFS_ROOT + "/manifests"
HDFS_CONFIG_DIR = HDFS_ROOT + "/config"
HDFS_LAYERS_DIR = HDFS_ROOT + "/layers"
HDFS_UNREF_DIR = HDFS_ROOT + "/unreferenced"
MAX_IMAGE_LAYERS = args.MAX_IMAGE_LAYERS
MAX_IMAGE_SIZE = args.MAX_IMAGE_SIZE
UNTAGGED_TRANSITION_SEC = args.UNTAGGED_TRANSITION_SEC
DEAD_PERMS = args.DEAD_PERMS
LOG_LEVEL = args.LOG_LEVEL.upper()
image_tag_to_hash = args.image_tag_to_hash
numeric_level = getattr(logging, LOG_LEVEL, None)
if not isinstance(numeric_level, int):
logging.error("Invalid log level: %s", LOG_LEVEL)
return
logging.basicConfig(format="%(levelname)s: %(message)s", level=numeric_level)
if args.hadoop_prefix is None:
logging.error("Hadoop prefix is not set. You may set it either " +
"in your environment or via --hadoop-prefix")
return
HADOOP_PREFIX = args.hadoop_prefix
HADOOP_BIN_DIR = HADOOP_PREFIX + "/bin"
if check_dependencies():
return
if "/" in image_tag_to_hash:
logging.error("image-tag-to-hash cannot contain a /")
return
logging.debug("args: %s", str(args))
logging.debug("extra: %s", str(extra))
logging.debug("image-tag-to-hash: %s", image_tag_to_hash)
logging.debug("LOG_LEVEL: %s", LOG_LEVEL)
logging.debug("HADOOP_BIN_DIR: %s", str(HADOOP_BIN_DIR))
args.func(args)
if __name__ == "__main__":
main()