[STORM-3855] Remove python2 support and move to python3. (#3475)

* [STORM-3855] Remove python2 support and move to python3.

* [STORM-3855] Change argument and I/O processing.

* [STORM-3855] Add test_storm.py

* [STORM-3855] PEP8 format,  global var initialization, fix shell_command return value, simpify regex, use log.warning

* [STORM-3855] Add test class for docker-to-squash.py (which must be later renamed to docker_to_squash.py)

* [STORM-3855] Convert byte array to string; initialize variable

* [STORM-3855] Convert bytes to str immediatelly after reading from URL and eliminate unnecesary string conversions.

* [STORM-3855] Reduce indent for main() call.

* [STORM-3855] Remove unused imports

* [STORM-3855] Add latest docker-to-squash from https://git.ouryahoo.com/hadoop/Hadoop/blob/y-branch-2.10/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/bin/docker-to-squash.py

* [STORM-3855] Change to python3; add license; PEP8 format; rename returncode to rc; log.warn to log.warning; fix working_dir assignment;

* [YSTORM-3855] Remove __init__.py in bin directory.
diff --git a/DEVELOPER.md b/DEVELOPER.md
index 70f8871..bfd0073 100644
--- a/DEVELOPER.md
+++ b/DEVELOPER.md
@@ -261,10 +261,12 @@
 
 in order to get started as fast as possible. Users can still install a specific version of `ruby` and/or `node` manually.
 
-You will also need the [mock](https://docs.python.org/3/library/unittest.mock.html) Python testing library (as well as [Python 2.7.x and Python 3.x](https://github.com/pyenv/pyenv)). With [pip](https://pip.pypa.io/en/stable/installing/) installed you can run
-
+You will also need the [mock](https://docs.python.org/3/library/unittest.mock.html) 
+python testing library (as well as [Python 3.x](https://github.com/pyenv/pyenv)). 
+With [pip3](https://pip.pypa.io/en/stable/installation/) installed you can run
+(python 2.7.x is not supported).
 ```
-pip install mock
+pip3 install mock
 ```
 
 ## Building
diff --git a/RELEASING.md b/RELEASING.md
index 8c37ef4..d352316 100644
--- a/RELEASING.md
+++ b/RELEASING.md
@@ -50,7 +50,7 @@
 
 6. Run `dev-tools/release_notes.py` for the release version, piping the output to a RELEASE_NOTES.html file. Move that file to the svn release directory, sign it, and generate checksums, e.g.
 ```bash
-python dev-tools/release_notes.py 2.2.0 > RELEASE_NOTES.html
+python3 dev-tools/release_notes.py 2.2.0 > RELEASE_NOTES.html
 gpg --armor --output RELEASE_NOTES.html.asc --detach-sig RELEASE_NOTES.html
 gpg --print-md SHA512 RELEASE_NOTES.html > RELEASE_NOTES.html.sha512
 ```
diff --git a/bin/docker-to-squash.py b/bin/docker-to-squash.py
index d5515db..e63c05c 100755
--- a/bin/docker-to-squash.py
+++ b/bin/docker-to-squash.py
@@ -1,22 +1,22 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
 """
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-
 docker_to_squash.py is a tool to facilitate the process of converting
 Docker images into squashFS layers, manifests, and configs.
 
@@ -33,19 +33,34 @@
 import re
 import shutil
 import subprocess
+import time
 from threading import Timer
 
 LOG_LEVEL = None
-HADOOP_BIN_DIR = None
+HADOOP_BIN_DIR = "/undefined"
+MAX_IMAGE_LAYERS = 0
+MAX_IMAGE_SIZE = 0
+HADOOP_PREFIX = "/undefined"
+ARG_MAX = 10
+HDFS_ROOT = "/undefined"
+HDFS_MANIFEST_DIR = "/undefined"
+HDFS_CONFIG_DIR = "/undefined"
+HDFS_LAYERS_DIR = "/undefined"
+HDFS_UNREF_DIR = "/undefined"
 
-def shell_command(command, print_stdout, print_stderr, raise_on_error,
-                  timeout_sec=600):
+
+def shell_command(command, print_stdout, print_stderr, raise_on_error, timeout_sec=600):
     global LOG_LEVEL
+    global ARG_MAX
     stdout_val = subprocess.PIPE
     stderr_val = subprocess.PIPE
 
     logging.debug("command: %s", command)
 
+    for arg in command:
+        if len(arg) > ARG_MAX:
+            raise Exception(f"command length ({len(arg)}) greater than ARG_MAX ({ARG_MAX})")
+
     if print_stdout:
         stdout_val = None
 
@@ -54,44 +69,45 @@
 
     process = None
     timer = None
+    out = None
+    err = None
     try:
-        process = subprocess.Popen(command, stdout=stdout_val,
-                                   stderr=stderr_val)
+        process = subprocess.Popen(command, stdout=stdout_val, stderr=stderr_val)
         timer = Timer(timeout_sec, process_timeout, [process])
 
         timer.start()
         out, err = process.communicate()
 
-        if raise_on_error and process.returncode is not 0:
-            exception_string = ("Command: " + str(command)
-                                + " failed with returncode: "
-                                + str(process.returncode))
-            if out is not None:
+        if raise_on_error and process.returncode:
+            exception_string = f"Command: {command} failed with returncode: {process.returncode}"
+            if out:
                 exception_string = exception_string + "\nstdout: " + str(out)
-            if err is not None:
+            if err:
                 exception_string = exception_string + "\nstderr: " + str(err)
             raise Exception(exception_string)
-
-    except:
+    except Exception as ex:
         if process and process.poll() is None:
             process.kill()
-        raise Exception("Popen failure")
+        raise Exception("Popen failure, " + str(ex))
     finally:
+        # Note that finally clause executes even when there is no exception, hence the "return" statement
         if timer:
             timer.cancel()
+        return out, err, (process.returncode if process else -1)
 
-    return out, err, process.returncode
 
 def process_timeout(process):
     process.kill()
     logging.error("Process killed due to timeout")
 
+
 def does_hdfs_entry_exist(entry, raise_on_error=True):
-    out, err, returncode = hdfs_ls(entry, raise_on_error=raise_on_error)
-    if returncode is not 0:
+    out, err, rc = hdfs_ls(entry, raise_on_error=raise_on_error)
+    if rc:
         return False
     return True
 
+
 def setup_hdfs_dirs(dirs):
     if does_hdfs_entry_exist(dirs, raise_on_error=False):
         return
@@ -102,59 +118,65 @@
         directories = dir_entry.split("/")[1:]
         dir_path = ""
         for directory in directories:
-            dir_path = dir_path + "/" +  directory
+            dir_path = dir_path + "/" + directory
             logging.info("dir_path: %s", str(dir_path))
             chmod_dirs.append(dir_path)
     hdfs_chmod("755", chmod_dirs)
 
+
 def append_or_extend_to_list(src, src_list):
     if isinstance(src, list):
         src_list.extend(src)
     else:
         src_list.append(src)
 
+
 def hdfs_get(src, dest, print_stdout=False, print_stderr=False, raise_on_error=True):
     global HADOOP_BIN_DIR
     command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-get"]
     append_or_extend_to_list(src, command)
     command.append(dest)
-    out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
-    return out, err, returncode
+    out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+    return out, err, rc
 
-def hdfs_ls(file_path, options="", print_stdout=False, print_stderr=False,
-            raise_on_error=True):
+
+def hdfs_ls(file_path, options="", print_stdout=False, print_stderr=False, raise_on_error=True):
     global HADOOP_BIN_DIR
     command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-ls"]
     if options:
         append_or_extend_to_list(options, command)
     append_or_extend_to_list(file_path, command)
-    out, err, returncode = shell_command(command, print_stdout, print_stderr,
-                                         raise_on_error)
-    return out, err, returncode
+    out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+    return out, err, rc
+
 
 def hdfs_cat(file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
     global HADOOP_BIN_DIR
     command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cat"]
     append_or_extend_to_list(file_path, command)
-    out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
-    return out, err, returncode
+    out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+    return out, err, rc
 
-def hdfs_mkdir(file_path, print_stdout=False, print_stderr=True, raise_on_error=True,
-               create_parents=False):
+
+def hdfs_mkdir(file_path, print_stdout=False, print_stderr=True, raise_on_error=True, create_parents=False):
     global HADOOP_BIN_DIR
     command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-mkdir"]
     if create_parents:
         command.append("-p")
     append_or_extend_to_list(file_path, command)
-    out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
-    return out, err, returncode
+    out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+    return out, err, rc
 
-def hdfs_rm(file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
+
+def hdfs_rm(file_path, error_on_file_not_found=False, print_stdout=False, print_stderr=True, raise_on_error=True):
     global HADOOP_BIN_DIR
     command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-rm"]
+    if not error_on_file_not_found:
+        command.append("-f")
     append_or_extend_to_list(file_path, command)
-    out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
-    return out, err, returncode
+    out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+    return out, err, rc
+
 
 def hdfs_put(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=True):
     global HADOOP_BIN_DIR
@@ -163,64 +185,76 @@
         command.append("-f")
     append_or_extend_to_list(src, command)
     command.append(dest)
-    out, err, returncode = shell_command(command, print_stdout, print_stderr,
-                                         raise_on_error, 60)
-    return out, err, returncode
+    out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error, 60)
+    return out, err, rc
 
-def hdfs_chmod(mode, file_path, print_stdout=False, print_stderr=True, raise_on_error=True,
-               recursive=False):
+
+def hdfs_chmod(mode, file_path, print_stdout=False, print_stderr=True, raise_on_error=True, recursive=False):
     global HADOOP_BIN_DIR
     command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-chmod"]
     if recursive:
         command.append("-R")
     command.append(mode)
     append_or_extend_to_list(file_path, command)
-    out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
-    return out, err, returncode
+    out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+    return out, err, rc
+
 
 def hdfs_setrep(replication, file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
     global HADOOP_BIN_DIR
-    command = [HADOOP_BIN_DIR +  "/hadoop", "fs", "-setrep", str(replication)]
+    command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-setrep", str(replication)]
     append_or_extend_to_list(file_path, command)
-    out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
-    return out, err, returncode
+    out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+    return out, err, rc
+
 
 def hdfs_cp(src, dest, force=False, print_stdout=False, print_stderr=True, raise_on_error=True):
     global HADOOP_BIN_DIR
-    command = [HADOOP_BIN_DIR +  "/hadoop", "fs", "-cp"]
+    command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-cp"]
     if force:
         command.append("-f")
     append_or_extend_to_list(src, command)
     command.append(dest)
-    out, err, returncode = shell_command(command, print_stdout, print_stderr,
-                                         raise_on_error, 60)
-    return out, err, returncode
+    out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+    return out, err, rc
 
-def hdfs_touchz(file_path, print_stdout=False, print_stderr=True,
-                raise_on_error=True):
+
+def hdfs_touchz(file_path, print_stdout=False, print_stderr=True, raise_on_error=True):
     global HADOOP_BIN_DIR
     command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-touchz"]
     append_or_extend_to_list(file_path, command)
-    out, err, returncode = shell_command(command, print_stdout, print_stderr, raise_on_error)
-    return out, err, returncode
+    out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+    return out, err, rc
+
+
+def hdfs_stat(file_path, fmt, print_stdout=False, print_stderr=True, raise_on_error=True):
+    global HADOOP_BIN_DIR
+    command = [HADOOP_BIN_DIR + "/hadoop", "fs", "-stat"]
+    append_or_extend_to_list(fmt, command)
+    command.append(file_path)
+    out, err, rc = shell_command(command, print_stdout, print_stderr, raise_on_error)
+    return out, err, rc
 
 
 def get_working_dir(directory):
+    working_dir = "(undefined)"
     try:
         if os.path.isdir(directory):
             working_dir = os.path.join(directory, "docker-to-squash")
         else:
             working_dir = directory
         os.makedirs(working_dir)
-    except:
-        raise Exception("Could not create working_dir: " + working_dir)
+    except Exception as ex:
+        raise Exception(f"Could not create working_dir: {working_dir}, {ex}")
     return working_dir
 
+
 def is_sha256_hash(string):
-    if not re.findall(r"^[a-fA-F\d]{64,64}$", string):
+    if not re.findall(r"^[a-fA-F\d]{64}$", string):
         return False
     return True
 
+
 def calculate_file_hash(filename):
     sha = hashlib.sha256()
     with open(filename, 'rb') as file_pointer:
@@ -229,16 +263,18 @@
             if not data:
                 break
             sha.update(data)
-    hexdigest = sha.hexdigest()
-    if hexdigest == 0:
-        raise Exception("Hex digest for file: " + hexdigest + "returned 0")
-    return hexdigest
+    hex_digest = sha.hexdigest()
+    if hex_digest == 0:
+        raise Exception(f"Hex digest for file: {filename} returned 0")
+    return hex_digest
+
 
 def calculate_string_hash(string):
     sha = hashlib.sha256()
     sha.update(string)
     return sha.hexdigest()
 
+
 def get_local_manifest_from_path(manifest_path):
     with open(manifest_path, "rb") as file_pointer:
         out = file_pointer.read()
@@ -246,16 +282,35 @@
     manifest = json.loads(out)
     return manifest, manifest_hash
 
+
 def get_hdfs_manifest_from_path(manifest_path):
-    out, err, returncode = hdfs_cat(manifest_path)
+    out, err, rc = hdfs_cat(manifest_path)
     manifest_hash = calculate_string_hash(str(out))
     manifest = json.loads(out)
     return manifest, manifest_hash
 
+
+def get_hdfs_manifests_from_paths(manifest_paths):
+    out, err, rc = hdfs_cat(manifest_paths)
+    manifests_list = out.split("}{")
+    manifests = []
+    for manifest_str in manifests_list:
+        if manifest_str[0] != "{":
+            manifest_str = "{" + manifest_str
+        if manifest_str[-1] != "}":
+            manifest_str = manifest_str + "}"
+        manifest_hash = calculate_string_hash(manifest_str)
+        logging.debug("manifest for %s:\n%s", manifest_hash, manifest_str)
+        manifest = json.loads(manifest_str)
+        manifests.append((manifest, manifest_hash))
+    return manifests
+
+
 def get_config_hash_from_manifest(manifest):
     config_hash = manifest['config']['digest'].split(":", 1)[1]
     return config_hash
 
+
 def check_total_layer_number(layers):
     global MAX_IMAGE_LAYERS
     if len(layers) > MAX_IMAGE_LAYERS:
@@ -264,6 +319,7 @@
                         " layers, which is more than the maximum " + str(MAX_IMAGE_LAYERS) +
                         " layers. Failing out")
 
+
 def check_total_layer_size(manifest, size):
     global MAX_IMAGE_SIZE
     if size > MAX_IMAGE_SIZE:
@@ -272,6 +328,7 @@
         raise Exception("Image has total size " + str(size) +
                         " B. which is more than the maximum size " + str(MAX_IMAGE_SIZE) + " B. Failing out")
 
+
 def get_layer_hashes_from_manifest(manifest, error_on_size_check=True):
     layers = []
     size = 0
@@ -286,27 +343,28 @@
 
     return layers
 
+
 def get_pull_fmt_string(pull_format):
     pull_fmt_string = pull_format + ":"
     if pull_format == "docker":
         pull_fmt_string = pull_fmt_string + "//"
     return pull_fmt_string
 
+
 def get_manifest_from_docker_image(pull_format, image):
     pull_fmt_string = get_pull_fmt_string(pull_format)
-    out, err, returncode = shell_command(["skopeo", "inspect", "--raw", pull_fmt_string + image],
-                                         False, True, True, 60)
+    out, err, rc = shell_command(["skopeo", "inspect", "--raw", pull_fmt_string + image], False, True, True)
     manifest = json.loads(out)
     if 'manifests' in manifest:
         logging.debug("skopeo inspect --raw returned a list of manifests")
         manifests_dict = manifest['manifests']
         sha = None
         for mfest in manifests_dict:
-            if(mfest['platform']['architecture'] == "amd64"):
+            if mfest['platform']['architecture'] == "amd64":
                 sha = mfest['digest']
                 break
         if not sha:
-            raise Exception("Could not find amd64 manifest for" + image)
+            raise Exception("Could not find amd64 manifest for image " + image)
 
         image_without_tag = image.split("/", 1)[-1].split(":", 1)[0]
         image_and_sha = image_without_tag + "@" + sha
@@ -320,12 +378,14 @@
     logging.debug("manifest: %s", str(manifest))
     return manifest, manifest_hash
 
+
 def split_image_and_tag(image_and_tag):
     split = image_and_tag.split(",")
     image = split[0]
     tags = split[1:]
     return image, tags
 
+
 def read_image_tag_to_hash(image_tag_to_hash):
     hash_to_tags = dict()
     tag_to_hash = dict()
@@ -334,7 +394,7 @@
             line = file_pointer.readline()
             if not line:
                 break
-            line = line.rstrip()
+            line = str(line).rstrip()
 
             if not line:
                 continue
@@ -348,7 +408,7 @@
             tags_list = ' '.join(split_line[:-1]).split(",")
 
             if not is_sha256_hash(manifest_hash) or not tags_list:
-                logging.warn("image-tag-to-hash file malformed. Skipping entry %s", line)
+                logging.warning("image-tag-to-hash file malformed. Skipping entry %s", line)
                 continue
 
             tags_and_comments = hash_to_tags.get(manifest_hash, None)
@@ -368,11 +428,12 @@
             for tag in tags_list:
                 cur_manifest = tag_to_hash.get(tag, None)
                 if cur_manifest is not None:
-                    logging.warn("tag_to_hash already has manifest %s defined for tag %s."
-                                 + "This entry will be overwritten", cur_manifest, tag)
+                    logging.warning(f"tag_to_hash already has manifest {cur_manifest} defined for tag {tag}. "
+                                    f"This entry will be overwritten")
                 tag_to_hash[tag] = manifest_hash
     return hash_to_tags, tag_to_hash
 
+
 def remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag):
     if not hash_to_tags:
         logging.debug("hash_to_tags is null. Not removing tag %s", tag)
@@ -391,6 +452,7 @@
     else:
         logging.debug("Tag not found. Not removing tag: %s", tag)
 
+
 def remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_hash):
     if not hash_to_tags:
         logging.debug("hash_to_tags is null. Not removing image_hash %s", image_hash)
@@ -404,6 +466,7 @@
         for tag in prev_tags:
             del tag_to_hash[tag]
 
+
 def add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment):
     tag_to_hash[tag] = manifest_hash
     new_tags_and_comments = hash_to_tags.get(manifest_hash, None)
@@ -419,14 +482,19 @@
             new_comment.append(comment)
     hash_to_tags[manifest_hash] = (new_tags, new_comment)
 
+
 def write_local_image_tag_to_hash(image_tag_to_hash, hash_to_tags):
     file_contents = []
     for key, value in hash_to_tags.iteritems():
         manifest_hash = key
+        # Sort tags list to preserve consistent order
+        value[0].sort()
         tags = ','.join(map(str, value[0]))
         if tags:
+            # Sort comments list to preserve consistent order
+            value[1].sort()
             comment = ', '.join(map(str, value[1]))
-            if comment > 0:
+            if comment:
                 comment = "#" + comment
             file_contents.append(tags + ":" + manifest_hash + comment + "\n")
 
@@ -435,29 +503,37 @@
         for val in file_contents:
             file_pointer.write(val)
 
-def update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
-                                   manifest_hash, comment):
+
+def update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, comment):
     for tag in tags:
         update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment)
 
+
 def update_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment):
     remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag)
     add_tag_to_dicts(hash_to_tags, tag_to_hash, tag, manifest_hash, comment)
 
+
 def remove_from_dicts(hash_to_tags, tag_to_hash, tags):
     for tag in tags:
         logging.debug("removing tag: %s", tag)
         remove_tag_from_dicts(hash_to_tags, tag_to_hash, tag)
 
-def populate_tag_dicts(hdfs_root, image_tag_to_hash, local_image_tag_to_hash):
 
-    if does_hdfs_entry_exist(hdfs_root + "/" + image_tag_to_hash):
-        hdfs_get(hdfs_root + "/" + image_tag_to_hash, local_image_tag_to_hash)
-        image_tag_to_hash_hash = calculate_file_hash(local_image_tag_to_hash)
-    else:
-        image_tag_to_hash_hash = 0
+def populate_tag_dicts(image_tag_to_hash, local_image_tag_to_hash):
+    return populate_tag_dicts_set_root(image_tag_to_hash, local_image_tag_to_hash, None)
 
-    if image_tag_to_hash_hash != 0:
+
+def populate_tag_dicts_set_root(image_tag_to_hash, local_image_tag_to_hash, hdfs_root):
+    # Setting hdfs_root to None will default it to using the global
+    global HDFS_ROOT
+    if not hdfs_root:
+        hdfs_root = HDFS_ROOT
+
+    hdfs_get(hdfs_root + "/" + image_tag_to_hash, local_image_tag_to_hash, raise_on_error=True)
+    image_tag_to_hash_hash = calculate_file_hash(local_image_tag_to_hash)
+
+    if image_tag_to_hash_hash:
         hash_to_tags, tag_to_hash = read_image_tag_to_hash(local_image_tag_to_hash)
     else:
         hash_to_tags = {}
@@ -472,6 +548,7 @@
         hdfs_touchz(image_tag_to_hash_path)
         hdfs_chmod("755", image_tag_to_hash_path)
 
+
 def skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir):
     logging.info("Pulling image: %s", image)
     if os.path.isdir(skopeo_dir):
@@ -480,34 +557,33 @@
                         + "Directory: " + skopeo_dir)
     pull_fmt_string = get_pull_fmt_string(pull_format)
     shell_command(["skopeo", "copy", pull_fmt_string + image,
-                   skopeo_format + ":" + skopeo_dir], False, True, True, 600)
+                   skopeo_format + ":" + skopeo_dir], False, True, True)
+
 
 def untar_layer(tmp_dir, layer_path):
-    shell_command(["tar", "-C", tmp_dir, "--xattrs",
-                   "--xattrs-include='*'", "-xf", layer_path],
-                  False, True, True, 600)
+    shell_command(["tar", "-C", tmp_dir, "--xattrs", "--xattrs-include='*'", "-xf", layer_path], False, True, True)
+
 
 def tar_file_search(archive, target):
-    out, err, returncode = shell_command(["tar", "-xf", archive, target, "-O"],
-                                         False, False, False, 600)
+    out, err, rc = shell_command(["tar", "-xf", archive, target, "-O"], False, False, False)
     return out
 
+
 def set_fattr(directory):
-    shell_command(["setfattr", "-n", "trusted.overlay.opaque",
-                   "-v", "y", directory], False, True, True)
+    shell_command(["setfattr", "-n", "trusted.overlay.opaque", "-v", "y", directory], False, True, True)
+
 
 def make_whiteout_block_device(file_path, whiteout):
-    shell_command(["mknod", "-m", "000", file_path,
-                   "c", "0", "0"], False, True, True)
+    shell_command(["mknod", "-m", "000", file_path, "c", "0", "0"], False, True, True)
 
-    out, err, returncode = shell_command(["stat", "-c", "%U:%G", whiteout], False, True, True)
+    out, err, rc = shell_command(["stat", "-c", "%U:%G", whiteout], False, True, True)
     perms = str(out).strip()
 
     shell_command(["chown", perms, file_path], False, True, True)
 
+
 def convert_oci_whiteouts(tmp_dir):
-    out, err, returncode = shell_command(["find", tmp_dir, "-name", ".wh.*"],
-                                         False, False, True, 60)
+    out, err, rc = shell_command(["find", tmp_dir, "-name", ".wh.*"], False, False, True)
     whiteouts = str(out).splitlines()
     for whiteout in whiteouts:
         if whiteout == 0:
@@ -519,47 +595,47 @@
         else:
             whiteout_string = ".wh."
             idx = basename.rfind(whiteout_string)
-            bname = basename[idx+len(whiteout_string):]
+            bname = basename[idx + len(whiteout_string):]
             file_path = os.path.join(directory, bname)
             make_whiteout_block_device(file_path, whiteout)
         shell_command(["rm", whiteout], False, True, True)
 
+
 def dir_to_squashfs(tmp_dir, squash_path):
-    shell_command(["/usr/sbin/mksquashfs", tmp_dir, squash_path, "-write-queue", "4096",
-                   "-read-queue", "4096", "-fragment-queue", "4096"],
-                  False, True, True, 600)
+    shell_command(["mksquashfs", tmp_dir, squash_path, "-write-queue", "4096", "-read-queue", "4096",
+                   "-fragment-queue", "4096"],
+                  False, True, True)
 
-def upload_to_hdfs(file_path, file_name, hdfs_dir, replication, mode, force=False):
-    dest = hdfs_dir + "/" + file_name
 
+def upload_to_hdfs(src, dest, replication, mode, force=False):
     if does_hdfs_entry_exist(dest, raise_on_error=False):
         if not force:
-            logging.warn("Not uploading to HDFS. File already exists: %s", dest)
+            logging.warning("Not uploading to HDFS. File already exists: %s", dest)
             return
         logging.info("File already exists, but overwriting due to force option: %s", dest)
 
-    hdfs_put(file_path, dest, force)
+    hdfs_put(src, dest, force)
     hdfs_setrep(replication, dest)
     hdfs_chmod(mode, dest)
-    logging.info("Uploaded file %s with replication %d and permissions %s",
-                 dest, replication, mode)
+    logging.info(f"Uploaded file {dest} with replication {replication} and permissions {mode}")
 
-def atomic_upload_mv_to_hdfs(file_path, file_name, hdfs_dir, replication, image_tag_to_hash_file_hash):
+
+def atomic_upload_mv_to_hdfs(src, dest, replication, image_tag_to_hash_file_hash):
     global HADOOP_PREFIX
-    local_hash = calculate_file_hash(file_path)
+    global HADOOP_BIN_DIR
+
+    local_hash = calculate_file_hash(src)
     if local_hash == image_tag_to_hash_file_hash:
         logging.info("image_tag_to_hash file unchanged. Not uploading")
         return
 
-    tmp_file_name = file_name + ".tmp"
-    hdfs_tmp_path = hdfs_dir + "/" + tmp_file_name
-    hdfs_file_path = hdfs_dir + "/" + file_name
+    tmp_dest = dest + ".tmp"
     try:
-        if does_hdfs_entry_exist(hdfs_tmp_path, raise_on_error=False):
-            hdfs_rm(hdfs_tmp_path)
-        hdfs_put(file_path, hdfs_tmp_path)
-        hdfs_setrep(replication, hdfs_tmp_path)
-        hdfs_chmod("444", hdfs_tmp_path)
+        if does_hdfs_entry_exist(tmp_dest, raise_on_error=False):
+            hdfs_rm(tmp_dest)
+        hdfs_put(src, tmp_dest)
+        hdfs_setrep(replication, tmp_dest)
+        hdfs_chmod("444", tmp_dest)
 
         jar_path = HADOOP_PREFIX + "/share/hadoop/tools/lib/hadoop-extras-*.jar"
         jar_file = None
@@ -567,17 +643,18 @@
             jar_file = file
 
         if not jar_file:
-            raise Exception("SymlinkTool Jar doesn't exist: %s" % (jar_path))
+            raise Exception("SymlinkTool Jar doesn't exist: %s" % jar_path)
 
         logging.debug("jar_file: " + jar_file)
 
-        shell_command(["hadoop", "jar", jar_file, "org.apache.hadoop.tools.SymlinkTool",
-                       "mvlink", "-f", hdfs_tmp_path, hdfs_file_path], False, False, True)
+        shell_command([HADOOP_BIN_DIR + "/hadoop", "jar", jar_file, "org.apache.hadoop.tools.SymlinkTool",
+                       "mvlink", "-f", tmp_dest, dest], False, False, True)
 
-    except:
-        if does_hdfs_entry_exist(hdfs_tmp_path, raise_on_error=False):
-            hdfs_rm(hdfs_tmp_path)
-        raise Exception("image tag to hash file upload failed")
+    except Exception as ex:
+        if does_hdfs_entry_exist(tmp_dest, raise_on_error=False):
+            hdfs_rm(tmp_dest)
+        raise Exception("image tag to hash file upload failed, exception=" + str(ex))
+
 
 def docker_to_squash(layer_dir, layer, working_dir):
     tmp_dir = os.path.join(working_dir, "expand_archive_" + layer)
@@ -595,8 +672,7 @@
         dir_to_squashfs(tmp_dir, squash_path)
     finally:
         os.remove(layer_path)
-        shell_command(["rm", "-rf", tmp_dir],
-                      False, True, True)
+        shell_command(["rm", "-rf", tmp_dir], False, True, True)
 
 
 def check_image_for_magic_file(magic_file, skopeo_dir, layers):
@@ -608,13 +684,18 @@
             logging.debug("Found magic file %s in layer %s", magic_file_absolute, layer)
             logging.debug("Magic file %s has contents:\n%s", magic_file_absolute, ret)
             return ret
-    raise Exception("Magic file %s doesn't exist in any layer" %
-                    (magic_file_absolute))
+    raise Exception(f"Magic file {magic_file_absolute} doesn't exist in any layer")
+
 
 def pull_build_push_update(args):
+    global HDFS_ROOT
+    global HDFS_MANIFEST_DIR
+    global HDFS_CONFIG_DIR
+    global HDFS_LAYERS_DIR
+    global HDFS_UNREF_DIR
+
     skopeo_format = args.skopeo_format
     pull_format = args.pull_format
-    hdfs_root = args.hdfs_root
     image_tag_to_hash = args.image_tag_to_hash
     replication = args.replication
     force = args.force
@@ -623,28 +704,22 @@
     magic_file = args.magic_file
     bootstrap = args.bootstrap
 
-    hdfs_layers_dir = hdfs_root + "/layers"
-    hdfs_config_dir = hdfs_root + "/config"
-    hdfs_manifest_dir = hdfs_root + "/manifests"
     working_dir = None
 
-
     try:
         working_dir = get_working_dir(args.working_dir)
         local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
         if bootstrap:
-            hdfs_dirs = [hdfs_root, hdfs_layers_dir, hdfs_config_dir, hdfs_manifest_dir]
-            image_tag_to_hash_path = hdfs_root + "/" + image_tag_to_hash
+            hdfs_dirs = [HDFS_ROOT, HDFS_LAYERS_DIR, HDFS_CONFIG_DIR, HDFS_MANIFEST_DIR, HDFS_UNREF_DIR]
+            image_tag_to_hash_path = HDFS_ROOT + "/" + image_tag_to_hash
             setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
-        hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
-                                                                               image_tag_to_hash,
+        hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
                                                                                local_image_tag_to_hash)
 
         for image_and_tag_arg in images_and_tags:
             image, tags = split_image_and_tag(image_and_tag_arg)
             if not image or not tags:
-                raise Exception("Positional parameter requires an image and at least 1 tag: "
-                                + image_and_tag_arg)
+                raise Exception("Positional parameter requires an image and at least 1 tag: " + image_and_tag_arg)
 
             logging.info("Working on image %s with tags %s", image, str(tags))
             manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
@@ -655,30 +730,18 @@
             logging.debug("Layers: %s", str(layers))
             logging.debug("Config: %s", str(config_hash))
 
-            update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
-                                           manifest_hash, image)
+            update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, image)
 
-            all_layers_exist = True
-
-            if not does_hdfs_entry_exist(hdfs_manifest_dir + "/" + manifest_hash,
-                                         raise_on_error=False):
-                all_layers_exist = False
-
-            if not does_hdfs_entry_exist(hdfs_config_dir + "/" + config_hash,
-                                         raise_on_error=False):
-                all_layers_exist = False
+            hdfs_files_to_check = [HDFS_MANIFEST_DIR + "/" + manifest_hash, HDFS_CONFIG_DIR + "/" + config_hash]
 
             for layer in layers:
-                hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
-                if not does_hdfs_entry_exist(hdfs_squash_path, raise_on_error=False):
-                    all_layers_exist = False
-                    break
+                hdfs_files_to_check.append(HDFS_LAYERS_DIR + "/" + layer + ".sqsh")
 
-            if all_layers_exist:
+            if does_hdfs_entry_exist(hdfs_files_to_check, raise_on_error=False):
                 if not force:
-                    logging.info("All layers exist in HDFS, skipping this image")
+                    logging.info("All image files exist in HDFS, skipping this image")
                     continue
-                logging.info("All layers exist in HDFS, but force option set, so overwriting image")
+                logging.info("All image files exist in HDFS, but force option set, so overwriting image")
 
             skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
             logging.debug("skopeo_dir: %s", skopeo_dir)
@@ -690,40 +753,35 @@
 
             for layer in layers:
                 logging.info("Squashifying and uploading layer: %s", layer)
-                hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
+                hdfs_squash_path = HDFS_LAYERS_DIR + "/" + layer + ".sqsh"
                 if does_hdfs_entry_exist(hdfs_squash_path, raise_on_error=False):
                     if force:
-                        logging.info("Layer already exists, but overwriting due to force"
-                                     + "option: %s", layer)
+                        logging.info(f"Layer already exists, but overwriting due to force option: {layer}")
                     else:
-                        logging.info("Layer exists. Skipping and not squashifying or"
-                                     + "uploading: %s", layer)
+                        logging.info(f"Layer exists. Skipping and not squashifying or uploading: {layer}")
                         continue
 
                 docker_to_squash(skopeo_dir, layer, working_dir)
                 squash_path = os.path.join(skopeo_dir, layer + ".sqsh")
                 squash_name = os.path.basename(squash_path)
-                upload_to_hdfs(squash_path, squash_name, hdfs_layers_dir, replication, "444", force)
-
+                upload_to_hdfs(squash_path, HDFS_LAYERS_DIR + "/" + squash_name, replication, "444", force)
 
             config_local_path = os.path.join(skopeo_dir, config_hash)
             upload_to_hdfs(config_local_path,
-                           os.path.basename(config_local_path),
-                           hdfs_config_dir, replication, "444", force)
+                           HDFS_CONFIG_DIR + "/" + os.path.basename(config_local_path),
+                           replication, "444", force)
 
             manifest_local_path = os.path.join(skopeo_dir, "manifest.json")
-            upload_to_hdfs(manifest_local_path, manifest_hash,
-                           hdfs_manifest_dir, replication, "444", force)
+            upload_to_hdfs(manifest_local_path, HDFS_MANIFEST_DIR + "/" + manifest_hash, replication, "444", force)
 
         write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
-        atomic_upload_mv_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
-                                 hdfs_root, replication,
-                                 image_tag_to_hash_hash)
+        atomic_upload_mv_to_hdfs(local_image_tag_to_hash, HDFS_ROOT + "/" + image_tag_to_hash,
+                                 replication, image_tag_to_hash_hash)
     finally:
         if working_dir:
             if os.path.isdir(working_dir):
-                shell_command(["rm", "-rf", working_dir],
-                              False, True, True)
+                shell_command(["rm", "-rf", working_dir], False, True, True)
+
 
 def pull_build(args):
     skopeo_format = args.skopeo_format
@@ -735,72 +793,72 @@
     for image_and_tag_arg in images_and_tags:
         image, tags = split_image_and_tag(image_and_tag_arg)
         if not image or not tags:
-            raise Exception("Positional parameter requires an image and at least 1 tag: "
-                            + image_and_tag_arg)
+            raise Exception("Positional parameter requires an image and at least 1 tag: " + image_and_tag_arg)
 
-        logging.info("Working on image %s with tags %s", image, str(tags))
+        logging.info(f"Working on image {image} with tags {tags}")
         manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
 
         layers = get_layer_hashes_from_manifest(manifest)
         config_hash = get_config_hash_from_manifest(manifest)
 
-        logging.debug("Layers: %s", str(layers))
-        logging.debug("Config: %s", str(config_hash))
-
+        logging.debug(f"Layers: {layers}")
+        logging.debug(f"Config: {config_hash}")
+        skopeo_dir = None
 
         try:
             working_dir = get_working_dir(args.working_dir)
             skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
-            logging.debug("skopeo_dir: %s", skopeo_dir)
+            logging.debug(f"skopeo_dir: {skopeo_dir}")
             skopeo_copy_image(pull_format, image, skopeo_format, skopeo_dir)
 
             if check_magic_file:
                 check_image_for_magic_file(magic_file, skopeo_dir, layers)
 
             for layer in layers:
-                logging.info("Squashifying layer: %s", layer)
+                logging.info(f"Squashifying layer: {layer}")
                 docker_to_squash(skopeo_dir, layer, working_dir)
 
-        except:
-            if os.path.isdir(skopeo_dir):
+        except Exception as _:
+            if skopeo_dir and os.path.isdir(skopeo_dir):
                 shutil.rmtree(skopeo_dir)
             raise
 
+
 def push_update(args):
-    hdfs_root = args.hdfs_root
+    global HDFS_ROOT
+    global HDFS_MANIFEST_DIR
+    global HDFS_CONFIG_DIR
+    global HDFS_LAYERS_DIR
+    global HDFS_UNREF_DIR
+
     image_tag_to_hash = args.image_tag_to_hash
     replication = args.replication
     force = args.force
     images_and_tags = args.images_and_tags
     bootstrap = args.bootstrap
 
-    hdfs_layers_dir = hdfs_root + "/layers"
-    hdfs_config_dir = hdfs_root + "/config"
-    hdfs_manifest_dir = hdfs_root + "/manifests"
     local_image_tag_to_hash = None
 
     try:
         working_dir = get_working_dir(args.working_dir)
         local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
         if bootstrap:
-            hdfs_dirs = [hdfs_root, hdfs_layers_dir, hdfs_config_dir, hdfs_manifest_dir]
-            image_tag_to_hash_path = hdfs_root + "/" + image_tag_to_hash
+            hdfs_dirs = [HDFS_ROOT, HDFS_LAYERS_DIR, HDFS_CONFIG_DIR, HDFS_MANIFEST_DIR, HDFS_UNREF_DIR]
+            image_tag_to_hash_path = HDFS_ROOT + "/" + image_tag_to_hash
             setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
-        hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
-                                                                               image_tag_to_hash,
+        hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
                                                                                local_image_tag_to_hash)
 
         for image_and_tag_arg in images_and_tags:
             image, tags = split_image_and_tag(image_and_tag_arg)
             if not image or not tags:
-                raise Exception("Positional parameter requires an image and at least 1 tag: "
-                                + image_and_tag_arg)
+                raise Exception("Positional parameter requires an image and at least 1 tag: " + image_and_tag_arg)
 
             logging.info("Working on image %s with tags %s", image, str(tags))
             skopeo_dir = os.path.join(working_dir, image.split("/")[-1])
             if not os.path.exists(skopeo_dir):
-                raise Exception("skopeo_dir doesn't exists: %s" % (skopeo_dir))
-            manifest, manifest_hash = get_local_manifest_from_path(skopeo_dir + "/manifest.json")
+                raise Exception(f"skopeo_dir doesn't exists: {skopeo_dir}")
+            manifest, manifest_hash = get_local_manifest_from_path(f"{skopeo_dir}/manifest.json")
 
             layers = get_layer_hashes_from_manifest(manifest)
             config_hash = get_config_hash_from_manifest(manifest)
@@ -808,60 +866,43 @@
             logging.debug("Layers: %s", str(layers))
             logging.debug("Config: %s", str(config_hash))
 
-            update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
-                                           manifest_hash, image)
+            update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, image)
 
-            all_layers_exist = True
-
-            if not does_hdfs_entry_exist(hdfs_manifest_dir + "/" + manifest_hash,
-                                         raise_on_error=False):
-                all_layers_exist = False
-
-            if not does_hdfs_entry_exist(hdfs_config_dir + "/" + config_hash,
-                                         raise_on_error=False):
-                all_layers_exist = False
+            hdfs_files_to_check = [f"{HDFS_MANIFEST_DIR}/{manifest_hash}", f"{HDFS_CONFIG_DIR}/{config_hash}"]
 
             for layer in layers:
-                hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
-                if not does_hdfs_entry_exist(hdfs_squash_path, raise_on_error=False):
-                    all_layers_exist = False
-                    break
+                hdfs_files_to_check.append(f"{HDFS_LAYERS_DIR}/{layer}.sqsh")
 
-            if all_layers_exist:
+            if does_hdfs_entry_exist(hdfs_files_to_check, raise_on_error=False):
                 if not force:
-                    logging.info("All layers exist in HDFS, skipping this image")
+                    logging.info("All image files exist in HDFS, skipping this image")
                     continue
-                logging.info("All layers exist in HDFS, but force option set, so overwriting image")
+                logging.info("All image files exist in HDFS, but force option set, so overwriting image")
 
             for layer in layers:
-                hdfs_squash_path = hdfs_layers_dir + "/" + layer + ".sqsh"
+                hdfs_squash_path = f"{HDFS_LAYERS_DIR}/{layer}.sqsh"
                 if does_hdfs_entry_exist(hdfs_squash_path, raise_on_error=False):
                     if force:
-                        logging.info("Layer already exists, but overwriting due to force"
-                                     + "option: %s", layer)
+                        logging.info(f"Layer already exists, but overwriting due to force option: {layer}")
                     else:
-                        logging.info("Layer exists. Skipping and not squashifying or"
-                                     + "uploading: %s", layer)
+                        logging.info(f"Layer exists. Skipping and not squashifying or uploading: {layer}")
                         continue
 
                 squash_path = os.path.join(skopeo_dir, layer + ".sqsh")
                 squash_name = os.path.basename(squash_path)
-                upload_to_hdfs(squash_path, squash_name, hdfs_layers_dir, replication, "444", force)
-
+                upload_to_hdfs(squash_path, f"{HDFS_LAYERS_DIR}/{squash_name}", replication, "444", force)
 
             config_local_path = os.path.join(skopeo_dir, config_hash)
             upload_to_hdfs(config_local_path,
-                           os.path.basename(config_local_path),
-                           hdfs_config_dir, replication, "444", force)
+                           f"{HDFS_CONFIG_DIR}/" + os.path.basename(config_local_path),
+                           replication, "444", force)
 
             manifest_local_path = os.path.join(skopeo_dir, "manifest.json")
-            upload_to_hdfs(manifest_local_path, manifest_hash,
-                           hdfs_manifest_dir, replication, "444", force)
+            upload_to_hdfs(manifest_local_path, HDFS_MANIFEST_DIR + "/" + manifest_hash, replication, "444", force)
 
         write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
-        atomic_upload_mv_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
-                                 hdfs_root, replication,
-                                 image_tag_to_hash_hash)
+        atomic_upload_mv_to_hdfs(local_image_tag_to_hash, f"{HDFS_ROOT}/{image_tag_to_hash}",
+                                 replication, image_tag_to_hash_hash)
     finally:
         if local_image_tag_to_hash:
             if os.path.isfile(local_image_tag_to_hash):
@@ -869,7 +910,12 @@
 
 
 def remove_image(args):
-    hdfs_root = args.hdfs_root
+    global HDFS_ROOT
+    global HDFS_MANIFEST_DIR
+    global HDFS_CONFIG_DIR
+    global HDFS_LAYERS_DIR
+    global HDFS_UNREF_DIR
+
     image_tag_to_hash = args.image_tag_to_hash
     replication = args.replication
     images_or_tags = args.images_or_tags
@@ -878,107 +924,38 @@
     try:
         working_dir = get_working_dir(args.working_dir)
         local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
-        hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
-                                                                               image_tag_to_hash,
-                                                                               local_image_tag_to_hash)
-
-        logging.debug("hash_to_tags: %s", str(hash_to_tags))
-        logging.debug("tag_to_hash: %s", str(tag_to_hash))
-
-        hdfs_layers_dir = hdfs_root + "/layers"
-        hdfs_config_dir = hdfs_root + "/config"
-        hdfs_manifest_dir = hdfs_root + "/manifests"
-
-        delete_list = []
-
-        known_images, err, returncode = hdfs_ls(hdfs_manifest_dir, "-C", False, False, False)
-        known_images = known_images.split()
-
-        logging.debug("known_images:\n%s", known_images)
-
-        layers_to_keep = []
 
         images_and_tags_to_remove = []
-        images_to_remove = []
         for image_or_tag_arg in images_or_tags:
             images_and_tags_to_remove.extend(image_or_tag_arg.split(","))
 
         logging.debug("images_and_tags_to_remove:\n%s", images_and_tags_to_remove)
 
-        if isinstance(images_and_tags_to_remove, Iterable):
-            for image in images_and_tags_to_remove:
-                if is_sha256_hash(image):
-                    image_hash = image
-                else:
-                    image_hash = tag_to_hash.get(image, None)
-                if image_hash:
-                    images_to_remove.append(hdfs_manifest_dir + "/" + image_hash)
-        else:
-            image = images_and_tags_to_remove[0]
-            if is_sha256_hash(image):
-                image_hash = image
-            else:
-                image_hash = tag_to_hash.get(image, None)
-            if image_hash:
-                images_to_remove.append(hdfs_manifest_dir + "/" + image_hash)
+        hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
+                                                                               local_image_tag_to_hash)
+
+        logging.debug("hash_to_tags: %s", str(hash_to_tags))
+        logging.debug("tag_to_hash: %s", str(tag_to_hash))
+
+        known_images = get_all_known_images()
+        if not known_images:
+            logging.warn("No known images\n")
+            return
+
+        images_to_remove = get_images_from_args(images_and_tags_to_remove, tag_to_hash, known_images)
 
         logging.debug("images_to_remove:\n%s", images_to_remove)
         if not images_to_remove:
-            logging.warn("No images to remove")
+            logging.warning("No images to remove")
             return
 
-        for image in known_images:
-            if image not in images_to_remove:
-                manifest, manifest_hash = get_hdfs_manifest_from_path(image)
-                layers = get_layer_hashes_from_manifest(manifest, False)
-                layers_to_keep.extend(layers)
+        delete_list = get_delete_list_from_images_to_remove(images_to_remove, known_images)
 
-        logging.debug("layers_to_keep:\n%s", layers_to_keep)
-
-        for image_or_tag_arg in images_or_tags:
-            images = image_or_tag_arg.split(",")
-            for image in images:
-                logging.info("removing image: %s", image)
-                if is_sha256_hash(image):
-                    logging.debug("image is sha256")
-                    image_hash = image
-                else:
-                    image_hash = tag_to_hash.get(image, None)
-                    if image_hash:
-                        logging.debug("image tag exists for %s", image)
-                    else:
-                        logging.info("Not removing %s. Image tag doesn't exist", image)
-                        continue
-                manifest_path = hdfs_manifest_dir + "/" + image_hash
-                if does_hdfs_entry_exist(manifest_path, raise_on_error=False):
-                    logging.debug("image manifest for %s exists: %s", image, manifest_path)
-                else:
-                    logging.info("Not removing %s. Image manifest doesn't exist: %s", image, manifest_path)
-                    continue
-
-                delete_list.append(manifest_path)
-
-                manifest, manifest_hash = get_hdfs_manifest_from_path(manifest_path)
-
-                config_hash = get_config_hash_from_manifest(manifest)
-                logging.debug("config_hash: %s", config_hash)
-
-                delete_list.append(hdfs_config_dir + "/" + config_hash)
-
-                layers = get_layer_hashes_from_manifest(manifest, False)
-                layers_paths = []
-                for layer in layers:
-                    if layer not in layers_to_keep:
-                        layers_paths.append(hdfs_layers_dir + "/" + layer + ".sqsh")
-                delete_list.extend(layers_paths)
-
-                logging.debug("delete_list: %s", delete_list)
-
-                remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_hash)
+        for image_to_remove in images_to_remove:
+            remove_image_hash_from_dicts(hash_to_tags, tag_to_hash, image_to_remove.manifest_stat.name)
 
         write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
-        atomic_upload_mv_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
-                                 hdfs_root, replication,
+        atomic_upload_mv_to_hdfs(local_image_tag_to_hash, HDFS_ROOT + "/" + image_tag_to_hash, replication,
                                  image_tag_to_hash_hash)
 
         hdfs_rm(delete_list)
@@ -988,22 +965,87 @@
             if os.path.isdir(working_dir):
                 shutil.rmtree(working_dir)
 
+
+def get_images_from_args(images_and_tags, tag_to_hash, known_images):
+    images = []
+
+    if isinstance(images_and_tags, Iterable):
+        for image_arg in images_and_tags:
+            image = get_image_hash_from_arg(image_arg, tag_to_hash, known_images)
+            if image:
+                images.append(image)
+    else:
+        image_arg = images_and_tags[0]
+        image = get_image_hash_from_arg(image_arg, tag_to_hash, known_images)
+        if image:
+            images.append(image)
+
+    return images
+
+
+def get_image_hash_from_arg(image_str, tag_to_hash, known_images):
+    if is_sha256_hash(image_str):
+        image_hash = image_str
+    else:
+        image_hash = tag_to_hash.get(image_str, None)
+
+    if image_hash:
+        image = get_known_image_by_hash(image_hash, known_images)
+    else:
+        logging.warn("image tag unknown: %s", image_str)
+        return None
+
+    return image
+
+
+def get_delete_list_from_images_to_remove(images_to_remove, known_images):
+    global HDFS_ROOT
+    global HDFS_MANIFEST_DIR
+    global HDFS_CONFIG_DIR
+    global HDFS_LAYERS_DIR
+    global HDFS_UNREF_DIR
+
+    layers_to_keep = []
+    delete_list = []
+
+    for image in known_images:
+        if image not in images_to_remove:
+            layers_to_keep.extend(image.layers)
+
+    for image_to_remove in images_to_remove:
+        delete_list.append(HDFS_MANIFEST_DIR + "/" + image_to_remove.manifest_stat.name)
+        delete_list.append(HDFS_CONFIG_DIR + "/" + image_to_remove.config)
+        if image_to_remove.unref_file_stat:
+            delete_list.append(HDFS_UNREF_DIR + "/" + image_to_remove.unref_file_stat.name)
+
+        layers = image_to_remove.layers
+        for layer in layers:
+            if layer not in layers_to_keep:
+                layer_path = HDFS_LAYERS_DIR + "/" + layer + ".sqsh"
+                if layer_path not in delete_list:
+                    delete_list.append(layer_path)
+
+    logging.debug("delete_list:\n%s", delete_list)
+
+    return delete_list
+
+
 def add_remove_tag(args):
+    global HDFS_ROOT
+    global HDFS_MANIFEST_DIR
+
     pull_format = args.pull_format
-    hdfs_root = args.hdfs_root
     image_tag_to_hash = args.image_tag_to_hash
     replication = args.replication
     sub_command = args.sub_command
     images_and_tags = args.images_and_tags
 
-    hdfs_manifest_dir = hdfs_root + "/manifests"
     working_dir = None
 
     try:
         working_dir = get_working_dir(args.working_dir)
         local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
-        hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
-                                                                               image_tag_to_hash,
+        hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
                                                                                local_image_tag_to_hash)
 
         for image_and_tag_arg in images_and_tags:
@@ -1015,31 +1057,33 @@
                     manifest_hash = tag_to_hash.get(image, None)
 
                 if manifest_hash:
-                    manifest_path = hdfs_manifest_dir + "/" + manifest_hash
+                    manifest_path = HDFS_MANIFEST_DIR + "/" + manifest_hash
                     out, err, returncode = hdfs_cat(manifest_path)
                     manifest = json.loads(out)
                     logging.debug("image tag exists for %s", image)
                 else:
                     manifest, manifest_hash = get_manifest_from_docker_image(pull_format, image)
 
-                update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags,
-                                               manifest_hash, image)
+                update_dicts_for_multiple_tags(hash_to_tags, tag_to_hash, tags, manifest_hash, image)
 
             elif sub_command == "remove-tag":
                 tags = image_and_tag_arg.split(",")
+                image = None
+                manifest = None
+                manifest_hash = 0
                 remove_from_dicts(hash_to_tags, tag_to_hash, tags)
             else:
-                raise Exception("Invalid sub_command: %s" % (sub_command))
+                raise Exception(f"Invalid sub_command: {sub_command}")
 
         write_local_image_tag_to_hash(local_image_tag_to_hash, hash_to_tags)
-        atomic_upload_mv_to_hdfs(local_image_tag_to_hash, image_tag_to_hash,
-                                 hdfs_root, replication,
+        atomic_upload_mv_to_hdfs(local_image_tag_to_hash, HDFS_ROOT + "/" + image_tag_to_hash, replication,
                                  image_tag_to_hash_hash)
     finally:
         if working_dir:
             if os.path.isdir(working_dir):
                 shutil.rmtree(working_dir)
 
+
 def copy_update(args):
     image_tag_to_hash = args.image_tag_to_hash
     replication = args.replication
@@ -1051,15 +1095,17 @@
 
     src_layers_dir = src_root + "/layers"
     src_config_dir = src_root + "/config"
-    src_manifest_dir = src_root  + "/manifests"
+    src_manifest_dir = src_root + "/manifests"
     dest_layers_dir = dest_root + "/layers"
     dest_config_dir = dest_root + "/config"
-    dest_manifest_dir = dest_root  + "/manifests"
+    dest_manifest_dir = dest_root + "/manifests"
+    dest_unref_dir = dest_root + "/unreferenced"
 
     if bootstrap:
-        hdfs_dirs = [dest_root, dest_layers_dir, dest_config_dir, dest_manifest_dir]
+        hdfs_dirs = [dest_root, dest_layers_dir, dest_config_dir, dest_manifest_dir, dest_unref_dir]
         image_tag_to_hash_path = dest_root + "/" + image_tag_to_hash
         setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
+
     working_dir = None
 
     try:
@@ -1069,8 +1115,11 @@
         local_dest_image_tag_to_hash = os.path.join(working_dir, "dest-"
                                                     + os.path.basename(image_tag_to_hash))
 
-        src_hash_to_tags, src_tag_to_hash, src_image_tag_to_hash_hash = populate_tag_dicts(src_root, image_tag_to_hash, local_src_image_tag_to_hash)
-        dest_hash_to_tags, dest_tag_to_hash, dest_image_tag_to_hash_hash = populate_tag_dicts(dest_root, image_tag_to_hash, local_dest_image_tag_to_hash)
+        src_hash_to_tags, src_tag_to_hash, src_image_tag_to_hash_hash = populate_tag_dicts_set_root(image_tag_to_hash,
+                                                                                                    local_src_image_tag_to_hash,
+                                                                                                    src_root)
+        dest_hash_to_tags, dest_tag_to_hash, dest_image_tag_to_hash_hash = populate_tag_dicts_set_root(
+            image_tag_to_hash, local_dest_image_tag_to_hash, dest_root)
 
         for image_and_tag_arg in images_and_tags:
             image, tags = split_image_and_tag(image_and_tag_arg)
@@ -1101,21 +1150,30 @@
             logging.debug("Copying Layers: %s", str(src_layers_paths))
             logging.debug("Copying Config: %s", str(src_config_hash))
 
-            hdfs_cp(src_layers_paths, dest_layers_dir, force)
-            hdfs_cp(src_config_path, dest_config_dir, force)
-            hdfs_cp(src_manifest_path, dest_manifest_dir, force)
+            if not does_hdfs_entry_exist(dest_layers_paths, raise_on_error=False):
+                dest_layers_paths = []
+                for layer in src_layers:
+                    dest_layer_path = dest_layers_dir + "/" + layer + ".sqsh"
+                    src_layer_path = src_layers_dir + "/" + layer + ".sqsh"
+                    if not does_hdfs_entry_exist(dest_layer_path, raise_on_error=False):
+                        hdfs_cp(src_layer_path, dest_layer_path, force)
+                        dest_layers_paths.append(dest_layer_path)
+                hdfs_setrep(replication, dest_layers_paths)
+                hdfs_chmod("444", dest_layers_paths)
 
-            hdfs_setrep(replication, dest_layers_paths)
-            hdfs_setrep(replication, dest_config_path)
-            hdfs_setrep(replication, dest_manifest_path)
+            if not does_hdfs_entry_exist(dest_config_path, raise_on_error=False):
+                hdfs_cp(src_config_path, dest_config_dir, force)
+                hdfs_setrep(replication, dest_config_path)
+                hdfs_chmod("444", dest_config_path)
 
-            hdfs_chmod("444", dest_layers_paths)
-            hdfs_chmod("444", dest_config_path)
-            hdfs_chmod("444", dest_manifest_path)
+            if not does_hdfs_entry_exist(dest_manifest_path, raise_on_error=False):
+                hdfs_cp(src_manifest_path, dest_manifest_dir, force)
+                hdfs_setrep(replication, dest_manifest_path)
+                hdfs_chmod("444", dest_manifest_path)
 
             for tag in tags:
-                new_tags_and_comments = src_hash_to_tags.get(src_manifest_hash, None)
                 comment = None
+                new_tags_and_comments = src_hash_to_tags.get(src_manifest_hash, None)
                 if new_tags_and_comments:
                     comment = ', '.join(map(str, new_tags_and_comments[1]))
                 if comment is None:
@@ -1124,8 +1182,8 @@
                 update_dicts(dest_hash_to_tags, dest_tag_to_hash, tag, src_manifest_hash, comment)
 
             write_local_image_tag_to_hash(local_dest_image_tag_to_hash, dest_hash_to_tags)
-            atomic_upload_mv_to_hdfs(local_dest_image_tag_to_hash, image_tag_to_hash,
-                                     dest_root, replication,
+            atomic_upload_mv_to_hdfs(local_dest_image_tag_to_hash, dest_root + "/" + image_tag_to_hash,
+                                     replication,
                                      dest_image_tag_to_hash_hash)
 
     finally:
@@ -1133,8 +1191,13 @@
             if os.path.isdir(working_dir):
                 shutil.rmtree(working_dir)
 
+
 def query_tag(args):
-    hdfs_root = args.hdfs_root
+    global HDFS_ROOT
+    global HDFS_MANIFEST_DIR
+    global HDFS_CONFIG_DIR
+    global HDFS_LAYERS_DIR
+
     image_tag_to_hash = args.image_tag_to_hash
     tags = args.tags
     working_dir = None
@@ -1142,24 +1205,19 @@
     try:
         working_dir = get_working_dir(args.working_dir)
         local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
-        hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(hdfs_root,
-                                                                               image_tag_to_hash,
+        hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
                                                                                local_image_tag_to_hash)
 
         logging.debug("hash_to_tags: %s", str(hash_to_tags))
         logging.debug("tag_to_hash: %s", str(tag_to_hash))
 
-        hdfs_layers_dir = hdfs_root + "/layers"
-        hdfs_config_dir = hdfs_root + "/config"
-        hdfs_manifest_dir = hdfs_root + "/manifests"
-
         for tag in tags:
             image_hash = tag_to_hash.get(tag, None)
             if not image_hash:
                 logging.info("image hash mapping doesn't exist for tag %s", tag)
                 continue
 
-            manifest_path = hdfs_manifest_dir + "/" + image_hash
+            manifest_path = HDFS_MANIFEST_DIR + "/" + image_hash
             if does_hdfs_entry_exist(manifest_path, raise_on_error=False):
                 logging.debug("image manifest for %s exists: %s", tag, manifest_path)
             else:
@@ -1169,9 +1227,9 @@
             manifest, manifest_hash = get_hdfs_manifest_from_path(manifest_path)
             layers = get_layer_hashes_from_manifest(manifest, False)
             config_hash = get_config_hash_from_manifest(manifest)
-            config_path = hdfs_config_dir + "/" + config_hash
+            config_path = HDFS_CONFIG_DIR + "/" + config_hash
 
-            layers_paths = [hdfs_layers_dir + "/" + layer + ".sqsh" for layer in layers]
+            layers_paths = [HDFS_LAYERS_DIR + "/" + layer + ".sqsh" for layer in layers]
 
             logging.info("Image info for '%s'", tag)
             logging.info(manifest_path)
@@ -1184,27 +1242,324 @@
             if os.path.isdir(working_dir):
                 shutil.rmtree(working_dir)
 
+
 def list_tags(args):
-    hdfs_root = args.hdfs_root
+    global HDFS_ROOT
+
     image_tag_to_hash = args.image_tag_to_hash
 
-    hdfs_image_tag_to_hash = hdfs_root + "/" + image_tag_to_hash
-    if does_hdfs_entry_exist(hdfs_image_tag_to_hash, raise_on_error=False):
-        hdfs_cat(hdfs_image_tag_to_hash, True, True, False)
-    else:
-        logging.error("image-tag-to-hash file doesn't exist: %s", hdfs_image_tag_to_hash)
+    hdfs_image_tag_to_hash = HDFS_ROOT + "/" + image_tag_to_hash
+    hdfs_cat(hdfs_image_tag_to_hash, True, True, True)
+
 
 def bootstrap_setup(args):
-    hdfs_root = args.hdfs_root
-    image_tag_to_hash = args.image_tag_to_hash
-    hdfs_layers_dir = hdfs_root + "/layers"
-    hdfs_config_dir = hdfs_root + "/config"
-    hdfs_manifest_dir = hdfs_root + "/manifests"
+    global HDFS_ROOT
+    global HDFS_MANIFEST_DIR
+    global HDFS_CONFIG_DIR
+    global HDFS_LAYERS_DIR
+    global HDFS_UNREF_DIR
 
-    hdfs_dirs = [hdfs_root, hdfs_layers_dir, hdfs_config_dir, hdfs_manifest_dir]
-    image_tag_to_hash_path = hdfs_root + "/" + image_tag_to_hash
+    image_tag_to_hash = args.image_tag_to_hash
+
+    hdfs_dirs = [HDFS_ROOT, HDFS_LAYERS_DIR, HDFS_CONFIG_DIR, HDFS_MANIFEST_DIR, HDFS_UNREF_DIR]
+    image_tag_to_hash_path = HDFS_ROOT + "/" + image_tag_to_hash
     setup_squashfs_hdfs_dirs(hdfs_dirs, image_tag_to_hash_path)
 
+
+def cleanup_untagged_images(args):
+    global HDFS_ROOT
+    global HDFS_MANIFEST_DIR
+    global HDFS_CONFIG_DIR
+    global HDFS_LAYERS_DIR
+    global HDFS_UNREF_DIR
+    global DEAD_PERMS
+
+    image_tag_to_hash = args.image_tag_to_hash
+    working_dir = get_working_dir(args.working_dir)
+
+    local_image_tag_to_hash = os.path.join(working_dir, os.path.basename(image_tag_to_hash))
+
+    try:
+        hash_to_tags, tag_to_hash, image_tag_to_hash_hash = populate_tag_dicts(image_tag_to_hash,
+                                                                               local_image_tag_to_hash)
+        logging.debug("hash_to_tags: %s\n", hash_to_tags)
+        logging.debug("tag_to_hash: %s\n", tag_to_hash)
+
+        known_images = get_all_known_images()
+        tagged_images = [image for image in known_images if image.manifest_stat.name in hash_to_tags.keys()]
+        untagged_images = get_untagged_images(known_images, tagged_images)
+        stale_images = get_stale_images(untagged_images)
+        dead_images = get_dead_images(untagged_images)
+
+        cleanup_handle_tagged_images(tagged_images)
+        cleanup_handle_untagged_images(untagged_images)
+        cleanup_handle_stale_images(stale_images)
+        cleanup_handle_dead_images(dead_images, known_images)
+
+    finally:
+        if working_dir:
+            if os.path.isdir(working_dir):
+                shutil.rmtree(working_dir)
+
+
+class Filestat:
+    def __init__(self, perms, mod_time, name):
+        self.perms = perms
+        self.mod_time = mod_time
+        self.name = name
+
+    def __repr__(self):
+        return "Perms: {}, Mod time: {}, Name: {}\n".format(self.perms, self.mod_time, self.name)
+
+    def __str__(self):
+        return "Perms: {}, Mod time: {}, Name: {}\n".format(self.perms, self.mod_time, self.name)
+
+
+class Image:
+    def __init__(self, manifest_stat, layers, config, unref_file_stat):
+        self.manifest_stat = manifest_stat
+        self.layers = layers
+        self.config = config
+        self.unref_file_stat = unref_file_stat
+
+    def __repr__(self):
+        return "Manifest: {}, Layers: {}, Config: {}, Unreferenced File: {}\n".format(self.manifest_stat, self.layers,
+                                                                                      self.config, self.unref_file_stat)
+
+    def __str__(self):
+        return "Manifest: {}, Layers: {}, Config: {}, Unreferenced File: {}\n".format(self.manifest_stat, self.layers,
+                                                                                      self.config, self.unref_file_stat)
+
+
+def get_all_known_images():
+    global HDFS_MANIFEST_DIR
+    global HDFS_UNREF_DIR
+
+    known_manifest_paths, err, returncode = hdfs_stat(HDFS_MANIFEST_DIR + "/*", "%a %Y %n", False, False, False)
+    known_manifests = [image for image in known_manifest_paths.split("\n") if image is not "" or None]
+
+    unref_manifest_paths, err, returncode = hdfs_stat(HDFS_UNREF_DIR + "/*", "%a %Y %n", False, False, False)
+    logging.debug("unref_manifest_paths:\n%s", unref_manifest_paths)
+    unref_manifests = []
+    if unref_manifest_paths:
+        unref_manifests = [image for image in unref_manifest_paths.split("\n") if image is not "" or None]
+    logging.debug("unref_manifests:\n%s", unref_manifests)
+
+    unref_manifests_stats_dict = {}
+    if unref_manifests:
+        for unref_manifest in unref_manifests:
+            unref_manifest_split = unref_manifest.split()
+            unref_manifest_perms = unref_manifest_split[0]
+            unref_manifest_mod_time = long(unref_manifest_split[1])
+            unref_manifest_name = unref_manifest_split[2]
+            unref_manifest_stat = Filestat(unref_manifest_perms, unref_manifest_mod_time, unref_manifest_name)
+            unref_manifests_stats_dict[unref_manifest_name] = unref_manifest_stat
+
+    logging.debug("unref_manifests_stats_dict:\n%s", unref_manifests_stats_dict)
+
+    known_manifests_names = [known_manifest.split()[2] for known_manifest in known_manifests]
+    layers_and_configs = get_all_layers_and_configs(known_manifests_names)
+
+    known_images = []
+    for manifest in known_manifests:
+        manifest_split = manifest.split()
+        manifest_perms = manifest_split[0]
+        manifest_mod_time = long(manifest_split[1])
+        manifest_name = manifest_split[2]
+        manifest_stat = Filestat(manifest_perms, manifest_mod_time, manifest_name)
+
+        unref_image_stat = unref_manifests_stats_dict.get(manifest_name, None)
+
+        layers = layers_and_configs[manifest_name][0]
+        config = layers_and_configs[manifest_name][1]
+        known_image = Image(manifest_stat, layers, config, unref_image_stat)
+        known_images.append(known_image)
+
+    return known_images
+
+
+def get_known_image_by_hash(image_hash, known_images):
+    for image in known_images:
+        if image_hash == image.manifest_stat.name:
+            return image
+    logging.debug("Couldn't find known image by hash:\n%s", image_hash)
+    return None
+
+
+def get_all_layers_and_configs(manifest_names):
+    global HDFS_MANIFEST_DIR
+
+    manifests_tuples = get_hdfs_manifests_from_paths(
+        [HDFS_MANIFEST_DIR + "/" + manifest_name for manifest_name in manifest_names])
+    layers_and_configs = {}
+
+    for manifest_tuple in manifests_tuples:
+        manifest = manifest_tuple[0]
+        manifest_hash = manifest_tuple[1]
+        layers = []
+        layers.extend(get_layer_hashes_from_manifest(manifest, False))
+        config = get_config_hash_from_manifest(manifest)
+        layers_and_configs[manifest_hash] = (layers, config)
+
+        logging.debug("layers for %s:\n%s", manifest_hash, layers)
+        logging.debug("config for %s:\n%s", manifest_hash, config)
+
+    return layers_and_configs
+
+
+def get_untagged_images(known_images, tagged_images):
+    untagged_images = []
+    for image in known_images:
+        if is_image_untagged(image, tagged_images):
+            untagged_images.append(image)
+
+    logging.debug("known_images:\n%s", known_images)
+    logging.debug("tagged_images:\n%s", tagged_images)
+    logging.debug("untagged_images:\n%s", untagged_images)
+    return untagged_images
+
+
+def get_stale_images(untagged_images):
+    stale_images = [image for image in untagged_images if is_image_stale(image)]
+    logging.debug("stale_images:\n%s", stale_images)
+    return stale_images
+
+
+def get_dead_images(untagged_images):
+    dead_images = [image for image in untagged_images if is_image_dead(image)]
+    logging.debug("dead_images:\n%s", dead_images)
+    return dead_images
+
+
+def is_image_untagged(image, tagged_images):
+    for tagged_image in tagged_images:
+        if tagged_image.manifest_stat.name == image.manifest_stat.name:
+            return False
+    return True
+
+
+def is_image_stale(image):
+    return does_image_have_unref_file(image) and not does_image_have_dead_perms(image)
+
+
+def is_image_dead(image):
+    return does_image_have_unref_file(image) and does_image_have_dead_perms(image)
+
+
+def does_image_have_unref_file(image):
+    return image.unref_file_stat != None
+
+
+def does_image_have_dead_perms(image):
+    global DEAD_PERMS
+    return image.manifest_stat.perms == DEAD_PERMS
+
+
+def is_mod_time_old(mod_time):
+    global UNTAGGED_TRANSITION_SEC
+
+    cutoff_time = long(time.time() * 1000) - UNTAGGED_TRANSITION_SEC * 1000
+    logging.debug("Mod time: %d, Cutoff time: %d", mod_time, cutoff_time)
+    return mod_time < cutoff_time
+
+
+def cleanup_handle_tagged_images(tagged_images):
+    # Remove unreferenced files if they exist
+    if not tagged_images:
+        return
+
+    unref_remove_list = []
+    for image in tagged_images:
+        if does_image_have_unref_file(image):
+            unref_remove_list.append(image)
+
+    remove_unref_files(unref_remove_list)
+
+
+def cleanup_handle_untagged_images(untagged_images):
+    # Create unreferenced file
+    if not untagged_images:
+        return
+
+    touch_list = []
+    for image in untagged_images:
+        if not does_image_have_unref_file(image):
+            touch_list.append(image)
+
+    touch_unref_files(touch_list)
+
+
+def cleanup_handle_stale_images(stale_images):
+    # Set blob permissions to 400 for old stale images
+    if not stale_images:
+        return
+
+    make_unreadable_list = []
+    for image in stale_images:
+        if is_mod_time_old(image.unref_file_stat.mod_time):
+            make_unreadable_list.append(image)
+
+    make_manifests_unreadable(make_unreadable_list)
+    touch_unref_files(make_unreadable_list)
+
+
+def cleanup_handle_dead_images(dead_images, known_images):
+    # Remove old dead images
+    if not dead_images:
+        return
+
+    images_to_remove = []
+    for image in dead_images:
+        if is_mod_time_old(image.unref_file_stat.mod_time):
+            images_to_remove.append(image)
+
+    remove_dead_images(images_to_remove, known_images)
+
+
+def make_manifests_unreadable(images):
+    global HDFS_MANIFEST_DIR
+    global DEAD_PERMS
+
+    if not images:
+        return
+
+    manifest_file_paths = [HDFS_MANIFEST_DIR + "/" + image.unref_file_stat.name for image in images]
+    logging.debug("Chmod %s manifest file:\n%s", DEAD_PERMS, manifest_file_paths)
+    hdfs_chmod(DEAD_PERMS, manifest_file_paths)
+
+
+def touch_unref_files(images):
+    global HDFS_UNREF_DIR
+
+    if not images:
+        return
+
+    unref_file_paths = [HDFS_UNREF_DIR + "/" + image.manifest_stat.name for image in images]
+    logging.debug("Touching unref file:\n%s", unref_file_paths)
+    hdfs_touchz(unref_file_paths)
+
+
+def remove_unref_files(images):
+    global HDFS_UNREF_DIR
+
+    if not images:
+        return
+
+    unref_file_paths = [HDFS_UNREF_DIR + "/" + image.manifest_stat.name for image in images]
+    logging.debug("Removing unref files:\n%s", unref_file_paths)
+    hdfs_rm(unref_file_paths)
+
+
+def remove_dead_images(images_to_remove, known_images):
+    if not images_to_remove:
+        return
+
+    logging.debug("Removing dead images:\n%s", images_to_remove)
+    delete_list = get_delete_list_from_images_to_remove(images_to_remove, known_images)
+    if delete_list:
+        hdfs_rm(delete_list)
+
+
 def create_parsers():
     parser = argparse.ArgumentParser()
     add_parser_default_arguments(parser)
@@ -1223,7 +1578,7 @@
 
     parse_pull_build = subparsers.add_parser('pull-build',
                                              help='Pull an image and build its  squashfs layers')
-    parse_pull_build .set_defaults(func=pull_build)
+    parse_pull_build.set_defaults(func=pull_build)
     add_parser_default_arguments(parse_pull_build)
     parse_pull_build.add_argument("images_and_tags", nargs="+",
                                   help="Image and tag argument (can specify multiple)")
@@ -1286,14 +1641,18 @@
     parse_list_tags.set_defaults(func=list_tags)
     add_parser_default_arguments(parse_list_tags)
 
-    parse_bootstrap_setup= subparsers.add_parser('bootstrap',
-                                                 help='Bootstrap setup of required HDFS'
-                                                      + 'directories')
+    parse_bootstrap_setup = subparsers.add_parser('bootstrap', help='Bootstrap setup of required HDFS directories')
     parse_bootstrap_setup.set_defaults(func=bootstrap_setup)
     add_parser_default_arguments(parse_bootstrap_setup)
 
+    parse_cleanup_untagged_images = subparsers.add_parser('cleanup',
+                                                          help='Cleanup untagged images in HDFS')
+    parse_cleanup_untagged_images.set_defaults(func=cleanup_untagged_images)
+    add_parser_default_arguments(parse_cleanup_untagged_images)
+
     return parser
 
+
 def add_parser_default_arguments(parser):
     parser.add_argument("--working-dir", type=str, dest='working_dir', default="dts-work-dir",
                         help="Name of working directory")
@@ -1303,7 +1662,7 @@
                         default='docker', help="Pull format for skopeo")
     parser.add_argument("-l", "--log", type=str, dest='LOG_LEVEL',
                         default="INFO", help="Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)")
-    parser.add_argument("--hdfs-root", type=str, dest='hdfs_root',
+    parser.add_argument("--hdfs-root", type=str, dest='HDFS_ROOT',
                         default='/runc-root', help="The root directory in HDFS for all of the"
                                                    + "squashfs images")
     parser.add_argument("--image-tag-to-hash", type=str,
@@ -1325,35 +1684,43 @@
     parser.add_argument("--max-layers", type=int, dest='MAX_IMAGE_LAYERS',
                         default=37, help="Maximum number of layers an image is allowed to have")
     parser.add_argument("--max-size", type=int, dest='MAX_IMAGE_SIZE',
-                        default=10*1024*1024*1024, help="Maximum size an image is allowed to be")
+                        default=10 * 1024 * 1024 * 1024, help="Maximum size an image is allowed to be")
+    parser.add_argument("--untagged-transition-sec", type=long, dest='UNTAGGED_TRANSITION_SEC',
+                        default=7 * 24 * 60 * 60, help="Time that untagged images will spend in each state"
+                                                       + "before moving to the next one")
+    parser.add_argument("--dead-perms", type=str, dest='DEAD_PERMS',
+                        default="400", help="Permissions to set for manifests that are untagged "
+                                            + "before they are removed")
     parser.add_argument("-b", "--bootstrap", dest='bootstrap',
                         action="store_true", default=False, help="Bootstrap setup"
                                                                  + " of required HDFS directories")
     return parser
 
+
 def check_dependencies():
     global HADOOP_BIN_DIR
+
     try:
-        command = [HADOOP_BIN_DIR + "/hadoop", "version"]
+        command = [HADOOP_BIN_DIR + "/hadoop", "-h"]
         shell_command(command, False, False, True)
-    except:
+    except Exception as ex:
         logging.error("Could not find hadoop. Make sure HADOOP_PREFIX " +
                       "is set correctly either in your environment or on the command line " +
-                      "via --hadoop-prefix")
+                      "via --hadoop-prefix" +
+                      ", exception is " + str(ex))
         return 1
 
     try:
         command = ["skopeo", "-v"]
         shell_command(command, False, False, True)
-    except:
-        logging.error("Could not find skopeo. Make sure it is installed and present " +
-                      "on the PATH")
+    except Exception as _:
+        logging.error("Could not find skopeo. Make sure it is installed and present on the PATH")
         return 1
 
     try:
         command = ["/usr/sbin/mksquashfs", "-version"]
         shell_command(command, False, False, True)
-    except:
+    except Exception as _:
         logging.error("Could not find /usr/sbin/mksquashfs. Make sure squashfs-tools is installed " +
                       "and /usr/sbin/mksquashfs is present on the the PATH")
         return 1
@@ -1361,27 +1728,34 @@
     try:
         command = ["tar", "--version"]
         shell_command(command, False, False, True)
-    except:
-        logging.error("Could not find tar. Make sure it is installed and present " +
-                      "on the PATH")
+    except Exception as _:
+        logging.error("Could not find tar. Make sure it is installed and present on the PATH")
         return 1
 
     try:
         command = ["setfattr", "--version"]
         shell_command(command, False, False, True)
     except:
-        logging.error("Could not find setfattr . Make sure it is installed and present " +
-                      "on the PATH")
+        logging.error("Could not find setfattr . Make sure it is installed and present on the PATH")
         return 1
 
     return 0
 
+
 def main():
     global LOG_LEVEL
     global HADOOP_PREFIX
     global HADOOP_BIN_DIR
+    global HDFS_ROOT
+    global HDFS_MANIFEST_DIR
+    global HDFS_CONFIG_DIR
+    global HDFS_LAYERS_DIR
+    global HDFS_UNREF_DIR
     global MAX_IMAGE_LAYERS
     global MAX_IMAGE_SIZE
+    global UNTAGGED_TRANSITION_SEC
+    global ARG_MAX
+    global DEAD_PERMS
 
     if os.geteuid() != 0:
         logging.error("Script must be run as root")
@@ -1391,10 +1765,18 @@
     args, extra = parser.parse_known_args()
 
     if extra:
-        raise Exception("Extra unknown arguments given: %s" % (extra))
+        raise Exception(f"Extra unknown arguments given: {extra}")
 
+    ARG_MAX = os.sysconf("SC_ARG_MAX")
+    HDFS_ROOT = args.HDFS_ROOT
+    HDFS_MANIFEST_DIR = HDFS_ROOT + "/manifests"
+    HDFS_CONFIG_DIR = HDFS_ROOT + "/config"
+    HDFS_LAYERS_DIR = HDFS_ROOT + "/layers"
+    HDFS_UNREF_DIR = HDFS_ROOT + "/unreferenced"
     MAX_IMAGE_LAYERS = args.MAX_IMAGE_LAYERS
     MAX_IMAGE_SIZE = args.MAX_IMAGE_SIZE
+    UNTAGGED_TRANSITION_SEC = args.UNTAGGED_TRANSITION_SEC
+    DEAD_PERMS = args.DEAD_PERMS
     LOG_LEVEL = args.LOG_LEVEL.upper()
     image_tag_to_hash = args.image_tag_to_hash
 
@@ -1427,5 +1809,6 @@
 
     args.func(args)
 
+
 if __name__ == "__main__":
     main()
diff --git a/bin/storm.py b/bin/storm.py
index 05dd1d2..ed51fc2 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
@@ -28,25 +28,27 @@
 from argparse import HelpFormatter
 from operator import attrgetter
 
-class SortingHelpFormatter(HelpFormatter):
+if sys.version_info[0] == 2:
+    raise Exception("Python version 2 is not supported. Please download and use python3")
 
+import configparser
+from urllib.parse import quote_plus
+
+
+class SortingHelpFormatter(HelpFormatter):
     def add_arguments(self, actions):
         actions = sorted(actions, key=attrgetter('option_strings'))
         super(SortingHelpFormatter, self).add_arguments(actions)
 
-if sys.version_info[0] == 2:
-    import ConfigParser as configparser
-    from urllib import quote_plus
-else:
-    import configparser
-    from urllib.parse import quote_plus
 
 def is_windows():
     return sys.platform.startswith('win')
 
+
 def identity(x):
     return x
 
+
 def cygpath(x):
     command = ["cygpath", "-wp", x]
     p = subprocess.Popen(command, stdout=subprocess.PIPE)
@@ -54,9 +56,11 @@
     lines = output.split(os.linesep)
     return lines[0]
 
+
 def get_config_opts(storm_config_opts):
     return "-Dstorm.options=" + ','.join([quote_plus(s) for s in storm_config_opts])
 
+
 def get_jars_full(adir):
     files = []
     if os.path.isdir(adir):
@@ -66,8 +70,9 @@
 
     return [os.path.join(adir, f) for f in files if f.endswith(".jar")]
 
-# If given path is a dir, make it a wildcard so the JVM will include all JARs in the directory.
+
 def get_wildcard_dir(path):
+    """If given path is a dir, make it a wildcard so the JVM will include all JARs in the directory."""
     ret = []
     if os.path.isdir(path):
         ret = [(os.path.join(path, "*"))]
@@ -75,12 +80,14 @@
         ret = [path]
     return ret
 
+
 def get_java_cmd():
     cmd = 'java' if not is_windows() else 'java.exe'
     if JAVA_HOME:
         cmd = os.path.join(JAVA_HOME, 'bin', cmd)
     return cmd
 
+
 def confvalue(name, storm_config_opts, extrapaths, overriding_conf_file=None, daemon=True):
     command = [
         JAVA_CMD, "-client", get_config_opts(storm_config_opts),
@@ -103,7 +110,7 @@
     ret = get_wildcard_dir(STORM_DIR)
     if client:
         ret.extend(get_wildcard_dir(STORM_WORKER_LIB_DIR))
-    else :
+    else:
         ret.extend(get_wildcard_dir(STORM_LIB_DIR))
     ret.extend(get_wildcard_dir(os.path.join(STORM_DIR, "extlib")))
     if daemon:
@@ -116,7 +123,7 @@
     return NORMAL_CLASS_PATH(os.pathsep.join(ret))
 
 
-def init_storm_env():
+def init_storm_env(within_unittest=False):
 
     global NORMAL_CLASS_PATH, STORM_DIR, USER_CONF_DIR, STORM_CONF_DIR, STORM_WORKER_LIB_DIR, STORM_LIB_DIR,\
         STORM_TOOLS_LIB_DIR, STORM_WEBAPP_LIB_DIR, STORM_BIN_DIR, STORM_LOG4J2_CONF_DIR, STORM_SUPERVISOR_LOG_FILE,\
@@ -130,7 +137,7 @@
 
     CLUSTER_CONF_DIR = STORM_CONF_DIR if STORM_CONF_DIR else os.path.join(STORM_DIR, "conf")
 
-    if (not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml"))):
+    if not os.path.isfile(os.path.join(USER_CONF_DIR, "storm.yaml")):
         USER_CONF_DIR = CLUSTER_CONF_DIR
 
     STORM_WORKER_LIB_DIR = os.path.join(STORM_DIR, "lib-worker")
@@ -148,10 +155,10 @@
     JAVA_CMD = get_java_cmd()
 
     if JAVA_HOME and not os.path.exists(JAVA_CMD):
-        print("ERROR:  JAVA_HOME is invalid.  Could not find bin/java at %s." % JAVA_HOME)
+        print(f"ERROR:  JAVA_HOME is invalid.  Could not find bin/java at {JAVA_HOME}.")
         sys.exit(1)
 
-    if not os.path.exists(STORM_LIB_DIR):
+    if not (within_unittest or os.path.exists(STORM_LIB_DIR)):
         print("*" * 20)
         print('''The storm client can only be run from within a release. 
 You appear to be trying to run the client from a checkout of Storm's source code.
@@ -159,7 +166,6 @@
         print("*" * 20)
         sys.exit(1)
 
-
     STORM_EXT_CLASSPATH = os.getenv('STORM_EXT_CLASSPATH', None)
     STORM_EXT_CLASSPATH_DAEMON = os.getenv('STORM_EXT_CLASSPATH_DAEMON', None)
     LOCAL_TTL_DEFAULT = "20"
@@ -181,13 +187,13 @@
     if not artifacts:
         return {}
 
-    print("Resolving dependencies on demand: artifacts (%s) with repositories (%s)" % (artifacts, artifact_repositories))
+    print(f"Resolving dependencies on demand: artifacts ({artifacts}) with repositories ({artifact_repositories})")
 
     if maven_local_repos_dir:
-        print("Local repository directory: %s" % maven_local_repos_dir)
+        print(f"Local repository directory: {maven_local_repos_dir}")
 
     if proxy_url:
-        print("Proxy information: url (%s) username (%s)" % (proxy_url, proxy_username))
+        print(f"Proxy information: url ({proxy_url}) username ({proxy_username})")
 
     sys.stdout.flush()
 
@@ -214,20 +220,19 @@
     p = subprocess.Popen(command, stdout=subprocess.PIPE)
     output, errors = p.communicate()
     if p.returncode != 0:
-        raise RuntimeError("dependency handler returns non-zero code: code<%s> syserr<%s>" % (p.returncode, errors))
+        raise RuntimeError(f"dependency handler returns non-zero code: code<{p.returncode}> syserr<{errors}>")
 
-    # python 3
     if not isinstance(output, str):
         output = output.decode('utf-8')
 
     # For debug purpose, uncomment when you need to debug DependencyResolver
-    # print("Resolved dependencies: %s" % output)
+    # print(f"Resolved dependencies: {output}")
 
     try:
         out_dict = json.loads(output)
         return out_dict
     except:
-        raise RuntimeError("dependency handler returns non-json response: sysout<%s>", output)
+        raise RuntimeError(f"dependency handler returns non-json response: sysout<{output}>", )
 
 
 def exec_storm_class(klass, storm_config_opts, jvmtype="-server", jvmopts=[],
@@ -238,15 +243,15 @@
     if storm_log_dir is None or storm_log_dir in ["null", ""]:
         storm_log_dir = os.path.join(STORM_DIR, "logs")
     all_args = [
-        JAVA_CMD, jvmtype,
-        "-Ddaemon.name=" + daemonName,
-        get_config_opts(storm_config_opts),
-       "-Dstorm.home=" + STORM_DIR,
-       "-Dstorm.log.dir=" + storm_log_dir,
-       "-Djava.library.path=" + confvalue("java.library.path", storm_config_opts, extrajars, daemon=daemon),
-       "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
-       "-cp", get_classpath(extrajars, daemon, client=client),
-    ] + jvmopts + [klass] + list(main_class_args)
+            JAVA_CMD, jvmtype,
+            f"-Ddaemon.name={daemonName}",
+            get_config_opts(storm_config_opts),
+            f"-Dstorm.home={STORM_DIR}",
+            f"-Dstorm.log.dir={storm_log_dir}",
+            "-Djava.library.path=" + confvalue("java.library.path", storm_config_opts, extrajars, daemon=daemon),
+            "-Dstorm.conf.file=" + (overriding_conf_file if overriding_conf_file else ""),
+            "-cp", get_classpath(extrajars, daemon, client=client),
+        ] + jvmopts + [klass] + list(main_class_args)
     print("Running: " + " ".join(all_args))
     sys.stdout.flush()
     exit_code = 0
@@ -354,7 +359,6 @@
     add_common_options(sub_parser)
 
 
-
 def initialize_remoteconfvalue_subcommand(subparsers):
     command_help = '''Prints out the value for conf-name in the cluster's Storm configs.
     The cluster's Storm configs are the ones in $STORM-PATH/conf/storm.yaml
@@ -380,6 +384,7 @@
             nargs='*', help="Runs the main method with the specified arguments."
         )
 
+
 def remove_common_options(sys_args):
     flags_to_filter = ["-c", "-storm_config_opts", "--config"]
     filtered_sys_args = [
@@ -388,6 +393,7 @@
     ]
     return filtered_sys_args
 
+
 def add_topology_jar_options(parser):
     parser.add_argument(
         "topology_jar_path", metavar="topology-jar-path",
@@ -395,7 +401,7 @@
     )
     parser.add_argument(
         "topology_main_class", metavar="topology-main-class",
-    help="main class of the topology jar being submitted"
+        help="main class of the topology jar being submitted"
     )
 
 
@@ -413,7 +419,6 @@
     For example, -artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" will load jedis and kafka-clients artifact and all of transitive dependencies but exclude slf4j-api from kafka.
         ''', default="")
 
-
     parser.add_argument("--artifactRepositories", help='''
     When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with a comma-separated string.
     Repository format is "<name>^<url>". '^' is taken as separator because URL allows various characters.
@@ -422,7 +427,6 @@
 
     parser.add_argument("--mavenLocalRepositoryDirectory", help="You can provide local maven repository directory via --mavenLocalRepositoryDirectory if you would like to use specific directory. It might help when you don't have '.m2/repository' directory in home directory, because CWD is sometimes non-deterministic (fragile).", default="")
 
-
     parser.add_argument("--proxyUrl", help="You can also provide proxy information to let dependency resolver utilizing proxy if needed. URL representation of proxy ('http://host:port')", default="")
     parser.add_argument("--proxyUsername", help="username of proxy if it requires basic auth", default="")
     parser.add_argument("--proxyPassword", help="password of proxy if it requires basic auth", default="")
@@ -518,15 +522,17 @@
 def check_non_negative(value):
     ivalue = int(value)
     if ivalue < 0:
-        raise argparse.ArgumentTypeError("%s is not a non-zero integer" % value)
+        raise argparse.ArgumentTypeError(f"{value} is not a non-zero integer")
     return ivalue
 
+
 def check_positive(value):
     ivalue = int(value)
     if ivalue <= 0:
         raise argparse.ArgumentTypeError("%s is not a positive integer" % value)
     return ivalue
 
+
 def initialize_upload_credentials_subcommand(subparsers):
     command_help = """Uploads a new set of credentials to a running topology."""
     sub_parser = subparsers.add_parser("upload-credentials", help=command_help, formatter_class=SortingHelpFormatter)
@@ -569,7 +575,6 @@
 
     add_client_jar_options(sub_parser)
 
-
     sub_parser.add_argument("sql_file", metavar="sql-file")
 
     group = sub_parser.add_mutually_exclusive_group(required=True)
@@ -608,7 +613,6 @@
     cat_parser.add_argument("-f", '--FILE', default=None)
     add_common_options(cat_parser)
 
-
     create_parser = sub_sub_parsers.add_parser(
         "create", help="create a new blob. Contents comes from a FILE or STDIN", formatter_class=SortingHelpFormatter
     )
@@ -1133,8 +1137,8 @@
 
 
 def upload_credentials(args):
-    if (len(args.cred_list) % 2 != 0):
-        raise argparse.ArgumentTypeError("please provide a list of cred key and value pairs " + cred_list)
+    if len(args.cred_list) % 2 != 0:
+        raise argparse.ArgumentTypeError("please provide a list of cred key and value pairs " + args.cred_list)
     exec_storm_class(
         "org.apache.storm.command.UploadCredentials",
         main_class_args=remove_common_options(sys.argv[2:]), storm_config_opts=args.storm_config_opts,
@@ -1171,6 +1175,7 @@
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
         overriding_conf_file=args.config)
 
+
 def listtopos(args):
     exec_storm_class(
         "org.apache.storm.command.ListTopologies",
@@ -1179,6 +1184,7 @@
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
         overriding_conf_file=args.config)
 
+
 def set_log_level(args):
     for log_level in args.l:
         try:
@@ -1195,6 +1201,7 @@
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR],
         overriding_conf_file=args.config)
 
+
 def deactivate(args):
     exec_storm_class(
         "org.apache.storm.command.Deactivate",
@@ -1286,9 +1293,9 @@
         "storm.log4j2.conf.dir", storm_config_opts=storm_config_opts,
         extrapaths=cppaths, overriding_conf_file=args.config
     )
-    if(not storm_log4j2_conf_dir or storm_log4j2_conf_dir == "null"):
+    if not storm_log4j2_conf_dir or storm_log4j2_conf_dir == "null":
         storm_log4j2_conf_dir = STORM_LOG4J2_CONF_DIR
-    elif(not os.path.isabs(storm_log4j2_conf_dir)):
+    elif not os.path.isabs(storm_log4j2_conf_dir):
         storm_log4j2_conf_dir = os.path.join(STORM_DIR, storm_log4j2_conf_dir)
     return storm_log4j2_conf_dir
 
@@ -1478,6 +1485,7 @@
         jvmtype="-client",
         extrajars=[USER_CONF_DIR, STORM_BIN_DIR])
 
+
 def main():
     init_storm_env()
     storm_parser = initialize_main_command()
diff --git a/bin/test_docker_to_squash.py b/bin/test_docker_to_squash.py
new file mode 100644
index 0000000..d7a44e9
--- /dev/null
+++ b/bin/test_docker_to_squash.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from unittest import TestCase
+# import docker-to-squash as dtsq
+dtsq = __import__('docker-to-squash')  # TODO: rename docker-to-squash.py as docker_to_squash.py
+
+
+class Test(TestCase):
+    def test_shell_command(self):
+        """
+        shell_command is used by many functions in docker_to_squash.py. Ensure that it works correctly.
+        Prior code was not returning any values, and was not detected till PR https://github.com/apache/storm/pull/3475
+        :return:
+        """
+        # expect success
+        cmd = ["ls", "-l"]
+        out, err, rc = dtsq.shell_command(cmd, True, True, True, timeout_sec=10)
+        self.assertEqual(0, rc, f"Failed cmd={cmd}\nrc={rc}\nout={out}\nerr={err}")
+
+        # expect failure
+        cmd = ["badcmd", "-l"]
+        out, err, rc = dtsq.shell_command(cmd, True, True, True, timeout_sec=10)
+        self.assertNotEqual(0, rc, f"Expected to fail cmd={cmd}\nrc={rc}\nout={out}\nerr={err}")
+
+    # TODO:
+    #  def test_read_image_tag_to_hash(self):
+    #     """
+    #     The base method behaves differently, since string in python3 is always unicode. Base method was flips between
+    #     byte arrays and strings. This may not always work properly in python3.
+    #     :return:
+    #     """
+    #     image_tag_to_hash = ""
+    #     hash_to_tags, tag_to_hash = dtsq.read_image_tag_to_hash(image_tag_to_hash)
+
diff --git a/bin/test_storm.py b/bin/test_storm.py
new file mode 100644
index 0000000..11c8057
--- /dev/null
+++ b/bin/test_storm.py
@@ -0,0 +1,217 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import unittest
+import storm
+import os
+
+
+class Test(unittest.TestCase):
+    """
+    Mostly just test coverage
+    """
+    _testMethodName = None
+    _testMethodDoc = None
+
+    def __init__(self, method_name="None"):
+        super().__init__()
+        self._testMethodName = method_name
+        storm.init_storm_env(within_unittest=True)
+
+    def test_get_jars_full(self):
+        storm.get_jars_full(".")
+
+    def test_get_wildcard_dir(self):
+        s = storm.get_wildcard_dir("./")
+        self.assertEqual(s, ["./*"])
+
+    def test_get_java_cmd(self):
+        s = storm.get_java_cmd()
+        expected = 'java' if not storm.is_windows() else 'java.exe'
+        if storm.JAVA_HOME:
+            expected = os.path.join(storm.JAVA_HOME, 'bin', expected)
+        self.assertEqual(s, expected)
+
+    def test_confvalue(self):
+        name = 'name'
+        storm_config_opts = {'ui.port': '8080'}
+        extrapaths = []
+        overriding_conf_file = None
+        daemon = True
+        s = storm.confvalue(name, storm_config_opts, extrapaths, overriding_conf_file, daemon)
+        expected = ""
+        self.assertEqual(s, expected)
+
+    def test_get_classpath(self):
+        extrajars = [f"jar{x}.jar" for x in range(5)]
+        daemon = True
+        client = False
+        s = storm.get_classpath(extrajars, daemon, client)
+        expected = ":".join(extrajars)
+        self.assertEqual(s[-len(expected):], expected)
+
+    def test_resolve_dependencies(self):
+        artifacts = "org.apache.commons.commons-api"
+        artifact_repositories = "maven-central"
+        maven_local_repos_dir = "~/.m2"
+        proxy_url = None
+        proxy_username = None
+        proxy_password = None
+        try:
+            output = storm.resolve_dependencies(artifacts, artifact_repositories, maven_local_repos_dir,
+                                                proxy_url, proxy_username, proxy_password)
+        except RuntimeError as ex:
+            print(f"Unexpected {ex=}, {type(ex)=}")
+        # test coverage only
+
+    def test_exec_storm_class(self):
+        klass = "org.apache.storm.starter.WordCountTopology"
+        storm_config_opts = []
+        jvmtype = "-server"
+        jvmopts = []
+        extrajars = []
+        main_class_args = []
+        fork = False
+        daemon = True
+        client = False
+        daemonName = ""
+        overriding_conf_file = None
+        # exit_code = storm.exec_storm_class(klass, storm_config_opts=storm_config_opts, jvmtype=jvmtype, jvmopts=jvmopts,
+        #                                    extrajars=extrajars, main_class_args=main_class_args, fork=fork,
+        #                                    daemon=daemon, client=client, daemonName=daemonName,
+        #                                    overriding_conf_file=overriding_conf_file)
+
+    def test_run_client_jar(self):
+        pass
+
+    def test_print_localconfvalue(self):
+        class Args:
+            conf_name = self._testMethodName
+            storm_config_opts = {self._testMethodName: "confvalue"}
+            config = "config/file/path.yaml"
+
+        args = Args()
+        storm.print_localconfvalue(args)
+
+    def test_print_remoteconfvalue(self):
+        class Args:
+            conf_name = self._testMethodName
+            storm_config_opts = {self._testMethodName: "confvalue"}
+            config = "config/file/path.yaml"
+
+        args = Args()
+        storm.print_remoteconfvalue(args)
+
+    def test_initialize_main_command(self):
+        storm.initialize_main_command()
+
+    def test_jar(self):
+        pass
+
+    def test_local(self):
+        pass
+
+    def test_sql(self):
+        pass
+
+    def test_kill(self):
+        pass
+
+    def test_upload_credentials(self):
+        pass
+
+    def test_blob(self):
+        pass
+
+    def test_heartbeats(self):
+        pass
+
+    def test_activate(self):
+        pass
+
+    def test_listtopos(self):
+        pass
+
+    def test_set_log_level(self):
+        pass
+
+    def test_deactivate(self):
+        pass
+
+    def test_rebalance(self):
+        pass
+
+    def test_get_errors(self):
+        pass
+
+    def test_healthcheck(self):
+        pass
+
+    def test_kill_workers(self):
+        pass
+
+    def test_admin(self):
+        pass
+
+    def test_shell(self):
+        pass
+
+    def test_repl(self):
+        pass
+
+    def test_get_log4j2_conf_dir(self):
+        pass
+
+    def test_nimbus(self):
+        pass
+
+    def test_pacemaker(self):
+        pass
+
+    def test_supervisor(self):
+        pass
+
+    def test_ui(self):
+        pass
+
+    def test_logviewer(self):
+        pass
+
+    def test_drpc_client(self):
+        pass
+
+    def test_drpc(self):
+        pass
+
+    def test_dev_zookeeper(self):
+        pass
+
+    def test_version(self):
+        pass
+
+    def test_print_classpath(self):
+        storm.print_classpath(None)
+
+    def test_print_server_classpath(self):
+        storm.print_server_classpath(None)
+
+    def test_monitor(self):
+        pass
+
+
diff --git a/dev-tools/find-checkstyle-issues.py b/dev-tools/find-checkstyle-issues.py
index ea3520f..9a6d5a1 100755
--- a/dev-tools/find-checkstyle-issues.py
+++ b/dev-tools/find-checkstyle-issues.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -12,17 +12,18 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-import sys
 import os
 from optparse import OptionParser
 import subprocess
 
+
 def getCheckstyleFor(f, check_result):
     f = os.path.abspath(f)
     check_result = os.path.abspath(check_result)
     ret = subprocess.check_output(['xsltproc', '--stringparam', 'target', f, './dev-tools/checkstyle.xslt', check_result])
     if not ret.isspace():
-        print ret
+        print(ret)
+
 
 def main():
     parser = OptionParser(usage="usage: %prog [options]")
@@ -34,5 +35,6 @@
     for f in args:
         getCheckstyleFor(f, options.check_result)
 
+
 if __name__ == "__main__":
     main()
diff --git a/dev-tools/github/__init__.py b/dev-tools/github/__init__.py
old mode 100755
new mode 100644
index 48f397b..4cc659d
--- a/dev-tools/github/__init__.py
+++ b/dev-tools/github/__init__.py
@@ -13,7 +13,8 @@
 
 import getpass
 import base64
-import urllib2
+import urllib
+import urllib.request
 from datetime import datetime
 import re
 
@@ -23,12 +24,6 @@
     import simplejson as json
 
 
-def mstr(obj):
-    if obj is None:
-        return ""
-    return unicode(obj)
-
-
 def git_time(obj):
     if obj is None:
         return None
@@ -61,13 +56,13 @@
         # TODO def review_comments
 
     def user(self):
-        return mstr(self.data["user"]["login"])
+        return self.data["user"]["login"]
 
     def from_branch(self):
-        return mstr(self.data["head"]["ref"])
+        return self.data["head"]["ref"]
 
     def from_repo(self):
-        return mstr(self.data["head"]["repo"]["clone_url"])
+        return self.data["head"]["repo"]["clone_url"]
 
     def merged(self):
         return self.data["merged_at"] is not None
@@ -111,11 +106,10 @@
         page = 1
         ret = []
         while True:
-            url = "https://api.github.com/repos/" + user + "/" + repo + "/pulls?state=" + type + "&page=" + str(page)
-
-            req = urllib2.Request(url, None, self.headers)
-            result = urllib2.urlopen(req)
-            contents = result.read()
+            url = f"https://api.github.com/repos/{user}/{repo}/pulls?state={type}&page={page}"
+            req = urllib.request.Request(url, None, self.headers)
+            result = urllib.request.urlopen(req)
+            contents = result.read().decode()
             if result.getcode() != 200:
                 raise Exception(result.getcode() + " != 200 " + contents)
             got = json.loads(contents)
@@ -129,10 +123,10 @@
         return self.pulls(user, repo, "open")
 
     def pull(self, user, repo, number):
-        url = "https://api.github.com/repos/" + user + "/" + repo + "/pulls/" + number
-        req = urllib2.Request(url, None, self.headers)
-        result = urllib2.urlopen(req)
-        contents = result.read()
+        url = f"https://api.github.com/repos/{user}/{repo}/pulls/{number}"
+        req = urllib.request.Request(url, None, self.headers)
+        result = urllib.request.urlopen(req)
+        contents = result.read().decode()
         if result.getcode() != 200:
             raise Exception(result.getcode() + " != 200 " + contents)
         got = json.loads(contents)
diff --git a/dev-tools/jira-github-join.py b/dev-tools/jira-github-join.py
index 1931f19..3e23ea1 100755
--- a/dev-tools/jira-github-join.py
+++ b/dev-tools/jira-github-join.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -18,6 +18,16 @@
 from jira_github import JiraRepo
 from report.report_builder import CompleteReportBuilder
 
+"""
+If you get certificate error when running on Mac, 
+(https://stackoverflow.com/questions/50236117/scraping-ssl-certificate-verify-failed-error-for-http-en-wikipedia-org)
+Go to Macintosh HD 
+    > Applications 
+    > Python3.9 folder (or whatever version of python you're using) 
+    > double click on "Install Certificates.command" file. 
+    
+"""
+
 
 def main():
     parser = OptionParser(usage="usage: %prog [options]")
@@ -29,8 +39,9 @@
     jira_repo = JiraRepo("https://issues.apache.org/jira/rest/api/2")
     github_repo = GitHub(options)
 
-    print "Report generated on: %s (GMT)" % (datetime.strftime(datetime.utcnow(), "%Y-%m-%d %H:%M:%S"))
-
+    print("=" * 100)
+    print("Report generated on: %s (GMT)" % (datetime.strftime(datetime.utcnow(), "%Y-%m-%d %H:%M:%S")))
+    print("-" * 100)
     report_builder = CompleteReportBuilder(jira_repo, github_repo)
     report_builder.report.print_all()
 
diff --git a/dev-tools/jira_github/__init__.py b/dev-tools/jira_github/__init__.py
index c98ae31..46412e6 100644
--- a/dev-tools/jira_github/__init__.py
+++ b/dev-tools/jira_github/__init__.py
@@ -13,7 +13,8 @@
 
 import re
 import urllib
-import urllib2
+import urllib.request
+import urllib.parse
 from datetime import datetime
 
 try:
@@ -22,21 +23,16 @@
     import simplejson as json
 
 
-def mstr(obj):
-    if obj is None:
-        return ""
-    return unicode(obj)
-
-
 def jiratime(obj):
     if obj is None:
         return None
     return datetime.strptime(obj[0:19], "%Y-%m-%dT%H:%M:%S")
 
+
 # Regex pattern definitions
-github_user = re.compile("Git[Hh]ub user ([\w-]+)")
-github_pull = re.compile("https://github.com/[^/\s]+/[^/\s]+/pull/[0-9]+")
-has_vote = re.compile("\s+([-+][01])\s*")
+github_user = re.compile(r"Git[Hh]ub user ([\w-]+)")
+github_pull = re.compile(r"https://github.com/[^/\s]+/[^/\s]+/pull/[0-9]+")
+has_vote = re.compile(r"\s+([-+][01])\s*")
 is_diff = re.compile("--- End diff --")
 
 
@@ -52,7 +48,7 @@
 
     def __init__(self, data):
         self.data = data
-        self.author = mstr(self.data['author']['name'])
+        self.author = self.data['author']['name']
         self.github_author = None
         self.githubPull = None
         self.githubComment = (self.author == "githubbot")
@@ -72,7 +68,7 @@
         return self.author
 
     def get_body(self):
-        return mstr(self.data['body'])
+        return self.data['body']
 
     def get_pull(self):
         return self.githubPull
@@ -104,16 +100,27 @@
         self.comments = None
 
     def get_id(self):
-        return mstr(self.key)
+        """
+        Get Jira ID as a string from the string stored in self.key
+        :return: Jira id, example "STORM-1234"
+        """
+        return self.key
+
+    def get_id_num(self):
+        """
+        Get Jira ID number as an integer from the string stored in self.key
+        :return: Numeric Jira Id as a number. Example "STORM-1234" and "ZKP-1234" will both return 1234
+        """
+        return int(self.key.split('-')[-1])
 
     def get_description(self):
-        return mstr(self.fields['description'])
+        return self.fields['description']
 
     def getReleaseNote(self):
         if self.notes is None:
             field = self.parent.fieldIdMap['Release Note']
-            if self.fields.has_key(field):
-                self.notes = mstr(self.fields[field])
+            if field in self.fields:
+                self.notes = self.fields[field]
             else:
                 self.notes = self.get_description()
         return self.notes
@@ -123,28 +130,28 @@
         status = self.fields['status']
         if status is not None:
             ret = status['name']
-        return mstr(ret)
+        return ret
 
     def get_priority(self):
         ret = ""
         pri = self.fields['priority']
         if pri is not None:
             ret = pri['name']
-        return mstr(ret)
+        return ret
 
     def get_assignee_email(self):
         ret = ""
         mid = self.fields['assignee']
         if mid is not None:
             ret = mid['emailAddress']
-        return mstr(ret)
+        return ret
 
     def get_assignee(self):
         ret = ""
         mid = self.fields['assignee']
         if mid is not None:
             ret = mid['displayName']
-        return mstr(ret)
+        return ret
 
     def get_components(self):
         return " , ".join([comp['name'] for comp in self.fields['components']])
@@ -162,21 +169,21 @@
         mid = self.fields['issuetype']
         if mid is not None:
             ret = mid['name']
-        return mstr(ret)
+        return ret
 
     def get_reporter(self):
         ret = ""
         mid = self.fields['reporter']
         if mid is not None:
             ret = mid['displayName']
-        return mstr(ret)
+        return ret
 
     def get_project(self):
         ret = ""
         mid = self.fields['project']
         if mid is not None:
             ret = mid['key']
-        return mstr(ret)
+        return ret
 
     def get_created(self):
         return jiratime(self.fields['created'])
@@ -191,11 +198,12 @@
             at = 0
             end = 1
             count = 100
-            while (at < end):
-                params = urllib.urlencode({'startAt': at, 'maxResults': count})
-                resp = urllib2.urlopen(self.parent.baseUrl + "/issue/" + jiraId + "/comment?" + params)
-                data = json.loads(resp.read())
-                if (data.has_key('errorMessages')):
+            while at < end:
+                params = urllib.parse.urlencode({'startAt': at, 'maxResults': count})
+                resp = urllib.request.urlopen(self.parent.baseUrl + "/issue/" + jiraId + "/comment?" + params)
+                resp_str = resp.read().decode()
+                data = json.loads(resp_str)
+                if 'errorMessages' in data:
                     raise Exception(data['errorMessages'])
                 at = data['startAt'] + data['maxResults']
                 end = data['total']
@@ -213,12 +221,12 @@
 
     def get_trimmed_comments(self, limit=40):
         comments = self.get_comments()
-        return comments if len(comments) < limit else comments[0:limit] + "..."
+        return comments if len(comments) < limit else comments[0:limit] + ["..."]
 
     def raw(self):
         return self.fields
 
-    def storm_jira_cmp(x, y):
+    def storm_jira_cmp(self, x, y):
         xn = x.get_id().split("-")[1]
         yn = y.get_id().split("-")[1]
         return int(xn) - int(yn)
@@ -229,17 +237,19 @@
 
     def __init__(self, baseUrl):
         self.baseUrl = baseUrl
-        resp = urllib2.urlopen(baseUrl + "/field")
-        data = json.loads(resp.read())
+        resp = urllib.request.urlopen(baseUrl + "/field")
+        resp_str = resp.read().decode()
+        data = json.loads(resp_str)
 
         self.fieldIdMap = {}
         for part in data:
             self.fieldIdMap[part['name']] = part['id']
 
     def get(self, id):
-        resp = urllib2.urlopen(self.baseUrl + "/issue/" + id)
-        data = json.loads(resp.read())
-        if (data.has_key('errorMessages')):
+        resp = urllib.request.urlopen(self.baseUrl + "/issue/" + id)
+        resp_str = resp.read().decode()
+        data = json.loads(resp_str)
+        if 'errorMessages' in data:
             raise Exception(data['errorMessages'])
         j = Jira(data, self)
         return j
@@ -249,12 +259,13 @@
         at = 0
         end = 1
         count = 100
-        while (at < end):
-            params = urllib.urlencode({'jql': query, 'startAt': at, 'maxResults': count})
+        while at < end:
+            params = urllib.parse.urlencode({'jql': query, 'startAt': at, 'maxResults': count})
             # print params
-            resp = urllib2.urlopen(self.baseUrl + "/search?%s" % params)
-            data = json.loads(resp.read())
-            if (data.has_key('errorMessages')):
+            resp = urllib.request.urlopen(self.baseUrl + "/search?%s" % params)
+            resp_str = resp.read().decode()
+            data = json.loads(resp_str)
+            if 'errorMessages' in data:
                 raise Exception(data['errorMessages'])
             at = data['startAt'] + data['maxResults']
             end = data['total']
@@ -268,18 +279,18 @@
         :param project: The JIRA project to search for unresolved issues
         :return: All JIRA issues that have the field resolution = Unresolved
         """
-        return self.query("project = " + project + " AND resolution = Unresolved")
+        return self.query(f"project = {project} AND resolution = Unresolved")
 
     def open_jiras(self, project):
         """
         :param project: The JIRA project to search for open issues
         :return: All JIRA issues that have the field status = Open
         """
-        return self.query("project = " + project + " AND status = Open")
+        return self.query(f"project = {project} AND status = Open")
 
     def in_progress_jiras(self, project):
         """
         :param project: The JIRA project to search for In Progress issues
         :return: All JIRA issues that have the field status = 'In Progress'
         """
-        return self.query("project = " + project + " AND status = 'In Progress'")
+        return self.query(f"project = {project} AND status = 'In Progress'")
diff --git a/dev-tools/release_notes.py b/dev-tools/release_notes.py
old mode 100644
new mode 100755
index c1e053a..1c8663d
--- a/dev-tools/release_notes.py
+++ b/dev-tools/release_notes.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -30,7 +30,7 @@
 import itertools, sys
 
 if len(sys.argv) < 2:
-    print >>sys.stderr, "Usage: release_notes.py <version>"
+    print("Usage: release_notes.py <version>", file=sys.stderr)
     sys.exit(1)
 
 version = sys.argv[1]
@@ -38,19 +38,22 @@
 JIRA_BASE_URL = 'https://issues.apache.org/jira'
 MAX_RESULTS = 100 # This is constrained for cloud instances so we need to fix this value
 
+
 def get_issues(jira, query, **kwargs):
     """
-    Get all issues matching the JQL query from the JIRA instance. This handles expanding paginated results for you. Any additional keyword arguments are forwarded to the JIRA.search_issues call.
+    Get all issues matching the JQL query from the JIRA instance. This handles expanding paginated results for you.
+    Any additional keyword arguments are forwarded to the JIRA.search_issues call.
     """
     results = []
     startAt = 0
     new_results = None
-    while new_results == None or len(new_results) == MAX_RESULTS:
+    while new_results is None or len(new_results) == MAX_RESULTS:
         new_results = jira.search_issues(query, startAt=startAt, maxResults=MAX_RESULTS, **kwargs)
         results += new_results
         startAt += len(new_results)
     return results
 
+
 def issue_link(issue):
     return "%s/browse/%s" % (JIRA_BASE_URL, issue.key)
 
@@ -59,10 +62,11 @@
     apache = JIRA(JIRA_BASE_URL)
     issues = get_issues(apache, 'project=STORM and fixVersion=%s' % version)
     if not issues:
-        print >>sys.stderr, "Didn't find any issues for the target fix version"
+        print("Didn't find any issues for the target fix version", file=sys.stderr)
         sys.exit(1)
 
-    # Some resolutions, including a lack of resolution, indicate that the bug hasn't actually been addressed and we shouldn't even be able to create a release until they are fixed
+    # Some resolutions, including a lack of resolution, indicate that the bug hasn't actually been addressed and we
+    # shouldn't even be able to create a release until they are fixed
     UNRESOLVED_RESOLUTIONS = [None,
                               "Unresolved",
                               "Duplicate",
@@ -77,42 +81,50 @@
                               "Workaround",
                               "Information Provided"
                               ]
-    unresolved_issues = [issue for issue in issues if issue.fields.resolution in UNRESOLVED_RESOLUTIONS or issue.fields.resolution.name in UNRESOLVED_RESOLUTIONS]
+    unresolved_issues = [issue for issue in issues
+                         if issue.fields.resolution in UNRESOLVED_RESOLUTIONS
+                         or issue.fields.resolution.name in UNRESOLVED_RESOLUTIONS]
     if unresolved_issues:
-        print >>sys.stderr, "The release is not completed since unresolved issues or improperly resolved issues were found still tagged with this release as the fix version:"
+        print("The release is not completed since unresolved issues or improperly resolved issues were found still "
+              "tagged with this release as the fix version:", file=sys.stderr)
         for issue in unresolved_issues:
-            print >>sys.stderr, "Unresolved issue: %15s %20s %s" % (issue.key, issue.fields.resolution, issue_link(issue))
-        print >>sys.stderr
-        print >>sys.stderr, "Note that for some resolutions, you should simply remove the fix version as they have not been truly fixed in this release."
+            print("Unresolved issue: %15s %20s %s" % (issue.key, issue.fields.resolution, issue_link(issue)),
+                  file=sys.stderr)
+        print('', file=sys.stderr)
+        print("Note that for some resolutions, you should simply remove the fix version as they have not been truly "
+              "fixed in this release.", file=sys.stderr)
         sys.exit(1)
 
-    # Get list of (issue type, [issues]) sorted by the issue ID type, with each subset of issues sorted by their key so they
-    # are in increasing order of bug #. To get a nice ordering of the issue types we customize the key used to sort by issue
-    # type a bit to ensure features and improvements end up first.
+    # Get list of (issue type, [issues]) sorted by the issue ID type, with each subset of issues sorted by their key so
+    # they are in increasing order of bug #. To get a nice ordering of the issue types we customize the key used to sort
+    # by issue type a bit to ensure features and improvements end up first.
     def issue_type_key(issue):
         if issue.fields.issuetype.name == 'New Feature':
             return -2
         if issue.fields.issuetype.name == 'Improvement':
             return -1
-        return issue.fields.issuetype.id
-    by_group = [(k,sorted(g, key=lambda issue: issue.id)) for k,g in itertools.groupby(sorted(issues, key=issue_type_key), lambda issue: issue.fields.issuetype.name)]
+        return int(issue.fields.issuetype.id)
 
-    print "<!DOCTYPE html>"
-    print "<html lang=\"en\">"
-    print "<head>"
-    print "<meta charset=\"utf-8\">"
-    print "<title>Storm %(version)s Release Notes</title>" % { 'version': version }
-    print "</head>"
-    print "<body>"
-    print "<h1>Release Notes for Storm %s</h1>" % version
-    print """<p>JIRA issues addressed in the %(version)s release of Storm. Documentation for this
-    release is available at the <a href="http://storm.apache.org/">Apache Storm
-    project site</a>.</p>""" % { 'version': version }
-    for itype, issues in by_group:
-        print "<h2>%s</h2>" % itype
-        print "<ul>"
-        for issue in issues:
-            print '<li>[<a href="%(link)s">%(key)s</a>] - %(summary)s</li>' % {'key': issue.key, 'link': issue_link(issue), 'summary': issue.fields.summary}
-        print "</ul>"
-    print "</body>"
-    print "</html>"
+    by_group = [(k, sorted(g, key=lambda issue: issue.id))
+                for k, g in itertools.groupby(sorted(issues, key=issue_type_key),
+                                              lambda issue: issue.fields.issuetype.name)]
+    issues_str = "\n".join([
+        f"\n\t<h2>{itype}</h2>" +
+        f"\n\t<ul>" +
+        '\n\t\t'.join([f'<li>[<a href="{issue_link(issue)}">{issue.key}</a>] - {issue.fields.summary}</li>' for issue in issues]) +
+        "\n\t</ul>"
+        for itype, issues in by_group])
+
+    print(f"""<!DOCTYPE html>
+<html lang="en">
+<head>
+<meta charset="utf-8">
+<title>Storm {version} Release Notes</title>
+</head>
+<body>
+<h1>Release Notes for Storm {version}</h1>
+<p>JIRA issues addressed in the {version} release of Storm. Documentation for this
+    release is available at the <a href="https://storm.apache.org/">Apache Storm project site</a>.</p>
+{issues_str}
+</body>
+</html>""")
diff --git a/dev-tools/report/formatter.py b/dev-tools/report/formatter.py
index 81f574d..1bdd558 100644
--- a/dev-tools/report/formatter.py
+++ b/dev-tools/report/formatter.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -12,14 +12,6 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-def encode(obj, encoding='UTF-8'):
-    """
-    Check if the object supports encode() method, and if so, encodes it.
-    Encoding defaults to UTF-8.
-    For example objects of type 'int' do not support encode
-    """
-    return obj.encode(encoding) if 'encode' in dir(obj) else obj
-
 class Formatter:
     def __init__(self, fields_tuple=(), row_tuple=(), min_width_tuple=None):
         # Format to pass as first argument to the print function, e.g. '%s%s%s'
@@ -46,7 +38,7 @@
         sizes = []
         padding = 3
         for i in range(0, len(row_tuple)):
-            max_len = max(len(encode(fields_tuple[i])), len(str(encode(row_tuple[i]))))
+            max_len = max(len(fields_tuple[i]), len(str(row_tuple[i])))
             if min_width_tuple is not None:
                 max_len = max(max_len, min_width_tuple[i])
             sizes += [max_len + padding]
@@ -62,7 +54,7 @@
     # Returns a tuple where each entry has a string that is the result of
     # statements with the pattern "{!s:43}".format("Text")
     def row_str_format(self, row_tuple):
-        format_with_values = [str(self.data_format[0].format(encode(row_tuple[0])))]
+        format_with_values = [self.data_format[0].format(row_tuple[0])]
         for i in range(1, len(row_tuple)):
-            format_with_values += [str(self.data_format[i].format(encode(row_tuple[i])))]
+            format_with_values += [self.data_format[i].format(row_tuple[i])]
         return tuple(format_with_values)
diff --git a/dev-tools/report/report.py b/dev-tools/report/report.py
old mode 100644
new mode 100755
index d2df077..bba78bc
--- a/dev-tools/report/report.py
+++ b/dev-tools/report/report.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -13,9 +13,8 @@
 #  limitations under the License.
 
 from datetime import datetime
-from github import mstr
 from jira_github import Jira
-from formatter import Formatter, encode
+from .formatter import Formatter
 
 
 def daydiff(a, b):
@@ -24,23 +23,25 @@
 
 class Report:
     now = datetime.utcnow()
+
     def __init__(self, header=''):
         self.header = header
 
     # if padding starts with - it puts padding before contents, otherwise after
     @staticmethod
     def _build_tuple(contents, padding=''):
-        if padding is not '':
+        if padding:
             out = []
             for i in range(len(contents)):
-                out += [padding[1:] + str(contents[i])] if padding[0] is '-' else [str(contents[i]) + padding]
+                out += [padding[1:] + str(contents[i])] if padding[0] == '-' else [str(contents[i]) + padding]
             return tuple(out)
         return contents
 
     # calls the native print function with the following format. Text1,Text2,... has the correct spacing
-    # print ("%s%s%s" % ("Text1, Text2, Text3))
+    # print("%s%s%s" % ("Text1, Text2, Text3))
     def print_(self, formatter, row_tuple):
-        print (formatter.format % formatter.row_str_format(row_tuple))
+        print(formatter.format % formatter.row_str_format(row_tuple))
+
 
 class JiraReport(Report):
     def __init__(self, issues, header=''):
@@ -58,9 +59,7 @@
 
     def values_view(self, excluded=None):
         temp_dic = dict(self.issues) if excluded is None else self.view(excluded)
-        values = temp_dic.values()
-        values.sort(Jira.storm_jira_cmp, reverse=True)
-        return values
+        return sorted(temp_dic.values(), key=lambda jira: jira.get_id_num(), reverse=True)
 
     @staticmethod
     def _row_tuple(jira):
@@ -71,7 +70,7 @@
         return -1, 43, -1, -1
 
     def print_report(self):
-        print "%s (Count = %s) " % (self.header, len(self.issues))
+        print(f"{self.header} (Count = {len(self.issues)}) ")
         jiras = self.values_view()
         fields_tuple = ('Jira Id', 'Summary', 'Created', 'Last Updated (Days)')
         row_tuple = self._row_tuple(jiras[0])
@@ -110,7 +109,7 @@
         return -1, 43, -1, -1, -1
 
     def print_report(self):
-        print "%s (Count = %s) " % (self.header, len(self.pull_requests))
+        print("%s (Count = %s) " % (self.header, len(self.pull_requests)))
 
         fields_tuple = self._build_tuple(('URL', 'Title', 'Created', 'Last Updated (Days)', 'User'), '')
         if len(self.pull_requests) > 0:
@@ -132,6 +131,7 @@
             jira_ids.append(pull.jira_id())
         return sorted(jira_ids)
 
+
 class JiraGitHubCombinedReport(Report):
     def __init__(self, jira_report, github_report, header='', print_comments=False):
         Report.__init__(self, header)
@@ -158,7 +158,7 @@
 
     def _jira_id(self, pull_idx):
         pull = self._pull_request(pull_idx)
-        return encode(pull.jira_id())
+        return str(pull.jira_id())
 
     def _jira_issue(self, jira_id):
         return self.jira_report.issues[jira_id]
@@ -168,11 +168,11 @@
         jira_id = self._jira_id(pull_idx)
         jira_issue = self._jira_issue(jira_id)
 
-        return (jira_id, mstr(pull), jira_issue.get_trimmed_summary(),
+        return (jira_id, str(pull) if pull else "No PR", jira_issue.get_trimmed_summary(),
                 daydiff(Report.now, jira_issue.get_created()),
-                daydiff(Report.now, pull.created_at()),
+                daydiff(Report.now, pull.created_at() if pull else "No PR"),
                 daydiff(Report.now, jira_issue.get_updated()),
-                daydiff(Report.now, pull.updated_at()),
+                daydiff(Report.now, pull.updated_at() if pull else "No PR"),
                 jira_issue.get_status(), pull.user())
 
     def _row_tuple_1(self, pull_idx, comment_idx):
@@ -188,7 +188,10 @@
 
     # variables and method names ending with _1 correspond to the comments part
     def print_report(self, print_comments=False):
-        print "%s (Count = %s) " % (self.header, len(self.github_report.pull_requests))
+        pull_request_cnt = len(self.github_report.pull_requests)
+        print("%s (Count = %s) " % (self.header, pull_request_cnt))
+        if not pull_request_cnt:
+            return
 
         fields_tuple = ('JIRA ID', 'Pull Request', 'Jira Summary', 'JIRA Age',
                         'Pull Age', 'JIRA Update Age', 'Pull Update Age (Days)',
@@ -197,15 +200,12 @@
         formatter = Formatter(fields_tuple, row_tuple)
         self.print_(formatter, fields_tuple)
 
-        row_tuple_1 = ()
-        formatter_1 = Formatter()
-
         if print_comments or self.print_comments:
             fields_tuple_1 = self._build_tuple(('Comment Vote', 'Comment Author', 'Pull URL', 'Comment Age'), '-\t\t')
             row_tuple_1 = self._build_tuple(self._row_tuple_1(*self._idx_1st_comment_with_vote()), '-\t\t')
             formatter_1 = Formatter(fields_tuple_1, row_tuple_1)
             self.print_(formatter_1, fields_tuple_1)
-            print ''
+            print('')
 
         for p in range(0, len(self.github_report.pull_requests)):
             row_tuple = self._row_tuple(p)
@@ -219,10 +219,11 @@
                     if comment.has_vote():
                         row_tuple_1 = self._build_tuple(self._row_tuple_1(p, c), '-\t\t')
                         if row_tuple_1 is not None:
+                            formatter_1 = Formatter()
                             self.print_(formatter_1, row_tuple_1)
                             has_vote = True
                 if has_vote:
-                    print ''
+                    print('')
 
 
 class CompleteReport(Report):
@@ -233,8 +234,8 @@
         self.jira_github_combined_reports = []
 
     def print_all(self):
-        if self.header is not '':
-            print self.header
+        if self.header:
+            print(self.header)
 
         self._print_github_reports()
         self._print_jira_github_combined_reports()
diff --git a/dev-tools/report/report_builder.py b/dev-tools/report/report_builder.py
index 4b8a468..6f5c842 100644
--- a/dev-tools/report/report_builder.py
+++ b/dev-tools/report/report_builder.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -11,7 +11,7 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
-from report import CompleteReport, GitHubReport, JiraReport, JiraGitHubCombinedReport
+from .report import CompleteReport, GitHubReport, JiraReport, JiraGitHubCombinedReport
 
 
 class ReportBuilder:
@@ -41,8 +41,10 @@
 
         # all unresolved JIRA issues
         jira_unresolved = JiraReport(self.jira_repo.unresolved_jiras("STORM"))
-        jira_open = JiraReport(dict((x, y) for x, y in self.jira_repo.unresolved_jiras("STORM").items() if y.get_status() == 'Open'))
-        jira_in_progress = JiraReport(dict((x, y) for x, y in self.jira_repo.in_progress_jiras("STORM").items() if y.get_status() == 'In Progress'),
+        jira_open = JiraReport(dict((x, y) for x, y in self.jira_repo.unresolved_jiras("STORM").items()
+                                    if y.get_status().lower() == 'open'))
+        jira_in_progress = JiraReport(dict((x, y) for x, y in self.jira_repo.in_progress_jiras("STORM").items()
+                                           if y.get_status() == 'In Progress'),
                                       "\nIN PROGRESS JIRA ISSUES")
 
         for pull in github_open.pull_requests:
diff --git a/dev-tools/storm-merge.py b/dev-tools/storm-merge.py
index 33690c2..c96a71b 100755
--- a/dev-tools/storm-merge.py
+++ b/dev-tools/storm-merge.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
 #	 Licensed under the Apache License, Version 2.0 (the "License");
 #	 you may not use this file except in compliance with the License.
 #	 You may obtain a copy of the License at
@@ -14,18 +14,20 @@
 from github import GitHub
 from optparse import OptionParser
 
-def main():
-	parser = OptionParser(usage="usage: %prog [options] [pull number]")
-	parser.add_option("-g", "--github-user", dest="gituser",
-			type="string", help="github user, if not supplied no auth is used", metavar="USER")
-	
-	(options, args) = parser.parse_args()
-	github = GitHub(options)
 
-        for pullNumber in args:
-		pull = github.pull("apache", "storm", pullNumber)
-		print "git pull --no-ff "+pull.from_repo()+" "+pull.from_branch()
+def main():
+    parser = OptionParser(usage="usage: %prog [options] [pull number]")
+    parser.add_option("-g", "--github-user", dest="gituser",
+                      type="string", help="github user, if not supplied no auth is used", metavar="USER")
+
+    (options, args) = parser.parse_args()
+    github = GitHub(options)
+
+    for pullNumber in args:
+        pull = github.pull("apache", "storm", pullNumber)
+        print("git pull --no-ff " + pull.from_repo() + " " + pull.from_branch())
+
 
 if __name__ == "__main__":
-	main()
+    main()
 
diff --git a/dev-tools/travis/ratprint.py b/dev-tools/travis/ratprint.py
index 5031045..70e0d95 100755
--- a/dev-tools/travis/ratprint.py
+++ b/dev-tools/travis/ratprint.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -14,13 +14,13 @@
 import sys
 import re
 
-p = re.compile('Unapproved licenses:\s*([^\s\*]*).*\*\*\*')
+p = re.compile(r'Unapproved licenses:\s*([^\s\*]*).*\*\*\*')
 
 with open (sys.argv[1]) as ratfile:
-  rat = ratfile.read().replace('\n','')
+    rat = ratfile.read().replace('\n', '')
 
 matches = p.search(rat)
-failed =  matches.group(1)
+failed = matches.group(1)
 
-if re.search('\S', failed):
-  print failed
+if re.search(r'\S', failed):
+    print(failed)
diff --git a/dev-tools/travis/save-logs.py b/dev-tools/travis/save-logs.py
index 5f4ad28..a3a5215 100755
--- a/dev-tools/travis/save-logs.py
+++ b/dev-tools/travis/save-logs.py
@@ -1,4 +1,4 @@
-#!/usr/bin/python
+#!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 #  Licensed under the Apache License, Version 2.0 (the "License");
 #  you may not use this file except in compliance with the License.
@@ -16,13 +16,14 @@
 import subprocess
 from datetime import datetime, timedelta
 
+
 def main(file, cmd):
-    print cmd, "writing to", file
+    print(cmd, "writing to", file)
     out = open(file, "w")
     count = 0
     process = subprocess.Popen(cmd,
-                           stderr=subprocess.STDOUT,
-                           stdout=subprocess.PIPE)
+                               stderr=subprocess.STDOUT,
+                               stdout=subprocess.PIPE)
 
     start = datetime.now()
     nextPrint = datetime.now() + timedelta(seconds=1)
@@ -30,10 +31,11 @@
     pout = process.stdout
     line = pout.readline()
     while line:
+        line = line.decode('utf-8')
         count = count + 1
         if datetime.now() > nextPrint:
             diff = datetime.now() - start
-            sys.stdout.write("\r%d seconds %d log lines"%(diff.seconds, count))
+            sys.stdout.write(f"\r{diff.seconds} seconds {count} log lines")
             sys.stdout.flush()
             nextPrint = datetime.now() + timedelta(seconds=10)
         out.write(line)
@@ -41,14 +43,15 @@
     out.close()
     errcode = process.wait()
     diff = datetime.now() - start
-    sys.stdout.write("\r%d seconds %d log lines"%(diff.seconds, count))
-    print
-    print cmd, "done", errcode
+    sys.stdout.write(f"\r{diff.seconds} seconds {count} log lines")
+    print()
+    print(cmd, "done", errcode)
     return errcode
 
+
 if __name__ == "__main__":
-    if sys.argv < 1:
-        print "Usage: %s [file info]" % sys.argv[0]
+    if len(sys.argv) < 3:
+        print(f"Usage: {sys.argv[0]} <file-path> <cmd>")
         sys.exit(1)
 
     sys.exit(main(sys.argv[1], sys.argv[2:]))
diff --git a/dev-tools/travis/travis-install.sh b/dev-tools/travis/travis-install.sh
index d4ca821..6080625 100755
--- a/dev-tools/travis/travis-install.sh
+++ b/dev-tools/travis/travis-install.sh
@@ -11,41 +11,40 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-echo "Python version :  " `python -V 2>&1`
-echo "Python3 version :  " `python3 -V 2>&1`
-echo "Pip2 version :  " `pip2 --version 2>&1`
-echo "Pip3 version :  " `pip3 --version 2>&1`
+# shellcheck disable=SC2006
+echo "Python version :  $(python -V 2>&1)  (note python2 is not supported) "
+echo "Python3 version :  $(python3 -V 2>&1) "
+echo "Pip3 version :  $(pip3 --version 2>&1) "
 
 
-echo "Maven version  :  " `mvn -v`
+echo "Maven version  :  $(mvn -v)"
 
 STORM_SRC_ROOT_DIR=$1
 
 TRAVIS_SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
 
-pip2 install --user -r ${TRAVIS_SCRIPT_DIR}/requirements.txt
-pip3 install --user -r ${TRAVIS_SCRIPT_DIR}/requirements.txt
+pip3 install --user -r "${TRAVIS_SCRIPT_DIR}"/requirements.txt
 
-python ${TRAVIS_SCRIPT_DIR}/save-logs.py "storm-shaded-deps/install-shade.txt" mvn clean install --batch-mode -pl storm-shaded-deps -am
+python3 "${TRAVIS_SCRIPT_DIR}"/save-logs.py "storm-shaded-deps/install-shade.txt" mvn clean install --batch-mode -pl storm-shaded-deps -am
 BUILD_RET_VAL=$?
 if [[ "$BUILD_RET_VAL" != "0" ]];
 then
-  cat "storm-shaded-deps/install-shade.txt"
-  exit ${BUILD_RET_VAL}
+    cat "storm-shaded-deps/install-shade.txt"
+    exit ${BUILD_RET_VAL}
 fi
 
-cd ${STORM_SRC_ROOT_DIR}
-python ${TRAVIS_SCRIPT_DIR}/save-logs.py "install.txt" mvn clean install -DskipTests -Pnative,examples,externals -pl '!storm-shaded-deps' --batch-mode
+cd "${STORM_SRC_ROOT_DIR}" || ( echo "Cannot cd to ${STORM_SRC_ROOT_DIR}"; exit 1 )
+python3 "${TRAVIS_SCRIPT_DIR}"/save-logs.py "install.txt" mvn clean install -DskipTests -Pnative,examples,externals -pl '!storm-shaded-deps' --batch-mode
 BUILD_RET_VAL=$?
 
 if [[ "$BUILD_RET_VAL" != "0" ]];
 then
-  cat "install.txt"
-  echo "Looking for unapproved licenses"
-  for rat in `find . -name rat.txt`;
-  do
-    python ${TRAVIS_SCRIPT_DIR}/ratprint.py "${rat}"
-  done
+    cat "install.txt"
+    echo "Looking for unapproved licenses"
+    for rat in $(find . -name rat.txt)
+    do
+        python3 "${TRAVIS_SCRIPT_DIR}"/ratprint.py "${rat}"
+    done
 fi
 
 
diff --git a/dev-tools/travis/travis-script.sh b/dev-tools/travis/travis-script.sh
index b743688..6908596 100755
--- a/dev-tools/travis/travis-script.sh
+++ b/dev-tools/travis/travis-script.sh
@@ -11,10 +11,10 @@
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-echo "Python version :  " `python -V 2>&1`
-echo "Ruby version   :  " `ruby -v`
-echo "NodeJs version :  " `node -v`
-echo "Maven version  :  " `mvn -v`
+echo "Python3 version:  " $(python3 -V 2>&1)
+echo "Ruby version   :  " $(ruby -v)
+echo "NodeJs version :  " $(node -v)
+echo "Maven version  :  " $(mvn -v)
 
 set -x
 
@@ -22,31 +22,31 @@
 
 TRAVIS_SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
 
-cd ${STORM_SRC_ROOT_DIR}
+cd "${STORM_SRC_ROOT_DIR}" || (echo "Cannot cd to ${STORM_SRC_ROOT_DIR}"; exit 1)
 
 if [ "$2" == "Integration-Test" ]
-  then
-  exec ./integration-test/run-it.sh
+    then
+    exec ./integration-test/run-it.sh
 elif [ "$2" == "Check-Updated-License-Files" ]
-  then
-  exec python3.6 dev-tools/validate-license-files.py --skip-build-storm
+    then
+    exec python3 dev-tools/validate-license-files.py --skip-build-storm
 elif [ "$2" == "Client" ]
 then
-  TEST_MODULES=storm-client
+    TEST_MODULES=storm-client
 elif [ "$2" == "Server" ]
 then
-  TEST_MODULES=storm-server,storm-webapp
+    TEST_MODULES=storm-server,storm-webapp
 elif [ "$2" == "Core" ]
 then
-  TEST_MODULES=storm-core
+    TEST_MODULES=storm-core
 elif [ "$2" == "External" ]
 then
-  if [ "$TRAVIS_JDK_VERSION" == "openjdk11" ]
-  then 
-    TEST_MODULES='!storm-client,!storm-server,!storm-core,!storm-webapp,!storm-shaded-deps,!external/storm-cassandra,!external/storm-hive,!external/storm-hdfs,!external/storm-hbase,!sql/storm-sql-external/storm-sql-hdfs,!external/storm-hdfs-blobstore'
-  else
-    TEST_MODULES='!storm-client,!storm-server,!storm-core,!storm-webapp,!storm-shaded-deps'
-  fi
+    if [ "$TRAVIS_JDK_VERSION" == "openjdk11" ]
+    then
+        TEST_MODULES='!storm-client,!storm-server,!storm-core,!storm-webapp,!storm-shaded-deps,!external/storm-cassandra,!external/storm-hive,!external/storm-hdfs,!external/storm-hbase,!sql/storm-sql-external/storm-sql-hdfs,!external/storm-hdfs-blobstore'
+    else
+        TEST_MODULES='!storm-client,!storm-server,!storm-core,!storm-webapp,!storm-shaded-deps'
+    fi
 fi
 # We should be concerned that Travis CI could be very slow because it uses VM
 export STORM_TEST_TIMEOUT_MS=150000
@@ -56,10 +56,10 @@
 mvn --batch-mode test -fae -Pnative,all-tests,examples,externals -Prat -pl "$TEST_MODULES"
 BUILD_RET_VAL=$?
 
-for dir in `find . -type d -and -wholename \*/target/\*-reports`;
+for dir in $(find . -type d -and -wholename \*/target/\*-reports)
 do
   echo "Looking for errors in ${dir}"
-  python ${TRAVIS_SCRIPT_DIR}/print-errors-from-test-reports.py "${dir}"
+  python3 "${TRAVIS_SCRIPT_DIR}"/print-errors-from-test-reports.py "${dir}"
 done
 
 exit ${BUILD_RET_VAL}
diff --git a/dev-tools/validate-license-files.py b/dev-tools/validate-license-files.py
index 66da6c0..98e6d0e 100755
--- a/dev-tools/validate-license-files.py
+++ b/dev-tools/validate-license-files.py
@@ -18,13 +18,11 @@
 
 
 from contextlib import contextmanager
-from random import randint
 from pathlib import Path
 import os
 import subprocess
 import shlex
 import filecmp
-import shutil
 import re
 import itertools
 import argparse
diff --git a/storm-multilang/python/src/main/resources/resources/storm.py b/storm-multilang/python/src/main/resources/resources/storm.py
index 0af6e8b..b11a7f0 100755
--- a/storm-multilang/python/src/main/resources/resources/storm.py
+++ b/storm-multilang/python/src/main/resources/resources/storm.py
@@ -1,3 +1,4 @@
+#!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 
 # Licensed to the Apache Software Foundation (ASF) under one
@@ -29,8 +30,9 @@
 json_encode = lambda x: json.dumps(x)
 json_decode = lambda x: json.loads(x)
 
-#reads lines and reconstructs newlines appropriately
+
 def readMsg():
+    """reads lines and reconstructs newlines appropriately"""
     msg = ""
     while True:
         line = sys.stdin.readline()
@@ -41,12 +43,14 @@
         msg = msg + line
     return json_decode(msg[0:-1])
 
+
 MODE = None
 ANCHOR_TUPLE = None
 
-#queue up commands we read while trying to read taskids
+# queue up commands we read while trying to read taskids
 pending_commands = deque()
 
+
 def readTaskIds():
     if pending_taskids:
         return pending_taskids.popleft()
@@ -57,9 +61,11 @@
             msg = readMsg()
         return msg
 
+
 #queue up taskids we read while trying to read commands/tuples
 pending_taskids = deque()
 
+
 def readCommand():
     if pending_commands:
         return pending_commands.popleft()
@@ -70,31 +76,38 @@
             msg = readMsg()
         return msg
 
+
 def readTuple():
     cmd = readCommand()
     return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
 
+
 def sendMsgToParent(msg):
     print(json_encode(msg))
     print("end")
     sys.stdout.flush()
 
+
 def sync():
     sendMsgToParent({'command':'sync'})
 
+
 def sendpid(heartbeatdir):
     pid = os.getpid()
     sendMsgToParent({'pid':pid})
     open(heartbeatdir + "/" + str(pid), "w").close()
 
+
 def emit(*args, **kwargs):
     __emit(*args, **kwargs)
     return readTaskIds()
 
+
 def emitDirect(task, *args, **kwargs):
     kwargs["directTask"] = task
     __emit(*args, **kwargs)
 
+
 def __emit(*args, **kwargs):
     global MODE
     if MODE == Bolt:
@@ -102,7 +115,8 @@
     elif MODE == Spout:
         emitSpout(*args, **kwargs)
 
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
+
+def emitBolt(tup, stream=None, anchors=[], directTask=None):
     global ANCHOR_TUPLE
     if ANCHOR_TUPLE is not None:
         anchors = [ANCHOR_TUPLE]
@@ -115,6 +129,7 @@
     m["tuple"] = tup
     sendMsgToParent(m)
 
+
 def emitSpout(tup, stream=None, id=None, directTask=None):
     m = {"command": "emit"}
     if id is not None:
@@ -126,41 +141,53 @@
     m["tuple"] = tup
     sendMsgToParent(m)
 
+
 def ack(tup):
     sendMsgToParent({"command": "ack", "id": tup.id})
 
+
 def fail(tup):
     sendMsgToParent({"command": "fail", "id": tup.id})
 
+
 def reportError(msg):
     sendMsgToParent({"command": "error", "msg": msg})
 
+
 def log(msg, level=2):
     sendMsgToParent({"command": "log", "msg": msg, "level":level})
 
+
 def logTrace(msg):
     log(msg, 0)
 
+
 def logDebug(msg):
     log(msg, 1)
 
+
 def logInfo(msg):
     log(msg, 2)
 
+
 def logWarn(msg):
     log(msg, 3)
 
+
 def logError(msg):
     log(msg, 4)
 
+
 def rpcMetrics(name, params):
     sendMsgToParent({"command": "metrics", "name": name, "params": params})
 
+
 def initComponent():
     setupInfo = readMsg()
     sendpid(setupInfo['pidDir'])
     return [setupInfo['conf'], setupInfo['context']]
 
+
 class Tuple(object):
     def __init__(self, id, component, stream, task, values):
         self.id = id
@@ -177,6 +204,7 @@
     def is_heartbeat_tuple(self):
         return self.task == -1 and self.stream == "__heartbeat"
 
+
 class Bolt(object):
     def initialize(self, stormconf, context):
         pass
@@ -199,6 +227,7 @@
         except Exception:
             reportError(traceback.format_exc())
 
+
 class BasicBolt(object):
     def initialize(self, stormconf, context):
         pass
@@ -228,6 +257,7 @@
         except Exception:
             reportError(traceback.format_exc())
 
+
 class Spout(object):
     def initialize(self, conf, context):
         pass