[STORM-3855] Remove python2 support and move to python3. (#3475)
* [STORM-3855] Remove python2 support and move to python3.
* [STORM-3855] Change argument and I/O processing.
* [STORM-3855] Add test_storm.py
* [STORM-3855] PEP8 format, global var initialization, fix shell_command return value, simpify regex, use log.warning
* [STORM-3855] Add test class for docker-to-squash.py (which must be later renamed to docker_to_squash.py)
* [STORM-3855] Convert byte array to string; initialize variable
* [STORM-3855] Convert bytes to str immediatelly after reading from URL and eliminate unnecesary string conversions.
* [STORM-3855] Reduce indent for main() call.
* [STORM-3855] Remove unused imports
* [STORM-3855] Add latest docker-to-squash from https://git.ouryahoo.com/hadoop/Hadoop/blob/y-branch-2.10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/bin/docker-to-squash.py
* [STORM-3855] Change to python3; add license; PEP8 format; rename returncode to rc; log.warn to log.warning; fix working_dir assignment;
* [YSTORM-3855] Remove __init__.py in bin directory.
diff --git a/DEVELOPER.md b/DEVELOPER.md
index 70f8871..bfd0073 100644
--- a/DEVELOPER.md
+++ b/DEVELOPER.md
@@ -261,10 +261,12 @@
in order to get started as fast as possible. Users can still install a specific version of `ruby` and/or `node` manually.
-You will also need the [mock](https://docs.python.org/3/library/unittest.mock.html) Python testing library (as well as [Python 2.7.x and Python 3.x](https://github.com/pyenv/pyenv)). With [pip](https://pip.pypa.io/en/stable/installing/) installed you can run
-
+You will also need the [mock](https://docs.python.org/3/library/unittest.mock.html)
+python testing library (as well as [Python 3.x](https://github.com/pyenv/pyenv)).
+With [pip3](https://pip.pypa.io/en/stable/installation/) installed you can run
+(python 2.7.x is not supported).
```
-pip install mock
+pip3 install mock
```
## Building
diff --git a/RELEASING.md b/RELEASING.md
index 8c37ef4..d352316 100644
--- a/RELEASING.md
+++ b/RELEASING.md
@@ -50,7 +50,7 @@
6. Run `dev-tools/release_notes.py` for the release version, piping the output to a RELEASE_NOTES.html file. Move that file to the svn release directory, sign it, and generate checksums, e.g.
```bash
-python dev-tools/release_notes.py 2.2.0 > RELEASE_NOTES.html
+python3 dev-tools/release_notes.py 2.2.0 > RELEASE_NOTES.html
gpg --armor --output RELEASE_NOTES.html.asc --detach-sig RELEASE_NOTES.html
gpg --print-md SHA512 RELEASE_NOTES.html > RELEASE_NOTES.html.sha512
```
diff --git a/bin/docker-to-squash.py b/bin/docker-to-squash.py
index d5515db..e63c05c 100755
--- a/bin/docker-to-squash.py
+++ b/bin/docker-to-squash.py
@@ -1,22 +1,22 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
"""
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
docker_to_squash.py is a tool to facilitate the process of converting
Docker images into squashFS layers, manifests, and configs.
@@ -33,19 +33,34 @@
import re
import shutil
import subprocess
+import time
from threading import Timer
LOG_LEVEL = None
-HADOOP_BIN_DIR = None
+HADOOP_BIN_DIR = "/undefined"
+MAX_IMAGE_LAYERS = 0
+MAX_IMAGE_SIZE = 0
+HADOOP_PREFIX = "/undefined"
+ARG_MAX = 10
+HDFS_ROOT = "/undefined"
+HDFS_MANIFEST_DIR = "/undefined"
+HDFS_CONFIG_DIR = "/undefined"
+HDFS_LAYERS_DIR = "/undefined"
+HDFS_UNREF_DIR = "/undefined"
-def shell_command(command, print_stdout, print_stderr, raise_on_error,
- timeout_sec=600):
+
+def shell_command(command, print_stdout, print_stderr, raise_on_error, timeout_sec=600):
global LOG_LEVEL
+ global ARG_MAX
stdout_val = subprocess.PIPE
stderr_val = subprocess.PIPE
logging.debug("command: %s", command)
+ for arg in command:
+ if len(arg) > ARG_MAX:
+ raise Exception(f"command length ({len(arg)}) greater than ARG_MAX ({ARG_MAX})")
+
if print_stdout:
stdout_val = None
@@ -54,44 +69,45 @@
process = None
timer = None
+ out = None
+ err = None
try:
- process = subprocess.Popen(command, stdout=stdout_val,
- stderr=stderr_val)
+ process = subprocess.Popen(command, stdout=stdout_val, stderr=stderr_val)
timer = Timer(timeout_sec, process_timeout, [process])
timer.start()
out, err = process.communicate()
- if raise_on_error and process.returncode is not 0:
- exception_string = ("Command: " + str(command)
- + " failed with returncode: "
- + str(process.returncode))
- if out is not None:
+ if raise_on_error and process.returncode:
+ exception_string = f"Command: {command} failed with returncode: {process.returncode}"
+ if out:
exception_string = exception_string + "\nstdout: " + str(out)
- if err is not None:
+ if err:
exception_string = exception_string + "\nstderr: " + str(err)
raise Exception(exception_string)
-
- except:
+ except Exception as ex:
if process and process.poll() is None:
process.kill()
- raise Exception("Popen failure")
+ raise Exception("Popen failure, " + str(ex))
finally:
+ # Note that finally clause executes even when there is no exception, hence the "return" statement
if timer:
timer.cancel()
+ return out, err, (process.returncode if process else -1)
- return out, err, process.returncode
def process_timeout(process):
process.kill()
logging.error("Process killed due to timeout")
+
def does_hdfs_entry_exist(entry, raise_on_error=True):
- out, err, returncode = hdfs_ls(entry, raise_on_error=raise_on_error)
- if returncode is not 0:
+ out, err, rc = hdfs_ls(entry, raise_on_error=raise_on_error)
+ if rc:
return False
return True
+
def setup_hdfs_dirs(dirs):
if does_hdfs_entry_exist(dirs, raise_on_error=False):
return
@@ -102,59 +118,65 @@
directories = dir_entry.split("/")[1:]
dir_path = ""
for directory in directories:
- dir_path = dir_path + "/" + directory
+ dir_path = dir_path + "/" + directory
logging.info("dir_path: %s", str(dir_path))
chmod_dirs.append(dir_path)
hdfs_chmod("755", chmod_dirs)
+
def append_or_extend_to_list(src, src_list):
if isinstance(src, list):
src_list.extend(src)
else:
src_list.append(src)
+
def hdfs_get(src, dest, print_stdout=False, print_stderr=False, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-get"]
append_or_extend_to_list(src, command)
command.append(dest)
- out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
- return out, err, returncode
+ out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, rc
-def hdfs_ls(file_path, options="", print_stdout=False, print_stderr=False,
- raise_on_error=True):
+
+def hdfs_ls(file_path, options="", print_stdout=False, print_stderr=False, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-ls"]
if options:
append_or_extend_to_list(options, command)
append_or_extend_to_list(file_path, command)
- out, err, returncode = shell_command(command, print_stdout, print_stderr,
- raise_on_error)
- return out, err, returncode
+ out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, rc
+
def hdfs_cat(file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cat"]
append_or_extend_to_list(file_path, command)
- out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
- return out, err, returncode
+ out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, rc
-def hdfs_mkdir(file_path, print_stdout=False, print_stderr=True, raise_on_error=True,
- create_parents=False):
+
+def hdfs_mkdir(file_path, print_stdout=False, print_stderr=True, raise_on_error=True, create_parents=False):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-mkdir"]
if create_parents:
command.append("-p")
append_or_extend_to_list(file_path, command)
- out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
- return out, err, returncode
+ out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, rc
-def hdfs_rm(file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
+
+def hdfs_rm(file_path, error_on_file_not_found=False, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-rm"]
+ if not error_on_file_not_found:
+ command.append("-f")
append_or_extend_to_list(file_path, command)
- out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
- return out, err, returncode
+ out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, rc
+
def hdfs_put(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
@@ -163,64 +185,76 @@
command.append("-f")
append_or_extend_to_list(src, command)
command.append(dest)
- out, err, returncode = shell_command(command, print_stdout, print_stderr,
- raise_on_error, 60)
- return out, err, returncode
+ out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error, 60)
+ return out, err, rc
-def hdfs_chmod(mode, file_path, print_stdout=False, print_stderr=True, raise_on_error=True,
- recursive=False):
+
+def hdfs_chmod(mode, file_path, print_stdout=False, print_stderr=True, raise_on_error=True, recursive=False):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-chmod"]
if recursive:
command.append("-R")
command.append(mode)
append_or_extend_to_list(file_path, command)
- out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
- return out, err, returncode
+ out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, rc
+
def hdfs_setrep(replication, file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
- command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-setrep", str(replication)]
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-setrep", str(replication)]
append_or_extend_to_list(file_path, command)
- out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
- return out, err, returncode
+ out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, rc
+
def hdfs_cp(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
- command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cp"]
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cp"]
if force:
command.append("-f")
append_or_extend_to_list(src, command)
command.append(dest)
- out, err, returncode = shell_command(command, print_stdout, print_stderr,
- raise_on_error, 60)
- return out, err, returncode
+ out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, rc
-def hdfs_touchz(file_path, print_stdout=False, print_stderr=True,
- raise_on_error=True):
+
+def hdfs_touchz(file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
global HADOOP_BIN_DIR
command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-touchz"]
append_or_extend_to_list(file_path, command)
- out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
- return out, err, returncode
+ out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, rc
+
+
+def hdfs_stat(file_path, fmt, print_stdout=False, print_stderr=True, raise_on_error=True):
+ global HADOOP_BIN_DIR
+ command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-stat"]
+ append_or_extend_to_list(fmt, command)
+ command.append(file_path)
+ out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+ return out, err, rc
def get_working_dir(directory):
+ working_dir = "(undefined)"
try:
if os.path.isdir(directory):
working_dir = os.path.join(directory, "docker-to-squash")
else:
working_dir = directory
os.makedirs(working_dir)
- except:
- raise Exception("Could not create working_dir: " + working_dir)
+ except Exception as ex:
+ raise Exception(f"Could not create working_dir: {working_dir}, {ex}")
return working_dir
+
def is_sha256_hash(string):
- if not re.findall(r"^[a-fA-F\d]{64,64}$", string):
+ if not re.findall(r"^[a-fA-F\d]{64}$", string):
return False
return True
+
def calculate_file_hash(filename):
sha = hashlib.sha256()
with open(filename, 'rb') as file_pointer:
@@ -229,16 +263,18 @@
if not data:
break
sha.update(data)
- hexdigest = sha.hexdigest()
- if hexdigest == 0:
- raise Exception("Hex digest for file: " + hexdigest + "returned 0")
- return hexdigest
+ hex_digest = sha.hexdigest()
+ if hex_digest == 0:
+ raise Exception(f"Hex digest for file: {filename} returned 0")
+ return hex_digest
+
def calculate_string_hash(string):
sha = hashlib.sha256()
sha.update(string)
return sha.hexdigest()
+
def get_local_manifest_from_path(manifest_path):
with open(manifest_path, "rb") as file_pointer:
out = file_pointer.read()
@@ -246,16 +282,35 @@
manifest = json.loads(out)
return manifest, manifest_hash
+
def get_hdfs_manifest_from_path(manifest_path):
- out, err, returncode = hdfs_cat(manifest_path)
+ out, err, rc = hdfs_cat(manifest_path)
manifest_hash = calculate_string_hash(str(out))
manifest = json.loads(out)
return manifest, manifest_hash
+
+def get_hdfs_manifests_from_paths(manifest_paths):
+ out, err, rc = hdfs_cat(manifest_paths)
+ manifests_list = out.split("}{")
+ manifests = []
+ for manifest_str in manifests_list:
+ if manifest_str[0] != "{":
+ manifest_str = "{" + manifest_str
+ if manifest_str[-1] != "}":
+ manifest_str = manifest_str + "}"
+ manifest_hash = calculate_string_hash(manifest_str)
+ logging.debug("manifest for %s:\n%s", manifest_hash, manifest_str)
+ manifest = json.loads(manifest_str)
+ manifests.append((manifest, manifest_hash))
+ return manifests
+
+
def get_config_hash_from_manifest(manifest):
config_hash = manifest['config']['digest'].split(":", 1)[1]
return config_hash
+
def check_total_layer_number(layers):
global MAX_IMAGE_LAYERS
if len(layers) > MAX_IMAGE_LAYERS:
@@ -264,6 +319,7 @@
" layers, which is more than the maximum " + str(MAX_IMAGE_LAYERS) +
" layers. Failing out")
+
def check_total_layer_size(manifest, size):
global MAX_IMAGE_SIZE
if size > MAX_IMAGE_SIZE:
@@ -272,6 +328,7 @@
raise Exception("Image has total size " + str(size) +
" B. which is more than the maximum size " + str(MAX_IMAGE_SIZE) + " B. Failing out")
+
def get_layer_hashes_from_manifest(manifest, error_on_size_check=True):
layers = []
size = 0
@@ -286,27 +343,28 @@
return layers
+
def get_pull_fmt_string(pull_format):
pull_fmt_string = pull_format + ":"
if pull_format == "docker":
pull_fmt_string = pull_fmt_string + "//"
return pull_fmt_string
+
def get_manifest_from_docker_image(pull_format, image):
pull_fmt_string = get_pull_fmt_string(pull_format)
- out, err, returncode = shell_command(["skopeo", "inspect", "--raw", pull_fmt_string + image],
- False, True, True, 60)
+ out, err, rc = shell_command(["skopeo", "inspect", "--raw", pull_fmt_string + image], False, True, True)
manifest = json.loads(out)
if 'manifests' in manifest:
logging.debug("skopeo inspect --raw returned a list of manifests")
manifests_dict = manifest['manifests']
sha = None
for mfest in manifests_dict:
- if(mfest['platform']['architecture'] == "amd64"):
+ if mfest['platform']['architecture'] == "amd64":
sha = mfest['digest']
break
if not sha:
- raise Exception("Could not find amd64 manifest for" + image)
+ raise Exception("Could not find amd64 manifest for image " + image)
image_without_tag = image.split("/", 1)[-1].split(":", 1)[0]
image_and_sha = image_without_tag + "@" + sha
@@ -320,12 +378,14 @@
logging.debug("manifest: %s", str(manifest))
return manifest, manifest_hash
+
def split_image_and_tag(image_and_tag):
split = image_and_tag.split(",")
image = split[0]
tags = split[1:]
return image, tags
+
def read_image_tag_to_hash(image_tag_to_hash):
hash_to_tags = dict()
tag_to_hash = dict()
@@ -334,7 +394,7 @@
line = file_pointer.readline()
if not line:
break
- line = line.rstrip()
+ line = str(line).rstrip()
if not line:
continue
@@ -348,7 +408,7 @@
tags_list = ' '.join(split_line[:-1]).split(",")
if not is_sha256_hash(manifest_hash) or not tags_list:
- logging.warn("image-tag-to-hash file malformed. Skipping entry %s", line)
+ logging.warning("image-tag-to-hash file malformed. Skipping entry %s", line)
continue
tags_and_comments = hash_to_tags.get(manifest_hash, None)
@@ -368,11 +428,12 @@
for tag in tags_list:
cur_manifest = tag_to_hash.get(tag, None)
if cur_manifest is not None:
- logging.warn("tag_to_hash already has manifest %s defined for tag %s."
- + "This entry will be overwritten", cur_manifest, tag)
+ logging.warning(f"tag_to_hash already has manifest {cur_manifest} defined for tag {tag}. "
+ f"This entry will be overwritten")
tag_to_hash[tag] = manifest_hash
return hash_to_tags, tag_to_hash
+
def remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag):
if not hash_to_tags:
logging.debug("hash_to_tags is null. Not removing tag %s", tag)
@@ -391,6 +452,7 @@
else:
logging.debug("Tag not found. Not removing tag: %s", tag)
+
def remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_hash):
if not hash_to_tags:
logging.debug("hash_to_tags is null. Not removing image_hash %s", image_hash)
@@ -404,6 +466,7 @@
for tag in prev_tags:
del tag_to_hash[tag]
+
def add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment):
tag_to_hash[tag] = manifest_hash
new_tags_and_comments = hash_to_tags.get(manifest_hash, None)
@@ -419,14 +482,19 @@
new_comment.append(comment)
hash_to_tags[manifest_hash] = (new_tags, new_comment)
+
def write_local_image_tag_to_hash(image_tag_to_hash, hash_to_tags):
file_contents = []
for key, value in hash_to_tags.iteritems():
manifest_hash = key
+ # Sort tags list to preserve consistent order
+ value[0].sort()
tags = ','.join(map(str, value[0]))
if tags:
+ # Sort comments list to preserve consistent order
+ value[1].sort()
comment = ', '.join(map(str, value[1]))
- if comment > 0:
+ if comment:
comment = "#" + comment
file_contents.append(tags + ":" + manifest_hash + comment + "\n")
@@ -435,29 +503,37 @@
for val in file_contents:
file_pointer.write(val)
-def update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
- manifest_hash, comment):
+
+def update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, comment):
for tag in tags:
update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment)
+
def update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment):
remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag)
add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment)
+
def remove_from_dicts(hash_to_tags, tag_to_hash, tags):
for tag in tags:
logging.debug("removing tag: %s", tag)
remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag)
-def populate_tag_dicts(hdfs_root, image_tag_to_hash, local_image_tag_to_hash):
- if does_hdfs_entry_exist(hdfs_root + "/" + image_tag_to_hash):
- hdfs_get(hdfs_root + "/" + image_tag_to_hash, local_image_tag_to_hash)
- image_tag_to_hash_hash = calculate_file_hash(local_image_tag_to_hash)
- else:
- image_tag_to_hash_hash = 0
+def populate_tag_dicts(image_tag_to_hash, local_image_tag_to_hash):
+ return populate_tag_dicts_set_root(image_tag_to_hash, local_image_tag_to_hash, None)
- if image_tag_to_hash_hash != 0:
+
+def populate_tag_dicts_set_root(image_tag_to_hash, local_image_tag_to_hash, hdfs_root):
+ # Setting hdfs_root to None will default it to using the global
+ global HDFS_ROOT
+ if not hdfs_root:
+ hdfs_root = HDFS_ROOT
+
+ hdfs_get(hdfs_root + "/" + image_tag_to_hash, local_image_tag_to_hash, raise_on_error=True)
+ image_tag_to_hash_hash = calculate_file_hash(local_image_tag_to_hash)
+
+ if image_tag_to_hash_hash:
hash_to_tags, tag_to_hash = read_image_tag_to_hash(local_image_tag_to_hash)
else:
hash_to_tags = {}
@@ -472,6 +548,7 @@
hdfs_touchz(image_tag_to_hash_path)
hdfs_chmod("755", image_tag_to_hash_path)
+
def skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir):
logging.info("Pulling image: %s", image)
if os.path.isdir(skopeo_dir):
@@ -480,34 +557,33 @@
+ "Directory: " + skopeo_dir)
pull_fmt_string = get_pull_fmt_string(pull_format)
shell_command(["skopeo", "copy", pull_fmt_string + image,
- skopeo_format + ":" + skopeo_dir], False, True, True, 600)
+ skopeo_format + ":" + skopeo_dir], False, True, True)
+
def untar_layer(tmp_dir, layer_path):
- shell_command(["tar", "-C", tmp_dir, "--xattrs",
- "--xattrs-include='*'", "-xf", layer_path],
- False, True, True, 600)
+ shell_command(["tar", "-C", tmp_dir, "--xattrs", "--xattrs-include='*'", "-xf", layer_path], False, True, True)
+
def tar_file_search(archive, target):
- out, err, returncode = shell_command(["tar", "-xf", archive, target, "-O"],
- False, False, False, 600)
+ out, err, rc = shell_command(["tar", "-xf", archive, target, "-O"], False, False, False)
return out
+
def set_fattr(directory):
- shell_command(["setfattr", "-n", "trusted.overlay.opaque",
- "-v", "y", directory], False, True, True)
+ shell_command(["setfattr", "-n", "trusted.overlay.opaque", "-v", "y", directory], False, True, True)
+
def make_whiteout_block_device(file_path, whiteout):
- shell_command(["mknod", "-m", "000", file_path,
- "c", "0", "0"], False, True, True)
+ shell_command(["mknod", "-m", "000", file_path, "c", "0", "0"], False, True, True)
- out, err, returncode = shell_command(["stat", "-c", "%U:%G", whiteout], False, True, True)
+ out, err, rc = shell_command(["stat", "-c", "%U:%G", whiteout], False, True, True)
perms = str(out).strip()
shell_command(["chown", perms, file_path], False, True, True)
+
def convert_oci_whiteouts(tmp_dir):
- out, err, returncode = shell_command(["find", tmp_dir, "-name", ".wh.*"],
- False, False, True, 60)
+ out, err, rc = shell_command(["find", tmp_dir, "-name", ".wh.*"], False, False, True)
whiteouts = str(out).splitlines()
for whiteout in whiteouts:
if whiteout == 0:
@@ -519,47 +595,47 @@
else:
whiteout_string = ".wh."
idx = basename.rfind(whiteout_string)
- bname = basename[idx+len(whiteout_string):]
+ bname = basename[idx + len(whiteout_string):]
file_path = os.path.join(directory, bname)
make_whiteout_block_device(file_path, whiteout)
shell_command(["rm", whiteout], False, True, True)
+
def dir_to_squashfs(tmp_dir, squash_path):
- shell_command(["/usr/sbin/mksquashfs", tmp_dir, squash_path, "-write-queue", "4096",
- "-read-queue", "4096", "-fragment-queue", "4096"],
- False, True, True, 600)
+ shell_command(["mksquashfs", tmp_dir, squash_path, "-write-queue", "4096", "-read-queue", "4096",
+ "-fragment-queue", "4096"],
+ False, True, True)
-def upload_to_hdfs(file_path, file_name, hdfs_dir, replication, mode, force=False):
- dest = hdfs_dir + "/" + file_name
+def upload_to_hdfs(src, dest, replication, mode, force=False):
if does_hdfs_entry_exist(dest, raise_on_error=False):
if not force:
- logging.warn("Not uploading to HDFS. File already exists: %s", dest)
+ logging.warning("Not uploading to HDFS. File already exists: %s", dest)
return
logging.info("File already exists, but overwriting due to force option: %s", dest)
- hdfs_put(file_path, dest, force)
+ hdfs_put(src, dest, force)
hdfs_setrep(replication, dest)
hdfs_chmod(mode, dest)
- logging.info("Uploaded file %s with replication %d and permissions %s",
- dest, replication, mode)
+ logging.info(f"Uploaded file {dest} with replication {replication} and permissions {mode}")
-def atomic_upload_mv_to_hdfs(file_path, file_name, hdfs_dir, replication, image_tag_to_hash_file_hash):
+
+def atomic_upload_mv_to_hdfs(src, dest, replication, image_tag_to_hash_file_hash):
global HADOOP_PREFIX
- local_hash = calculate_file_hash(file_path)
+ global HADOOP_BIN_DIR
+
+ local_hash = calculate_file_hash(src)
if local_hash == image_tag_to_hash_file_hash:
logging.info("image_tag_to_hash file unchanged. Not uploading")
return
- tmp_file_name = file_name + ".tmp"
- hdfs_tmp_path = hdfs_dir + "/" + tmp_file_name
- hdfs_file_path = hdfs_dir + "/" + file_name
+ tmp_dest = dest + ".tmp"
try:
- if does_hdfs_entry_exist(hdfs_tmp_path, raise_on_error=False):
- hdfs_rm(hdfs_tmp_path)
- hdfs_put(file_path, hdfs_tmp_path)
- hdfs_setrep(replication, hdfs_tmp_path)
- hdfs_chmod("444", hdfs_tmp_path)
+ if does_hdfs_entry_exist(tmp_dest, raise_on_error=False):
+ hdfs_rm(tmp_dest)
+ hdfs_put(src, tmp_dest)
+ hdfs_setrep(replication, tmp_dest)
+ hdfs_chmod("444", tmp_dest)
jar_path = HADOOP_PREFIX + "/share/hadoop/tools/lib/hadoop-extras-*.jar"
jar_file = None
@@ -567,17 +643,18 @@
jar_file = file
if not jar_file:
- raise Exception("SymlinkTool Jar doesn't exist: %s" % (jar_path))
+ raise Exception("SymlinkTool Jar doesn't exist: %s" % jar_path)
logging.debug("jar_file: " + jar_file)
- shell_command(["hadoop", "jar", jar_file, "org.apache.hadoop.tools.SymlinkTool",
- "mvlink", "-f", hdfs_tmp_path, hdfs_file_path], False, False, True)
+ shell_command([HADOOP_BIN_DIR + "/hadoop", "jar", jar_file, "org.apache.hadoop.tools.SymlinkTool",
+ "mvlink", "-f", tmp_dest, dest], False, False, True)
- except:
- if does_hdfs_entry_exist(hdfs_tmp_path, raise_on_error=False):
- hdfs_rm(hdfs_tmp_path)
- raise Exception("image tag to hash file upload failed")
+ except Exception as ex:
+ if does_hdfs_entry_exist(tmp_dest, raise_on_error=False):
+ hdfs_rm(tmp_dest)
+ raise Exception("image tag to hash file upload failed, exception=" + str(ex))
+
def docker_to_squash(layer_dir, layer, working_dir):
tmp_dir = os.path.join(working_dir, "expand_archive_" + layer)
@@ -595,8 +672,7 @@
dir_to_squashfs(tmp_dir, squash_path)
finally:
os.remove(layer_path)
- shell_command(["rm", "-rf", tmp_dir],
- False, True, True)
+ shell_command(["rm", "-rf", tmp_dir], False, True, True)
def check_image_for_magic_file(magic_file, skopeo_dir, layers):
@@ -608,13 +684,18 @@
logging.debug("Found magic file %s in layer %s", magic_file_absolute, layer)
logging.debug("Magic file %s has contents:\n%s", magic_file_absolute, ret)
return ret
- raise Exception("Magic file %s doesn't exist in any layer" %
- (magic_file_absolute))
+ raise Exception(f"Magic file {magic_file_absolute} doesn't exist in any layer")
+
def pull_build_push_update(args):
+ global HDFS_ROOT
+ global HDFS_MANIFEST_DIR
+ global HDFS_CONFIG_DIR
+ global HDFS_LAYERS_DIR
+ global HDFS_UNREF_DIR
+
skopeo_format = args.skopeo_format
pull_format = args.pull_format
- hdfs_root = args.hdfs_root
image_tag_to_hash = args.image_tag_to_hash
replication = args.replication
force = args.force
@@ -623,28 +704,22 @@
magic_file = args.magic_file
bootstrap = args.bootstrap
- hdfs_layers_dir = hdfs_root + "/layers"
- hdfs_config_dir = hdfs_root + "/config"
- hdfs_manifest_dir = hdfs_root + "/manifests"
working_dir = None
-
try:
working_dir = get_working_dir(args.working_dir)
local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
if bootstrap:
- hdfs_dirs = [hdfs_root, hdfs_layers_dir, hdfs_config_dir, hdfs_manifest_dir]
- image_tag_to_hash_path = hdfs_root + "/" + image_tag_to_hash
+ hdfs_dirs = [HDFS_ROOT, HDFS_LAYERS_DIR, HDFS_CONFIG_DIR, HDFS_MANIFEST_DIR, HDFS_UNREF_DIR]
+ image_tag_to_hash_path = HDFS_ROOT + "/" + image_tag_to_hash
setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
- hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
- image_tag_to_hash,
+ hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
local_image_tag_to_hash)
for image_and_tag_arg in images_and_tags:
image, tags = split_image_and_tag(image_and_tag_arg)
if not image or not tags:
- raise Exception("Positional parameter requires an image and at least 1 tag: "
- + image_and_tag_arg)
+ raise Exception("Positional parameter requires an image and at least 1 tag: " + image_and_tag_arg)
logging.info("Working on image %s with tags %s", image, str(tags))
manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
@@ -655,30 +730,18 @@
logging.debug("Layers: %s", str(layers))
logging.debug("Config: %s", str(config_hash))
- update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
- manifest_hash, image)
+ update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, image)
- all_layers_exist = True
-
- if not does_hdfs_entry_exist(hdfs_manifest_dir + "/" + manifest_hash,
- raise_on_error=False):
- all_layers_exist = False
-
- if not does_hdfs_entry_exist(hdfs_config_dir + "/" + config_hash,
- raise_on_error=False):
- all_layers_exist = False
+ hdfs_files_to_check = [HDFS_MANIFEST_DIR + "/" + manifest_hash, HDFS_CONFIG_DIR + "/" + config_hash]
for layer in layers:
- hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
- if not does_hdfs_entry_exist(hdfs_squash_path, raise_on_error=False):
- all_layers_exist = False
- break
+ hdfs_files_to_check.append(HDFS_LAYERS_DIR + "/" + layer + ".sqsh")
- if all_layers_exist:
+ if does_hdfs_entry_exist(hdfs_files_to_check, raise_on_error=False):
if not force:
- logging.info("All layers exist in HDFS, skipping this image")
+ logging.info("All image files exist in HDFS, skipping this image")
continue
- logging.info("All layers exist in HDFS, but force option set, so overwriting image")
+ logging.info("All image files exist in HDFS, but force option set, so overwriting image")
skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
logging.debug("skopeo_dir: %s", skopeo_dir)
@@ -690,40 +753,35 @@
for layer in layers:
logging.info("Squashifying and uploading layer: %s", layer)
- hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
+ hdfs_squash_path = HDFS_LAYERS_DIR + "/" + layer + ".sqsh"
if does_hdfs_entry_exist(hdfs_squash_path, raise_on_error=False):
if force:
- logging.info("Layer already exists, but overwriting due to force"
- + "option: %s", layer)
+ logging.info(f"Layer already exists, but overwriting due to force option: {layer}")
else:
- logging.info("Layer exists. Skipping and not squashifying or"
- + "uploading: %s", layer)
+ logging.info(f"Layer exists. Skipping and not squashifying or uploading: {layer}")
continue
docker_to_squash(skopeo_dir, layer, working_dir)
squash_path = os.path.join(skopeo_dir, layer + ".sqsh")
squash_name = os.path.basename(squash_path)
- upload_to_hdfs(squash_path, squash_name, hdfs_layers_dir, replication, "444", force)
-
+ upload_to_hdfs(squash_path, HDFS_LAYERS_DIR + "/" + squash_name, replication, "444", force)
config_local_path = os.path.join(skopeo_dir, config_hash)
upload_to_hdfs(config_local_path,
- os.path.basename(config_local_path),
- hdfs_config_dir, replication, "444", force)
+ HDFS_CONFIG_DIR + "/" + os.path.basename(config_local_path),
+ replication, "444", force)
manifest_local_path = os.path.join(skopeo_dir, "manifest.json")
- upload_to_hdfs(manifest_local_path, manifest_hash,
- hdfs_manifest_dir, replication, "444", force)
+ upload_to_hdfs(manifest_local_path, HDFS_MANIFEST_DIR + "/" + manifest_hash, replication, "444", force)
write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
- atomic_upload_mv_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
- hdfs_root, replication,
- image_tag_to_hash_hash)
+ atomic_upload_mv_to_hdfs(local_image_tag_to_hash, HDFS_ROOT + "/" + image_tag_to_hash,
+ replication, image_tag_to_hash_hash)
finally:
if working_dir:
if os.path.isdir(working_dir):
- shell_command(["rm", "-rf", working_dir],
- False, True, True)
+ shell_command(["rm", "-rf", working_dir], False, True, True)
+
def pull_build(args):
skopeo_format = args.skopeo_format
@@ -735,72 +793,72 @@
for image_and_tag_arg in images_and_tags:
image, tags = split_image_and_tag(image_and_tag_arg)
if not image or not tags:
- raise Exception("Positional parameter requires an image and at least 1 tag: "
- + image_and_tag_arg)
+ raise Exception("Positional parameter requires an image and at least 1 tag: " + image_and_tag_arg)
- logging.info("Working on image %s with tags %s", image, str(tags))
+ logging.info(f"Working on image {image} with tags {tags}")
manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
layers = get_layer_hashes_from_manifest(manifest)
config_hash = get_config_hash_from_manifest(manifest)
- logging.debug("Layers: %s", str(layers))
- logging.debug("Config: %s", str(config_hash))
-
+ logging.debug(f"Layers: {layers}")
+ logging.debug(f"Config: {config_hash}")
+ skopeo_dir = None
try:
working_dir = get_working_dir(args.working_dir)
skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
- logging.debug("skopeo_dir: %s", skopeo_dir)
+ logging.debug(f"skopeo_dir: {skopeo_dir}")
skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir)
if check_magic_file:
check_image_for_magic_file(magic_file, skopeo_dir, layers)
for layer in layers:
- logging.info("Squashifying layer: %s", layer)
+ logging.info(f"Squashifying layer: {layer}")
docker_to_squash(skopeo_dir, layer, working_dir)
- except:
- if os.path.isdir(skopeo_dir):
+ except Exception as _:
+ if skopeo_dir and os.path.isdir(skopeo_dir):
shutil.rmtree(skopeo_dir)
raise
+
def push_update(args):
- hdfs_root = args.hdfs_root
+ global HDFS_ROOT
+ global HDFS_MANIFEST_DIR
+ global HDFS_CONFIG_DIR
+ global HDFS_LAYERS_DIR
+ global HDFS_UNREF_DIR
+
image_tag_to_hash = args.image_tag_to_hash
replication = args.replication
force = args.force
images_and_tags = args.images_and_tags
bootstrap = args.bootstrap
- hdfs_layers_dir = hdfs_root + "/layers"
- hdfs_config_dir = hdfs_root + "/config"
- hdfs_manifest_dir = hdfs_root + "/manifests"
local_image_tag_to_hash = None
try:
working_dir = get_working_dir(args.working_dir)
local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
if bootstrap:
- hdfs_dirs = [hdfs_root, hdfs_layers_dir, hdfs_config_dir, hdfs_manifest_dir]
- image_tag_to_hash_path = hdfs_root + "/" + image_tag_to_hash
+ hdfs_dirs = [HDFS_ROOT, HDFS_LAYERS_DIR, HDFS_CONFIG_DIR, HDFS_MANIFEST_DIR, HDFS_UNREF_DIR]
+ image_tag_to_hash_path = HDFS_ROOT + "/" + image_tag_to_hash
setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
- hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
- image_tag_to_hash,
+ hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
local_image_tag_to_hash)
for image_and_tag_arg in images_and_tags:
image, tags = split_image_and_tag(image_and_tag_arg)
if not image or not tags:
- raise Exception("Positional parameter requires an image and at least 1 tag: "
- + image_and_tag_arg)
+ raise Exception("Positional parameter requires an image and at least 1 tag: " + image_and_tag_arg)
logging.info("Working on image %s with tags %s", image, str(tags))
skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
if not os.path.exists(skopeo_dir):
- raise Exception("skopeo_dir doesn't exists: %s" % (skopeo_dir))
- manifest, manifest_hash = get_local_manifest_from_path(skopeo_dir + "/manifest.json")
+ raise Exception(f"skopeo_dir doesn't exists: {skopeo_dir}")
+ manifest, manifest_hash = get_local_manifest_from_path(f"{skopeo_dir}/manifest.json")
layers = get_layer_hashes_from_manifest(manifest)
config_hash = get_config_hash_from_manifest(manifest)
@@ -808,60 +866,43 @@
logging.debug("Layers: %s", str(layers))
logging.debug("Config: %s", str(config_hash))
- update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
- manifest_hash, image)
+ update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, image)
- all_layers_exist = True
-
- if not does_hdfs_entry_exist(hdfs_manifest_dir + "/" + manifest_hash,
- raise_on_error=False):
- all_layers_exist = False
-
- if not does_hdfs_entry_exist(hdfs_config_dir + "/" + config_hash,
- raise_on_error=False):
- all_layers_exist = False
+ hdfs_files_to_check = [f"{HDFS_MANIFEST_DIR}/{manifest_hash}", f"{HDFS_CONFIG_DIR}/{config_hash}"]
for layer in layers:
- hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
- if not does_hdfs_entry_exist(hdfs_squash_path, raise_on_error=False):
- all_layers_exist = False
- break
+ hdfs_files_to_check.append(f"{HDFS_LAYERS_DIR}/{layer}.sqsh")
- if all_layers_exist:
+ if does_hdfs_entry_exist(hdfs_files_to_check, raise_on_error=False):
if not force:
- logging.info("All layers exist in HDFS, skipping this image")
+ logging.info("All image files exist in HDFS, skipping this image")
continue
- logging.info("All layers exist in HDFS, but force option set, so overwriting image")
+ logging.info("All image files exist in HDFS, but force option set, so overwriting image")
for layer in layers:
- hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
+ hdfs_squash_path = f"{HDFS_LAYERS_DIR}/{layer}.sqsh"
if does_hdfs_entry_exist(hdfs_squash_path, raise_on_error=False):
if force:
- logging.info("Layer already exists, but overwriting due to force"
- + "option: %s", layer)
+ logging.info(f"Layer already exists, but overwriting due to force option: {layer}")
else:
- logging.info("Layer exists. Skipping and not squashifying or"
- + "uploading: %s", layer)
+ logging.info(f"Layer exists. Skipping and not squashifying or uploading: {layer}")
continue
squash_path = os.path.join(skopeo_dir, layer + ".sqsh")
squash_name = os.path.basename(squash_path)
- upload_to_hdfs(squash_path, squash_name, hdfs_layers_dir, replication, "444", force)
-
+ upload_to_hdfs(squash_path, f"{HDFS_LAYERS_DIR}/{squash_name}", replication, "444", force)
config_local_path = os.path.join(skopeo_dir, config_hash)
upload_to_hdfs(config_local_path,
- os.path.basename(config_local_path),
- hdfs_config_dir, replication, "444", force)
+ f"{HDFS_CONFIG_DIR}/" + os.path.basename(config_local_path),
+ replication, "444", force)
manifest_local_path = os.path.join(skopeo_dir, "manifest.json")
- upload_to_hdfs(manifest_local_path, manifest_hash,
- hdfs_manifest_dir, replication, "444", force)
+ upload_to_hdfs(manifest_local_path, HDFS_MANIFEST_DIR + "/" + manifest_hash, replication, "444", force)
write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
- atomic_upload_mv_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
- hdfs_root, replication,
- image_tag_to_hash_hash)
+ atomic_upload_mv_to_hdfs(local_image_tag_to_hash, f"{HDFS_ROOT}/{image_tag_to_hash}",
+ replication, image_tag_to_hash_hash)
finally:
if local_image_tag_to_hash:
if os.path.isfile(local_image_tag_to_hash):
@@ -869,7 +910,12 @@
def remove_image(args):
- hdfs_root = args.hdfs_root
+ global HDFS_ROOT
+ global HDFS_MANIFEST_DIR
+ global HDFS_CONFIG_DIR
+ global HDFS_LAYERS_DIR
+ global HDFS_UNREF_DIR
+
image_tag_to_hash = args.image_tag_to_hash
replication = args.replication
images_or_tags = args.images_or_tags
@@ -878,107 +924,38 @@
try:
working_dir = get_working_dir(args.working_dir)
local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
- hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
- image_tag_to_hash,
- local_image_tag_to_hash)
-
- logging.debug("hash_to_tags: %s", str(hash_to_tags))
- logging.debug("tag_to_hash: %s", str(tag_to_hash))
-
- hdfs_layers_dir = hdfs_root + "/layers"
- hdfs_config_dir = hdfs_root + "/config"
- hdfs_manifest_dir = hdfs_root + "/manifests"
-
- delete_list = []
-
- known_images, err, returncode = hdfs_ls(hdfs_manifest_dir, "-C", False, False, False)
- known_images = known_images.split()
-
- logging.debug("known_images:\n%s", known_images)
-
- layers_to_keep = []
images_and_tags_to_remove = []
- images_to_remove = []
for image_or_tag_arg in images_or_tags:
images_and_tags_to_remove.extend(image_or_tag_arg.split(","))
logging.debug("images_and_tags_to_remove:\n%s", images_and_tags_to_remove)
- if isinstance(images_and_tags_to_remove, Iterable):
- for image in images_and_tags_to_remove:
- if is_sha256_hash(image):
- image_hash = image
- else:
- image_hash = tag_to_hash.get(image, None)
- if image_hash:
- images_to_remove.append(hdfs_manifest_dir + "/" + image_hash)
- else:
- image = images_and_tags_to_remove[0]
- if is_sha256_hash(image):
- image_hash = image
- else:
- image_hash = tag_to_hash.get(image, None)
- if image_hash:
- images_to_remove.append(hdfs_manifest_dir + "/" + image_hash)
+ hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
+ local_image_tag_to_hash)
+
+ logging.debug("hash_to_tags: %s", str(hash_to_tags))
+ logging.debug("tag_to_hash: %s", str(tag_to_hash))
+
+ known_images = get_all_known_images()
+ if not known_images:
+ logging.warn("No known images\n")
+ return
+
+ images_to_remove = get_images_from_args(images_and_tags_to_remove, tag_to_hash, known_images)
logging.debug("images_to_remove:\n%s", images_to_remove)
if not images_to_remove:
- logging.warn("No images to remove")
+ logging.warning("No images to remove")
return
- for image in known_images:
- if image not in images_to_remove:
- manifest, manifest_hash = get_hdfs_manifest_from_path(image)
- layers = get_layer_hashes_from_manifest(manifest, False)
- layers_to_keep.extend(layers)
+ delete_list = get_delete_list_from_images_to_remove(images_to_remove, known_images)
- logging.debug("layers_to_keep:\n%s", layers_to_keep)
-
- for image_or_tag_arg in images_or_tags:
- images = image_or_tag_arg.split(",")
- for image in images:
- logging.info("removing image: %s", image)
- if is_sha256_hash(image):
- logging.debug("image is sha256")
- image_hash = image
- else:
- image_hash = tag_to_hash.get(image, None)
- if image_hash:
- logging.debug("image tag exists for %s", image)
- else:
- logging.info("Not removing %s. Image tag doesn't exist", image)
- continue
- manifest_path = hdfs_manifest_dir + "/" + image_hash
- if does_hdfs_entry_exist(manifest_path, raise_on_error=False):
- logging.debug("image manifest for %s exists: %s", image, manifest_path)
- else:
- logging.info("Not removing %s. Image manifest doesn't exist: %s", image, manifest_path)
- continue
-
- delete_list.append(manifest_path)
-
- manifest, manifest_hash = get_hdfs_manifest_from_path(manifest_path)
-
- config_hash = get_config_hash_from_manifest(manifest)
- logging.debug("config_hash: %s", config_hash)
-
- delete_list.append(hdfs_config_dir + "/" + config_hash)
-
- layers = get_layer_hashes_from_manifest(manifest, False)
- layers_paths = []
- for layer in layers:
- if layer not in layers_to_keep:
- layers_paths.append(hdfs_layers_dir + "/" + layer + ".sqsh")
- delete_list.extend(layers_paths)
-
- logging.debug("delete_list: %s", delete_list)
-
- remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_hash)
+ for image_to_remove in images_to_remove:
+ remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_to_remove.manifest_stat.name)
write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
- atomic_upload_mv_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
- hdfs_root, replication,
+ atomic_upload_mv_to_hdfs(local_image_tag_to_hash, HDFS_ROOT + "/" + image_tag_to_hash, replication,
image_tag_to_hash_hash)
hdfs_rm(delete_list)
@@ -988,22 +965,87 @@
if os.path.isdir(working_dir):
shutil.rmtree(working_dir)
+
+def get_images_from_args(images_and_tags, tag_to_hash, known_images):
+ images = []
+
+ if isinstance(images_and_tags, Iterable):
+ for image_arg in images_and_tags:
+ image = get_image_hash_from_arg(image_arg, tag_to_hash, known_images)
+ if image:
+ images.append(image)
+ else:
+ image_arg = images_and_tags[0]
+ image = get_image_hash_from_arg(image_arg, tag_to_hash, known_images)
+ if image:
+ images.append(image)
+
+ return images
+
+
+def get_image_hash_from_arg(image_str, tag_to_hash, known_images):
+ if is_sha256_hash(image_str):
+ image_hash = image_str
+ else:
+ image_hash = tag_to_hash.get(image_str, None)
+
+ if image_hash:
+ image = get_known_image_by_hash(image_hash, known_images)
+ else:
+ logging.warn("image tag unknown: %s", image_str)
+ return None
+
+ return image
+
+
+def get_delete_list_from_images_to_remove(images_to_remove, known_images):
+ global HDFS_ROOT
+ global HDFS_MANIFEST_DIR
+ global HDFS_CONFIG_DIR
+ global HDFS_LAYERS_DIR
+ global HDFS_UNREF_DIR
+
+ layers_to_keep = []
+ delete_list = []
+
+ for image in known_images:
+ if image not in images_to_remove:
+ layers_to_keep.extend(image.layers)
+
+ for image_to_remove in images_to_remove:
+ delete_list.append(HDFS_MANIFEST_DIR + "/" + image_to_remove.manifest_stat.name)
+ delete_list.append(HDFS_CONFIG_DIR + "/" + image_to_remove.config)
+ if image_to_remove.unref_file_stat:
+ delete_list.append(HDFS_UNREF_DIR + "/" + image_to_remove.unref_file_stat.name)
+
+ layers = image_to_remove.layers
+ for layer in layers:
+ if layer not in layers_to_keep:
+ layer_path = HDFS_LAYERS_DIR + "/" + layer + ".sqsh"
+ if layer_path not in delete_list:
+ delete_list.append(layer_path)
+
+ logging.debug("delete_list:\n%s", delete_list)
+
+ return delete_list
+
+
def add_remove_tag(args):
+ global HDFS_ROOT
+ global HDFS_MANIFEST_DIR
+
pull_format = args.pull_format
- hdfs_root = args.hdfs_root
image_tag_to_hash = args.image_tag_to_hash
replication = args.replication
sub_command = args.sub_command
images_and_tags = args.images_and_tags
- hdfs_manifest_dir = hdfs_root + "/manifests"
working_dir = None
try:
working_dir = get_working_dir(args.working_dir)
local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
- hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
- image_tag_to_hash,
+ hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
local_image_tag_to_hash)
for image_and_tag_arg in images_and_tags:
@@ -1015,31 +1057,33 @@
manifest_hash = tag_to_hash.get(image, None)
if manifest_hash:
- manifest_path = hdfs_manifest_dir + "/" + manifest_hash
+ manifest_path = HDFS_MANIFEST_DIR + "/" + manifest_hash
out, err, returncode = hdfs_cat(manifest_path)
manifest = json.loads(out)
logging.debug("image tag exists for %s", image)
else:
manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
- update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
- manifest_hash, image)
+ update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, image)
elif sub_command == "remove-tag":
tags = image_and_tag_arg.split(",")
+ image = None
+ manifest = None
+ manifest_hash = 0
remove_from_dicts(hash_to_tags, tag_to_hash, tags)
else:
- raise Exception("Invalid sub_command: %s" % (sub_command))
+ raise Exception(f"Invalid sub_command: {sub_command}")
write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
- atomic_upload_mv_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
- hdfs_root, replication,
+ atomic_upload_mv_to_hdfs(local_image_tag_to_hash, HDFS_ROOT + "/" + image_tag_to_hash, replication,
image_tag_to_hash_hash)
finally:
if working_dir:
if os.path.isdir(working_dir):
shutil.rmtree(working_dir)
+
def copy_update(args):
image_tag_to_hash = args.image_tag_to_hash
replication = args.replication
@@ -1051,15 +1095,17 @@
src_layers_dir = src_root + "/layers"
src_config_dir = src_root + "/config"
- src_manifest_dir = src_root + "/manifests"
+ src_manifest_dir = src_root + "/manifests"
dest_layers_dir = dest_root + "/layers"
dest_config_dir = dest_root + "/config"
- dest_manifest_dir = dest_root + "/manifests"
+ dest_manifest_dir = dest_root + "/manifests"
+ dest_unref_dir = dest_root + "/unreferenced"
if bootstrap:
- hdfs_dirs = [dest_root, dest_layers_dir, dest_config_dir, dest_manifest_dir]
+ hdfs_dirs = [dest_root, dest_layers_dir, dest_config_dir, dest_manifest_dir, dest_unref_dir]
image_tag_to_hash_path = dest_root + "/" + image_tag_to_hash
setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
+
working_dir = None
try:
@@ -1069,8 +1115,11 @@
local_dest_image_tag_to_hash = os.path.join(working_dir, "dest-"
+ os.path.basename(image_tag_to_hash))
- src_hash_to_tags, src_tag_to_hash, src_image_tag_to_hash_hash = populate_tag_dicts(src_root, image_tag_to_hash, local_src_image_tag_to_hash)
- dest_hash_to_tags, dest_tag_to_hash, dest_image_tag_to_hash_hash = populate_tag_dicts(dest_root, image_tag_to_hash, local_dest_image_tag_to_hash)
+ src_hash_to_tags, src_tag_to_hash, src_image_tag_to_hash_hash = populate_tag_dicts_set_root(image_tag_to_hash,
+ local_src_image_tag_to_hash,
+ src_root)
+ dest_hash_to_tags, dest_tag_to_hash, dest_image_tag_to_hash_hash = populate_tag_dicts_set_root(
+ image_tag_to_hash, local_dest_image_tag_to_hash, dest_root)
for image_and_tag_arg in images_and_tags:
image, tags = split_image_and_tag(image_and_tag_arg)
@@ -1101,21 +1150,30 @@
logging.debug("Copying Layers: %s", str(src_layers_paths))
logging.debug("Copying Config: %s", str(src_config_hash))
- hdfs_cp(src_layers_paths, dest_layers_dir, force)
- hdfs_cp(src_config_path, dest_config_dir, force)
- hdfs_cp(src_manifest_path, dest_manifest_dir, force)
+ if not does_hdfs_entry_exist(dest_layers_paths, raise_on_error=False):
+ dest_layers_paths = []
+ for layer in src_layers:
+ dest_layer_path = dest_layers_dir + "/" + layer + ".sqsh"
+ src_layer_path = src_layers_dir + "/" + layer + ".sqsh"
+ if not does_hdfs_entry_exist(dest_layer_path, raise_on_error=False):
+ hdfs_cp(src_layer_path, dest_layer_path, force)
+ dest_layers_paths.append(dest_layer_path)
+ hdfs_setrep(replication, dest_layers_paths)
+ hdfs_chmod("444", dest_layers_paths)
- hdfs_setrep(replication, dest_layers_paths)
- hdfs_setrep(replication, dest_config_path)
- hdfs_setrep(replication, dest_manifest_path)
+ if not does_hdfs_entry_exist(dest_config_path, raise_on_error=False):
+ hdfs_cp(src_config_path, dest_config_dir, force)
+ hdfs_setrep(replication, dest_config_path)
+ hdfs_chmod("444", dest_config_path)
- hdfs_chmod("444", dest_layers_paths)
- hdfs_chmod("444", dest_config_path)
- hdfs_chmod("444", dest_manifest_path)
+ if not does_hdfs_entry_exist(dest_manifest_path, raise_on_error=False):
+ hdfs_cp(src_manifest_path, dest_manifest_dir, force)
+ hdfs_setrep(replication, dest_manifest_path)
+ hdfs_chmod("444", dest_manifest_path)
for tag in tags:
- new_tags_and_comments = src_hash_to_tags.get(src_manifest_hash, None)
comment = None
+ new_tags_and_comments = src_hash_to_tags.get(src_manifest_hash, None)
if new_tags_and_comments:
comment = ', '.join(map(str, new_tags_and_comments[1]))
if comment is None:
@@ -1124,8 +1182,8 @@
update_dicts(dest_hash_to_tags, dest_tag_to_hash, tag, src_manifest_hash, comment)
write_local_image_tag_to_hash(local_dest_image_tag_to_hash, dest_hash_to_tags)
- atomic_upload_mv_to_hdfs(local_dest_image_tag_to_hash, image_tag_to_hash,
- dest_root, replication,
+ atomic_upload_mv_to_hdfs(local_dest_image_tag_to_hash, dest_root + "/" + image_tag_to_hash,
+ replication,
dest_image_tag_to_hash_hash)
finally:
@@ -1133,8 +1191,13 @@
if os.path.isdir(working_dir):
shutil.rmtree(working_dir)
+
def query_tag(args):
- hdfs_root = args.hdfs_root
+ global HDFS_ROOT
+ global HDFS_MANIFEST_DIR
+ global HDFS_CONFIG_DIR
+ global HDFS_LAYERS_DIR
+
image_tag_to_hash = args.image_tag_to_hash
tags = args.tags
working_dir = None
@@ -1142,24 +1205,19 @@
try:
working_dir = get_working_dir(args.working_dir)
local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
- hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
- image_tag_to_hash,
+ hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
local_image_tag_to_hash)
logging.debug("hash_to_tags: %s", str(hash_to_tags))
logging.debug("tag_to_hash: %s", str(tag_to_hash))
- hdfs_layers_dir = hdfs_root + "/layers"
- hdfs_config_dir = hdfs_root + "/config"
- hdfs_manifest_dir = hdfs_root + "/manifests"
-
for tag in tags:
image_hash = tag_to_hash.get(tag, None)
if not image_hash:
logging.info("image hash mapping doesn't exist for tag %s", tag)
continue
- manifest_path = hdfs_manifest_dir + "/" + image_hash
+ manifest_path = HDFS_MANIFEST_DIR + "/" + image_hash
if does_hdfs_entry_exist(manifest_path, raise_on_error=False):
logging.debug("image manifest for %s exists: %s", tag, manifest_path)
else:
@@ -1169,9 +1227,9 @@
manifest, manifest_hash = get_hdfs_manifest_from_path(manifest_path)
layers = get_layer_hashes_from_manifest(manifest, False)
config_hash = get_config_hash_from_manifest(manifest)
- config_path = hdfs_config_dir + "/" + config_hash
+ config_path = HDFS_CONFIG_DIR + "/" + config_hash
- layers_paths = [hdfs_layers_dir + "/" + layer + ".sqsh" for layer in layers]
+ layers_paths = [HDFS_LAYERS_DIR + "/" + layer + ".sqsh" for layer in layers]
logging.info("Image info for '%s'", tag)
logging.info(manifest_path)
@@ -1184,27 +1242,324 @@
if os.path.isdir(working_dir):
shutil.rmtree(working_dir)
+
def list_tags(args):
- hdfs_root = args.hdfs_root
+ global HDFS_ROOT
+
image_tag_to_hash = args.image_tag_to_hash
- hdfs_image_tag_to_hash = hdfs_root + "/" + image_tag_to_hash
- if does_hdfs_entry_exist(hdfs_image_tag_to_hash, raise_on_error=False):
- hdfs_cat(hdfs_image_tag_to_hash, True, True, False)
- else:
- logging.error("image-tag-to-hash file doesn't exist: %s", hdfs_image_tag_to_hash)
+ hdfs_image_tag_to_hash = HDFS_ROOT + "/" + image_tag_to_hash
+ hdfs_cat(hdfs_image_tag_to_hash, True, True, True)
+
def bootstrap_setup(args):
- hdfs_root = args.hdfs_root
- image_tag_to_hash = args.image_tag_to_hash
- hdfs_layers_dir = hdfs_root + "/layers"
- hdfs_config_dir = hdfs_root + "/config"
- hdfs_manifest_dir = hdfs_root + "/manifests"
+ global HDFS_ROOT
+ global HDFS_MANIFEST_DIR
+ global HDFS_CONFIG_DIR
+ global HDFS_LAYERS_DIR
+ global HDFS_UNREF_DIR
- hdfs_dirs = [hdfs_root, hdfs_layers_dir, hdfs_config_dir, hdfs_manifest_dir]
- image_tag_to_hash_path = hdfs_root + "/" + image_tag_to_hash
+ image_tag_to_hash = args.image_tag_to_hash
+
+ hdfs_dirs = [HDFS_ROOT, HDFS_LAYERS_DIR, HDFS_CONFIG_DIR, HDFS_MANIFEST_DIR, HDFS_UNREF_DIR]
+ image_tag_to_hash_path = HDFS_ROOT + "/" + image_tag_to_hash
setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
+
+def cleanup_untagged_images(args):
+ global HDFS_ROOT
+ global HDFS_MANIFEST_DIR
+ global HDFS_CONFIG_DIR
+ global HDFS_LAYERS_DIR
+ global HDFS_UNREF_DIR
+ global DEAD_PERMS
+
+ image_tag_to_hash = args.image_tag_to_hash
+ working_dir = get_working_dir(args.working_dir)
+
+ local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
+
+ try:
+ hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
+ local_image_tag_to_hash)
+ logging.debug("hash_to_tags: %s\n", hash_to_tags)
+ logging.debug("tag_to_hash: %s\n", tag_to_hash)
+
+ known_images = get_all_known_images()
+ tagged_images = [image for image in known_images if image.manifest_stat.name in hash_to_tags.keys()]
+ untagged_images = get_untagged_images(known_images, tagged_images)
+ stale_images = get_stale_images(untagged_images)
+ dead_images = get_dead_images(untagged_images)
+
+ cleanup_handle_tagged_images(tagged_images)
+ cleanup_handle_untagged_images(untagged_images)
+ cleanup_handle_stale_images(stale_images)
+ cleanup_handle_dead_images(dead_images, known_images)
+
+ finally:
+ if working_dir:
+ if os.path.isdir(working_dir):
+ shutil.rmtree(working_dir)
+
+
+class Filestat:
+ def __init__(self, perms, mod_time, name):
+ self.perms = perms
+ self.mod_time = mod_time
+ self.name = name
+
+ def __repr__(self):
+ return "Perms: {}, Mod time: {}, Name: {}\n".format(self.perms, self.mod_time, self.name)
+
+ def __str__(self):
+ return "Perms: {}, Mod time: {}, Name: {}\n".format(self.perms, self.mod_time, self.name)
+
+
+class Image:
+ def __init__(self, manifest_stat, layers, config, unref_file_stat):
+ self.manifest_stat = manifest_stat
+ self.layers = layers
+ self.config = config
+ self.unref_file_stat = unref_file_stat
+
+ def __repr__(self):
+ return "Manifest: {}, Layers: {}, Config: {}, Unreferenced File: {}\n".format(self.manifest_stat, self.layers,
+ self.config, self.unref_file_stat)
+
+ def __str__(self):
+ return "Manifest: {}, Layers: {}, Config: {}, Unreferenced File: {}\n".format(self.manifest_stat, self.layers,
+ self.config, self.unref_file_stat)
+
+
+def get_all_known_images():
+ global HDFS_MANIFEST_DIR
+ global HDFS_UNREF_DIR
+
+ known_manifest_paths, err, returncode = hdfs_stat(HDFS_MANIFEST_DIR + "/*", "%a %Y %n", False, False, False)
+ known_manifests = [image for image in known_manifest_paths.split("\n") if image is not "" or None]
+
+ unref_manifest_paths, err, returncode = hdfs_stat(HDFS_UNREF_DIR + "/*", "%a %Y %n", False, False, False)
+ logging.debug("unref_manifest_paths:\n%s", unref_manifest_paths)
+ unref_manifests = []
+ if unref_manifest_paths:
+ unref_manifests = [image for image in unref_manifest_paths.split("\n") if image is not "" or None]
+ logging.debug("unref_manifests:\n%s", unref_manifests)
+
+ unref_manifests_stats_dict = {}
+ if unref_manifests:
+ for unref_manifest in unref_manifests:
+ unref_manifest_split = unref_manifest.split()
+ unref_manifest_perms = unref_manifest_split[0]
+ unref_manifest_mod_time = long(unref_manifest_split[1])
+ unref_manifest_name = unref_manifest_split[2]
+ unref_manifest_stat = Filestat(unref_manifest_perms, unref_manifest_mod_time, unref_manifest_name)
+ unref_manifests_stats_dict[unref_manifest_name] = unref_manifest_stat
+
+ logging.debug("unref_manifests_stats_dict:\n%s", unref_manifests_stats_dict)
+
+ known_manifests_names = [known_manifest.split()[2] for known_manifest in known_manifests]
+ layers_and_configs = get_all_layers_and_configs(known_manifests_names)
+
+ known_images = []
+ for manifest in known_manifests:
+ manifest_split = manifest.split()
+ manifest_perms = manifest_split[0]
+ manifest_mod_time = long(manifest_split[1])
+ manifest_name = manifest_split[2]
+ manifest_stat = Filestat(manifest_perms, manifest_mod_time, manifest_name)
+
+ unref_image_stat = unref_manifests_stats_dict.get(manifest_name, None)
+
+ layers = layers_and_configs[manifest_name][0]
+ config = layers_and_configs[manifest_name][1]
+ known_image = Image(manifest_stat, layers, config, unref_image_stat)
+ known_images.append(known_image)
+
+ return known_images
+
+
+def get_known_image_by_hash(image_hash, known_images):
+ for image in known_images:
+ if image_hash == image.manifest_stat.name:
+ return image
+ logging.debug("Couldn't find known image by hash:\n%s", image_hash)
+ return None
+
+
+def get_all_layers_and_configs(manifest_names):
+ global HDFS_MANIFEST_DIR
+
+ manifests_tuples = get_hdfs_manifests_from_paths(
+ [HDFS_MANIFEST_DIR + "/" + manifest_name for manifest_name in manifest_names])
+ layers_and_configs = {}
+
+ for manifest_tuple in manifests_tuples:
+ manifest = manifest_tuple[0]
+ manifest_hash = manifest_tuple[1]
+ layers = []
+ layers.extend(get_layer_hashes_from_manifest(manifest, False))
+ config = get_config_hash_from_manifest(manifest)
+ layers_and_configs[manifest_hash] = (layers, config)
+
+ logging.debug("layers for %s:\n%s", manifest_hash, layers)
+ logging.debug("config for %s:\n%s", manifest_hash, config)
+
+ return layers_and_configs
+
+
+def get_untagged_images(known_images, tagged_images):
+ untagged_images = []
+ for image in known_images:
+ if is_image_untagged(image, tagged_images):
+ untagged_images.append(image)
+
+ logging.debug("known_images:\n%s", known_images)
+ logging.debug("tagged_images:\n%s", tagged_images)
+ logging.debug("untagged_images:\n%s", untagged_images)
+ return untagged_images
+
+
+def get_stale_images(untagged_images):
+ stale_images = [image for image in untagged_images if is_image_stale(image)]
+ logging.debug("stale_images:\n%s", stale_images)
+ return stale_images
+
+
+def get_dead_images(untagged_images):
+ dead_images = [image for image in untagged_images if is_image_dead(image)]
+ logging.debug("dead_images:\n%s", dead_images)
+ return dead_images
+
+
+def is_image_untagged(image, tagged_images):
+ for tagged_image in tagged_images:
+ if tagged_image.manifest_stat.name == image.manifest_stat.name:
+ return False
+ return True
+
+
+def is_image_stale(image):
+ return does_image_have_unref_file(image) and not does_image_have_dead_perms(image)
+
+
+def is_image_dead(image):
+ return does_image_have_unref_file(image) and does_image_have_dead_perms(image)
+
+
+def does_image_have_unref_file(image):
+ return image.unref_file_stat != None
+
+
+def does_image_have_dead_perms(image):
+ global DEAD_PERMS
+ return image.manifest_stat.perms == DEAD_PERMS
+
+
+def is_mod_time_old(mod_time):
+ global UNTAGGED_TRANSITION_SEC
+
+ cutoff_time = long(time.time() * 1000) - UNTAGGED_TRANSITION_SEC * 1000
+ logging.debug("Mod time: %d, Cutoff time: %d", mod_time, cutoff_time)
+ return mod_time < cutoff_time
+
+
+def cleanup_handle_tagged_images(tagged_images):
+ # Remove unreferenced files if they exist
+ if not tagged_images:
+ return
+
+ unref_remove_list = []
+ for image in tagged_images:
+ if does_image_have_unref_file(image):
+ unref_remove_list.append(image)
+
+ remove_unref_files(unref_remove_list)
+
+
+def cleanup_handle_untagged_images(untagged_images):
+ # Create unreferenced file
+ if not untagged_images:
+ return
+
+ touch_list = []
+ for image in untagged_images:
+ if not does_image_have_unref_file(image):
+ touch_list.append(image)
+
+ touch_unref_files(touch_list)
+
+
+def cleanup_handle_stale_images(stale_images):
+ # Set blob permissions to 400 for old stale images
+ if not stale_images:
+ return
+
+ make_unreadable_list = []
+ for image in stale_images:
+ if is_mod_time_old(image.unref_file_stat.mod_time):
+ make_unreadable_list.append(image)
+
+ make_manifests_unreadable(make_unreadable_list)
+ touch_unref_files(make_unreadable_list)
+
+
+def cleanup_handle_dead_images(dead_images, known_images):
+ # Remove old dead images
+ if not dead_images:
+ return
+
+ images_to_remove = []
+ for image in dead_images:
+ if is_mod_time_old(image.unref_file_stat.mod_time):
+ images_to_remove.append(image)
+
+ remove_dead_images(images_to_remove, known_images)
+
+
+def make_manifests_unreadable(images):
+ global HDFS_MANIFEST_DIR
+ global DEAD_PERMS
+
+ if not images:
+ return
+
+ manifest_file_paths = [HDFS_MANIFEST_DIR + "/" + image.unref_file_stat.name for image in images]
+ logging.debug("Chmod %s manifest file:\n%s", DEAD_PERMS, manifest_file_paths)
+ hdfs_chmod(DEAD_PERMS, manifest_file_paths)
+
+
+def touch_unref_files(images):
+ global HDFS_UNREF_DIR
+
+ if not images:
+ return
+
+ unref_file_paths = [HDFS_UNREF_DIR + "/" + image.manifest_stat.name for image in images]
+ logging.debug("Touching unref file:\n%s", unref_file_paths)
+ hdfs_touchz(unref_file_paths)
+
+
+def remove_unref_files(images):
+ global HDFS_UNREF_DIR
+
+ if not images:
+ return
+
+ unref_file_paths = [HDFS_UNREF_DIR + "/" + image.manifest_stat.name for image in images]
+ logging.debug("Removing unref files:\n%s", unref_file_paths)
+ hdfs_rm(unref_file_paths)
+
+
+def remove_dead_images(images_to_remove, known_images):
+ if not images_to_remove:
+ return
+
+ logging.debug("Removing dead images:\n%s", images_to_remove)
+ delete_list = get_delete_list_from_images_to_remove(images_to_remove, known_images)
+ if delete_list:
+ hdfs_rm(delete_list)
+
+
def create_parsers():
parser = argparse.ArgumentParser()
add_parser_default_arguments(parser)
@@ -1223,7 +1578,7 @@
parse_pull_build = subparsers.add_parser('pull-build',
help='Pull an image and build its squashfs layers')
- parse_pull_build .set_defaults(func=pull_build)
+ parse_pull_build.set_defaults(func=pull_build)
add_parser_default_arguments(parse_pull_build)
parse_pull_build.add_argument("images_and_tags", nargs="+",
help="Image and tag argument (can specify multiple)")
@@ -1286,14 +1641,18 @@
parse_list_tags.set_defaults(func=list_tags)
add_parser_default_arguments(parse_list_tags)
- parse_bootstrap_setup= subparsers.add_parser('bootstrap',
- help='Bootstrap setup of required HDFS'
- + 'directories')
+ parse_bootstrap_setup = subparsers.add_parser('bootstrap', help='Bootstrap setup of required HDFS directories')
parse_bootstrap_setup.set_defaults(func=bootstrap_setup)
add_parser_default_arguments(parse_bootstrap_setup)
+ parse_cleanup_untagged_images = subparsers.add_parser('cleanup',
+ help='Cleanup untagged images in HDFS')
+ parse_cleanup_untagged_images.set_defaults(func=cleanup_untagged_images)
+ add_parser_default_arguments(parse_cleanup_untagged_images)
+
return parser
+
def add_parser_default_arguments(parser):
parser.add_argument("--working-dir", type=str, dest='working_dir', default="dts-work-dir",
help="Name of working directory")
@@ -1303,7 +1662,7 @@
default='docker', help="Pull format for skopeo")
parser.add_argument("-l", "--log", type=str, dest='LOG_LEVEL',
default="INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)")
- parser.add_argument("--hdfs-root", type=str, dest='hdfs_root',
+ parser.add_argument("--hdfs-root", type=str, dest='HDFS_ROOT',
default='/runc-root', help="The root directory in HDFS for all of the"
+ "squashfs images")
parser.add_argument("--image-tag-to-hash", type=str,
@@ -1325,35 +1684,43 @@
parser.add_argument("--max-layers", type=int, dest='MAX_IMAGE_LAYERS',
default=37, help="Maximum number of layers an image is allowed to have")
parser.add_argument("--max-size", type=int, dest='MAX_IMAGE_SIZE',
- default=10*1024*1024*1024, help="Maximum size an image is allowed to be")
+ default=10 * 1024 * 1024 * 1024, help="Maximum size an image is allowed to be")
+ parser.add_argument("--untagged-transition-sec", type=long, dest='UNTAGGED_TRANSITION_SEC',
+ default=7 * 24 * 60 * 60, help="Time that untagged images will spend in each state"
+ + "before moving to the next one")
+ parser.add_argument("--dead-perms", type=str, dest='DEAD_PERMS',
+ default="400", help="Permissions to set for manifests that are untagged "
+ + "before they are removed")
parser.add_argument("-b", "--bootstrap", dest='bootstrap',
action="store_true", default=False, help="Bootstrap setup"
+ " of required HDFS directories")
return parser
+
def check_dependencies():
global HADOOP_BIN_DIR
+
try:
- command = [HADOOP_BIN_DIR + "/hadoop", "version"]
+ command = [HADOOP_BIN_DIR + "/hadoop", "-h"]
shell_command(command, False, False, True)
- except:
+ except Exception as ex:
logging.error("Could not find hadoop. Make sure HADOOP_PREFIX " +
"is set correctly either in your environment or on the command line " +
- "via --hadoop-prefix")
+ "via --hadoop-prefix" +
+ ", exception is " + str(ex))
return 1
try:
command = ["skopeo", "-v"]
shell_command(command, False, False, True)
- except:
- logging.error("Could not find skopeo. Make sure it is installed and present " +
- "on the PATH")
+ except Exception as _:
+ logging.error("Could not find skopeo. Make sure it is installed and present on the PATH")
return 1
try:
command = ["/usr/sbin/mksquashfs", "-version"]
shell_command(command, False, False, True)
- except:
+ except Exception as _:
logging.error("Could not find /usr/sbin/mksquashfs. Make sure squashfs-tools is installed " +
"and /usr/sbin/mksquashfs is present on the the PATH")
return 1
@@ -1361,27 +1728,34 @@
try:
command = ["tar", "--version"]
shell_command(command, False, False, True)
- except:
- logging.error("Could not find tar. Make sure it is installed and present " +
- "on the PATH")
+ except Exception as _:
+ logging.error("Could not find tar. Make sure it is installed and present on the PATH")
return 1
try:
command = ["setfattr", "--version"]
shell_command(command, False, False, True)
except:
- logging.error("Could not find setfattr . Make sure it is installed and present " +
- "on the PATH")
+ logging.error("Could not find setfattr . Make sure it is installed and present on the PATH")
return 1
return 0
+
def main():
global LOG_LEVEL
global HADOOP_PREFIX
global HADOOP_BIN_DIR
+ global HDFS_ROOT
+ global HDFS_MANIFEST_DIR
+ global HDFS_CONFIG_DIR
+ global HDFS_LAYERS_DIR
+ global HDFS_UNREF_DIR
global MAX_IMAGE_LAYERS
global MAX_IMAGE_SIZE
+ global UNTAGGED_TRANSITION_SEC
+ global ARG_MAX
+ global DEAD_PERMS
if os.geteuid() != 0:
logging.error("Script must be run as root")
@@ -1391,10 +1765,18 @@
args, extra = parser.parse_known_args()
if extra:
- raise Exception("Extra unknown arguments given: %s" % (extra))
+ raise Exception(f"Extra unknown arguments given: {extra}")
+ ARG_MAX = os.sysconf("SC_ARG_MAX")
+ HDFS_ROOT = args.HDFS_ROOT
+ HDFS_MANIFEST_DIR = HDFS_ROOT + "/manifests"
+ HDFS_CONFIG_DIR = HDFS_ROOT + "/config"
+ HDFS_LAYERS_DIR = HDFS_ROOT + "/layers"
+ HDFS_UNREF_DIR = HDFS_ROOT + "/unreferenced"
MAX_IMAGE_LAYERS = args.MAX_IMAGE_LAYERS
MAX_IMAGE_SIZE = args.MAX_IMAGE_SIZE
+ UNTAGGED_TRANSITION_SEC = args.UNTAGGED_TRANSITION_SEC
+ DEAD_PERMS = args.DEAD_PERMS
LOG_LEVEL = args.LOG_LEVEL.upper()
image_tag_to_hash = args.image_tag_to_hash
@@ -1427,5 +1809,6 @@
args.func(args)
+
if __name__ == "__main__":
main()
diff --git a/bin/storm.py b/bin/storm.py
index 05dd1d2..ed51fc2 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
@@ -28,25 +28,27 @@
from argparse import HelpFormatter
from operator import attrgetter
-class SortingHelpFormatter(HelpFormatter):
+if sys.version_info[0] == 2:
+ raise Exception("Python version 2 is not supported. Please download and use python3")
+import configparser
+from urllib.parse import quote_plus
+
+
+class SortingHelpFormatter(HelpFormatter):
def add_arguments(self, actions):
actions = sorted(actions, key=attrgetter('option_strings'))
super(SortingHelpFormatter, self).add_arguments(actions)
-if sys.version_info[0] == 2:
- import ConfigParser as configparser
- from urllib import quote_plus
-else:
- import configparser
- from urllib.parse import quote_plus
def is_windows():
return sys.platform.startswith('win')
+
def identity(x):
return x
+
def cygpath(x):
command = ["cygpath", "-wp", x]
p = subprocess.Popen(command, stdout=subprocess.PIPE)
@@ -54,9 +56,11 @@
lines = output.split(os.linesep)
return lines[0]
+
def get_config_opts(storm_config_opts):
return "-Dstorm.options=" + ','.join([quote_plus(s) for s in storm_config_opts])
+
def get_jars_full(adir):
files = []
if os.path.isdir(adir):
@@ -66,8 +70,9 @@
return [os.path.join(adir, f) for f in files if f.endswith(".jar")]
-# If given path is a dir, make it a wildcard so the JVM will include all JARs in the directory.
+
def get_wildcard_dir(path):
+ """If given path is a dir, make it a wildcard so the JVM will include all JARs in the directory."""
ret = []
if os.path.isdir(path):
ret = [(os.path.join(path, "*"))]
@@ -75,12 +80,14 @@
ret = [path]
return ret
+
def get_java_cmd():
cmd = 'java' if not is_windows() else 'java.exe'
if JAVA_HOME:
cmd = os.path.join(JAVA_HOME, 'bin', cmd)
return cmd
+
def confvalue(name, storm_config_opts, extrapaths, overriding_conf_file=None, daemon=True):
command = [
JAVA_CMD, "-client", get_config_opts(storm_config_opts),
@@ -103,7 +110,7 @@
ret = get_wildcard_dir(STORM_DIR)
if client:
ret.extend(get_wildcard_dir(STORM_WORKER_LIB_DIR))
- else :
+ else:
ret.extend(get_wildcard_dir(STORM_LIB_DIR))
ret.extend(get_wildcard_dir(os.path.join(STORM_DIR, "extlib")))
if daemon:
@@ -116,7 +123,7 @@
return NORMAL_CLASS_PATH(os.pathsep.join(ret))
-def init_storm_env():
+def init_storm_env(within_unittest=False):
global NORMAL_CLASS_PATH, STORM_DIR, USER_CONF_DIR, STORM_CONF_DIR, STORM_WORKER_LIB_DIR, STORM_LIB_DIR,\
STORM_TOOLS_LIB_DIR, STORM_WEBAPP_LIB_DIR, STORM_BIN_DIR, STORM_LOG4J2_CONF_DIR, STORM_SUPERVISOR_LOG_FILE,\
@@ -130,7 +137,7 @@
CLUSTER_CONF_DIR = STORM_CONF_DIR if STORM_CONF_DIR else os.path.join(STORM_DIR, "conf")
- if (not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml"))):
+ if not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml")):
USER_CONF_DIR = CLUSTER_CONF_DIR
STORM_WORKER_LIB_DIR = os.path.join(STORM_DIR, "lib-worker")
@@ -148,10 +155,10 @@
JAVA_CMD = get_java_cmd()
if JAVA_HOME and not os.path.exists(JAVA_CMD):
- print("ERROR: JAVA_HOME is invalid. Could not find bin/java at %s." % JAVA_HOME)
+ print(f"ERROR: JAVA_HOME is invalid. Could not find bin/java at {JAVA_HOME}.")
sys.exit(1)
- if not os.path.exists(STORM_LIB_DIR):
+ if not (within_unittest or os.path.exists(STORM_LIB_DIR)):
print("*" * 20)
print('''The storm client can only be run from within a release.
You appear to be trying to run the client from a checkout of Storm's source code.
@@ -159,7 +166,6 @@
print("*" * 20)
sys.exit(1)
-
STORM_EXT_CLASSPATH = os.getenv('STORM_EXT_CLASSPATH', None)
STORM_EXT_CLASSPATH_DAEMON = os.getenv('STORM_EXT_CLASSPATH_DAEMON', None)
LOCAL_TTL_DEFAULT = "20"
@@ -181,13 +187,13 @@
if not artifacts:
return {}
- print("Resolving dependencies on demand: artifacts (%s) with repositories (%s)" % (artifacts, artifact_repositories))
+ print(f"Resolving dependencies on demand: artifacts ({artifacts}) with repositories ({artifact_repositories})")
if maven_local_repos_dir:
- print("Local repository directory: %s" % maven_local_repos_dir)
+ print(f"Local repository directory: {maven_local_repos_dir}")
if proxy_url:
- print("Proxy information: url (%s) username (%s)" % (proxy_url, proxy_username))
+ print(f"Proxy information: url ({proxy_url}) username ({proxy_username})")
sys.stdout.flush()
@@ -214,20 +220,19 @@
p = subprocess.Popen(command, stdout=subprocess.PIPE)
output, errors = p.communicate()
if p.returncode != 0:
- raise RuntimeError("dependency handler returns non-zero code: code<%s> syserr<%s>" % (p.returncode, errors))
+ raise RuntimeError(f"dependency handler returns non-zero code: code<{p.returncode}> syserr<{errors}>")
- # python 3
if not isinstance(output, str):
output = output.decode('utf-8')
# For debug purpose, uncomment when you need to debug DependencyResolver
- # print("Resolved dependencies: %s" % output)
+ # print(f"Resolved dependencies: {output}")
try:
out_dict = json.loads(output)
return out_dict
except:
- raise RuntimeError("dependency handler returns non-json response: sysout<%s>", output)
+ raise RuntimeError(f"dependency handler returns non-json response: sysout<{output}>", )
def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[],
@@ -238,15 +243,15 @@
if storm_log_dir is None or storm_log_dir in ["null", ""]:
storm_log_dir = os.path.join(STORM_DIR, "logs")
all_args = [
- JAVA_CMD, jvmtype,
- "-Ddaemon.name=" + daemonName,
- get_config_opts(storm_config_opts),
- "-Dstorm.home=" + STORM_DIR,
- "-Dstorm.log.dir=" + storm_log_dir,
- "-Djava.library.path=" + confvalue("java.library.path", storm_config_opts, extrajars, daemon=daemon),
- "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
- "-cp", get_classpath(extrajars, daemon, client=client),
- ] + jvmopts + [klass] + list(main_class_args)
+ JAVA_CMD, jvmtype,
+ f"-Ddaemon.name={daemonName}",
+ get_config_opts(storm_config_opts),
+ f"-Dstorm.home={STORM_DIR}",
+ f"-Dstorm.log.dir={storm_log_dir}",
+ "-Djava.library.path=" + confvalue("java.library.path", storm_config_opts, extrajars, daemon=daemon),
+ "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
+ "-cp", get_classpath(extrajars, daemon, client=client),
+ ] + jvmopts + [klass] + list(main_class_args)
print("Running: " + " ".join(all_args))
sys.stdout.flush()
exit_code = 0
@@ -354,7 +359,6 @@
add_common_options(sub_parser)
-
def initialize_remoteconfvalue_subcommand(subparsers):
command_help = '''Prints out the value for conf-name in the cluster's Storm configs.
The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml
@@ -380,6 +384,7 @@
nargs='*', help="Runs the main method with the specified arguments."
)
+
def remove_common_options(sys_args):
flags_to_filter = ["-c", "-storm_config_opts", "--config"]
filtered_sys_args = [
@@ -388,6 +393,7 @@
]
return filtered_sys_args
+
def add_topology_jar_options(parser):
parser.add_argument(
"topology_jar_path", metavar="topology-jar-path",
@@ -395,7 +401,7 @@
)
parser.add_argument(
"topology_main_class", metavar="topology-main-class",
- help="main class of the topology jar being submitted"
+ help="main class of the topology jar being submitted"
)
@@ -413,7 +419,6 @@
For example, -artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" will load jedis and kafka-clients artifact and all of transitive dependencies but exclude slf4j-api from kafka.
''', default="")
-
parser.add_argument("--artifactRepositories", help='''
When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with a comma-separated string.
Repository format is "<name>^<url>". '^' is taken as separator because URL allows various characters.
@@ -422,7 +427,6 @@
parser.add_argument("--mavenLocalRepositoryDirectory", help="You can provide local maven repository directory via --mavenLocalRepositoryDirectory if you would like to use specific directory. It might help when you don't have '.m2/repository' directory in home directory, because CWD is sometimes non-deterministic (fragile).", default="")
-
parser.add_argument("--proxyUrl", help="You can also provide proxy information to let dependency resolver utilizing proxy if needed. URL representation of proxy ('http://host:port')", default="")
parser.add_argument("--proxyUsername", help="username of proxy if it requires basic auth", default="")
parser.add_argument("--proxyPassword", help="password of proxy if it requires basic auth", default="")
@@ -518,15 +522,17 @@
def check_non_negative(value):
ivalue = int(value)
if ivalue < 0:
- raise argparse.ArgumentTypeError("%s is not a non-zero integer" % value)
+ raise argparse.ArgumentTypeError(f"{value} is not a non-zero integer")
return ivalue
+
def check_positive(value):
ivalue = int(value)
if ivalue <= 0:
raise argparse.ArgumentTypeError("%s is not a positive integer" % value)
return ivalue
+
def initialize_upload_credentials_subcommand(subparsers):
command_help = """Uploads a new set of credentials to a running topology."""
sub_parser = subparsers.add_parser("upload-credentials", help=command_help, formatter_class=SortingHelpFormatter)
@@ -569,7 +575,6 @@
add_client_jar_options(sub_parser)
-
sub_parser.add_argument("sql_file", metavar="sql-file")
group = sub_parser.add_mutually_exclusive_group(required=True)
@@ -608,7 +613,6 @@
cat_parser.add_argument("-f", '--FILE', default=None)
add_common_options(cat_parser)
-
create_parser = sub_sub_parsers.add_parser(
"create", help="create a new blob. Contents comes from a FILE or STDIN", formatter_class=SortingHelpFormatter
)
@@ -1133,8 +1137,8 @@
def upload_credentials(args):
- if (len(args.cred_list) % 2 != 0):
- raise argparse.ArgumentTypeError("please provide a list of cred key and value pairs " + cred_list)
+ if len(args.cred_list) % 2 != 0:
+ raise argparse.ArgumentTypeError("please provide a list of cred key and value pairs " + args.cred_list)
exec_storm_class(
"org.apache.storm.command.UploadCredentials",
main_class_args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
@@ -1171,6 +1175,7 @@
extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
overriding_conf_file=args.config)
+
def listtopos(args):
exec_storm_class(
"org.apache.storm.command.ListTopologies",
@@ -1179,6 +1184,7 @@
extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
overriding_conf_file=args.config)
+
def set_log_level(args):
for log_level in args.l:
try:
@@ -1195,6 +1201,7 @@
extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
overriding_conf_file=args.config)
+
def deactivate(args):
exec_storm_class(
"org.apache.storm.command.Deactivate",
@@ -1286,9 +1293,9 @@
"storm.log4j2.conf.dir", storm_config_opts=storm_config_opts,
extrapaths=cppaths, overriding_conf_file=args.config
)
- if(not storm_log4j2_conf_dir or storm_log4j2_conf_dir == "null"):
+ if not storm_log4j2_conf_dir or storm_log4j2_conf_dir == "null":
storm_log4j2_conf_dir = STORM_LOG4J2_CONF_DIR
- elif(not os.path.isabs(storm_log4j2_conf_dir)):
+ elif not os.path.isabs(storm_log4j2_conf_dir):
storm_log4j2_conf_dir = os.path.join(STORM_DIR, storm_log4j2_conf_dir)
return storm_log4j2_conf_dir
@@ -1478,6 +1485,7 @@
jvmtype="-client",
extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
+
def main():
init_storm_env()
storm_parser = initialize_main_command()
diff --git a/bin/test_docker_to_squash.py b/bin/test_docker_to_squash.py
new file mode 100644
index 0000000..d7a44e9
--- /dev/null
+++ b/bin/test_docker_to_squash.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from unittest import TestCase
+# import docker-to-squash as dtsq
+dtsq = __import__('docker-to-squash') # TODO: rename docker-to-squash.py as docker_to_squash.py
+
+
+class Test(TestCase):
+ def test_shell_command(self):
+ """
+ shell_command is used by many functions in docker_to_squash.py. Ensure that it works correctly.
+ Prior code was not returning any values, and was not detected till PR https://github.com/apache/storm/pull/3475
+ :return:
+ """
+ # expect success
+ cmd = ["ls", "-l"]
+ out, err, rc = dtsq.shell_command(cmd, True, True, True, timeout_sec=10)
+ self.assertEqual(0, rc, f"Failed cmd={cmd}\nrc={rc}\nout={out}\nerr={err}")
+
+ # expect failure
+ cmd = ["badcmd", "-l"]
+ out, err, rc = dtsq.shell_command(cmd, True, True, True, timeout_sec=10)
+ self.assertNotEqual(0, rc, f"Expected to fail cmd={cmd}\nrc={rc}\nout={out}\nerr={err}")
+
+ # TODO:
+ # def test_read_image_tag_to_hash(self):
+ # """
+ # The base method behaves differently, since string in python3 is always unicode. Base method was flips between
+ # byte arrays and strings. This may not always work properly in python3.
+ # :return:
+ # """
+ # image_tag_to_hash = ""
+ # hash_to_tags, tag_to_hash = dtsq.read_image_tag_to_hash(image_tag_to_hash)
+
diff --git a/bin/test_storm.py b/bin/test_storm.py
new file mode 100644
index 0000000..11c8057
--- /dev/null
+++ b/bin/test_storm.py
@@ -0,0 +1,217 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+import storm
+import os
+
+
+class Test(unittest.TestCase):
+ """
+ Mostly just test coverage
+ """
+ _testMethodName = None
+ _testMethodDoc = None
+
+ def __init__(self, method_name="None"):
+ super().__init__()
+ self._testMethodName = method_name
+ storm.init_storm_env(within_unittest=True)
+
+ def test_get_jars_full(self):
+ storm.get_jars_full(".")
+
+ def test_get_wildcard_dir(self):
+ s = storm.get_wildcard_dir("./")
+ self.assertEqual(s, ["./*"])
+
+ def test_get_java_cmd(self):
+ s = storm.get_java_cmd()
+ expected = 'java' if not storm.is_windows() else 'java.exe'
+ if storm.JAVA_HOME:
+ expected = os.path.join(storm.JAVA_HOME, 'bin', expected)
+ self.assertEqual(s, expected)
+
+ def test_confvalue(self):
+ name = 'name'
+ storm_config_opts = {'ui.port': '8080'}
+ extrapaths = []
+ overriding_conf_file = None
+ daemon = True
+ s = storm.confvalue(name, storm_config_opts, extrapaths, overriding_conf_file, daemon)
+ expected = ""
+ self.assertEqual(s, expected)
+
+ def test_get_classpath(self):
+ extrajars = [f"jar{x}.jar" for x in range(5)]
+ daemon = True
+ client = False
+ s = storm.get_classpath(extrajars, daemon, client)
+ expected = ":".join(extrajars)
+ self.assertEqual(s[-len(expected):], expected)
+
+ def test_resolve_dependencies(self):
+ artifacts = "org.apache.commons.commons-api"
+ artifact_repositories = "maven-central"
+ maven_local_repos_dir = "~/.m2"
+ proxy_url = None
+ proxy_username = None
+ proxy_password = None
+ try:
+ output = storm.resolve_dependencies(artifacts, artifact_repositories, maven_local_repos_dir,
+ proxy_url, proxy_username, proxy_password)
+ except RuntimeError as ex:
+ print(f"Unexpected {ex=}, {type(ex)=}")
+ # test coverage only
+
+ def test_exec_storm_class(self):
+ klass = "org.apache.storm.starter.WordCountTopology"
+ storm_config_opts = []
+ jvmtype = "-server"
+ jvmopts = []
+ extrajars = []
+ main_class_args = []
+ fork = False
+ daemon = True
+ client = False
+ daemonName = ""
+ overriding_conf_file = None
+ # exit_code = storm.exec_storm_class(klass, storm_config_opts=storm_config_opts, jvmtype=jvmtype, jvmopts=jvmopts,
+ # extrajars=extrajars, main_class_args=main_class_args, fork=fork,
+ # daemon=daemon, client=client, daemonName=daemonName,
+ # overriding_conf_file=overriding_conf_file)
+
+ def test_run_client_jar(self):
+ pass
+
+ def test_print_localconfvalue(self):
+ class Args:
+ conf_name = self._testMethodName
+ storm_config_opts = {self._testMethodName: "confvalue"}
+ config = "config/file/path.yaml"
+
+ args = Args()
+ storm.print_localconfvalue(args)
+
+ def test_print_remoteconfvalue(self):
+ class Args:
+ conf_name = self._testMethodName
+ storm_config_opts = {self._testMethodName: "confvalue"}
+ config = "config/file/path.yaml"
+
+ args = Args()
+ storm.print_remoteconfvalue(args)
+
+ def test_initialize_main_command(self):
+ storm.initialize_main_command()
+
+ def test_jar(self):
+ pass
+
+ def test_local(self):
+ pass
+
+ def test_sql(self):
+ pass
+
+ def test_kill(self):
+ pass
+
+ def test_upload_credentials(self):
+ pass
+
+ def test_blob(self):
+ pass
+
+ def test_heartbeats(self):
+ pass
+
+ def test_activate(self):
+ pass
+
+ def test_listtopos(self):
+ pass
+
+ def test_set_log_level(self):
+ pass
+
+ def test_deactivate(self):
+ pass
+
+ def test_rebalance(self):
+ pass
+
+ def test_get_errors(self):
+ pass
+
+ def test_healthcheck(self):
+ pass
+
+ def test_kill_workers(self):
+ pass
+
+ def test_admin(self):
+ pass
+
+ def test_shell(self):
+ pass
+
+ def test_repl(self):
+ pass
+
+ def test_get_log4j2_conf_dir(self):
+ pass
+
+ def test_nimbus(self):
+ pass
+
+ def test_pacemaker(self):
+ pass
+
+ def test_supervisor(self):
+ pass
+
+ def test_ui(self):
+ pass
+
+ def test_logviewer(self):
+ pass
+
+ def test_drpc_client(self):
+ pass
+
+ def test_drpc(self):
+ pass
+
+ def test_dev_zookeeper(self):
+ pass
+
+ def test_version(self):
+ pass
+
+ def test_print_classpath(self):
+ storm.print_classpath(None)
+
+ def test_print_server_classpath(self):
+ storm.print_server_classpath(None)
+
+ def test_monitor(self):
+ pass
+
+
diff --git a/dev-tools/find-checkstyle-issues.py b/dev-tools/find-checkstyle-issues.py
index ea3520f..9a6d5a1 100755
--- a/dev-tools/find-checkstyle-issues.py
+++ b/dev-tools/find-checkstyle-issues.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,17 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import sys
import os
from optparse import OptionParser
import subprocess
+
def getCheckstyleFor(f, check_result):
f = os.path.abspath(f)
check_result = os.path.abspath(check_result)
ret = subprocess.check_output(['xsltproc', '--stringparam', 'target', f, './dev-tools/checkstyle.xslt', check_result])
if not ret.isspace():
- print ret
+ print(ret)
+
def main():
parser = OptionParser(usage="usage: %prog [options]")
@@ -34,5 +35,6 @@
for f in args:
getCheckstyleFor(f, options.check_result)
+
if __name__ == "__main__":
main()
diff --git a/dev-tools/github/__init__.py b/dev-tools/github/__init__.py
old mode 100755
new mode 100644
index 48f397b..4cc659d
--- a/dev-tools/github/__init__.py
+++ b/dev-tools/github/__init__.py
@@ -13,7 +13,8 @@
import getpass
import base64
-import urllib2
+import urllib
+import urllib.request
from datetime import datetime
import re
@@ -23,12 +24,6 @@
import simplejson as json
-def mstr(obj):
- if obj is None:
- return ""
- return unicode(obj)
-
-
def git_time(obj):
if obj is None:
return None
@@ -61,13 +56,13 @@
# TODO def review_comments
def user(self):
- return mstr(self.data["user"]["login"])
+ return self.data["user"]["login"]
def from_branch(self):
- return mstr(self.data["head"]["ref"])
+ return self.data["head"]["ref"]
def from_repo(self):
- return mstr(self.data["head"]["repo"]["clone_url"])
+ return self.data["head"]["repo"]["clone_url"]
def merged(self):
return self.data["merged_at"] is not None
@@ -111,11 +106,10 @@
page = 1
ret = []
while True:
- url = "https://api.github.com/repos/" + user + "/" + repo + "/pulls?state=" + type + "&page=" + str(page)
-
- req = urllib2.Request(url, None, self.headers)
- result = urllib2.urlopen(req)
- contents = result.read()
+ url = f"https://api.github.com/repos/{user}/{repo}/pulls?state={type}&page={page}"
+ req = urllib.request.Request(url, None, self.headers)
+ result = urllib.request.urlopen(req)
+ contents = result.read().decode()
if result.getcode() != 200:
raise Exception(result.getcode() + " != 200 " + contents)
got = json.loads(contents)
@@ -129,10 +123,10 @@
return self.pulls(user, repo, "open")
def pull(self, user, repo, number):
- url = "https://api.github.com/repos/" + user + "/" + repo + "/pulls/" + number
- req = urllib2.Request(url, None, self.headers)
- result = urllib2.urlopen(req)
- contents = result.read()
+ url = f"https://api.github.com/repos/{user}/{repo}/pulls/{number}"
+ req = urllib.request.Request(url, None, self.headers)
+ result = urllib.request.urlopen(req)
+ contents = result.read().decode()
if result.getcode() != 200:
raise Exception(result.getcode() + " != 200 " + contents)
got = json.loads(contents)
diff --git a/dev-tools/jira-github-join.py b/dev-tools/jira-github-join.py
index 1931f19..3e23ea1 100755
--- a/dev-tools/jira-github-join.py
+++ b/dev-tools/jira-github-join.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,6 +18,16 @@
from jira_github import JiraRepo
from report.report_builder import CompleteReportBuilder
+"""
+If you get certificate error when running on Mac,
+(https://stackoverflow.com/questions/50236117/scraping-ssl-certificate-verify-failed-error-for-http-en-wikipedia-org)
+Go to Macintosh HD
+ > Applications
+ > Python3.9 folder (or whatever version of python you're using)
+ > double click on "Install Certificates.command" file.
+
+"""
+
def main():
parser = OptionParser(usage="usage: %prog [options]")
@@ -29,8 +39,9 @@
jira_repo = JiraRepo("https://issues.apache.org/jira/rest/api/2")
github_repo = GitHub(options)
- print "Report generated on: %s (GMT)" % (datetime.strftime(datetime.utcnow(), "%Y-%m-%d %H:%M:%S"))
-
+ print("=" * 100)
+ print("Report generated on: %s (GMT)" % (datetime.strftime(datetime.utcnow(), "%Y-%m-%d %H:%M:%S")))
+ print("-" * 100)
report_builder = CompleteReportBuilder(jira_repo, github_repo)
report_builder.report.print_all()
diff --git a/dev-tools/jira_github/__init__.py b/dev-tools/jira_github/__init__.py
index c98ae31..46412e6 100644
--- a/dev-tools/jira_github/__init__.py
+++ b/dev-tools/jira_github/__init__.py
@@ -13,7 +13,8 @@
import re
import urllib
-import urllib2
+import urllib.request
+import urllib.parse
from datetime import datetime
try:
@@ -22,21 +23,16 @@
import simplejson as json
-def mstr(obj):
- if obj is None:
- return ""
- return unicode(obj)
-
-
def jiratime(obj):
if obj is None:
return None
return datetime.strptime(obj[0:19], "%Y-%m-%dT%H:%M:%S")
+
# Regex pattern definitions
-github_user = re.compile("Git[Hh]ub user ([\w-]+)")
-github_pull = re.compile("https://github.com/[^/\s]+/[^/\s]+/pull/[0-9]+")
-has_vote = re.compile("\s+([-+][01])\s*")
+github_user = re.compile(r"Git[Hh]ub user ([\w-]+)")
+github_pull = re.compile(r"https://github.com/[^/\s]+/[^/\s]+/pull/[0-9]+")
+has_vote = re.compile(r"\s+([-+][01])\s*")
is_diff = re.compile("--- End diff --")
@@ -52,7 +48,7 @@
def __init__(self, data):
self.data = data
- self.author = mstr(self.data['author']['name'])
+ self.author = self.data['author']['name']
self.github_author = None
self.githubPull = None
self.githubComment = (self.author == "githubbot")
@@ -72,7 +68,7 @@
return self.author
def get_body(self):
- return mstr(self.data['body'])
+ return self.data['body']
def get_pull(self):
return self.githubPull
@@ -104,16 +100,27 @@
self.comments = None
def get_id(self):
- return mstr(self.key)
+ """
+ Get Jira ID as a string from the string stored in self.key
+ :return: Jira id, example "STORM-1234"
+ """
+ return self.key
+
+ def get_id_num(self):
+ """
+ Get Jira ID number as an integer from the string stored in self.key
+ :return: Numeric Jira Id as a number. Example "STORM-1234" and "ZKP-1234" will both return 1234
+ """
+ return int(self.key.split('-')[-1])
def get_description(self):
- return mstr(self.fields['description'])
+ return self.fields['description']
def getReleaseNote(self):
if self.notes is None:
field = self.parent.fieldIdMap['Release Note']
- if self.fields.has_key(field):
- self.notes = mstr(self.fields[field])
+ if field in self.fields:
+ self.notes = self.fields[field]
else:
self.notes = self.get_description()
return self.notes
@@ -123,28 +130,28 @@
status = self.fields['status']
if status is not None:
ret = status['name']
- return mstr(ret)
+ return ret
def get_priority(self):
ret = ""
pri = self.fields['priority']
if pri is not None:
ret = pri['name']
- return mstr(ret)
+ return ret
def get_assignee_email(self):
ret = ""
mid = self.fields['assignee']
if mid is not None:
ret = mid['emailAddress']
- return mstr(ret)
+ return ret
def get_assignee(self):
ret = ""
mid = self.fields['assignee']
if mid is not None:
ret = mid['displayName']
- return mstr(ret)
+ return ret
def get_components(self):
return " , ".join([comp['name'] for comp in self.fields['components']])
@@ -162,21 +169,21 @@
mid = self.fields['issuetype']
if mid is not None:
ret = mid['name']
- return mstr(ret)
+ return ret
def get_reporter(self):
ret = ""
mid = self.fields['reporter']
if mid is not None:
ret = mid['displayName']
- return mstr(ret)
+ return ret
def get_project(self):
ret = ""
mid = self.fields['project']
if mid is not None:
ret = mid['key']
- return mstr(ret)
+ return ret
def get_created(self):
return jiratime(self.fields['created'])
@@ -191,11 +198,12 @@
at = 0
end = 1
count = 100
- while (at < end):
- params = urllib.urlencode({'startAt': at, 'maxResults': count})
- resp = urllib2.urlopen(self.parent.baseUrl + "/issue/" + jiraId + "/comment?" + params)
- data = json.loads(resp.read())
- if (data.has_key('errorMessages')):
+ while at < end:
+ params = urllib.parse.urlencode({'startAt': at, 'maxResults': count})
+ resp = urllib.request.urlopen(self.parent.baseUrl + "/issue/" + jiraId + "/comment?" + params)
+ resp_str = resp.read().decode()
+ data = json.loads(resp_str)
+ if 'errorMessages' in data:
raise Exception(data['errorMessages'])
at = data['startAt'] + data['maxResults']
end = data['total']
@@ -213,12 +221,12 @@
def get_trimmed_comments(self, limit=40):
comments = self.get_comments()
- return comments if len(comments) < limit else comments[0:limit] + "..."
+ return comments if len(comments) < limit else comments[0:limit] + ["..."]
def raw(self):
return self.fields
- def storm_jira_cmp(x, y):
+ def storm_jira_cmp(self, x, y):
xn = x.get_id().split("-")[1]
yn = y.get_id().split("-")[1]
return int(xn) - int(yn)
@@ -229,17 +237,19 @@
def __init__(self, baseUrl):
self.baseUrl = baseUrl
- resp = urllib2.urlopen(baseUrl + "/field")
- data = json.loads(resp.read())
+ resp = urllib.request.urlopen(baseUrl + "/field")
+ resp_str = resp.read().decode()
+ data = json.loads(resp_str)
self.fieldIdMap = {}
for part in data:
self.fieldIdMap[part['name']] = part['id']
def get(self, id):
- resp = urllib2.urlopen(self.baseUrl + "/issue/" + id)
- data = json.loads(resp.read())
- if (data.has_key('errorMessages')):
+ resp = urllib.request.urlopen(self.baseUrl + "/issue/" + id)
+ resp_str = resp.read().decode()
+ data = json.loads(resp_str)
+ if 'errorMessages' in data:
raise Exception(data['errorMessages'])
j = Jira(data, self)
return j
@@ -249,12 +259,13 @@
at = 0
end = 1
count = 100
- while (at < end):
- params = urllib.urlencode({'jql': query, 'startAt': at, 'maxResults': count})
+ while at < end:
+ params = urllib.parse.urlencode({'jql': query, 'startAt': at, 'maxResults': count})
# print params
- resp = urllib2.urlopen(self.baseUrl + "/search?%s" % params)
- data = json.loads(resp.read())
- if (data.has_key('errorMessages')):
+ resp = urllib.request.urlopen(self.baseUrl + "/search?%s" % params)
+ resp_str = resp.read().decode()
+ data = json.loads(resp_str)
+ if 'errorMessages' in data:
raise Exception(data['errorMessages'])
at = data['startAt'] + data['maxResults']
end = data['total']
@@ -268,18 +279,18 @@
:param project: The JIRA project to search for unresolved issues
:return: All JIRA issues that have the field resolution = Unresolved
"""
- return self.query("project = " + project + " AND resolution = Unresolved")
+ return self.query(f"project = {project} AND resolution = Unresolved")
def open_jiras(self, project):
"""
:param project: The JIRA project to search for open issues
:return: All JIRA issues that have the field status = Open
"""
- return self.query("project = " + project + " AND status = Open")
+ return self.query(f"project = {project} AND status = Open")
def in_progress_jiras(self, project):
"""
:param project: The JIRA project to search for In Progress issues
:return: All JIRA issues that have the field status = 'In Progress'
"""
- return self.query("project = " + project + " AND status = 'In Progress'")
+ return self.query(f"project = {project} AND status = 'In Progress'")
diff --git a/dev-tools/release_notes.py b/dev-tools/release_notes.py
old mode 100644
new mode 100755
index c1e053a..1c8663d
--- a/dev-tools/release_notes.py
+++ b/dev-tools/release_notes.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
@@ -30,7 +30,7 @@
import itertools, sys
if len(sys.argv) < 2:
- print >>sys.stderr, "Usage: release_notes.py <version>"
+ print("Usage: release_notes.py <version>", file=sys.stderr)
sys.exit(1)
version = sys.argv[1]
@@ -38,19 +38,22 @@
JIRA_BASE_URL = 'https://issues.apache.org/jira'
MAX_RESULTS = 100 # This is constrained for cloud instances so we need to fix this value
+
def get_issues(jira, query, **kwargs):
"""
- Get all issues matching the JQL query from the JIRA instance. This handles expanding paginated results for you. Any additional keyword arguments are forwarded to the JIRA.search_issues call.
+ Get all issues matching the JQL query from the JIRA instance. This handles expanding paginated results for you.
+ Any additional keyword arguments are forwarded to the JIRA.search_issues call.
"""
results = []
startAt = 0
new_results = None
- while new_results == None or len(new_results) == MAX_RESULTS:
+ while new_results is None or len(new_results) == MAX_RESULTS:
new_results = jira.search_issues(query, startAt=startAt, maxResults=MAX_RESULTS, **kwargs)
results += new_results
startAt += len(new_results)
return results
+
def issue_link(issue):
return "%s/browse/%s" % (JIRA_BASE_URL, issue.key)
@@ -59,10 +62,11 @@
apache = JIRA(JIRA_BASE_URL)
issues = get_issues(apache, 'project=STORM and fixVersion=%s' % version)
if not issues:
- print >>sys.stderr, "Didn't find any issues for the target fix version"
+ print("Didn't find any issues for the target fix version", file=sys.stderr)
sys.exit(1)
- # Some resolutions, including a lack of resolution, indicate that the bug hasn't actually been addressed and we shouldn't even be able to create a release until they are fixed
+ # Some resolutions, including a lack of resolution, indicate that the bug hasn't actually been addressed and we
+ # shouldn't even be able to create a release until they are fixed
UNRESOLVED_RESOLUTIONS = [None,
"Unresolved",
"Duplicate",
@@ -77,42 +81,50 @@
"Workaround",
"Information Provided"
]
- unresolved_issues = [issue for issue in issues if issue.fields.resolution in UNRESOLVED_RESOLUTIONS or issue.fields.resolution.name in UNRESOLVED_RESOLUTIONS]
+ unresolved_issues = [issue for issue in issues
+ if issue.fields.resolution in UNRESOLVED_RESOLUTIONS
+ or issue.fields.resolution.name in UNRESOLVED_RESOLUTIONS]
if unresolved_issues:
- print >>sys.stderr, "The release is not completed since unresolved issues or improperly resolved issues were found still tagged with this release as the fix version:"
+ print("The release is not completed since unresolved issues or improperly resolved issues were found still "
+ "tagged with this release as the fix version:", file=sys.stderr)
for issue in unresolved_issues:
- print >>sys.stderr, "Unresolved issue: %15s %20s %s" % (issue.key, issue.fields.resolution, issue_link(issue))
- print >>sys.stderr
- print >>sys.stderr, "Note that for some resolutions, you should simply remove the fix version as they have not been truly fixed in this release."
+ print("Unresolved issue: %15s %20s %s" % (issue.key, issue.fields.resolution, issue_link(issue)),
+ file=sys.stderr)
+ print('', file=sys.stderr)
+ print("Note that for some resolutions, you should simply remove the fix version as they have not been truly "
+ "fixed in this release.", file=sys.stderr)
sys.exit(1)
- # Get list of (issue type, [issues]) sorted by the issue ID type, with each subset of issues sorted by their key so they
- # are in increasing order of bug #. To get a nice ordering of the issue types we customize the key used to sort by issue
- # type a bit to ensure features and improvements end up first.
+ # Get list of (issue type, [issues]) sorted by the issue ID type, with each subset of issues sorted by their key so
+ # they are in increasing order of bug #. To get a nice ordering of the issue types we customize the key used to sort
+ # by issue type a bit to ensure features and improvements end up first.
def issue_type_key(issue):
if issue.fields.issuetype.name == 'New Feature':
return -2
if issue.fields.issuetype.name == 'Improvement':
return -1
- return issue.fields.issuetype.id
- by_group = [(k,sorted(g, key=lambda issue: issue.id)) for k,g in itertools.groupby(sorted(issues, key=issue_type_key), lambda issue: issue.fields.issuetype.name)]
+ return int(issue.fields.issuetype.id)
- print "<!DOCTYPE html>"
- print "<html lang=\"en\">"
- print "<head>"
- print "<meta charset=\"utf-8\">"
- print "<title>Storm %(version)s Release Notes</title>" % { 'version': version }
- print "</head>"
- print "<body>"
- print "<h1>Release Notes for Storm %s</h1>" % version
- print """<p>JIRA issues addressed in the %(version)s release of Storm. Documentation for this
- release is available at the <a href="http://storm.apache.org/">Apache Storm
- project site</a>.</p>""" % { 'version': version }
- for itype, issues in by_group:
- print "<h2>%s</h2>" % itype
- print "<ul>"
- for issue in issues:
- print '<li>[<a href="%(link)s">%(key)s</a>] - %(summary)s</li>' % {'key': issue.key, 'link': issue_link(issue), 'summary': issue.fields.summary}
- print "</ul>"
- print "</body>"
- print "</html>"
+ by_group = [(k, sorted(g, key=lambda issue: issue.id))
+ for k, g in itertools.groupby(sorted(issues, key=issue_type_key),
+ lambda issue: issue.fields.issuetype.name)]
+ issues_str = "\n".join([
+ f"\n\t<h2>{itype}</h2>" +
+ f"\n\t<ul>" +
+ '\n\t\t'.join([f'<li>[<a href="{issue_link(issue)}">{issue.key}</a>] - {issue.fields.summary}</li>' for issue in issues]) +
+ "\n\t</ul>"
+ for itype, issues in by_group])
+
+ print(f"""<!DOCTYPE html>
+<html lang="en">
+<head>
+<meta charset="utf-8">
+<title>Storm {version} Release Notes</title>
+</head>
+<body>
+<h1>Release Notes for Storm {version}</h1>
+<p>JIRA issues addressed in the {version} release of Storm. Documentation for this
+ release is available at the <a href="https://storm.apache.org/">Apache Storm project site</a>.</p>
+{issues_str}
+</body>
+</html>""")
diff --git a/dev-tools/report/formatter.py b/dev-tools/report/formatter.py
index 81f574d..1bdd558 100644
--- a/dev-tools/report/formatter.py
+++ b/dev-tools/report/formatter.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,14 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-def encode(obj, encoding='UTF-8'):
- """
- Check if the object supports encode() method, and if so, encodes it.
- Encoding defaults to UTF-8.
- For example objects of type 'int' do not support encode
- """
- return obj.encode(encoding) if 'encode' in dir(obj) else obj
-
class Formatter:
def __init__(self, fields_tuple=(), row_tuple=(), min_width_tuple=None):
# Format to pass as first argument to the print function, e.g. '%s%s%s'
@@ -46,7 +38,7 @@
sizes = []
padding = 3
for i in range(0, len(row_tuple)):
- max_len = max(len(encode(fields_tuple[i])), len(str(encode(row_tuple[i]))))
+ max_len = max(len(fields_tuple[i]), len(str(row_tuple[i])))
if min_width_tuple is not None:
max_len = max(max_len, min_width_tuple[i])
sizes += [max_len + padding]
@@ -62,7 +54,7 @@
# Returns a tuple where each entry has a string that is the result of
# statements with the pattern "{!s:43}".format("Text")
def row_str_format(self, row_tuple):
- format_with_values = [str(self.data_format[0].format(encode(row_tuple[0])))]
+ format_with_values = [self.data_format[0].format(row_tuple[0])]
for i in range(1, len(row_tuple)):
- format_with_values += [str(self.data_format[i].format(encode(row_tuple[i])))]
+ format_with_values += [self.data_format[i].format(row_tuple[i])]
return tuple(format_with_values)
diff --git a/dev-tools/report/report.py b/dev-tools/report/report.py
old mode 100644
new mode 100755
index d2df077..bba78bc
--- a/dev-tools/report/report.py
+++ b/dev-tools/report/report.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,9 +13,8 @@
# limitations under the License.
from datetime import datetime
-from github import mstr
from jira_github import Jira
-from formatter import Formatter, encode
+from .formatter import Formatter
def daydiff(a, b):
@@ -24,23 +23,25 @@
class Report:
now = datetime.utcnow()
+
def __init__(self, header=''):
self.header = header
# if padding starts with - it puts padding before contents, otherwise after
@staticmethod
def _build_tuple(contents, padding=''):
- if padding is not '':
+ if padding:
out = []
for i in range(len(contents)):
- out += [padding[1:] + str(contents[i])] if padding[0] is '-' else [str(contents[i]) + padding]
+ out += [padding[1:] + str(contents[i])] if padding[0] == '-' else [str(contents[i]) + padding]
return tuple(out)
return contents
# calls the native print function with the following format. Text1,Text2,... has the correct spacing
- # print ("%s%s%s" % ("Text1, Text2, Text3))
+ # print("%s%s%s" % ("Text1, Text2, Text3))
def print_(self, formatter, row_tuple):
- print (formatter.format % formatter.row_str_format(row_tuple))
+ print(formatter.format % formatter.row_str_format(row_tuple))
+
class JiraReport(Report):
def __init__(self, issues, header=''):
@@ -58,9 +59,7 @@
def values_view(self, excluded=None):
temp_dic = dict(self.issues) if excluded is None else self.view(excluded)
- values = temp_dic.values()
- values.sort(Jira.storm_jira_cmp, reverse=True)
- return values
+ return sorted(temp_dic.values(), key=lambda jira: jira.get_id_num(), reverse=True)
@staticmethod
def _row_tuple(jira):
@@ -71,7 +70,7 @@
return -1, 43, -1, -1
def print_report(self):
- print "%s (Count = %s) " % (self.header, len(self.issues))
+ print(f"{self.header} (Count = {len(self.issues)}) ")
jiras = self.values_view()
fields_tuple = ('Jira Id', 'Summary', 'Created', 'Last Updated (Days)')
row_tuple = self._row_tuple(jiras[0])
@@ -110,7 +109,7 @@
return -1, 43, -1, -1, -1
def print_report(self):
- print "%s (Count = %s) " % (self.header, len(self.pull_requests))
+ print("%s (Count = %s) " % (self.header, len(self.pull_requests)))
fields_tuple = self._build_tuple(('URL', 'Title', 'Created', 'Last Updated (Days)', 'User'), '')
if len(self.pull_requests) > 0:
@@ -132,6 +131,7 @@
jira_ids.append(pull.jira_id())
return sorted(jira_ids)
+
class JiraGitHubCombinedReport(Report):
def __init__(self, jira_report, github_report, header='', print_comments=False):
Report.__init__(self, header)
@@ -158,7 +158,7 @@
def _jira_id(self, pull_idx):
pull = self._pull_request(pull_idx)
- return encode(pull.jira_id())
+ return str(pull.jira_id())
def _jira_issue(self, jira_id):
return self.jira_report.issues[jira_id]
@@ -168,11 +168,11 @@
jira_id = self._jira_id(pull_idx)
jira_issue = self._jira_issue(jira_id)
- return (jira_id, mstr(pull), jira_issue.get_trimmed_summary(),
+ return (jira_id, str(pull) if pull else "No PR", jira_issue.get_trimmed_summary(),
daydiff(Report.now, jira_issue.get_created()),
- daydiff(Report.now, pull.created_at()),
+ daydiff(Report.now, pull.created_at() if pull else "No PR"),
daydiff(Report.now, jira_issue.get_updated()),
- daydiff(Report.now, pull.updated_at()),
+ daydiff(Report.now, pull.updated_at() if pull else "No PR"),
jira_issue.get_status(), pull.user())
def _row_tuple_1(self, pull_idx, comment_idx):
@@ -188,7 +188,10 @@
# variables and method names ending with _1 correspond to the comments part
def print_report(self, print_comments=False):
- print "%s (Count = %s) " % (self.header, len(self.github_report.pull_requests))
+ pull_request_cnt = len(self.github_report.pull_requests)
+ print("%s (Count = %s) " % (self.header, pull_request_cnt))
+ if not pull_request_cnt:
+ return
fields_tuple = ('JIRA ID', 'Pull Request', 'Jira Summary', 'JIRA Age',
'Pull Age', 'JIRA Update Age', 'Pull Update Age (Days)',
@@ -197,15 +200,12 @@
formatter = Formatter(fields_tuple, row_tuple)
self.print_(formatter, fields_tuple)
- row_tuple_1 = ()
- formatter_1 = Formatter()
-
if print_comments or self.print_comments:
fields_tuple_1 = self._build_tuple(('Comment Vote', 'Comment Author', 'Pull URL', 'Comment Age'), '-\t\t')
row_tuple_1 = self._build_tuple(self._row_tuple_1(*self._idx_1st_comment_with_vote()), '-\t\t')
formatter_1 = Formatter(fields_tuple_1, row_tuple_1)
self.print_(formatter_1, fields_tuple_1)
- print ''
+ print('')
for p in range(0, len(self.github_report.pull_requests)):
row_tuple = self._row_tuple(p)
@@ -219,10 +219,11 @@
if comment.has_vote():
row_tuple_1 = self._build_tuple(self._row_tuple_1(p, c), '-\t\t')
if row_tuple_1 is not None:
+ formatter_1 = Formatter()
self.print_(formatter_1, row_tuple_1)
has_vote = True
if has_vote:
- print ''
+ print('')
class CompleteReport(Report):
@@ -233,8 +234,8 @@
self.jira_github_combined_reports = []
def print_all(self):
- if self.header is not '':
- print self.header
+ if self.header:
+ print(self.header)
self._print_github_reports()
self._print_jira_github_combined_reports()
diff --git a/dev-tools/report/report_builder.py b/dev-tools/report/report_builder.py
index 4b8a468..6f5c842 100644
--- a/dev-tools/report/report_builder.py
+++ b/dev-tools/report/report_builder.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from report import CompleteReport, GitHubReport, JiraReport, JiraGitHubCombinedReport
+from .report import CompleteReport, GitHubReport, JiraReport, JiraGitHubCombinedReport
class ReportBuilder:
@@ -41,8 +41,10 @@
# all unresolved JIRA issues
jira_unresolved = JiraReport(self.jira_repo.unresolved_jiras("STORM"))
- jira_open = JiraReport(dict((x, y) for x, y in self.jira_repo.unresolved_jiras("STORM").items() if y.get_status() == 'Open'))
- jira_in_progress = JiraReport(dict((x, y) for x, y in self.jira_repo.in_progress_jiras("STORM").items() if y.get_status() == 'In Progress'),
+ jira_open = JiraReport(dict((x, y) for x, y in self.jira_repo.unresolved_jiras("STORM").items()
+ if y.get_status().lower() == 'open'))
+ jira_in_progress = JiraReport(dict((x, y) for x, y in self.jira_repo.in_progress_jiras("STORM").items()
+ if y.get_status() == 'In Progress'),
"\nIN PROGRESS JIRA ISSUES")
for pull in github_open.pull_requests:
diff --git a/dev-tools/storm-merge.py b/dev-tools/storm-merge.py
index 33690c2..c96a71b 100755
--- a/dev-tools/storm-merge.py
+++ b/dev-tools/storm-merge.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
@@ -14,18 +14,20 @@
from github import GitHub
from optparse import OptionParser
-def main():
- parser = OptionParser(usage="usage: %prog [options] [pull number]")
- parser.add_option("-g", "--github-user", dest="gituser",
- type="string", help="github user, if not supplied no auth is used", metavar="USER")
-
- (options, args) = parser.parse_args()
- github = GitHub(options)
- for pullNumber in args:
- pull = github.pull("apache", "storm", pullNumber)
- print "git pull --no-ff "+pull.from_repo()+" "+pull.from_branch()
+def main():
+ parser = OptionParser(usage="usage: %prog [options] [pull number]")
+ parser.add_option("-g", "--github-user", dest="gituser",
+ type="string", help="github user, if not supplied no auth is used", metavar="USER")
+
+ (options, args) = parser.parse_args()
+ github = GitHub(options)
+
+ for pullNumber in args:
+ pull = github.pull("apache", "storm", pullNumber)
+ print("git pull --no-ff " + pull.from_repo() + " " + pull.from_branch())
+
if __name__ == "__main__":
- main()
+ main()
diff --git a/dev-tools/travis/ratprint.py b/dev-tools/travis/ratprint.py
index 5031045..70e0d95 100755
--- a/dev-tools/travis/ratprint.py
+++ b/dev-tools/travis/ratprint.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,13 +14,13 @@
import sys
import re
-p = re.compile('Unapproved licenses:\s*([^\s\*]*).*\*\*\*')
+p = re.compile(r'Unapproved licenses:\s*([^\s\*]*).*\*\*\*')
with open (sys.argv[1]) as ratfile:
- rat = ratfile.read().replace('\n','')
+ rat = ratfile.read().replace('\n', '')
matches = p.search(rat)
-failed = matches.group(1)
+failed = matches.group(1)
-if re.search('\S', failed):
- print failed
+if re.search(r'\S', failed):
+ print(failed)
diff --git a/dev-tools/travis/save-logs.py b/dev-tools/travis/save-logs.py
index 5f4ad28..a3a5215 100755
--- a/dev-tools/travis/save-logs.py
+++ b/dev-tools/travis/save-logs.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,13 +16,14 @@
import subprocess
from datetime import datetime, timedelta
+
def main(file, cmd):
- print cmd, "writing to", file
+ print(cmd, "writing to", file)
out = open(file, "w")
count = 0
process = subprocess.Popen(cmd,
- stderr=subprocess.STDOUT,
- stdout=subprocess.PIPE)
+ stderr=subprocess.STDOUT,
+ stdout=subprocess.PIPE)
start = datetime.now()
nextPrint = datetime.now() + timedelta(seconds=1)
@@ -30,10 +31,11 @@
pout = process.stdout
line = pout.readline()
while line:
+ line = line.decode('utf-8')
count = count + 1
if datetime.now() > nextPrint:
diff = datetime.now() - start
- sys.stdout.write("\r%d seconds %d log lines"%(diff.seconds, count))
+ sys.stdout.write(f"\r{diff.seconds} seconds {count} log lines")
sys.stdout.flush()
nextPrint = datetime.now() + timedelta(seconds=10)
out.write(line)
@@ -41,14 +43,15 @@
out.close()
errcode = process.wait()
diff = datetime.now() - start
- sys.stdout.write("\r%d seconds %d log lines"%(diff.seconds, count))
- print
- print cmd, "done", errcode
+ sys.stdout.write(f"\r{diff.seconds} seconds {count} log lines")
+ print()
+ print(cmd, "done", errcode)
return errcode
+
if __name__ == "__main__":
- if sys.argv < 1:
- print "Usage: %s [file info]" % sys.argv[0]
+ if len(sys.argv) < 3:
+ print(f"Usage: {sys.argv[0]} <file-path> <cmd>")
sys.exit(1)
sys.exit(main(sys.argv[1], sys.argv[2:]))
diff --git a/dev-tools/travis/travis-install.sh b/dev-tools/travis/travis-install.sh
index d4ca821..6080625 100755
--- a/dev-tools/travis/travis-install.sh
+++ b/dev-tools/travis/travis-install.sh
@@ -11,41 +11,40 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-echo "Python version : " `python -V 2>&1`
-echo "Python3 version : " `python3 -V 2>&1`
-echo "Pip2 version : " `pip2 --version 2>&1`
-echo "Pip3 version : " `pip3 --version 2>&1`
+# shellcheck disable=SC2006
+echo "Python version : $(python -V 2>&1) (note python2 is not supported) "
+echo "Python3 version : $(python3 -V 2>&1) "
+echo "Pip3 version : $(pip3 --version 2>&1) "
-echo "Maven version : " `mvn -v`
+echo "Maven version : $(mvn -v)"
STORM_SRC_ROOT_DIR=$1
TRAVIS_SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
-pip2 install --user -r ${TRAVIS_SCRIPT_DIR}/requirements.txt
-pip3 install --user -r ${TRAVIS_SCRIPT_DIR}/requirements.txt
+pip3 install --user -r "${TRAVIS_SCRIPT_DIR}"/requirements.txt
-python ${TRAVIS_SCRIPT_DIR}/save-logs.py "storm-shaded-deps/install-shade.txt" mvn clean install --batch-mode -pl storm-shaded-deps -am
+python3 "${TRAVIS_SCRIPT_DIR}"/save-logs.py "storm-shaded-deps/install-shade.txt" mvn clean install --batch-mode -pl storm-shaded-deps -am
BUILD_RET_VAL=$?
if [[ "$BUILD_RET_VAL" != "0" ]];
then
- cat "storm-shaded-deps/install-shade.txt"
- exit ${BUILD_RET_VAL}
+ cat "storm-shaded-deps/install-shade.txt"
+ exit ${BUILD_RET_VAL}
fi
-cd ${STORM_SRC_ROOT_DIR}
-python ${TRAVIS_SCRIPT_DIR}/save-logs.py "install.txt" mvn clean install -DskipTests -Pnative,examples,externals -pl '!storm-shaded-deps' --batch-mode
+cd "${STORM_SRC_ROOT_DIR}" || ( echo "Cannot cd to ${STORM_SRC_ROOT_DIR}"; exit 1 )
+python3 "${TRAVIS_SCRIPT_DIR}"/save-logs.py "install.txt" mvn clean install -DskipTests -Pnative,examples,externals -pl '!storm-shaded-deps' --batch-mode
BUILD_RET_VAL=$?
if [[ "$BUILD_RET_VAL" != "0" ]];
then
- cat "install.txt"
- echo "Looking for unapproved licenses"
- for rat in `find . -name rat.txt`;
- do
- python ${TRAVIS_SCRIPT_DIR}/ratprint.py "${rat}"
- done
+ cat "install.txt"
+ echo "Looking for unapproved licenses"
+ for rat in $(find . -name rat.txt)
+ do
+ python3 "${TRAVIS_SCRIPT_DIR}"/ratprint.py "${rat}"
+ done
fi
diff --git a/dev-tools/travis/travis-script.sh b/dev-tools/travis/travis-script.sh
index b743688..6908596 100755
--- a/dev-tools/travis/travis-script.sh
+++ b/dev-tools/travis/travis-script.sh
@@ -11,10 +11,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-echo "Python version : " `python -V 2>&1`
-echo "Ruby version : " `ruby -v`
-echo "NodeJs version : " `node -v`
-echo "Maven version : " `mvn -v`
+echo "Python3 version: " $(python3 -V 2>&1)
+echo "Ruby version : " $(ruby -v)
+echo "NodeJs version : " $(node -v)
+echo "Maven version : " $(mvn -v)
set -x
@@ -22,31 +22,31 @@
TRAVIS_SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
-cd ${STORM_SRC_ROOT_DIR}
+cd "${STORM_SRC_ROOT_DIR}" || (echo "Cannot cd to ${STORM_SRC_ROOT_DIR}"; exit 1)
if [ "$2" == "Integration-Test" ]
- then
- exec ./integration-test/run-it.sh
+ then
+ exec ./integration-test/run-it.sh
elif [ "$2" == "Check-Updated-License-Files" ]
- then
- exec python3.6 dev-tools/validate-license-files.py --skip-build-storm
+ then
+ exec python3 dev-tools/validate-license-files.py --skip-build-storm
elif [ "$2" == "Client" ]
then
- TEST_MODULES=storm-client
+ TEST_MODULES=storm-client
elif [ "$2" == "Server" ]
then
- TEST_MODULES=storm-server,storm-webapp
+ TEST_MODULES=storm-server,storm-webapp
elif [ "$2" == "Core" ]
then
- TEST_MODULES=storm-core
+ TEST_MODULES=storm-core
elif [ "$2" == "External" ]
then
- if [ "$TRAVIS_JDK_VERSION" == "openjdk11" ]
- then
- TEST_MODULES='!storm-client,!storm-server,!storm-core,!storm-webapp,!storm-shaded-deps,!external/storm-cassandra,!external/storm-hive,!external/storm-hdfs,!external/storm-hbase,!sql/storm-sql-external/storm-sql-hdfs,!external/storm-hdfs-blobstore'
- else
- TEST_MODULES='!storm-client,!storm-server,!storm-core,!storm-webapp,!storm-shaded-deps'
- fi
+ if [ "$TRAVIS_JDK_VERSION" == "openjdk11" ]
+ then
+ TEST_MODULES='!storm-client,!storm-server,!storm-core,!storm-webapp,!storm-shaded-deps,!external/storm-cassandra,!external/storm-hive,!external/storm-hdfs,!external/storm-hbase,!sql/storm-sql-external/storm-sql-hdfs,!external/storm-hdfs-blobstore'
+ else
+ TEST_MODULES='!storm-client,!storm-server,!storm-core,!storm-webapp,!storm-shaded-deps'
+ fi
fi
# We should be concerned that Travis CI could be very slow because it uses VM
export STORM_TEST_TIMEOUT_MS=150000
@@ -56,10 +56,10 @@
mvn --batch-mode test -fae -Pnative,all-tests,examples,externals -Prat -pl "$TEST_MODULES"
BUILD_RET_VAL=$?
-for dir in `find . -type d -and -wholename \*/target/\*-reports`;
+for dir in $(find . -type d -and -wholename \*/target/\*-reports)
do
echo "Looking for errors in ${dir}"
- python ${TRAVIS_SCRIPT_DIR}/print-errors-from-test-reports.py "${dir}"
+ python3 "${TRAVIS_SCRIPT_DIR}"/print-errors-from-test-reports.py "${dir}"
done
exit ${BUILD_RET_VAL}
diff --git a/dev-tools/validate-license-files.py b/dev-tools/validate-license-files.py
index 66da6c0..98e6d0e 100755
--- a/dev-tools/validate-license-files.py
+++ b/dev-tools/validate-license-files.py
@@ -18,13 +18,11 @@
from contextlib import contextmanager
-from random import randint
from pathlib import Path
import os
import subprocess
import shlex
import filecmp
-import shutil
import re
import itertools
import argparse
diff --git a/storm-multilang/python/src/main/resources/resources/storm.py b/storm-multilang/python/src/main/resources/resources/storm.py
index 0af6e8b..b11a7f0 100755
--- a/storm-multilang/python/src/main/resources/resources/storm.py
+++ b/storm-multilang/python/src/main/resources/resources/storm.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
@@ -29,8 +30,9 @@
json_encode = lambda x: json.dumps(x)
json_decode = lambda x: json.loads(x)
-#reads lines and reconstructs newlines appropriately
+
def readMsg():
+ """reads lines and reconstructs newlines appropriately"""
msg = ""
while True:
line = sys.stdin.readline()
@@ -41,12 +43,14 @@
msg = msg + line
return json_decode(msg[0:-1])
+
MODE = None
ANCHOR_TUPLE = None
-#queue up commands we read while trying to read taskids
+# queue up commands we read while trying to read taskids
pending_commands = deque()
+
def readTaskIds():
if pending_taskids:
return pending_taskids.popleft()
@@ -57,9 +61,11 @@
msg = readMsg()
return msg
+
#queue up taskids we read while trying to read commands/tuples
pending_taskids = deque()
+
def readCommand():
if pending_commands:
return pending_commands.popleft()
@@ -70,31 +76,38 @@
msg = readMsg()
return msg
+
def readTuple():
cmd = readCommand()
return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
+
def sendMsgToParent(msg):
print(json_encode(msg))
print("end")
sys.stdout.flush()
+
def sync():
sendMsgToParent({'command':'sync'})
+
def sendpid(heartbeatdir):
pid = os.getpid()
sendMsgToParent({'pid':pid})
open(heartbeatdir + "/" + str(pid), "w").close()
+
def emit(*args, **kwargs):
__emit(*args, **kwargs)
return readTaskIds()
+
def emitDirect(task, *args, **kwargs):
kwargs["directTask"] = task
__emit(*args, **kwargs)
+
def __emit(*args, **kwargs):
global MODE
if MODE == Bolt:
@@ -102,7 +115,8 @@
elif MODE == Spout:
emitSpout(*args, **kwargs)
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
+
+def emitBolt(tup, stream=None, anchors=[], directTask=None):
global ANCHOR_TUPLE
if ANCHOR_TUPLE is not None:
anchors = [ANCHOR_TUPLE]
@@ -115,6 +129,7 @@
m["tuple"] = tup
sendMsgToParent(m)
+
def emitSpout(tup, stream=None, id=None, directTask=None):
m = {"command": "emit"}
if id is not None:
@@ -126,41 +141,53 @@
m["tuple"] = tup
sendMsgToParent(m)
+
def ack(tup):
sendMsgToParent({"command": "ack", "id": tup.id})
+
def fail(tup):
sendMsgToParent({"command": "fail", "id": tup.id})
+
def reportError(msg):
sendMsgToParent({"command": "error", "msg": msg})
+
def log(msg, level=2):
sendMsgToParent({"command": "log", "msg": msg, "level":level})
+
def logTrace(msg):
log(msg, 0)
+
def logDebug(msg):
log(msg, 1)
+
def logInfo(msg):
log(msg, 2)
+
def logWarn(msg):
log(msg, 3)
+
def logError(msg):
log(msg, 4)
+
def rpcMetrics(name, params):
sendMsgToParent({"command": "metrics", "name": name, "params": params})
+
def initComponent():
setupInfo = readMsg()
sendpid(setupInfo['pidDir'])
return [setupInfo['conf'], setupInfo['context']]
+
class Tuple(object):
def __init__(self, id, component, stream, task, values):
self.id = id
@@ -177,6 +204,7 @@
def is_heartbeat_tuple(self):
return self.task == -1 and self.stream == "__heartbeat"
+
class Bolt(object):
def initialize(self, stormconf, context):
pass
@@ -199,6 +227,7 @@
except Exception:
reportError(traceback.format_exc())
+
class BasicBolt(object):
def initialize(self, stormconf, context):
pass
@@ -228,6 +257,7 @@
except Exception:
reportError(traceback.format_exc())
+
class Spout(object):
def initialize(self, conf, context):
pass