[STORM-3271] enable supervisors to launch a worker inside a docker container (#3287)

diff --git a/SECURITY.md b/SECURITY.md
index 5e321e5..e41c31a 100644
--- a/SECURITY.md
+++ b/SECURITY.md
@@ -405,7 +405,7 @@
 
 There are several files that go along with this that need to be configured properly to make storm secure.
 
-The `worker-launcher` executable is a special program that allows the supervisor to launch workers as different users.  For this to work, `worker-launcher` needs to be owned by root, but with the group set to be a group that only the supervisor headless user is a part of.  `worker-launcher` also needs to have `6550` octal permissions.  There is also a `worker-launcher.cfg` file, usually located under /etc/, that should look something like the following:
+The `worker-launcher` executable is a special program that allows the supervisor to launch workers as different users.  For this to work, `worker-launcher` needs to be owned by root, but with the group set to be a group that only the supervisor headless user is a part of.  `worker-launcher` also needs to have `6550` octal permissions.  There is also a `worker-launcher.cfg` file, usually located under `/etc/storm`, that should look something like the following:
 
 ```ini
 storm.worker-launcher.group=$(worker_launcher_group)
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 9d78303..8b59c0a 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -398,6 +398,9 @@
 storm.supervisor.medium.memory.threshold.mb: 1536
 storm.supervisor.medium.memory.grace.period.ms: 30000
 
+storm.oci.cgroup.root: "/sys/fs/cgroup"
+storm.oci.cgroup.parent: "/storm"
+storm.oci.nscd.dir: "/var/run/nscd"
 storm.worker.min.cpu.pcore.percent: 0.0
 
 storm.topology.classpath.beginning.enabled: false
@@ -406,6 +409,7 @@
     "CGroupMemoryLimit": "org.apache.storm.metric.cgroup.CGroupMemoryLimit"
     "CGroupCpu": "org.apache.storm.metric.cgroup.CGroupCpu"
     "CGroupCpuGuarantee": "org.apache.storm.metric.cgroup.CGroupCpuGuarantee"
+    "CGroupCpuGuaranteeByCfsQuota": "org.apache.storm.metric.cgroup.CGroupCpuGuaranteeByCfsQuota"
 
 # The number of buckets for running statistics
 num.stat.buckets: 20
diff --git a/conf/seccomp.json.example b/conf/seccomp.json.example
new file mode 100644
index 0000000..3c1f550
--- /dev/null
+++ b/conf/seccomp.json.example
@@ -0,0 +1,407 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+#json format doesn't support comments. Please remove this line and above before using this file.
+
+{
+	"defaultAction": "SCMP_ACT_ERRNO",
+	"syscalls": [
+		{
+			"names": [
+				"accept",
+				"accept4",
+				"access",
+				"alarm",
+				"alarm",
+				"bind",
+				"brk",
+				"capget",
+				"capset",
+				"chdir",
+				"chmod",
+				"chown",
+				"chown32",
+				"clock_getres",
+				"clock_gettime",
+				"clock_nanosleep",
+				"close",
+				"connect",
+				"copy_file_range",
+				"creat",
+				"dup",
+				"dup2",
+				"dup3",
+				"epoll_create",
+				"epoll_create1",
+				"epoll_ctl",
+				"epoll_ctl_old",
+				"epoll_pwait",
+				"epoll_wait",
+				"epoll_wait_old",
+				"eventfd",
+				"eventfd2",
+				"execve",
+				"execveat",
+				"exit",
+				"exit_group",
+				"faccessat",
+				"fadvise64",
+				"fadvise64_64",
+				"fallocate",
+				"fanotify_mark",
+				"fchdir",
+				"fchmod",
+				"fchmodat",
+				"fchown",
+				"fchown32",
+				"fchownat",
+				"fcntl",
+				"fcntl64",
+				"fdatasync",
+				"fgetxattr",
+				"flistxattr",
+				"flock",
+				"fork",
+				"fremovexattr",
+				"fsetxattr",
+				"fstat",
+				"fstat64",
+				"fstatat64",
+				"fstatfs",
+				"fstatfs64",
+				"fsync",
+				"ftruncate",
+				"ftruncate64",
+				"futex",
+				"futimesat",
+				"getcpu",
+				"getcwd",
+				"getdents",
+				"getdents64",
+				"getegid",
+				"getegid32",
+				"geteuid",
+				"geteuid32",
+				"getgid",
+				"getgid32",
+				"getgroups",
+				"getgroups32",
+				"getitimer",
+				"getpeername",
+				"getpgid",
+				"getpgrp",
+				"getpid",
+				"getppid",
+				"getpriority",
+				"getrandom",
+				"getresgid",
+				"getresgid32",
+				"getresuid",
+				"getresuid32",
+				"getrlimit",
+				"get_robust_list",
+				"getrusage",
+				"getsid",
+				"getsockname",
+				"getsockopt",
+				"get_thread_area",
+				"gettid",
+				"gettimeofday",
+				"getuid",
+				"getuid32",
+				"getxattr",
+				"inotify_add_watch",
+				"inotify_init",
+				"inotify_init1",
+				"inotify_rm_watch",
+				"io_cancel",
+				"ioctl",
+				"io_destroy",
+				"io_getevents",
+				"ioprio_get",
+				"ioprio_set",
+				"io_setup",
+				"io_submit",
+				"ipc",
+				"kill",
+				"lchown",
+				"lchown32",
+				"lgetxattr",
+				"link",
+				"linkat",
+				"listen",
+				"listxattr",
+				"llistxattr",
+				"_llseek",
+				"lremovexattr",
+				"lseek",
+				"lsetxattr",
+				"lstat",
+				"lstat64",
+				"madvise",
+				"mbind",
+				"memfd_create",
+				"mincore",
+				"mkdir",
+				"mkdirat",
+				"mknod",
+				"mknodat",
+				"mlock",
+				"mlock2",
+				"mlockall",
+				"mmap",
+				"mmap2",
+				"mprotect",
+				"mq_getsetattr",
+				"mq_notify",
+				"mq_open",
+				"mq_timedreceive",
+				"mq_timedsend",
+				"mq_unlink",
+				"mremap",
+				"msgctl",
+				"msgget",
+				"msgrcv",
+				"msgsnd",
+				"msync",
+				"munlock",
+				"munlockall",
+				"munmap",
+				"nanosleep",
+				"newfstatat",
+				"_newselect",
+				"open",
+				"openat",
+				"pause",
+				"pipe",
+				"pipe2",
+				"poll",
+				"ppoll",
+				"prctl",
+				"pread64",
+				"preadv",
+				"prlimit64",
+				"pselect6",
+				"pwrite64",
+				"pwritev",
+				"read",
+				"readahead",
+				"readlink",
+				"readlinkat",
+				"readv",
+				"recv",
+				"recvfrom",
+				"recvmmsg",
+				"recvmsg",
+				"remap_file_pages",
+				"removexattr",
+				"rename",
+				"renameat",
+				"renameat2",
+				"restart_syscall",
+				"rmdir",
+				"rt_sigaction",
+				"rt_sigpending",
+				"rt_sigprocmask",
+				"rt_sigqueueinfo",
+				"rt_sigreturn",
+				"rt_sigsuspend",
+				"rt_sigtimedwait",
+				"rt_tgsigqueueinfo",
+				"sched_getaffinity",
+				"sched_getattr",
+				"sched_getparam",
+				"sched_get_priority_max",
+				"sched_get_priority_min",
+				"sched_getscheduler",
+				"sched_rr_get_interval",
+				"sched_setaffinity",
+				"sched_setattr",
+				"sched_setparam",
+				"sched_setscheduler",
+				"sched_yield",
+				"seccomp",
+				"select",
+				"semctl",
+				"semget",
+				"semop",
+				"semtimedop",
+				"send",
+				"sendfile",
+				"sendfile64",
+				"sendmmsg",
+				"sendmsg",
+				"sendto",
+				"setfsgid",
+				"setfsgid32",
+				"setfsuid",
+				"setfsuid32",
+				"setgid",
+				"setgid32",
+				"setgroups",
+				"setgroups32",
+				"setitimer",
+				"setpgid",
+				"setpriority",
+				"setregid",
+				"setregid32",
+				"setresgid",
+				"setresgid32",
+				"setresuid",
+				"setresuid32",
+				"setreuid",
+				"setreuid32",
+				"setrlimit",
+				"set_robust_list",
+				"setsid",
+				"setsockopt",
+				"set_thread_area",
+				"set_tid_address",
+				"setuid",
+				"setuid32",
+				"setxattr",
+				"shmat",
+				"shmctl",
+				"shmdt",
+				"shmget",
+				"shutdown",
+				"sigaltstack",
+				"signalfd",
+				"signalfd4",
+				"sigreturn",
+				"socket",
+				"socketcall",
+				"socketpair",
+				"splice",
+				"stat",
+				"stat64",
+				"statfs",
+				"statfs64",
+				"symlink",
+				"symlinkat",
+				"sync",
+				"sync_file_range",
+				"syncfs",
+				"sysinfo",
+				"syslog",
+				"tee",
+				"tgkill",
+				"time",
+				"timer_create",
+				"timer_delete",
+				"timerfd_create",
+				"timerfd_gettime",
+				"timerfd_settime",
+				"timer_getoverrun",
+				"timer_gettime",
+				"timer_settime",
+				"times",
+				"tkill",
+				"truncate",
+				"truncate64",
+				"ugetrlimit",
+				"umask",
+				"uname",
+				"unlink",
+				"unlinkat",
+				"utime",
+				"utimensat",
+				"utimes",
+				"vfork",
+				"vmsplice",
+				"wait4",
+				"waitid",
+				"waitpid",
+				"write",
+				"writev",
+				"mount",
+				"umount2",
+				"reboot",
+				"name_to_handle_at",
+				"unshare"
+			],
+			"action": "SCMP_ACT_ALLOW"
+		},
+		{
+			"names": [
+				"personality"
+			],
+			"action": "SCMP_ACT_ALLOW",
+			"args": [
+				{
+					"index": 0,
+					"value": 0,
+					"valueTwo": 0,
+					"op": "SCMP_CMP_EQ"
+				}
+			]
+		},
+		{
+			"names": [
+				"personality"
+			],
+			"action": "SCMP_ACT_ALLOW",
+			"args": [
+				{
+					"index": 0,
+					"value": 8,
+					"valueTwo": 0,
+					"op": "SCMP_CMP_EQ"
+				}
+			]
+		},
+		{
+			"names": [
+				"personality"
+			],
+			"action": "SCMP_ACT_ALLOW",
+			"args": [
+				{
+					"index": 0,
+					"value": 4294967295,
+					"valueTwo": 0,
+					"op": "SCMP_CMP_EQ"
+				}
+			]
+		},
+		{
+			"names": [
+				"arch_prctl"
+			],
+			"action": "SCMP_ACT_ALLOW"
+		},
+		{
+			"names": [
+				"modify_ldt"
+			],
+			"action": "SCMP_ACT_ALLOW"
+		},
+		{
+			"names": [
+				"clone"
+			],
+			"action": "SCMP_ACT_ALLOW",
+			"args": [
+				{
+					"index": 0,
+					"value": 2080505856,
+					"valueTwo": 0,
+					"op": "SCMP_CMP_MASKED_EQ"
+				}
+			]
+		}
+	]
+}
\ No newline at end of file
diff --git a/docs/Docker-support.md b/docs/Docker-support.md
new file mode 100644
index 0000000..a12532b
--- /dev/null
+++ b/docs/Docker-support.md
@@ -0,0 +1,135 @@
+---
+title: Docker Support
+layout: documentation
+documentation: true
+---
+
+# Docker Support
+
+This page describes how storm supervisor launches the worker in a docker container. 
+
+Note: This has only been tested on RHEL7.
+
+## Motivation
+
+This feature is mostly about security and portability. With workers running inside of docker containers, we isolate running user code from each other and from the hosted machine so that the whole system is less vulnerable to attack. 
+It also allows users to run their topologies on different os versions using different docker images.
+
+## Implementation
+
+Essentially, `DockerManager` composes a docker-run command and uses `worker-launcher` executable to execute the command 
+to launch a container. The `storm-worker-script.sh` script is the actual command to launch the worker process and logviewer in the container.
+One container ID is mapped to one worker ID conceptually. When the worker process dies, the container exits. 
+
+For security, when the supervisor launches the docker container, it makes the whole container read-only except some explicit bind mount locations.
+It also drops all the kernel capabilities and disables container processes from gaining new privileges. 
+
+For security reasons, we can drop privileges of containers like PTRACE. Consequently, `jmap`, `strace` and some other debugging tools cannot be used directly in the container when entered with docker-exec command. 
+We need to install `nscd` and have it running in the system. Storm will bind mount nscd directory when it launches the container. 
+And `nsenter` will be used to enter the docker container without losing privileges. This functionality is also implemented in `worker-launcher` executable.
+
+The command that will be run by `worker-launcher` executable to launch a container will be something like:
+
+```bash
+run --name=8198e1f0-f323-4b9d-8625-e4fd640cd058 \
+--user=<uid>:<gid> \
+-d \
+--net=host \
+--read-only \
+-v /sys/fs/cgroup:/sys/fs/cgroup:ro \
+-v /usr/share/apache-storm-2.3.0:/usr/share/apache-storm-2.3.0:ro \
+-v /<storm-local-dir>/supervisor:/<storm-local-dir>/supervisor:ro \
+-v /<storm-local-dir>/workers/8198e1f0-f323-4b9d-8625-e4fd640cd058:/<storm-local-dir>/workers/8198e1f0-f323-4b9d-8625-e4fd640cd058 \
+-v /<workers-artifacts-dir>/workers-artifacts/word-count-1-1591895933/6703:/<workers-artifacts-dir>/workers-artifacts/word-count-1-1591895933/6703 \
+-v /<storm-local-dir>/workers-users/8198e1f0-f323-4b9d-8625-e4fd640cd058:/<storm-local-dir>/workers-users/8198e1f0-f323-4b9d-8625-e4fd640cd058 \
+-v /var/run/nscd:/var/run/nscd \
+-v /<storm-local-dir>/supervisor/stormdist/word-count-1-1591895933/shared_by_topology:/<storm-local-dir>/supervisor/stormdist/word-count-1-1591895933/shared_by_topology \
+-v /<storm-local-dir>/workers/8198e1f0-f323-4b9d-8625-e4fd640cd058/tmp:/tmp \
+-v /etc/storm:/etc/storm:ro \
+--cgroup-parent=/storm \
+--group-add <gid> \
+--workdir=/<storm-local-dir>/workers/8198e1f0-f323-4b9d-8625-e4fd640cd058 \
+--cidfile=/<storm-local-dir>/workers/8198e1f0-f323-4b9d-8625-e4fd640cd058/container.cid \
+--cap-drop=ALL \
+--security-opt no-new-privileges \
+--security-opt seccomp=/usr/share/apache-storm-2.3.0/conf/seccomp.json \
+--cpus=2.6 xxx.xxx.com:8080/storm/storm/rhel7:latest \
+bash /<storm-local-dir>/workers/8198e1f0-f323-4b9d-8625-e4fd640cd058/storm-worker-script.sh
+```
+
+
+## Setup
+
+To make supervisor work with docker, you need to configure related settings correctly following the instructions below.
+
+### Settings Related To Docker Support in Storm
+
+| Setting                                   | Description                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |
+|-------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| `storm.resource.isolation.plugin.enable`  | set to `true` to enable isolation plugin. `storm.resource.isolation.plugin` determines which plugin to use. If this is set to `false`, `org.apache.storm.container.DefaultResourceIsolationManager` will be used.                                                                                                                                                                                                                                           |
+| `storm.resource.isolation.plugin`         | set to `"org.apache.storm.container.docker.DockerManager"` to enable docker support                                                                                                                                                                                                                                                                                                                                                                                                                              |
+| `storm.oci.allowed.images`             | A whitelist of docker images that can be used. Users can only choose a docker image from the list.
+| `storm.oci.image`                      | The default docker image to be used if user doesn't specify which image to use. And it must belong to the `storm.oci.allowed.images` 
+| `topology.oci.image`                   | Topologies can specify which image to use. It must belong to the `storm.oci.allowed.images` |
+| `storm.oci.cgroup.root`                | The root path of cgroup for docker to use. On RHEL7, it should be "/sys/fs/cgroup".
+| `storm.oci.cgroup.parent`              | --cgroup-parent config for docker command. It must follow the constraints of docker commands. The path will be made as absolute path if it's a relative path because we saw some weird bugs ((the cgroup memory directory disappears after a while) when a relative path is used.
+| `storm.oci.readonly.bindmounts`        | A list of read only bind mounted directories.
+| `storm.oci.readwrite.bindmounts`        | A list of read write bind mounted directories.
+| `storm.oci.nscd.dir`                   | The directory of nscd (name service cache daemon), e.g. "/var/run/nscd/". nscd must be running so that profiling can work properly.
+| `storm.oci.seccomp.profile`            | White listed syscalls seccomp Json file to be used as a seccomp filter
+| `supervisor.worker.launcher`              | Full path to the worker-launcher executable. Details explained at [How to set up worker-launcher](#how-to-set-up-worker-launcher)
+
+Note that we only support cgroupfs cgroup driver because of some issues with `systemd` cgroup driver; restricting to `cgroupfs` also makes cgroup paths simpler. Please make sure to use `cgroupfs` before setting up docker support.
+
+#### Example
+
+Below is a simple configuration example for storm on Rhel7. In this example, storm is deployed at `/usr/share/apache-storm-2.3.0`.
+
+```bash
+storm.resource.isolation.plugin.enable: true
+storm.resource.isolation.plugin: "org.apache.storm.container.docker.DockerManager"
+storm.oci.allowed.images: ["xxx.xxx.com:8080/storm/storm/rhel7:latest"]
+storm.oci.image: "xxx.xxx.com:8080/storm/storm/rhel7:latest"
+storm.oci.cgroup.root: "/storm"
+storm.oci.cgroup.parent: "/sys/fs/cgroup"
+storm.oci.readonly.bindmounts:
+    - "/etc/storm"
+storm.oci.nscd.dir: "/var/run/nscd"
+supervisor.worker.launcher: "/usr/share/apache-storm-2.3.0/bin/worker-launcher"
+```
+
+### How to set up worker-launcher
+
+The `worker-launcher` executable is a special program that is used to launch docker containers, run `docker` and `nsenter` commands.
+For this to work, `worker-launcher` needs to be owned by root, but with the group set to be a group that only the supervisor headless user is a part of. 
+`worker-launcher` also needs to have `6550` octal permissions. There is also a `worker-launcher.cfg` file, usually located under `/etc/storm`, that should look something like the following:
+```
+storm.worker-launcher.group=$(worker_launcher_group)
+min.user.id=$(min_user_id)
+worker.profiler.script.path=$(profiler_script_path)
+```
+where `storm.worker-launcher.group` is the same group the supervisor user is a part of, and `min.user.id` is set to the first real user id on the system. This config file also needs to be owned by root and not have world nor group write permissions. 
+`worker.profiler.script.path` points to the profiler script. For security, the script should be only writable by root. Note that it's the only profiler script that will be used and `DaemonConfig.WORKER_PROFILER_COMMAND` will be ignored.
+
+There are two optional configs that will be used by docker support: `docker.binary` and `nsenter.binary`. By default, they are set to
+```
+docker.binary=/usr/bin/docker
+nsenter.binary=/usr/bin/nsenter
+```
+and you don't need to set them in the worker-launcher.cfg unless you need to change them.
+
+## Profile the processes inside the container
+You can profile your worker processes by clicking on the profiling buttons (jstack, heap, etc) on storm UI.
+If you have sudo permission, you can also run `sudo nsenter --target <container-pid> --pid --mount --setuid <uid> --setgid <gid>` to enter the container. 
+Then you can run `jstack`, `jmap` etc inside the container. `<container-pid>` is the pid of the container process on the host.
+`<container-pid>` can be obtained by running `sudo docker inspect --format '{{.State.Pid}}' <container-id>` command. 
+`<uid>` and `<gid>` are the user id and group id of the container owner, respectively.
+
+## Seccomp security profiles
+
+You can set `storm.oci.seccomp.profile` to restrict the actions available within the container. If it's not set, the [default docker seccomp profile](https://github.com/moby/moby/blob/master/profiles/seccomp/default.json)
+is used. You can use `conf/seccomp.json.example` provided or you can specify our own `seccomp.json` file.
+
+## CGroup Metrics
+
+Docker internally uses cgroups to control resources for containers. The CGroup Metrics described at [cgroups_in_storm.md](cgroups_in_storm.md#CGroup-Metrics) still apply except CGroupCpuGuarantee. To get CGroup cpu guarantee, use CGroupCpuGuaranteeByCfsQuota instead.
\ No newline at end of file
diff --git a/docs/SECURITY.md b/docs/SECURITY.md
index 6e4d7e1..77cd23f 100644
--- a/docs/SECURITY.md
+++ b/docs/SECURITY.md
@@ -433,7 +433,7 @@
 
 The worker-launcher executable is a special program that allows the supervisor to launch workers as different users.  For this to work it needs to be owned by root, but with the group set to be a group that only the supervisor headless user is a part of.
 It also needs to have 6550 permissions.
-There is also a worker-launcher.cfg file, usually located under /etc/ that should look something like the following
+There is also a worker-launcher.cfg file, usually located under `/etc/storm` that should look something like the following
 
 ```
 storm.worker-launcher.group=$(worker_launcher_group)
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index d2ce7c1..027b532 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -1741,6 +1741,17 @@
     @IsPositiveNumber
     public static final String NUM_STAT_BUCKETS = "num.stat.buckets";
     /**
+     * The root of cgroup for oci to use. On RHEL7, it should be "/sys/fs/cgroup".
+     */
+    @IsString
+    @NotNull
+    public static String STORM_OCI_CGROUP_ROOT = "storm.oci.cgroup.root";
+    /**
+     * Specify the oci image to use.
+     */
+    @IsString
+    public static String TOPOLOGY_OCI_IMAGE = "topology.oci.image";
+    /**
      * Interval to check for the worker to check for updated blobs and refresh worker state accordingly. The default is 10 seconds
      */
     @IsInteger
diff --git a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
index 73f8776..d83048b 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuarantee.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
  * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
@@ -20,6 +20,8 @@
 
 /**
  * Report the guaranteed number of ms this worker has requested.
+ * It gets the result from cpu.shares.
+ * Use this when org.apache.storm.container.cgroup.CgroupManager is used as the storm.resource.isolation.plugin.
  */
 public class CGroupCpuGuarantee extends CGroupMetricsBase<Long> {
     long previousTime = -1;
diff --git a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuaranteeByCfsQuota.java b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuaranteeByCfsQuota.java
new file mode 100644
index 0000000..46f97b9
--- /dev/null
+++ b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupCpuGuaranteeByCfsQuota.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.metric.cgroup;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.storm.container.cgroup.SubSystemType;
+import org.apache.storm.container.cgroup.core.CgroupCore;
+import org.apache.storm.container.cgroup.core.CpuCore;
+
+/**
+ * Report the guaranteed number of ms this worker has requested.
+ * It gets the result from cpu.cfs_period_us and cpu.cfs_quota_us.
+ * Use this when org.apache.storm.container.docker.DockerManager is used as the storm.resource.isolation.plugin.
+ */
+public class CGroupCpuGuaranteeByCfsQuota extends CGroupMetricsBase<Long> {
+    long previousTime = 0;
+
+    public CGroupCpuGuaranteeByCfsQuota(Map<String, Object> conf) {
+        super(conf, SubSystemType.cpu);
+    }
+
+    @Override
+    public Long getDataFrom(CgroupCore core) throws IOException {
+        CpuCore cpu = (CpuCore) core;
+        Long msGuarantee = null;
+        long now = System.currentTimeMillis();
+        if (previousTime > 0) {
+            long cpuCfsQuotaUs = cpu.getCpuCfsQuotaUs();
+            if (cpuCfsQuotaUs == -1) {
+                //cpu.cfs_quota_us = -1 indicates that the cgroup does not adhere to any CPU time restrictions.
+                msGuarantee = -1L;
+            } else {
+                long cpuCfsPeriodUs = cpu.getCpuCfsPeriodUs();
+                double percentage = cpuCfsQuotaUs * 1.0 / cpuCfsPeriodUs;
+                msGuarantee = Math.round(percentage * (now - previousTime));
+            }
+        }
+        previousTime = now;
+        return msGuarantee;
+    }
+}
\ No newline at end of file
diff --git a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
index 5166fa5..ad4cc16 100644
--- a/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
+++ b/storm-client/src/jvm/org/apache/storm/metric/cgroup/CGroupMetricsBase.java
@@ -23,6 +23,7 @@
 import org.apache.storm.container.cgroup.SubSystemType;
 import org.apache.storm.container.cgroup.core.CgroupCore;
 import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.shade.org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,17 +54,6 @@
             return;
         }
 
-        String hierarchyDir = (String) conf.get(Config.STORM_CGROUP_HIERARCHY_DIR);
-        if (hierarchyDir == null || hierarchyDir.isEmpty()) {
-            LOG.warn("{} is disabled {} is not set", simpleName, Config.STORM_CGROUP_HIERARCHY_DIR);
-            return;
-        }
-
-        if (!new File(hierarchyDir).exists()) {
-            LOG.warn("{} is disabled {} does not exist", simpleName, hierarchyDir);
-            return;
-        }
-
         //Good so far, check if we are in a CGroup
         File cgroupFile = new File("/proc/self/cgroup");
         if (!cgroupFile.exists()) {
@@ -71,6 +61,7 @@
             return;
         }
 
+        String cgroupPath;
         try (BufferedReader reader = new BufferedReader(new FileReader(cgroupFile))) {
             //There can be more then one line if cgroups are mounted in more then one place, but we assume the first is good enough
             String line = reader.readLine();
@@ -78,14 +69,36 @@
             String[] parts = line.split(":");
             //parts[0] == 0 for CGroup V2, else maps to hierarchy in /proc/cgroups
             //parts[1] is empty for CGroups V2 else what is mapped that we are looking for
-            String cgroupPath = parts[2];
-            core = CgroupCoreFactory.getInstance(type, new File(hierarchyDir, cgroupPath).getAbsolutePath());
+            cgroupPath = parts[2];
         } catch (Exception e) {
             LOG.warn("{} is disabled error trying to read or parse {}", simpleName, cgroupFile);
             return;
         }
+
+        //Storm on Rhel6 and Rhel7 use different cgroup settings.
+        //On Rhel6, the cgroup of the worker is under
+        // "Config.STORM_CGROUP_HIERARCHY_DIR/DaemonConfig.STORM_SUPERVISOR_CGROUP_ROOTDIR/<worker-id>"
+        //On Rhel7, the cgroup of the worker is under
+        // "Config.STORM_OCI_CGROUP_ROOT/<subsystem>/DaemonConfig.STORM_OCI_CGROUP_PARENT/<container-id>"
+        // This block of code is a workaround for the CGroupMetrics to work on both system
+        String hierarchyDir = (String) conf.get(Config.STORM_CGROUP_HIERARCHY_DIR);
+        if (StringUtils.isEmpty(hierarchyDir) || !new File(hierarchyDir, cgroupPath).exists()) {
+            LOG.info("{} is not set or does not exist. checking {}", Config.STORM_CGROUP_HIERARCHY_DIR,
+                Config.STORM_OCI_CGROUP_ROOT);
+
+            String ociCgroupRoot = (String) conf.get(Config.STORM_OCI_CGROUP_ROOT);
+            hierarchyDir = ociCgroupRoot + File.separator + type;
+            if (StringUtils.isEmpty(ociCgroupRoot) || !new File(hierarchyDir, cgroupPath).exists()) {
+                LOG.info("{} is not set or does not exist", Config.STORM_OCI_CGROUP_ROOT);
+                LOG.warn("{} is disabled", simpleName);
+                return;
+            }
+        }
+
+        core = CgroupCoreFactory.getInstance(type, new File(hierarchyDir, cgroupPath).getAbsolutePath());
+
         enabled = true;
-        LOG.info("{} is ENABLED {} exists...", simpleName);
+        LOG.info("{} is ENABLED {} exists...", simpleName, hierarchyDir);
     }
 
     @Override
@@ -105,4 +118,4 @@
     }
 
     public abstract T getDataFrom(CgroupCore core) throws Exception;
-}
+}
\ No newline at end of file
diff --git a/storm-core/src/native/worker-launcher/impl/configuration.c b/storm-core/src/native/worker-launcher/impl/configuration.c
index 7b7a3c1..3ef9b6b 100644
--- a/storm-core/src/native/worker-launcher/impl/configuration.c
+++ b/storm-core/src/native/worker-launcher/impl/configuration.c
@@ -295,13 +295,19 @@
  */
 char ** get_values(const char * key) {
   char *value = get_value(key);
-  return extract_values(value);
+  return extract_values_delim(value, ",");
 }
 
+
 /**
- * Extracts array of values from the comma separated list of values.
+ * Extracts array of values from the delim separated list of values.
+ * A sequence of two or more contiguous delimiter bytes in the parsed string
+ * is considered to be a single delimiter, and that delimiter bytes at the
+ * start or end of the string are ignored.
+ * For example, extract_values_delim("aaa;;bbb,", ";,") would
+ * return the strings "aaa" and "bbb", and then a NULL pointer.
  */
-char ** extract_values(char *value) {
+char ** extract_values_delim(char *value, const char *delim) {
   char ** toPass = NULL;
   char *tempTok = NULL;
   char *tempstr = NULL;
@@ -311,14 +317,14 @@
   //first allocate any array of 10
   if(value != NULL) {
     toPass = (char **) malloc(sizeof(char *) * toPassSize);
-    tempTok = strtok_r((char *)value, ",", &tempstr);
+    tempTok = strtok_r((char *)value, delim, &tempstr);
     while (tempTok != NULL) {
       toPass[size++] = tempTok;
       if(size == toPassSize) {
         toPassSize += MAX_SIZE;
         toPass = (char **) realloc(toPass,(sizeof(char *) * toPassSize));
       }
-      tempTok = strtok_r(NULL, ",", &tempstr);
+      tempTok = strtok_r(NULL, delim, &tempstr);
     }
   }
   if (size > 0) {
diff --git a/storm-core/src/native/worker-launcher/impl/configuration.h b/storm-core/src/native/worker-launcher/impl/configuration.h
index b0d4814..98a77eb 100644
--- a/storm-core/src/native/worker-launcher/impl/configuration.h
+++ b/storm-core/src/native/worker-launcher/impl/configuration.h
@@ -34,8 +34,15 @@
 //comma seperated strings.
 char ** get_values(const char* key);
 
-// Extracts array of values from the comma separated list of values.
-char ** extract_values(char *value);
+/**
+ * Extracts array of values from the delim separated list of values.
+ * A sequence of two or more contiguous delimiter bytes in the parsed string
+ * is considered to be a single delimiter, and that delimiter bytes at the
+ * start or end of the string are ignored.
+ * For example, extract_values_delim("aaa;;bbb,", ";,") would
+ * return the strings "aaa" and "bbb", and then a NULL pointer.
+ */
+char ** extract_values_delim(char *value, const char *delim);
 
 // free the memory returned by get_values
 void free_values(char** values);
diff --git a/storm-core/src/native/worker-launcher/impl/main.c b/storm-core/src/native/worker-launcher/impl/main.c
index 450fd3d..7746ea9 100644
--- a/storm-core/src/native/worker-launcher/impl/main.c
+++ b/storm-core/src/native/worker-launcher/impl/main.c
@@ -39,16 +39,18 @@
 #endif
 
 void display_usage(FILE *stream) {
-  fprintf(stream,
-          "Usage: worker-launcher --checksetup\n");
-  fprintf(stream,
-      "Usage: worker-launcher user command command-args\n");
+  fprintf(stream, "Usage: worker-launcher --checksetup\n");
+  fprintf(stream, "Usage: worker-launcher user command command-args\n");
   fprintf(stream, "Commands:\n");
   fprintf(stream, "   initialize stormdist dir: code-dir <code-directory>\n");
+  fprintf(stream, "   initialize artifacts dir: artifacts-dir <directory>\n");
   fprintf(stream, "   remove a file/directory: rmr <directory>\n");
   fprintf(stream, "   launch a worker: worker <working-directory> <script-to-run>\n");
   fprintf(stream, "   launch a profiler: profiler <working-directory> <script-to-run>\n");
   fprintf(stream, "   signal a worker: signal <pid> <signal>\n");
+  fprintf(stream, "   launch a docker container: launch-docker-container <working-directory> <script-to-run>\n");
+  fprintf(stream, "   run a docker command: run-docker-cmd <working-directory> <script-to-run>\n");
+  fprintf(stream, "   profile a docker container: profile-docker-container <worker-id> <script-to-run>\n");
 }
 
 int main(int argc, char **argv) {
@@ -127,6 +129,7 @@
     return INVALID_CONTAINER_EXEC_PERMISSIONS;
   }
 
+  free(executable_file);
   free(conf_file);
   conf_file = NULL;
 
@@ -138,12 +141,13 @@
   }
 
   //checks done for user name
-  if (argv[optind] == NULL) {
+  const char *user_name = argv[optind];
+  if (user_name == NULL) {
     fprintf(ERRORFILE, "Invalid user name.\n");
     return INVALID_USER_NAME;
   }
 
-  int ret = set_user(argv[optind]);
+  int ret = set_user(user_name);
   if (ret != 0) {
     return ret;
   }
@@ -162,7 +166,7 @@
       fflush(ERRORFILE);
       return INVALID_ARGUMENT_NUMBER;
     }
-    exit_code = setup_dir_permissions(argv[optind], 0);
+    exit_code = setup_dir_permissions(argv[optind], 0, TRUE);
   } else if (strcasecmp("artifacts-dir", command) == 0) {
     if (argc != 4) {
       fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 4) for artifacts-dir\n",
@@ -170,7 +174,7 @@
       fflush(ERRORFILE);
       return INVALID_ARGUMENT_NUMBER;
     }
-    exit_code = setup_dir_permissions(argv[optind], 1);
+    exit_code = setup_dir_permissions(argv[optind], 1, TRUE);
   } else if (strcasecmp("blob", command) == 0) {
       if (argc != 4) {
           fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 4) for blob\n",
@@ -178,7 +182,7 @@
           fflush(ERRORFILE);
           return INVALID_ARGUMENT_NUMBER;
       }
-      exit_code = setup_dir_permissions(argv[optind], 0);
+      exit_code = setup_dir_permissions(argv[optind], 0, TRUE);
   } else if (strcasecmp("rmr", command) == 0) {
     if (argc != 4) {
       fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 4) for rmr\n",
@@ -195,11 +199,45 @@
       return INVALID_ARGUMENT_NUMBER;
     }
     working_dir = argv[optind++];
-    exit_code = setup_dir_permissions(working_dir, 1);
+    exit_code = setup_dir_permissions(working_dir, 1, TRUE);
     if (exit_code == 0) {
-      exit_code = exec_as_user(working_dir, argv[optind]);
+      exit_code = setup_worker_tmp_permissions(working_dir);
+      if (exit_code == 0) {
+        exit_code = exec_as_user(working_dir, argv[optind]);
+      }
     }
-   } else if (strcasecmp("profiler", command) == 0) {
+  } else if (strcasecmp("launch-docker-container", command) == 0) {
+    if (argc != 5) {
+      fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 5) for launch-docker-container\n", argc);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    working_dir = argv[optind++];
+    exit_code = setup_dir_permissions(working_dir, 1, TRUE);
+    if (exit_code == 0) {
+      exit_code = setup_worker_tmp_permissions(working_dir);
+      if (exit_code == 0) {
+        exit_code = run_docker_cmd(working_dir, argv[optind]);
+      }
+    }
+  } else if (strcasecmp("run-docker-cmd", command) == 0) {
+    if (argc != 5) {
+      fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 5) for run-docker-cmd\n", argc);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    working_dir = argv[optind++];
+    exit_code = run_docker_cmd(working_dir, argv[optind]);
+  } else if (strcasecmp("profile-docker-container", command) == 0) {
+    if (argc != 5) {
+      fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 5) for profile-docker-container\n", argc);
+      fflush(ERRORFILE);
+      return INVALID_ARGUMENT_NUMBER;
+    }
+    const char * worker_id = argv[optind++];
+    int pid = get_docker_container_pid(worker_id);
+    exit_code = profile_oci_container(pid, argv[optind]);
+  } else if (strcasecmp("profiler", command) == 0) {
     if (argc != 5) {
       fprintf(ERRORFILE, "Incorrect number of arguments (%d vs 5) for profiler\n",
 	      argc);
diff --git a/storm-core/src/native/worker-launcher/impl/worker-launcher.c b/storm-core/src/native/worker-launcher/impl/worker-launcher.c
index 5155ed8..2eca7d3 100644
--- a/storm-core/src/native/worker-launcher/impl/worker-launcher.c
+++ b/storm-core/src/native/worker-launcher/impl/worker-launcher.c
@@ -30,15 +30,31 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include <limits.h>
 #include <sys/stat.h>
+#include <getopt.h>
+#include <regex.h>
 
 static const int DEFAULT_MIN_USERID = 1000;
 
 static const char* DEFAULT_BANNED_USERS[] = {"bin", 0};
 
+static const char* DEFAULT_DOCKER_BINARY_PATH = "/usr/bin/docker";
+/**
+ * For security reasons, we can drop the privileges of containers including PTRACE.
+ * In that case, debugging tools (like jmap, strace, jstack -F) inside the container won't work
+ * when entered via docker-exec.
+ * So we need to use nsenter to enter the filesystem and pid namespace without losing privileges.
+ */
+static const char* DEFAULT_NSENTER_BINARY_PATH = "/usr/bin/nsenter";
+
 //struct to store the user details
 struct passwd *user_detail = NULL;
 
+//Docker container related constants.
+static const char* DOCKER_CLIENT_CONFIG_ARG = "--config=";
+static const char* DOCKER_PULL_COMMAND = "pull";
+
 FILE* LOGFILE = NULL;
 FILE* ERRORFILE = NULL;
 
@@ -382,10 +398,11 @@
  * is able to read and execute, and in certain directories, write. The setGID bit is set
  * to make sure any files created under the directory will be accessible to storm's user for
  * cleanup purposes.
+ * If setgid_on_dir is FALSE, don't set sticky bit on group permission on the directory.
  */
-static int setup_permissions(FTSENT* entry, uid_t euser, int user_write) {
+static int setup_permissions(FTSENT* entry, uid_t euser, int user_write, boolean setgid_on_dir) {
   if (lchown(entry->fts_path, euser, launcher_gid) != 0) {
-    fprintf(ERRORFILE, "Failure to exec app initialization process - %s, fts_path=%s\n",
+    fprintf(ERRORFILE, "ERROR: Failure to exec app initialization process - %s, fts_path=%s\n",
             strerror(errno), entry->fts_path);
      return -1;
   }
@@ -397,10 +414,14 @@
   }
   // If the entry is a directory, Add group execute and setGID bits.
   if ((mode & S_IFDIR) == S_IFDIR) {
-    new_mode = new_mode | S_IXGRP | S_ISGID;
+    new_mode = new_mode | S_IXGRP;
+    if (setgid_on_dir)
+    {
+      new_mode = new_mode | S_ISGID;
+    }
   }
   if (chmod(entry->fts_path, new_mode) != 0) {
-    fprintf(ERRORFILE, "Failure to exec app initialization process - %s, fts_path=%s\n",
+    fprintf(ERRORFILE, "ERROR: Failure to exec app initialization process - %s, fts_path=%s\n",
             strerror(errno), entry->fts_path);
     return -1;
   }
@@ -408,7 +429,7 @@
 }
 
 
-int setup_dir_permissions(const char* local_dir, int user_writable) {
+int setup_dir_permissions(const char* local_dir, int user_writable, boolean setgid_on_dir) {
   //This is the same as
   //> chmod g+rwX -R $local_dir
   //> chmod g+s -R $local_dir
@@ -423,13 +444,13 @@
   } else {
     char *(paths[]) = {strndup(local_dir,PATH_MAX), 0};
     if (paths[0] == NULL) {
-      fprintf(ERRORFILE, "Malloc failed in setup_dir_permissions\n");
+      fprintf(ERRORFILE, "ERROR: Malloc failed in setup_dir_permissions\n");
       return -1;
     }
     // check to make sure the directory exists
     if (access(local_dir, F_OK) != 0) {
       if (errno == ENOENT) {
-        fprintf(ERRORFILE, "Path does not exist %s\n", local_dir);
+        fprintf(ERRORFILE, "ERROR: Path does not exist %s\n", local_dir);
         free(paths[0]);
         paths[0] = NULL;
         return UNABLE_TO_BUILD_PATH;
@@ -441,7 +462,7 @@
 
     if (tree == NULL) {
       fprintf(ERRORFILE,
-              "Cannot open file traversal structure for the path %s:%s.\n", 
+              "ERROR: Cannot open file traversal structure for the path %s:%s.\n",
               local_dir, strerror(errno));
       free(paths[0]);
       paths[0] = NULL;
@@ -449,7 +470,7 @@
     }
 
     if (seteuid(0) != 0) {
-      fprintf(ERRORFILE, "Could not become root\n");
+      fprintf(ERRORFILE, "ERROR: Could not become root\n");
       return -1;
     }
 
@@ -464,7 +485,7 @@
         fprintf(LOGFILE, "NOOP: %s\n", entry->fts_path); break;
       case FTS_D:         // A directory in pre-order
       case FTS_F:         // A regular file
-        if (setup_permissions(entry, euser, user_writable) != 0) {
+        if (setup_permissions(entry, euser, user_writable, setgid_on_dir) != 0) {
             exit_code = -1;
         }
         break;
@@ -475,22 +496,47 @@
       case FTS_NSOK:      // No stat information requested
       case FTS_ERR:       // Error return
       default:
-        fprintf(LOGFILE, "Unexpected...\n");
+        fprintf(LOGFILE, "ERROR: Unexpected...\n");
         exit_code = -1;
         break;
       }
     }
     ret = fts_close(tree);
     if (exit_code == 0 && ret != 0) {
-      fprintf(LOGFILE, "Error in fts_close while setting up %s\n", local_dir);
+      fprintf(LOGFILE, "ERROR: Error in fts_close while setting up %s\n", local_dir);
       exit_code = -1;
     }
     free(paths[0]);
     paths[0] = NULL;
+
+    if (seteuid(euser) != 0)
+    {
+      fprintf(ERRORFILE, "ERROR: Could not switch euid back to %d\n", euser);
+      return -1;
+    }
   }
   return exit_code;
 }
 
+/**
+ * /tmp inside the container is bind mounted to worker-id/tmp directory
+ * remove setgid on worker-id/tmp directory so that java profiling can work
+ * This is not required for non-container workers. But better to keep them consistent
+ */ 
+int setup_worker_tmp_permissions(const char *worker_dir)
+{
+  char* worker_tmp = concatenate("%s/tmp", "worker tmp dir", 1, worker_dir);
+  if (worker_tmp != NULL) {
+    int exit_code = setup_dir_permissions(worker_tmp, 1, FALSE);
+    if (exit_code != 0) {
+      fprintf(ERRORFILE, "ERROR: setup_dir_permissions on %s failed\n", worker_tmp);
+      fflush(ERRORFILE);
+    } 
+    return exit_code;
+  } else {
+    return -1;
+  }
+}
 
 int signal_container_as_user(const char *user, int pid, int sig) {
   if(pid <= 0) {
@@ -738,3 +784,512 @@
   //Unreachable
   return -1;
 }
+
+//functions below are docker related.
+
+char *get_docker_binary()
+{
+  char *docker_binary = get_value(DOCKER_BINARY_KEY);
+  if (docker_binary == NULL)
+  {
+    docker_binary = strdup(DEFAULT_DOCKER_BINARY_PATH);
+  }
+  return docker_binary;
+}
+
+char **tokenize_docker_command(const char *input, int *split_counter)
+{
+  char *line = (char *)calloc(strlen(input) + 1, sizeof(char));
+  char **linesplit = (char **)malloc(sizeof(char *));
+  char *p = NULL;
+  *split_counter = 0;
+  strncpy(line, input, strlen(input));
+
+  p = strtok(line, " ");
+  while (p != NULL)
+  {
+    linesplit[*split_counter] = p;
+    (*split_counter)++;
+    linesplit = realloc(linesplit, (sizeof(char *) * (*split_counter + 1)));
+    if (linesplit == NULL)
+    {
+      fprintf(ERRORFILE, "ERROR: Cannot allocate memory to parse docker command %s",
+              strerror(errno));
+      fflush(ERRORFILE);
+      exit(OUT_OF_MEMORY);
+    }
+    p = strtok(NULL, " ");
+  }
+  linesplit[*split_counter] = NULL;
+  return linesplit;
+}
+
+int execute_regex_match(const char *regex_str, const char *input)
+{
+  regex_t regex;
+  int regex_match;
+  if (0 != regcomp(&regex, regex_str, REG_EXTENDED | REG_NOSUB))
+  {
+    fprintf(LOGFILE, "ERROR: Unable to compile regex.");
+    fflush(LOGFILE);
+    exit(ERROR_COMPILING_REGEX);
+  }
+  regex_match = regexec(&regex, input, (size_t)0, NULL, 0);
+  regfree(&regex);
+  if (0 == regex_match)
+  {
+    return 0;
+  }
+  return 1;
+}
+
+int validate_docker_image_name(const char *image_name)
+{
+  char *regex_str = "^(([a-zA-Z0-9.-]+)(:[0-9]+)?/)?([a-z0-9_./-]+)(:[a-zA-Z0-9_.-]+)?$";
+  return execute_regex_match(regex_str, image_name);
+}
+
+/**
+ * Only allow certain options for any docker commands.
+ * Since most options are from docker-run command, we don't have
+ * separate checks for other docker-xx (e.g docker-inspect) commands.
+ */
+char *sanitize_docker_command(const char *line)
+{
+  static struct option long_options[] = {
+      {"name", required_argument, 0, 'n'},
+      {"user", required_argument, 0, 'u'},
+      {"rm", no_argument, 0, 'r'},
+      {"workdir", required_argument, 0, 'w'},
+      {"net", required_argument, 0, 'e'},
+      {"cgroup-parent", required_argument, 0, 'g'},
+      {"cap-add", required_argument, 0, 'a'},
+      {"cap-drop", required_argument, 0, 'o'},
+      {"device", required_argument, 0, 'i'},
+      {"detach", required_argument, 0, 't'},
+      {"group-add", required_argument, 0, 'x'},
+      {"read-only", no_argument, 0, 'R'},
+      {"security-opt", required_argument, 0, 'S'},
+      {"cpu-shares", required_argument, 0, 'c'},
+      {"cpus", required_argument, 0, 'C'},
+      {"cpuset-cpus", required_argument, 0, 's'},
+      {"cpuset-mems", required_argument, 0, 'm'},
+      {"cidfile", required_argument, 0, 'I'},
+      {"format", required_argument, 0, 'f'}, //belongs to docker-inspect command
+      {"force", no_argument, 0, 'F'},        //belongs to docker-rm command
+      {"time", required_argument, 0, 'T'},   //belongs to docker-stop command
+      {"filter", required_argument, 0, 'l'}, //belongs to docker-ps command
+      {"quiet", required_argument, 0, 'q'},  //belongs to docker-ps command
+      {0, 0, 0, 0}};
+
+  int c = 0;
+  int option_index = 0;
+  char *output = NULL;
+  size_t output_size = 0;
+  char **linesplit;
+  int split_counter = 0;
+  int len = strlen(line);
+
+  linesplit = tokenize_docker_command(line, &split_counter);
+
+  output_size = len * 2;
+  output = (char *)calloc(output_size, sizeof(char));
+  if (output == NULL)
+  {
+    exit(OUT_OF_MEMORY);
+  }
+
+  // Handle docker client config option.
+  if (0 == strncmp(linesplit[0], DOCKER_CLIENT_CONFIG_ARG, strlen(DOCKER_CLIENT_CONFIG_ARG)))
+  {
+    strcat(output, linesplit[0]);
+    strcat(output, " ");
+    long index = 0;
+    while (index < split_counter)
+    {
+      linesplit[index] = linesplit[index + 1];
+      if (linesplit[index] == NULL)
+      {
+        split_counter--;
+        break;
+      }
+      index++;
+    }
+  }
+
+  // Handle docker pull and image name validation.
+  if (0 == strncmp(linesplit[0], DOCKER_PULL_COMMAND, strlen(DOCKER_PULL_COMMAND)))
+  {
+    if (0 != validate_docker_image_name(linesplit[1]))
+    {
+      fprintf(ERRORFILE, "ERROR: Invalid Docker image name, exiting.");
+      fflush(ERRORFILE);
+      exit(DOCKER_IMAGE_INVALID);
+    }
+    strcat(output, linesplit[0]);
+    strcat(output, " ");
+    strcat(output, linesplit[1]);
+    return output;
+  }
+
+  strcat(output, linesplit[0]);
+  strcat(output, " ");
+  optind = 1;
+  while ((c = getopt_long(split_counter, linesplit, "dv:", long_options, &option_index)) != -1)
+  {
+    switch (c)
+    {
+    case 'n':
+      strcat(output, "--name=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'w':
+      strcat(output, "--workdir=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'u':
+      strcat(output, "--user=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'e':
+      strcat(output, "--net=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'v':
+      strcat(output, "-v ");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'a':
+      strcat(output, "--cap-add=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'o':
+      strcat(output, "--cap-drop=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'd':
+      strcat(output, "-d ");
+      break;
+    case 'r':
+      strcat(output, "--rm ");
+      break;
+    case 'R':
+      strcat(output, "--read-only ");
+      break;
+    case 'S':
+      strcat(output, "--security-opt ");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'g':
+      strcat(output, "--cgroup-parent=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'c':
+      strcat(output, "--cpu-shares=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'C':
+      strcat(output, "--cpus=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 's':
+      strcat(output, "--cpuset-cpus=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'm':
+      strcat(output, "--cpuset-mems=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'i':
+      strcat(output, "--device=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 't':
+      strcat(output, "--detach=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'x':
+      strcat(output, "--group-add ");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'I':
+      strcat(output, "--cidfile=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'f':
+      strcat(output, "--format=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'F':
+      strcat(output, "--force ");
+      break;
+    case 'T':
+      strcat(output, "--time=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'l':
+      strcat(output, "--filter=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    case 'q':
+      strcat(output, "--quiet=");
+      strcat(output, optarg);
+      strcat(output, " ");
+      break;
+    default:
+      fprintf(LOGFILE, "Unknown option in docker command, character %d %c, optionindex = %d\n", c, c, optind);
+      fflush(LOGFILE);
+      return NULL;
+      break;
+    }
+  }
+
+  while (optind < split_counter)
+  {
+    strcat(output, linesplit[optind++]);
+    strcat(output, " ");
+  }
+
+  return output;
+}
+
+char *parse_docker_command_file(const char *command_file)
+{
+
+  size_t len = 0;
+  char *line = NULL;
+  ssize_t read;
+  FILE *stream;
+
+  uid_t user = geteuid();
+  gid_t group = getegid();
+  if (change_effective_user(launcher_uid, launcher_gid) != 0)
+  {
+    fprintf(ERRORFILE, "ERROR: Cannot change effective user to supervisor user");
+    fflush(ERRORFILE);
+    exit(ERROR_CHANGING_USER);
+  }
+
+  stream = fopen(command_file, "r");
+  if (stream == NULL)
+  {
+    fprintf(ERRORFILE, "ERROR: Cannot open file %s - %s in parse_docker_command",
+            command_file, strerror(errno));
+    fflush(ERRORFILE);
+    exit(ERROR_OPENING_FILE);
+  }
+  if ((read = getline(&line, &len, stream)) == -1)
+  {
+    fprintf(ERRORFILE, "ERROR: Failed reading command_file %s in parse_docker_command\n", command_file);
+    fflush(ERRORFILE);
+    exit(ERROR_READING_FILE);
+  }
+  fclose(stream);
+  if (change_effective_user(user, group))
+  {
+    fprintf(ERRORFILE, "ERROR: Cannot change effective user from supervisor user back to original in parse_docker_command");
+    fflush(ERRORFILE);
+    exit(ERROR_CHANGING_USER);
+  }
+
+  char *ret = sanitize_docker_command(line);
+  if (ret == NULL)
+  {
+    exit(ERROR_SANITIZING_DOCKER_COMMAND);
+  }
+
+  return ret;
+}
+
+int run_docker_cmd(const char *working_dir, const char *command_file)
+{
+  char *docker_command = parse_docker_command_file(command_file);
+  char *docker_binary = get_docker_binary();
+  size_t command_size = MIN(sysconf(_SC_ARG_MAX), 128 * 1024);
+
+  char *docker_command_with_binary = calloc(sizeof(char), command_size);
+  snprintf(docker_command_with_binary, command_size, "%s %s", docker_binary, docker_command);
+
+  fprintf(LOGFILE, "command: %s\n", docker_command_with_binary);
+  fflush(LOGFILE);
+
+  setsid();
+
+  char **args = extract_values_delim(docker_command_with_binary, " ");
+
+  if (execvp(docker_binary, args) != 0)
+  {
+    fprintf(ERRORFILE, "ERROR: Couldn't execute the container launch with args %s - %s",
+            docker_binary, strerror(errno));
+    fflush(LOGFILE);
+    fflush(ERRORFILE);
+    free(docker_binary);
+    free(args);
+    free(docker_command_with_binary);
+    free(docker_command);
+    return DOCKER_RUN_FAILED;
+  }
+  //Unreachable
+  return -1;
+}
+
+//functions below are nsenter related.
+//Used for running profiling inside docker container through nsenter.
+
+char *get_nsenter_binary()
+{
+  char *nsenter_binary = get_value(NSENTER_BINARY_KEY);
+  if (nsenter_binary == NULL)
+  {
+    nsenter_binary = strdup(DEFAULT_NSENTER_BINARY_PATH);
+  }
+  return nsenter_binary;
+}
+
+int get_docker_container_pid(const char *worker_id) {
+  size_t command_size = MIN(sysconf(_SC_ARG_MAX), 128 * 1024);
+  char *docker_inspect_command = calloc(sizeof(char), command_size);
+  char *docker_binary = get_docker_binary();
+  snprintf(docker_inspect_command, command_size, "%s inspect --format {{.State.Pid}} %s", docker_binary, worker_id);
+
+  fprintf(LOGFILE, "Inspecting docker container...\n");
+  fflush(LOGFILE);
+  FILE *inspect_docker = popen(docker_inspect_command, "r");
+  int pid = -1;
+  int res = fscanf(inspect_docker, "%d", &pid);
+  if (pclose(inspect_docker) != 0 || res <= 0)
+  {
+    fprintf(ERRORFILE,
+            "ERROR: Could not inspect docker to get pid %s in get_docker_container_pid\n", docker_inspect_command);
+    fflush(ERRORFILE);
+    pid = -1;
+    goto cleanup;
+  }
+
+  fprintf(LOGFILE, "The pid of the container is %d.\n", pid);
+  fflush(LOGFILE);
+
+cleanup:
+  free(docker_inspect_command);
+  free(docker_binary);
+  return pid;
+}
+
+int profile_oci_container(int container_pid, const char* command_file) {
+  if (container_pid <= 0) {
+    fprintf(ERRORFILE, "ERROR: The container pid is %d in profile_oci_container\n", container_pid);
+    fflush(ERRORFILE);
+    return UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
+  }
+
+  char *profiler_path = get_value(WORKER_PROFILER_SCRIPT_PATH);
+  if (profiler_path == NULL)
+  {
+    fprintf(ERRORFILE, "ERROR: ATTENTION: %s is not set. worker profiling won't work!\n", WORKER_PROFILER_SCRIPT_PATH);
+    fflush(ERRORFILE);
+    return -1;
+  }
+
+  size_t len = 0;
+  char *line = NULL;
+  ssize_t read;
+  FILE *stream = fopen(command_file, "r");
+  if (stream == NULL)
+  {
+    fprintf(ERRORFILE, "ERROR: Cannot open file %s - %s in profile_oci_container", command_file, strerror(errno));
+    fflush(ERRORFILE);
+    exit(ERROR_OPENING_FILE);
+  }
+  if ((read = getline(&line, &len, stream)) == -1)
+  {
+    fprintf(ERRORFILE, "ERROR: Failed reading command_file %s in profile_oci_container\n", command_file);
+    fflush(ERRORFILE);
+    exit(ERROR_READING_FILE);
+  }
+  fclose(stream);
+
+  if (seteuid(0) != 0)
+  {
+    fprintf(ERRORFILE, "ERROR: Could not become root in profile_oci_container\n");
+    fflush(LOGFILE);
+    return -1;
+  }
+
+  //run profiling command
+  size_t command_size = MIN(sysconf(_SC_ARG_MAX), 128 * 1024);
+  char *nsenter_binary = get_nsenter_binary();
+  char *nsenter_command_with_binary = calloc(sizeof(char), command_size);
+  snprintf(nsenter_command_with_binary, command_size, 
+    "%s --target %d --mount --pid --setuid %d --setgid %d", 
+    nsenter_binary, container_pid, user_detail->pw_uid, user_detail->pw_gid);
+
+  fprintf(LOGFILE, "command is %s\n", nsenter_command_with_binary);
+  fflush(LOGFILE);
+
+  FILE *fp = popen(nsenter_command_with_binary, "w");
+  fprintf(fp, "umask 0027; %s %s\nexit\n", profiler_path, line);
+  pclose(fp);
+
+  free(nsenter_binary);
+  free(nsenter_command_with_binary);
+  return 0;
+}
+
+/**
+ * Utility function to concatenate argB to argA using the concat_pattern.
+ */
+char *concatenate(char *concat_pattern, char *return_path_name,
+                  int numArgs, ...)
+{
+  va_list ap;
+  va_start(ap, numArgs);
+  int strlen_args = 0;
+  char *arg = NULL;
+  int j;
+  for (j = 0; j < numArgs; j++)
+  {
+    arg = va_arg(ap, char *);
+    if (arg == NULL)
+    {
+      fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
+              return_path_name);
+      return NULL;
+    }
+    strlen_args += strlen(arg);
+  }
+  va_end(ap);
+
+  char *return_path = NULL;
+  int str_len = strlen(concat_pattern) + strlen_args + 1;
+
+  return_path = (char *)malloc(str_len);
+  if (return_path == NULL)
+  {
+    fprintf(LOGFILE, "Unable to allocate memory for %s.\n", return_path_name);
+    return NULL;
+  }
+  va_start(ap, numArgs);
+  vsnprintf(return_path, str_len, concat_pattern, ap);
+  va_end(ap);
+  return return_path;
+}
\ No newline at end of file
diff --git a/storm-core/src/native/worker-launcher/impl/worker-launcher.h b/storm-core/src/native/worker-launcher/impl/worker-launcher.h
index 353d57a..a45f01c 100644
--- a/storm-core/src/native/worker-launcher/impl/worker-launcher.h
+++ b/storm-core/src/native/worker-launcher/impl/worker-launcher.h
@@ -19,6 +19,8 @@
 #include <stdio.h>
 #include <sys/types.h>
 
+typedef enum { FALSE, TRUE } boolean;
+
 enum errorcodes {
   INVALID_ARGUMENT_NUMBER = 1,
   INVALID_USER_NAME, //2
@@ -45,19 +47,32 @@
   // PREPARE_JOB_LOGS_FAILED (NOT USED) 23
   INVALID_CONFIG_FILE =  24,
   SETSID_OPER_FAILED = 25,
-  WRITE_PIDFILE_FAILED = 26
+  WRITE_PIDFILE_FAILED = 26,
+  DOCKER_RUN_FAILED=29,
+  ERROR_OPENING_FILE = 30,
+  ERROR_READING_FILE = 31,
+  ERROR_SANITIZING_DOCKER_COMMAND = 39,
+  DOCKER_IMAGE_INVALID = 40,
+  DOCKER_CONTAINER_NAME_INVALID = 41,
+  ERROR_COMPILING_REGEX = 42,
+  ERROR_CHANGING_USER = 43
 };
 
 #define LAUNCHER_GROUP_KEY "storm.worker-launcher.group"
-
-#define USER_DIR_PATTERN "%s/usercache/%s"
-#define NM_APP_DIR_PATTERN USER_DIR_PATTERN "/appcache/%s"
-#define CONTAINER_DIR_PATTERN NM_APP_DIR_PATTERN "/%s"
 #define CONTAINER_SCRIPT "launch_container.sh"
-#define CREDENTIALS_FILENAME "container_tokens"
 #define MIN_USERID_KEY "min.user.id"
 #define BANNED_USERS_KEY "banned.users"
-#define TMP_DIR "tmp"
+#define DOCKER_BINARY_KEY "docker.binary"
+#define NSENTER_BINARY_KEY "nsenter.binary"
+#define WORKER_PROFILER_SCRIPT_PATH "worker.profiler.script.path"
+
+/* Macros for min/max. */
+#ifndef MIN
+#define MIN(a,b) (((a)<(b))?(a):(b))
+#endif /* MIN */
+#ifndef MAX
+#define MAX(a,b) (((a)>(b))?(a):(b))
+#endif  /* MAX */
 
 extern struct passwd *user_detail;
 
@@ -66,7 +81,14 @@
 // the log file for error messages
 extern FILE *ERRORFILE;
 
-int setup_dir_permissions(const char* local_dir, int for_blob_permission);
+int setup_dir_permissions(const char* local_dir, int for_blob_permission, boolean setgid_on_dir);
+
+/**
+ * /tmp inside the container is bind mounted to worker-id/tmp directory
+ * remove setgid on worker-id/tmp directory so that java profiling can work
+ * This is not required for non-container workers. But better to keep them consistent
+ */ 
+int setup_worker_tmp_permissions(const char *worker_dir);
 
 int exec_as_user(const char * working_dir, const char * args);
 
@@ -127,3 +149,30 @@
 char *get_container_launcher_file(const char* work_dir);
 
 int change_user(uid_t user, gid_t group);
+
+/**
+ * Get the docker binary path.
+ */
+char *get_docker_binary();
+
+/**
+ * Run a docker command passing the command file as an argument
+ */
+int run_docker_cmd(const char * working_dir, const char * command_file);
+
+/**
+ * Get the nsenter binary path.
+ */
+char *get_nsenter_binary();
+
+/**
+ * Get the pid of the docker container
+ */
+int get_docker_container_pid(const char *worker_id);
+
+/**
+ * Utility function to concatenate argB to argA using the concat_pattern.
+ */
+char *concatenate(char *concat_pattern, char *return_path_name, int numArgs, ...);
+
+int profile_oci_container(int container_pid, const char* command_file);
\ No newline at end of file
diff --git a/storm-core/src/native/worker-launcher/test/test-worker-launcher.c b/storm-core/src/native/worker-launcher/test/test-worker-launcher.c
index 7294511..431f0d9 100644
--- a/storm-core/src/native/worker-launcher/test/test-worker-launcher.c
+++ b/storm-core/src/native/worker-launcher/test/test-worker-launcher.c
@@ -298,7 +298,7 @@
   log_dirs = (char *) malloc (sizeof(char) * ARRAY_SIZE);
   strcpy(log_dirs, NM_LOG_DIRS);
 
-  create_nm_roots(extract_values(local_dirs));
+  create_nm_roots(extract_values_delim(local_dirs, ","));
 
   if (getuid() == 0 && argc == 2) {
     username = argv[1];
diff --git a/storm-dist/binary/final-package/src/main/assembly/binary.xml b/storm-dist/binary/final-package/src/main/assembly/binary.xml
index 9450de2..9a9297f 100644
--- a/storm-dist/binary/final-package/src/main/assembly/binary.xml
+++ b/storm-dist/binary/final-package/src/main/assembly/binary.xml
@@ -331,6 +331,12 @@
             <fileMode>0755</fileMode>
         </file>
         <file>
+            <source>${project.basedir}/../../../conf/seccomp.json.example</source>
+            <outputDirectory>/conf</outputDirectory>
+            <destName>seccomp.json.example</destName>
+            <fileMode>0755</fileMode>
+        </file>
+        <file>
             <source>${project.basedir}/../../../VERSION</source>
             <outputDirectory>.</outputDirectory>
             <destName>RELEASE</destName>
diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
index ad7ce23..4de3cf5 100644
--- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
+++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java
@@ -1231,6 +1231,58 @@
     @IsPositiveNumber
     public static String STORM_WORKER_TOKEN_LIFE_TIME_HOURS = "storm.worker.token.life.time.hours";
 
+    /**
+     * The directory of nscd - name service cache daemon, e.g. "/var/run/nscd/".
+     * nscd must be running so that profiling can work properly.
+     */
+    @IsString
+    @NotNull
+    public static String STORM_OCI_NSCD_DIR = "storm.oci.nscd.dir";
+
+    /**
+     * A list of read only bind mounted directories.
+     */
+    @IsStringList
+    public static String STORM_OCI_READONLY_BINDMOUNTS = "storm.oci.readonly.bindmounts";
+
+    /**
+     * A list of read-write bind mounted directories.
+     */
+    @IsStringList
+    public static String STORM_OCI_READWRITE_BINDMOUNTS = "storm.oci.readwrite.bindmounts";
+
+    /**
+     * The cgroup root for oci container. (Also a --cgroup-parent config for docker command)
+     * Must follow the constraints of the docker command.
+     * The path will be made as absolute path if it's a relative path
+     * because we saw some weird bugs (the cgroup memory directory disappears after a while) when a relative path is used.
+     * Note that we only support cgroupfs cgroup driver because of some issues with systemd; restricting to `cgroupfs`
+     * also makes cgroup paths simple.
+     */
+    @IsString
+    @NotNull
+    public static String STORM_OCI_CGROUP_PARENT = "storm.oci.cgroup.parent";
+
+    /**
+     * Default oci image to use if the topology doesn't specify which oci image to use.
+     */
+    @IsString
+    public static String STORM_OCI_IMAGE = "storm.oci.image";
+
+    /**
+     * A list of oci image that are allowed.
+     * A special entry of asterisk(*) means any image is allowed, but the image has to pass other checks.
+     * Storm currently assumes OCI container is not supported on the cluster if this is not configured.
+     */
+    @IsStringList
+    public static String STORM_OCI_ALLOWED_IMAGES = "storm.oci.allowed.images";
+
+    /**
+     * White listed syscalls seccomp Json file to be used as a seccomp filter.
+     */
+    @IsString
+    public static String STORM_OCI_SECCOMP_PROFILE = "storm.oci.seccomp.profile";
+
     public static String getCgroupRootDir(Map<String, Object> conf) {
         return (String) conf.get(STORM_SUPERVISOR_CGROUP_ROOTDIR);
     }
diff --git a/storm-server/src/main/java/org/apache/storm/container/docker/DockerCommand.java b/storm-server/src/main/java/org/apache/storm/container/docker/DockerCommand.java
new file mode 100644
index 0000000..ebfe895
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/docker/DockerCommand.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+
+public abstract class DockerCommand  {
+    private final String command;
+    private final List<String> commandWithArguments;
+
+    protected DockerCommand(String command) {
+        this.command = command;
+        this.commandWithArguments = new ArrayList<>();
+        commandWithArguments.add(command);
+    }
+
+    /** Returns the docker sub-command string being used, e.g 'run'.
+     * @return the sub-command
+     */
+    public final String getCommandOption() {
+        return this.command;
+    }
+
+    /** Add command commandWithArguments. This method is only meant for use by sub-classes.
+     * @param arguments to be added
+     */
+    protected final void addCommandArguments(String... arguments) {
+        this.commandWithArguments.addAll(Arrays.asList(arguments));
+    }
+
+    /**
+     * Get the full command.
+     * @return the full command
+     */
+    public String getCommandWithArguments() {
+        return StringUtils.join(commandWithArguments, " ");
+    }
+}
\ No newline at end of file
diff --git a/storm-server/src/main/java/org/apache/storm/container/docker/DockerExecCommand.java b/storm-server/src/main/java/org/apache/storm/container/docker/DockerExecCommand.java
new file mode 100644
index 0000000..dfe3686
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/docker/DockerExecCommand.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Encapsulates the docker exec command and its command line arguments.
+ */
+public class DockerExecCommand extends DockerCommand {
+    private static final String EXEC_COMMAND = "exec";
+    private String containerName;
+    private List<String> commandInContainer;
+
+    public DockerExecCommand(String containerName) {
+        super(EXEC_COMMAND);
+        this.containerName = containerName;
+    }
+
+    /**
+     * Add the command to run from inside container.
+     * @param commandInContainer the command to run from inside container
+     * @return the self
+     */
+    public DockerExecCommand addExecCommand(List<String> commandInContainer) {
+        this.commandInContainer = commandInContainer;
+        return this;
+    }
+
+    /**
+     * Get the full command.
+     * @return the full command
+     */
+    @Override
+    public String getCommandWithArguments() {
+        List<String> argList = new ArrayList<>();
+        argList.add(super.getCommandWithArguments());
+        argList.add(containerName);
+        argList.addAll(commandInContainer);
+        return StringUtils.join(argList, " ");
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/container/docker/DockerInspectCommand.java b/storm-server/src/main/java/org/apache/storm/container/docker/DockerInspectCommand.java
new file mode 100644
index 0000000..d4d4b34
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/docker/DockerInspectCommand.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Encapsulates the docker inspect command and its command line arguments.
+ */
+public class DockerInspectCommand extends DockerCommand {
+    private static final String INSPECT_COMMAND = "inspect";
+    private String containerName;
+
+    public DockerInspectCommand(String containerName) {
+        super(INSPECT_COMMAND);
+        this.containerName = containerName;
+    }
+
+    public DockerInspectCommand withGettingContainerStatus() {
+        super.addCommandArguments("--format='{{.State.Status}}'");
+        return this;
+    }
+
+    /**
+     * Get the full command.
+     * @return the full command.
+     */
+    @Override
+    public String getCommandWithArguments() {
+        List<String> argList = new ArrayList<>();
+        argList.add(super.getCommandWithArguments());
+        argList.add(containerName);
+        return StringUtils.join(argList, " ");
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/container/docker/DockerManager.java b/storm-server/src/main/java/org/apache/storm/container/docker/DockerManager.java
new file mode 100644
index 0000000..47bb458
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/docker/DockerManager.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.apache.storm.container.oci.OciContainerManager;
+import org.apache.storm.daemon.supervisor.ClientSupervisorUtils;
+import org.apache.storm.daemon.supervisor.ExitCodeCallback;
+import org.apache.storm.shade.com.google.common.io.Files;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ServerUtils;
+import org.apache.storm.utils.ShellCommandRunnerImpl;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * For security, we can launch worker processes inside the docker container.
+ * This class manages the interaction with docker containers including launching, stopping, profiling and etc.
+ */
+public class DockerManager extends OciContainerManager {
+    private static final Logger LOG = LoggerFactory.getLogger(DockerManager.class);
+    private Map<String, String> workerToCid = new ConcurrentHashMap<>();
+
+    @Override
+    public void prepare(Map<String, Object> conf) throws IOException {
+        super.prepare(conf);
+    }
+
+    private String[] getGroupIdInfo(String userName)
+        throws IOException {
+        String[] groupIds;
+        try {
+            String output = new ShellCommandRunnerImpl().execCommand("id", "--groups", userName);
+            groupIds = output.trim().split(" ");
+        } catch (IOException e) {
+            LOG.error("Can't get group IDs of the user {}", userName);
+            throw new IOException(e);
+        }
+        return groupIds;
+    }
+
+    private String getUserIdInfo(String userName) throws IOException {
+        String uid = "";
+        try {
+            uid = new ShellCommandRunnerImpl().execCommand("id", "--user", userName).trim();
+        } catch (IOException e) {
+            LOG.error("Can't get uid of the user {}", userName);
+            throw e;
+        }
+        return uid;
+    }
+
+    @Override
+    public void launchWorkerProcess(String user, String topologyId, Map<String, Object> topoConf,
+                                    int port, String workerId, List<String> command, Map<String, String> env,
+                                    String logPrefix, ExitCodeCallback processExitCallback,
+                                    File targetDir) throws IOException {
+        String dockerImage = getImageName(topoConf);
+        if (dockerImage == null) {
+            LOG.error("Image name for {} is not configured properly; will not continue to launch the worker", topologyId);
+            return;
+        }
+
+        String workerDir = targetDir.getAbsolutePath();
+
+        String uid = getUserIdInfo(user);
+        String[] groups = getGroupIdInfo(user);
+        String gid = groups[0];
+        String dockerUser = uid + ":" + gid;
+
+        DockerRunCommand dockerRunCommand = new DockerRunCommand(workerId, dockerUser, dockerImage);
+
+        //set of locations to be bind mounted
+        String workerRootDir = ConfigUtils.workerRoot(conf, workerId);
+        String workerArtifactsRoot = ConfigUtils.workerArtifactsRoot(conf, topologyId, port);
+        String workerUserFile = ConfigUtils.workerUserFile(conf, workerId);
+        String sharedByTopologyDir = ConfigUtils.sharedByTopologyDir(conf, topologyId);
+
+        // Theoretically we only need to mount ConfigUtils.supervisorStormDistRoot directory.
+        // But if supervisorLocalDir is not mounted, the worker will try to create it and fail.
+        String supervisorLocalDir = ConfigUtils.supervisorLocalDir(conf);
+        String workerTmpRoot = ConfigUtils.workerTmpRoot(conf, workerId);
+
+        dockerRunCommand.detachOnRun()
+            .setNetworkType("host")
+            //The whole file system of the container will be read-only except specific read-write bind mounts
+            .setReadonly()
+            .addReadOnlyMountLocation(cgroupRootPath, cgroupRootPath, false)
+            .addReadOnlyMountLocation(stormHome, stormHome, false)
+            .addReadOnlyMountLocation(supervisorLocalDir, supervisorLocalDir, false)
+            .addReadWriteMountLocation(workerRootDir, workerRootDir, false)
+            .addReadWriteMountLocation(workerArtifactsRoot, workerArtifactsRoot, false)
+            .addReadWriteMountLocation(workerUserFile, workerUserFile, false)
+            //nscd must be running so that profiling can work properly
+            .addReadWriteMountLocation(nscdPath, nscdPath, false)
+            .addReadWriteMountLocation(sharedByTopologyDir, sharedByTopologyDir, false)
+            //This is to make /tmp directory in container writable. This is very important.
+            // For example
+            // 1. jvm needs to write to /tmp/hsperfdata_<user> directory so that jps can work
+            // 2. jstack needs to create a socket under /tmp directory.
+            //Otherwise profiling will not work properly.
+            .addReadWriteMountLocation(workerTmpRoot, TMP_DIR, false)
+            //a list of read-only bind mount locations
+            .addAllReadOnlyMountLocations(readonlyBindmounts, false)
+            .addAllReadWriteMountLocations(readwriteBindmounts, false);
+
+        if (workerToCores.containsKey(workerId)) {
+            dockerRunCommand.addCpuSetBindings(
+                    workerToCores.get(workerId), workerToMemoryZone.get(workerId)
+            );
+        }
+
+        dockerRunCommand.setCGroupParent(cgroupParent)
+            .groupAdd(groups)
+            .setContainerWorkDir(workerDir)
+            .setCidFile(dockerCidFilePath(workerId))
+            .setCapabilities(Collections.emptySet())
+            .setNoNewPrivileges();
+
+        if (seccompJsonFile != null) {
+            dockerRunCommand.setSeccompProfile(seccompJsonFile);
+        }
+
+        if (workerToCpu.containsKey(workerId)) {
+            dockerRunCommand.setCpus(workerToCpu.get(workerId) / 100.0);
+        }
+
+        if (workerToMemoryMb.containsKey(workerId)) {
+            dockerRunCommand.setMemoryMb(workerToMemoryMb.get(workerId));
+        }
+
+        dockerRunCommand.setOverrideCommandWithArgs(Arrays.asList("bash", ServerUtils.writeScript(workerDir, command, env, "0027")));
+
+        //run docker-run command and launch container in background (-d option).
+        runDockerCommandWaitFor(conf, user, CmdType.LAUNCH_DOCKER_CONTAINER,
+            dockerRunCommand.getCommandWithArguments(), null, logPrefix, null, targetDir, "docker-run");
+
+        //docker-wait for the container in another thread. processExitCallback will get the container's exit code.
+        String threadName = "DockerWait_SLOT_" + port;
+        Utils.asyncLoop(new Callable<Long>() {
+            @Override
+            public Long call() throws IOException {
+                DockerWaitCommand dockerWaitCommand = new DockerWaitCommand(workerId);
+                try {
+                    runDockerCommandWaitFor(conf, user,  CmdType.RUN_DOCKER_CMD,
+                        dockerWaitCommand.getCommandWithArguments(), null, logPrefix, processExitCallback, targetDir, "docker-wait");
+                } catch (IOException e) {
+                    LOG.error("IOException on running docker wait command:", e);
+                    throw e;
+                }
+                return null; // Run only once.
+            }
+        }, threadName, null);
+
+    }
+
+    @Override
+    public void releaseResourcesForWorker(String workerId) {
+        super.releaseResourcesForWorker(workerId);
+        workerToCid.remove(workerId);
+    }
+
+    //Get the container ID of the worker
+    private String getContainerId(String workerId) throws IOException {
+        String cid = workerToCid.get(workerId);
+        if (cid == null) {
+            File cidFile = new File(dockerCidFilePath(workerId));
+            if (cidFile.exists()) {
+                List<String> lines = Files.readLines(cidFile, Charset.defaultCharset());
+                if (lines.isEmpty()) {
+                    LOG.error("cid file {} is empty.", cidFile);
+                } else {
+                    cid = lines.get(0);
+                }
+            } else {
+                LOG.error("cid file {} doesn't exist.", cidFile);
+            }
+
+            if (cid == null) {
+                LOG.error("Couldn't get container id of the worker {}", workerId);
+                throw new IOException("Couldn't get container id of the worker " + workerId);
+            } else {
+                workerToCid.put(workerId, cid);
+            }
+        }
+        return cid;
+    }
+
+    @Override
+    public long getMemoryUsage(String user, String workerId, int port) throws IOException {
+        String memoryCgroupPath = memoryCgroupRootPath + File.separator + getContainerId(workerId);
+        MemoryCore memoryCore = new MemoryCore(memoryCgroupPath);
+        return memoryCore.getPhysicalUsage();
+    }
+
+    @Override
+    public void kill(String user, String workerId) throws IOException {
+        String workerDir = ConfigUtils.workerRoot(conf, workerId);
+        DockerStopCommand dockerStopCommand = new DockerStopCommand(workerId);
+        runDockerCommandWaitFor(conf, user, CmdType.RUN_DOCKER_CMD, dockerStopCommand.getCommandWithArguments(),
+            null, null, null, new File(workerDir), "docker-stop");
+
+        DockerRmCommand dockerRmCommand = new DockerRmCommand(workerId);
+        runDockerCommandWaitFor(conf, user, CmdType.RUN_DOCKER_CMD, dockerRmCommand.getCommandWithArguments(),
+            null, null, null, new File(workerDir), "docker-rm");
+    }
+
+    @Override
+    public void forceKill(String user, String workerId) throws IOException {
+        String workerDir = ConfigUtils.workerRoot(conf, workerId);
+        DockerRmCommand dockerRmCommand = new DockerRmCommand(workerId);
+        dockerRmCommand.withForce();
+        runDockerCommandWaitFor(conf, user,  CmdType.RUN_DOCKER_CMD, dockerRmCommand.getCommandWithArguments(),
+            null, null, null, new File(workerDir), "docker-force-rm");
+    }
+
+    /**
+     * Currently it only checks if the container is alive.
+     * If the worker process inside the container dies, the container will exit.
+     * So we only need to check if the container is running to know if the worker process is still alive.
+     *
+     * @param user     the user of the processes
+     * @param workerId the id of the worker to kill
+     * @return true if all processes are dead
+     * @throws IOException on I/O exception
+     */
+    @Override
+    public boolean areAllProcessesDead(String user, String workerId) throws IOException {
+        String workerDir = ConfigUtils.workerRoot(conf, workerId);
+        DockerPsCommand dockerPsCommand = new DockerPsCommand();
+        dockerPsCommand.withNameFilter(workerId);
+        dockerPsCommand.withQuietOption();
+
+        String command = dockerPsCommand.getCommandWithArguments();
+
+        Process p = runDockerCommand(conf, user, CmdType.RUN_DOCKER_CMD, command,
+            null, null, null, new File(workerDir), "docker-ps");
+
+        try {
+            p.waitFor();
+        } catch (InterruptedException e) {
+            LOG.error("running docker command is interrupted", e);
+        }
+
+        if (p.exitValue() != 0) {
+            String errorMessage = "The exitValue of the docker command [" + command + "] is non-zero: " + p.exitValue();
+            LOG.error(errorMessage);
+            throw new IOException(errorMessage);
+        }
+
+        String output = IOUtils.toString(p.getInputStream(), Charset.forName("UTF-8"));
+        LOG.debug("The output of the docker command [{}] is: [{}]; the exitValue is {}", command, output, p.exitValue());
+        //The output might include some things else
+        //The real output of the docker-ps command is either empty or the container's short ID
+        output = output.trim();
+        String[] lines = output.split("\n");
+        if (lines.length == 0) {
+            //output is empty, the container is not running
+            return true;
+        }
+        String lastLine = lines[lines.length - 1].trim();
+        if (lastLine.isEmpty()) {
+            return true;
+        }
+
+        try {
+            String containerId = getContainerId(workerId);
+            return !containerId.startsWith(lastLine);
+        } catch (IOException e) {
+            LOG.error("Failed to find Container ID for {}, assuming dead", workerId, e);
+            return true;
+        }
+    }
+
+
+    /**
+     * Run profiling command in the container.
+     * @param user the user that the worker is running as
+     * @param workerId the id of the worker
+     * @param command the command to run.
+     *                The profiler to be used is configured in worker-launcher.cfg.
+     * @param env the environment to run the command
+     * @param logPrefix the prefix to include in the logs
+     * @param targetDir the working directory to run the command in
+     * @return true if the command succeeds, false otherwise.
+     * @throws IOException on I/O exception
+     * @throws InterruptedException if interrupted
+     */
+    @Override
+    public boolean runProfilingCommand(String user, String workerId, List<String> command, Map<String, String> env,
+                                       String logPrefix, File targetDir) throws IOException, InterruptedException {
+        String workerDir = targetDir.getAbsolutePath();
+
+        String profilingArgs = StringUtils.join(command, " ");
+
+        //run nsenter
+        String nsenterScriptPath = writeToCommandFile(workerDir, profilingArgs, "profile");
+
+        List<String> args = Arrays.asList(CmdType.PROFILE_DOCKER_CONTAINER.toString(), workerId, nsenterScriptPath);
+
+        Process process = ClientSupervisorUtils.processLauncher(
+                conf, user, null, args, env, logPrefix, null, targetDir
+        );
+
+        process.waitFor();
+
+        int exitCode = process.exitValue();
+        LOG.debug("WorkerId {} : exitCode from {}: {}", workerId, CmdType.PROFILE_DOCKER_CONTAINER.toString(), exitCode);
+
+        return exitCode == 0;
+    }
+
+    @Override
+    public void cleanup(String user, String workerId, int port) throws IOException {
+        //NO OP
+    }
+
+    private String dockerCidFilePath(String workerId) {
+        return ConfigUtils.workerRoot(conf, workerId) + File.separator + "container.cid";
+    }
+
+    @Override
+    public boolean isResourceManaged() {
+        return true;
+    }
+
+    /**
+     * Run docker command using {@link Config#SUPERVISOR_WORKER_LAUNCHER}.
+     *
+     * @param conf             the storm conf
+     * @param dockerCommand    the docker command to run
+     * @param environment      the environment
+     * @param logPrefix        the prefix of logs
+     * @param exitCodeCallback the exit call back
+     * @param targetDir        the working directory
+     * @return the Process
+     * @throws IOException on I/O exception
+     */
+    private Process runDockerCommand(Map<String, Object> conf, String user,
+                                     CmdType cmdType, String dockerCommand,
+                                     Map<String, String> environment, final String logPrefix,
+                                     final ExitCodeCallback exitCodeCallback, File targetDir, String commandTag) throws IOException {
+        String workerDir = targetDir.getAbsolutePath();
+
+        String dockerScriptPath = writeToCommandFile(workerDir, dockerCommand, commandTag);
+
+        List<String> args = Arrays.asList(cmdType.toString(), workerDir, dockerScriptPath);
+
+        return ClientSupervisorUtils.processLauncher(conf, user, null, args, environment,
+            logPrefix, exitCodeCallback, targetDir);
+    }
+
+    /**
+     * Run docker command using {@link Config#SUPERVISOR_WORKER_LAUNCHER}.
+     *
+     * @param conf             the storm conf
+     * @param dockerCommand    the docker command to run
+     * @param environment      the environment
+     * @param logPrefix        the prefix of logs
+     * @param exitCodeCallback the exit call back
+     * @param targetDir        the working directory
+     * @return the Process
+     * @throws IOException on I/O exception
+     */
+    private int runDockerCommandWaitFor(Map<String, Object> conf, String user,
+                                        CmdType cmdType, String dockerCommand,
+                                        Map<String, String> environment, final String logPrefix,
+                                        final ExitCodeCallback exitCodeCallback, File targetDir, String commandTag) throws IOException {
+        Process p = runDockerCommand(
+                conf, user, cmdType, dockerCommand, environment, logPrefix,
+                exitCodeCallback, targetDir, commandTag
+        );
+
+        try {
+            p.waitFor();
+        } catch (InterruptedException e) {
+            LOG.error("running docker command is interrupted", e);
+        }
+        return p.exitValue();
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/container/docker/DockerPsCommand.java b/storm-server/src/main/java/org/apache/storm/container/docker/DockerPsCommand.java
new file mode 100644
index 0000000..6288f2a
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/docker/DockerPsCommand.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+
+public class DockerPsCommand extends DockerCommand {
+    private static final String PS_COMMAND = "ps";
+
+    public DockerPsCommand() {
+        super(PS_COMMAND);
+    }
+
+    public DockerPsCommand withQuietOption() {
+        super.addCommandArguments("--quiet=true");
+        return this;
+    }
+
+    public DockerPsCommand withNameFilter(String containerName) {
+        super.addCommandArguments("--filter=name=" + containerName);
+        return this;
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/container/docker/DockerRmCommand.java b/storm-server/src/main/java/org/apache/storm/container/docker/DockerRmCommand.java
new file mode 100644
index 0000000..60d3f1c
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/docker/DockerRmCommand.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Encapsulates the docker rm command and its command line arguments.
+ */
+public class DockerRmCommand extends DockerCommand {
+    private static final String RM_COMMAND = "rm";
+    private String containerName;
+
+    public DockerRmCommand(String containerName) {
+        super(RM_COMMAND);
+        this.containerName = containerName;
+    }
+
+    public DockerRmCommand withForce() {
+        super.addCommandArguments("--force");
+        return this;
+    }
+
+    /**
+     * Get the full command.
+     * @return the full command
+     */
+    @Override
+    public String getCommandWithArguments() {
+        List<String> argList = new ArrayList<>();
+        argList.add(super.getCommandWithArguments());
+        argList.add(containerName);
+        return StringUtils.join(argList, " ");
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/container/docker/DockerRunCommand.java b/storm-server/src/main/java/org/apache/storm/container/docker/DockerRunCommand.java
new file mode 100644
index 0000000..0c5a087
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/docker/DockerRunCommand.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the docker run command and its command line arguments.
+ */
+public class DockerRunCommand extends DockerCommand {
+    private static final Logger LOG = LoggerFactory.getLogger(DockerRunCommand.class);
+    private static final String RUN_COMMAND = "run";
+    private final String image;
+    private List<String> overrideCommandWithArgs;
+
+    /**
+     * The Construction function.
+     * @param containerName the container name
+     * @param userInfo the info of the user, e.g. "uid:gid"
+     * @param image the container image
+     */
+    public DockerRunCommand(String containerName, String userInfo, String image) {
+        super(RUN_COMMAND);
+        super.addCommandArguments("--name=" + containerName, "--user=" + userInfo);
+        this.image = image;
+    }
+
+    /**
+     * Add --rm option.
+     * @return the self
+     */
+    public DockerRunCommand removeContainerOnExit() {
+        super.addCommandArguments("--rm");
+        return this;
+    }
+
+    /**
+     * Add -d option.
+     * @return the self
+     */
+    public DockerRunCommand detachOnRun() {
+        super.addCommandArguments("-d");
+        return this;
+    }
+
+    /**
+     * Set --workdir option.
+     * @param workdir the working directory
+     * @return the self
+     */
+    public DockerRunCommand setContainerWorkDir(String workdir) {
+        super.addCommandArguments("--workdir=" + workdir);
+        return this;
+    }
+
+    /**
+     * Set --net option.
+     * @param type the network type
+     * @return the self
+     */
+    public DockerRunCommand setNetworkType(String type) {
+        super.addCommandArguments("--net=" + type);
+        return this;
+    }
+
+    /**
+     * Add bind mount locations.
+     * @param sourcePath the source path
+     * @param destinationPath the destination path
+     * @param createSource if createSource is false and the source path doesn't exist, do nothing
+     * @return the self
+     */
+    public DockerRunCommand addReadWriteMountLocation(String sourcePath, String
+        destinationPath, boolean createSource) throws IOException {
+        if (!createSource) {
+            boolean sourceExists = new File(sourcePath).exists();
+            if (!sourceExists) {
+                throw new IOException("SourcePath " + sourcePath + " doesn't exit.");
+            }
+        }
+        super.addCommandArguments("-v", sourcePath + ":" + destinationPath);
+        return this;
+    }
+
+    public DockerRunCommand addReadWriteMountLocation(String sourcePath, String
+        destinationPath) throws IOException {
+        return addReadWriteMountLocation(sourcePath, destinationPath, true);
+    }
+
+    /**
+     * Add all the rw bind mount locations.
+     * @param paths the locations
+     * @return the self
+     */
+    public DockerRunCommand addAllReadWriteMountLocations(List<String> paths) throws IOException {
+        return addAllReadWriteMountLocations(paths, true);
+    }
+
+    /**
+     * Add all the rw bind mount locations.
+     * @param paths the locations
+     * @param createSource if createSource is false and the source path doesn't exist, do nothing
+     * @return the self
+     */
+    public DockerRunCommand addAllReadWriteMountLocations(List<String> paths,
+                                                          boolean createSource) throws IOException {
+        for (String dir: paths) {
+            this.addReadWriteMountLocation(dir, dir, createSource);
+        }
+        return this;
+    }
+
+    /**
+     * Add readonly bind mount location.
+     * @param sourcePath the source path
+     * @param destinationPath the destination path
+     * @param createSource if createSource is false and the source path doesn't exist, do nothing
+     * @return the self
+     */
+    public DockerRunCommand addReadOnlyMountLocation(String sourcePath, String destinationPath,
+                                                     boolean createSource) throws IOException {
+        if (!createSource) {
+            boolean sourceExists = new File(sourcePath).exists();
+            if (!sourceExists) {
+                throw new IOException("SourcePath " + sourcePath + " doesn't exit.");
+            }
+        }
+        super.addCommandArguments("-v", sourcePath + ":" + destinationPath + ":ro");
+        return this;
+    }
+
+    /**
+     * Add readonly bind mout location.
+     * @param sourcePath the source path
+     * @param destinationPath the destination path
+     * @return the self
+     */
+    public DockerRunCommand addReadOnlyMountLocation(String sourcePath,
+                                                     String destinationPath) throws IOException {
+        return addReadOnlyMountLocation(sourcePath, destinationPath, true);
+    }
+
+    /**
+     * Add all readonly locations.
+     * @param paths the locations
+     * @return the self
+     */
+    public DockerRunCommand addAllReadOnlyMountLocations(List<String> paths) throws IOException {
+        return addAllReadOnlyMountLocations(paths, true);
+    }
+
+    /**
+     * Add all readonly locations.
+     * @param paths the locations
+     * @param createSource if createSource is false and the source path doesn't exist, do nothing
+     * @return the self
+     */
+    public DockerRunCommand addAllReadOnlyMountLocations(List<String> paths,
+                                                         boolean createSource) throws IOException {
+        for (String dir: paths) {
+            this.addReadOnlyMountLocation(dir, dir, createSource);
+        }
+        return this;
+    }
+
+    public DockerRunCommand addCpuSetBindings(List<String> cores, String memoryNode) {
+        if (!cores.isEmpty()) {
+            super.addCommandArguments("--cpuset-cpus=" + StringUtils.join(cores, ","));
+        }
+
+        if (memoryNode != null) {
+            super.addCommandArguments("--cpuset-mems=" + memoryNode);
+        }
+        return this;
+    }
+
+    /**
+     * Set --cgroup-parent option.
+     * @param parentPath the cgroup parent path
+     * @return the self
+     */
+    public DockerRunCommand setCGroupParent(String parentPath) {
+        super.addCommandArguments("--cgroup-parent=" + parentPath);
+        return this;
+    }
+
+    /**
+     * Set --privileged option to run a privileged container. Use with extreme care.
+     * @return the self.
+     */
+    public DockerRunCommand setPrivileged() {
+        super.addCommandArguments("--privileged");
+        return this;
+    }
+
+    /**
+     * Set capabilities of the container.
+     * @param capabilities the capabilities to be added
+     * @return the self
+     */
+    public DockerRunCommand setCapabilities(Set<String> capabilities) {
+        //first, drop all capabilities
+        super.addCommandArguments("--cap-drop=ALL");
+
+        //now, add the capabilities supplied
+        for (String capability : capabilities) {
+            super.addCommandArguments("--cap-add=" + capability);
+        }
+
+        return this;
+    }
+
+    /**
+     * Set --device option.
+     * @param sourceDevice the source device
+     * @param destinationDevice the destination device
+     * @return the self
+     */
+    public DockerRunCommand addDevice(String sourceDevice, String destinationDevice) {
+        super.addCommandArguments("--device=" + sourceDevice + ":" + destinationDevice);
+        return this;
+    }
+
+    /**
+     * Enable detach.
+     * @return the self
+     */
+    public DockerRunCommand enableDetach() {
+        super.addCommandArguments("--detach=true");
+        return this;
+    }
+
+    /**
+     * Disable detach.
+     * @return the self
+     */
+    public DockerRunCommand disableDetach() {
+        super.addCommandArguments("--detach=false");
+        return this;
+    }
+
+    /**
+     * Set --group-add option.
+     * @param groups the groups to be added
+     * @return the self
+     */
+    public DockerRunCommand groupAdd(String[] groups) {
+        for (int i = 0; i < groups.length; i++) {
+            super.addCommandArguments("--group-add " + groups[i]);
+        }
+        return this;
+    }
+
+    /**
+     * Set extra commands and args. It can override the existing commands.
+     * @param overrideCommandWithArgs the extra commands and args
+     * @return the self
+     */
+    public DockerRunCommand setOverrideCommandWithArgs(
+        List<String> overrideCommandWithArgs) {
+        this.overrideCommandWithArgs = overrideCommandWithArgs;
+        return this;
+    }
+
+    /**
+     * Add --read-only option.
+     * @return the self
+     */
+    public DockerRunCommand setReadonly() {
+        super.addCommandArguments("--read-only");
+        return this;
+    }
+
+    /**
+     * Set --security-opt option.
+     * @param jsonPath the path to the json file
+     * @return the self
+     */
+    public DockerRunCommand setSeccompProfile(String jsonPath) {
+        super.addCommandArguments("--security-opt seccomp=" + jsonPath);
+        return this;
+    }
+
+    /**
+     * Set no-new-privileges option.
+     * @return the self
+     */
+    public DockerRunCommand setNoNewPrivileges() {
+        super.addCommandArguments("--security-opt no-new-privileges");
+        return this;
+    }
+
+    /**
+     * Set cpuShares.
+     * @param cpuShares the cpu shares
+     * @return the self
+     */
+    public DockerRunCommand setCpuShares(int cpuShares) {
+        // Zero sets to default of 1024.  2 is the minimum value otherwise
+        if (cpuShares > 0 && cpuShares < 2) {
+            cpuShares = 2;
+        }
+        super.addCommandArguments("--cpu-shares=" + String.valueOf(cpuShares));
+        return this;
+    }
+
+    /**
+     * Set the number of cpus to use.
+     * @param cpus the number of cpus
+     * @return the self
+     */
+    public DockerRunCommand setCpus(double cpus) {
+        super.addCommandArguments("--cpus=" + cpus);
+        return this;
+    }
+
+    /**
+     * Set the number of memory in MB to use.
+     * @param memoryMb the number of memory in MB
+     * @return the self
+     */
+    public DockerRunCommand setMemoryMb(int memoryMb) {
+        super.addCommandArguments("--memory=" + memoryMb + "m");
+        return this;
+    }
+
+    /**
+     * Set the output container id file location.
+     * @param cidFile the container id file
+     * @return the self
+     */
+    public DockerRunCommand setCidFile(String cidFile) {
+        super.addCommandArguments("--cidfile=" + cidFile);
+        return this;
+    }
+
+    /**
+     * Get the full command.
+     * @return the full command
+     */
+    @Override
+    public String getCommandWithArguments() {
+        List<String> argList = new ArrayList<>();
+
+        argList.add(super.getCommandWithArguments());
+        argList.add(image);
+
+        if (overrideCommandWithArgs != null) {
+            argList.addAll(overrideCommandWithArgs);
+        }
+
+        return StringUtils.join(argList, " ");
+    }
+}
+
diff --git a/storm-server/src/main/java/org/apache/storm/container/docker/DockerStopCommand.java b/storm-server/src/main/java/org/apache/storm/container/docker/DockerStopCommand.java
new file mode 100644
index 0000000..dcd59a4
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/docker/DockerStopCommand.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Encapsulates the docker stop command and its command line arguments.
+ */
+public class DockerStopCommand extends DockerCommand {
+    private static final String STOP_COMMAND = "stop";
+    private String containerName;
+
+    public DockerStopCommand(String containerName) {
+        super(STOP_COMMAND);
+        this.containerName = containerName;
+    }
+
+    public DockerStopCommand setGracePeriod(int value) {
+        super.addCommandArguments("--time=" + Integer.toString(value));
+        return this;
+    }
+
+    /**
+     * Get the full command.
+     * @return the full command
+     */
+    @Override
+    public String getCommandWithArguments() {
+        List<String> argList = new ArrayList<>();
+        argList.add(super.getCommandWithArguments());
+        argList.add(containerName);
+        return StringUtils.join(argList, " ");
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/container/docker/DockerWaitCommand.java b/storm-server/src/main/java/org/apache/storm/container/docker/DockerWaitCommand.java
new file mode 100644
index 0000000..da8b691
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/docker/DockerWaitCommand.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * Encapsulates the docker wait command and its command line arguments.
+ */
+public class DockerWaitCommand extends DockerCommand {
+    private static final String WAIT_COMMAND = "wait";
+    private String containerName;
+
+    public DockerWaitCommand(String containerName) {
+        super(WAIT_COMMAND);
+        this.containerName = containerName;
+    }
+
+    /**
+     * Get the full command.
+     * @return the full command
+     */
+    @Override
+    public String getCommandWithArguments() {
+        List<String> argList = new ArrayList<>();
+        argList.add(super.getCommandWithArguments());
+        argList.add(containerName);
+        return StringUtils.join(argList, " ");
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/container/oci/OciContainerManager.java b/storm-server/src/main/java/org/apache/storm/container/oci/OciContainerManager.java
new file mode 100644
index 0000000..1f3f41b
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/oci/OciContainerManager.java
@@ -0,0 +1,177 @@
+/*
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.storm.container.oci;
+
+import static org.apache.storm.ServerConstants.NUMA_CORES;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.container.cgroup.core.MemoryCore;
+import org.apache.storm.daemon.supervisor.SupervisorUtils;
+import org.apache.storm.utils.ConfigUtils;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.ServerUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class OciContainerManager implements ResourceIsolationInterface {
+    private static final Logger LOG = LoggerFactory.getLogger(OciContainerManager.class);
+
+    protected Map<String, Object> conf;
+    protected List<String> readonlyBindmounts;
+    protected List<String> readwriteBindmounts;
+    protected String seccompJsonFile;
+    protected String nscdPath;
+    protected static final String TMP_DIR = File.separator + "tmp";
+    protected String stormHome;
+    protected String cgroupRootPath;
+    protected String cgroupParent;
+
+    protected String memoryCgroupRootPath;
+    protected MemoryCore memoryCoreAtRoot;
+
+    protected Map<String, Integer> workerToCpu = new ConcurrentHashMap<>();
+    protected Map<String, Integer> workerToMemoryMb = new ConcurrentHashMap<>();
+    protected Map<String, Object> validatedNumaMap = new ConcurrentHashMap();
+    protected Map<String, List<String>> workerToCores = new ConcurrentHashMap<>();
+    protected Map<String, String> workerToMemoryZone = new ConcurrentHashMap<>();
+
+    @Override
+    public void prepare(Map<String, Object> conf) throws IOException {
+        this.conf = conf;
+
+        readonlyBindmounts = ObjectReader.getStrings(conf.get(DaemonConfig.STORM_OCI_READONLY_BINDMOUNTS));
+
+        readwriteBindmounts = ObjectReader.getStrings(conf.get(DaemonConfig.STORM_OCI_READWRITE_BINDMOUNTS));
+
+        seccompJsonFile = (String) conf.get(DaemonConfig.STORM_OCI_SECCOMP_PROFILE);
+
+        nscdPath = ObjectReader.getString(conf.get(DaemonConfig.STORM_OCI_NSCD_DIR));
+
+        stormHome = System.getProperty(ConfigUtils.STORM_HOME);
+
+        cgroupRootPath = ObjectReader.getString(conf.get(Config.STORM_OCI_CGROUP_ROOT));
+
+        cgroupParent = ObjectReader.getString(conf.get(DaemonConfig.STORM_OCI_CGROUP_PARENT));
+
+        if (!cgroupParent.startsWith(File.separator)) {
+            cgroupParent = File.separator + cgroupParent;
+            LOG.warn("{} is not an absolute path. Changing it to be absolute: {}", DaemonConfig.STORM_OCI_CGROUP_PARENT, cgroupParent);
+        }
+
+        memoryCgroupRootPath = cgroupRootPath + File.separator + "memory" + File.separator + cgroupParent;
+        memoryCoreAtRoot = new MemoryCore(memoryCgroupRootPath);
+        validatedNumaMap = SupervisorUtils.getNumaMap(conf);
+    }
+
+    @Override
+    public void reserveResourcesForWorker(String workerId, Integer workerMemoryMb, Integer workerCpu, String numaId) {
+        // The manually set STORM_WORKER_CGROUP_CPU_LIMIT config on supervisor will overwrite resources assigned by
+        // RAS (Resource Aware Scheduler)
+        if (conf.get(DaemonConfig.STORM_WORKER_CGROUP_CPU_LIMIT) != null) {
+            workerCpu = ((Number) conf.get(DaemonConfig.STORM_WORKER_CGROUP_CPU_LIMIT)).intValue();
+        }
+        workerToCpu.put(workerId, workerCpu);
+
+        if ((boolean) this.conf.get(DaemonConfig.STORM_CGROUP_MEMORY_ENFORCEMENT_ENABLE)) {
+            workerToMemoryMb.put(workerId, workerMemoryMb);
+        }
+
+        if (numaId != null) {
+            Map<String, Object> numaIdEntry = (Map<String, Object>) validatedNumaMap.get(numaId);
+            List<String> rawCores = ((List<Integer>) numaIdEntry.get(NUMA_CORES)).stream()
+                .map(rawCore -> String.valueOf(rawCore)).collect(Collectors.toList());
+            workerToCores.put(workerId, rawCores);
+            workerToMemoryZone.put(workerId, numaId);
+        }
+    }
+
+    @Override
+    public void releaseResourcesForWorker(String workerId) {
+        workerToCpu.remove(workerId);
+        workerToMemoryMb.remove(workerId);
+    }
+
+    @Override
+    public long getSystemFreeMemoryMb() throws IOException {
+        long rootCgroupLimitFree = Long.MAX_VALUE;
+
+        try {
+            //For cgroups no limit is max long.
+            long limit = memoryCoreAtRoot.getPhysicalUsageLimit();
+            long used = memoryCoreAtRoot.getMaxPhysicalUsage();
+            rootCgroupLimitFree = (limit - used) / 1024 / 1024;
+        } catch (FileNotFoundException e) {
+            //Ignored if cgroups is not setup don't do anything with it
+        }
+
+        return Long.min(rootCgroupLimitFree, ServerUtils.getMemInfoFreeMb());
+    }
+
+    /**
+     * Get image name from topology Conf.
+     * @param topoConf topology configuration
+     * @return the image name
+     */
+    protected String getImageName(Map<String, Object> topoConf) {
+        return (String) topoConf.get(Config.TOPOLOGY_OCI_IMAGE);
+    }
+
+    protected String commandFilePath(String dir, String commandTag) {
+        return dir + File.separator + commandTag + ".sh";
+    }
+
+    protected String writeToCommandFile(String workerDir, String command, String commandTag) throws IOException {
+        String scriptPath = commandFilePath(workerDir, commandTag);
+        try (BufferedWriter out = new BufferedWriter(new FileWriter(scriptPath))) {
+            out.write(command);
+        }
+        LOG.debug("command : {}; location: {}", command, scriptPath);
+        return scriptPath;
+    }
+
+    protected enum CmdType {
+        LAUNCH_DOCKER_CONTAINER("launch-docker-container"),
+        RUN_DOCKER_CMD("run-docker-cmd"),
+        PROFILE_DOCKER_CONTAINER("profile-docker-container");
+
+        private final String name;
+
+        CmdType(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String toString() {
+            return this.name;
+        }
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/container/oci/OciUtils.java b/storm-server/src/main/java/org/apache/storm/container/oci/OciUtils.java
new file mode 100644
index 0000000..9f0b5cf
--- /dev/null
+++ b/storm-server/src/main/java/org/apache/storm/container/oci/OciUtils.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.container.oci;
+
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.utils.ObjectReader;
+import org.apache.storm.utils.WrappedInvalidTopologyException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OciUtils {
+    private static final Logger LOG = LoggerFactory.getLogger(OciUtils.class);
+
+    /**
+     * Adjust the image config for the topology.
+     * If OCI container is not supported, remove the oci image setting from the topoConf;
+     * otherwise, set it to the default image if it's null.
+     * @param conf the daemon conf
+     * @param topoConf the topology conf
+     * @param topoId the topology Id
+     * @throws InvalidTopologyException if image config is invalid
+     */
+    public static void adjustImageConfigForTopo(Map<String, Object> conf, Map<String, Object> topoConf, String topoId)
+        throws InvalidTopologyException {
+
+        //don't need sanity check here as we assume it's already done during daemon startup
+        List<String> allowedImages = getAllowedImages(conf, false);
+        String topoImage = (String) topoConf.get(Config.TOPOLOGY_OCI_IMAGE);
+
+        if (allowedImages.isEmpty()) {
+            if (topoImage != null) {
+                LOG.warn("{} is not configured; this indicates OCI container is not supported; "
+                        + "{} config for topology {} will be removed",
+                    DaemonConfig.STORM_OCI_ALLOWED_IMAGES, Config.TOPOLOGY_OCI_IMAGE, topoId);
+                topoConf.remove(Config.TOPOLOGY_OCI_IMAGE);
+            }
+        } else {
+            if (topoImage == null) {
+                //we assume the default image is already validated during daemon startup
+                String defaultImage = (String) conf.get(DaemonConfig.STORM_OCI_IMAGE);
+                topoImage = defaultImage;
+                topoConf.put(Config.TOPOLOGY_OCI_IMAGE, topoImage);
+                LOG.info("{} is not set for topology {}; set it to the default image {} configured in {}",
+                    Config.TOPOLOGY_OCI_IMAGE, topoId, defaultImage, DaemonConfig.STORM_OCI_IMAGE);
+            } else {
+                try {
+                    validateImage(allowedImages, topoImage, Config.TOPOLOGY_OCI_IMAGE);
+                } catch (IllegalArgumentException e) {
+                    throw new WrappedInvalidTopologyException(e.getMessage());
+                }
+            }
+        }
+    }
+
+    /**
+     * Validates the image setting in the daemon conf.
+     * This will be skipped if {@link DaemonConfig#STORM_OCI_ALLOWED_IMAGES} not configured.
+     * @param conf the daemon conf
+     */
+    public static void validateImageInDaemonConf(Map<String, Object> conf) {
+        List<String> allowedImages = getAllowedImages(conf, true);
+        if (allowedImages.isEmpty()) {
+            LOG.debug("{} is not configured; skip image validation", DaemonConfig.STORM_OCI_ALLOWED_IMAGES);
+        } else {
+            String defaultImage = (String) conf.get(DaemonConfig.STORM_OCI_IMAGE);
+            validateImage(allowedImages, defaultImage, DaemonConfig.STORM_OCI_IMAGE);
+        }
+    }
+
+    private static final String OCI_IMAGE_PATTERN = "^(([a-zA-Z0-9.-]+)(:\\d+)?/)?([a-z0-9_./-]+)(:[\\w.-]+)?$";
+    private static final Pattern ociImagePattern = Pattern.compile(OCI_IMAGE_PATTERN);
+    /**
+     * special case for allowing all images; should only be used in {@link DaemonConfig#STORM_OCI_ALLOWED_IMAGES}.
+     */
+    private static final String ASTERISK = "*";
+
+    /**
+     * This is a helper function to validate the image.
+     * @param allowedImages the allowed image list
+     * @param imageToValidate the image to be validated
+     * @param imageConfigKey the config where this image comes from; this is for logging purpose.
+     */
+    private static void validateImage(List<String> allowedImages, String imageToValidate, String imageConfigKey) {
+        if (imageToValidate == null) {
+            throw new IllegalArgumentException(imageConfigKey + " is null");
+        }
+
+        if (!allowedImages.contains(ASTERISK) && !allowedImages.contains(imageToValidate)) {
+            throw new IllegalArgumentException(imageConfigKey + "=" + imageToValidate
+                + " is not in the list of " + DaemonConfig.STORM_OCI_ALLOWED_IMAGES + ": "
+                + allowedImages);
+        }
+
+        if (!ociImagePattern.matcher(imageToValidate).matches()) {
+            throw new IllegalArgumentException(imageConfigKey + "=" + imageToValidate
+                + " doesn't match the pattern " + OCI_IMAGE_PATTERN);
+        }
+    }
+
+    private static List<String> getAllowedImages(Map<String, Object> conf, boolean validationEnforced) {
+        List<String> allowedImages = ObjectReader.getStrings(conf.get(DaemonConfig.STORM_OCI_ALLOWED_IMAGES));
+
+        if (validationEnforced) {
+            //check if image name matches the required pattern
+            for (String image : allowedImages) {
+                if (!image.equals(ASTERISK) && !ociImagePattern.matcher(image).matches()) {
+                    throw new IllegalArgumentException(image + " in the list of "
+                        + DaemonConfig.STORM_OCI_ALLOWED_IMAGES
+                        + " doesn't match the pattern " + OCI_IMAGE_PATTERN
+                        + " or is not " + ASTERISK);
+                }
+            }
+        }
+        return allowedImages;
+    }
+}
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
index 8c2c348..bdb65b6 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java
@@ -76,6 +76,7 @@
 import org.apache.storm.cluster.ClusterUtils;
 import org.apache.storm.cluster.DaemonType;
 import org.apache.storm.cluster.IStormClusterState;
+import org.apache.storm.container.oci.OciUtils;
 import org.apache.storm.daemon.DaemonCommon;
 import org.apache.storm.daemon.Shutdownable;
 import org.apache.storm.daemon.StormCommon;
@@ -1481,6 +1482,7 @@
     private static Nimbus launchServer(Map<String, Object> conf, INimbus inimbus) throws Exception {
         StormCommon.validateDistributedMode(conf);
         validatePortAvailable(conf);
+        OciUtils.validateImageInDaemonConf(conf);
         StormMetricsRegistry metricsRegistry = new StormMetricsRegistry();
         final Nimbus nimbus = new Nimbus(conf, inimbus, metricsRegistry);
         nimbus.launchServer();
@@ -3188,6 +3190,8 @@
             topoConf.put(Config.TOPOLOGY_NAME, topoName);
             topoConf = normalizeConf(conf, topoConf, topology);
 
+            OciUtils.adjustImageConfigForTopo(conf, topoConf, topoId);
+
             ReqContext req = ReqContext.context();
             Principal principal = req.principal();
             String submitterPrincipal = principal == null ? null : principal.toString();
diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
index 51fef1a..caee4c0 100644
--- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
+++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/BasicContainer.java
@@ -39,6 +39,7 @@
 import org.apache.storm.DaemonConfig;
 import org.apache.storm.ServerConstants;
 import org.apache.storm.container.ResourceIsolationInterface;
+import org.apache.storm.container.oci.OciContainerManager;
 import org.apache.storm.generated.LocalAssignment;
 import org.apache.storm.generated.ProfileAction;
 import org.apache.storm.generated.ProfileRequest;
@@ -151,11 +152,20 @@
             createNewWorkerId();
         }
 
-        if (profileCmd == null) {
-            profileCmd = stormHome + File.separator + "bin" + File.separator
-                         + conf.get(DaemonConfig.WORKER_PROFILER_COMMAND);
+        if (resourceIsolationManager instanceof OciContainerManager) {
+            //When we use OciContainerManager, we will only use the profiler configured in worker-launcher.cfg due to security reasons
+            LOG.debug("Supervisor is using {} as the {}."
+                    + "The profiler set at worker.profiler.script.path in worker-launcher.cfg is the only profiler to be used. "
+                    + "Please make sure it is configured properly",
+                resourceIsolationManager.getClass().getName(), ResourceIsolationInterface.class.getName());
+            this.profileCmd = "";
+        } else {
+            if (profileCmd == null) {
+                profileCmd = stormHome + File.separator + "bin" + File.separator
+                    + conf.get(DaemonConfig.WORKER_PROFILER_COMMAND);
+            }
+            this.profileCmd = profileCmd;
         }
-        this.profileCmd = profileCmd;
 
         hardMemoryLimitMultiplier =
             ObjectReader.getDouble(conf.get(DaemonConfig.STORM_SUPERVISOR_HARD_MEMORY_LIMIT_MULTIPLIER), 2.0);
diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index fd9983e..7868dcf 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -286,6 +286,20 @@
      */
     public static String writeScript(String dir, List<String> command,
                                      Map<String, String> environment) throws IOException {
+        return writeScript(dir, command, environment, null);
+    }
+
+    /**
+     * Writes a posix shell script file to be executed in its own process.
+     *
+     * @param dir         the directory under which the script is to be written
+     * @param command     the command the script is to execute
+     * @param environment optional environment variables to set before running the script's command. May be  null.
+     * @param umask umask to be set. It can be null.
+     * @return the path to the script that has been written
+     */
+    public static String writeScript(String dir, List<String> command,
+                                     Map<String, String> environment, String umask) throws IOException {
         String path = scriptFilePath(dir);
         try (BufferedWriter out = new BufferedWriter(new FileWriter(path))) {
             out.write("#!/bin/bash");
@@ -304,6 +318,10 @@
                 }
             }
             out.newLine();
+            if (umask != null) {
+                out.write("umask " + umask);
+                out.newLine();
+            }
             out.write("exec " + shellCmd(command) + ";");
         }
         return path;
diff --git a/storm-server/src/test/java/org/apache/storm/container/docker/DockerExecCommandTest.java b/storm-server/src/test/java/org/apache/storm/container/docker/DockerExecCommandTest.java
new file mode 100644
index 0000000..43e419a
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/container/docker/DockerExecCommandTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DockerExecCommandTest {
+
+    private DockerExecCommand dockerExecCommand;
+    private static final String CONTAINER_NAME = "container_name";
+
+    @Before
+    public void setup() {
+        dockerExecCommand = new DockerExecCommand(CONTAINER_NAME);
+    }
+
+    @Test
+    public void getCommandOption() {
+        assertEquals("exec", dockerExecCommand.getCommandOption());
+    }
+
+    @Test
+    public void getCommandWithArguments() {
+        dockerExecCommand.addExecCommand(Arrays.asList("ls", "-l"));
+        assertEquals("exec container_name ls -l",
+            dockerExecCommand.getCommandWithArguments());
+    }
+}
\ No newline at end of file
diff --git a/storm-server/src/test/java/org/apache/storm/container/docker/DockerInspectCommandTest.java b/storm-server/src/test/java/org/apache/storm/container/docker/DockerInspectCommandTest.java
new file mode 100644
index 0000000..d29af23
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/container/docker/DockerInspectCommandTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import static org.junit.Assert.assertEquals;
+
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class DockerInspectCommandTest {
+
+    private DockerInspectCommand dockerInspectCommand;
+    private static final String CONTAINER_NAME = "container_name";
+
+    @Before
+    public void setup() {
+        dockerInspectCommand = new DockerInspectCommand(CONTAINER_NAME);
+    }
+
+    @Test
+    public void getCommandOption() {
+        assertEquals("inspect", dockerInspectCommand.getCommandOption());
+    }
+
+    @Test
+    public void getCommandWithArguments() {
+        dockerInspectCommand.withGettingContainerStatus();
+        assertEquals("inspect --format='{{.State.Status}}' container_name",
+            dockerInspectCommand.getCommandWithArguments());
+    }
+}
\ No newline at end of file
diff --git a/storm-server/src/test/java/org/apache/storm/container/docker/DockerPsCommandTest.java b/storm-server/src/test/java/org/apache/storm/container/docker/DockerPsCommandTest.java
new file mode 100644
index 0000000..72cd220
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/container/docker/DockerPsCommandTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import static org.junit.Assert.assertEquals;
+
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class DockerPsCommandTest {
+
+    private DockerPsCommand dockerPsCommand;
+
+    @Before
+    public void setup() {
+        dockerPsCommand = new DockerPsCommand();
+    }
+
+    @Test
+    public void getCommandOption() {
+        assertEquals("ps", dockerPsCommand.getCommandOption());
+    }
+
+    @Test
+    public void getCommandWithArguments() {
+        dockerPsCommand.withNameFilter("container_name");
+        dockerPsCommand.withQuietOption();
+        assertEquals("ps --filter=name=container_name --quiet=true",
+            dockerPsCommand.getCommandWithArguments());
+    }
+}
\ No newline at end of file
diff --git a/storm-server/src/test/java/org/apache/storm/container/docker/DockerRmCommandTest.java b/storm-server/src/test/java/org/apache/storm/container/docker/DockerRmCommandTest.java
new file mode 100644
index 0000000..13363f5
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/container/docker/DockerRmCommandTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import static org.junit.Assert.assertEquals;
+
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class DockerRmCommandTest {
+    private DockerRmCommand dockerRmCommand;
+    private static final String CONTAINER_NAME = "container_name";
+
+    @Before
+    public void setup() {
+        dockerRmCommand = new DockerRmCommand(CONTAINER_NAME);
+    }
+
+    @Test
+    public void getCommandOption() {
+        assertEquals("rm", dockerRmCommand.getCommandOption());
+    }
+
+    @Test
+    public void getCommandWithArguments() {
+        dockerRmCommand.withForce();
+        assertEquals("rm --force container_name",
+            dockerRmCommand.getCommandWithArguments());
+    }
+}
\ No newline at end of file
diff --git a/storm-server/src/test/java/org/apache/storm/container/docker/DockerRunCommandTest.java b/storm-server/src/test/java/org/apache/storm/container/docker/DockerRunCommandTest.java
new file mode 100644
index 0000000..35f5f4e
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/container/docker/DockerRunCommandTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import static org.junit.Assert.assertEquals;
+
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DockerRunCommandTest {
+    private DockerRunCommand dockerRunCommand;
+
+    private static final String CONTAINER_NAME = "foo";
+    private static final String USER_INFO = "user_id:group_id";
+    private static final String IMAGE_NAME = "image_name";
+
+    @Before
+    public void setUp() {
+        dockerRunCommand = new DockerRunCommand(CONTAINER_NAME, USER_INFO, IMAGE_NAME);
+    }
+
+    @Test
+    public void getCommandWithArguments() {
+        assertEquals("run", dockerRunCommand.getCommandOption());
+    }
+
+    @Test
+    public void getCommandOption() throws IOException {
+        String sourcePath = "source";
+        String destPath = "dest";
+        dockerRunCommand.detachOnRun()
+            .addReadWriteMountLocation(sourcePath, destPath);
+        List<String> commands = Arrays.asList("bash", "launch_command");
+        dockerRunCommand.setOverrideCommandWithArgs(commands);
+        dockerRunCommand.removeContainerOnExit();
+        assertEquals("run --name=foo --user=user_id:group_id -d -v source:dest --rm "
+                + "image_name bash launch_command",
+            dockerRunCommand.getCommandWithArguments());
+    }
+}
\ No newline at end of file
diff --git a/storm-server/src/test/java/org/apache/storm/container/docker/DockerStopCommandTest.java b/storm-server/src/test/java/org/apache/storm/container/docker/DockerStopCommandTest.java
new file mode 100644
index 0000000..f23f327
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/container/docker/DockerStopCommandTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import static org.junit.Assert.assertEquals;
+
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class DockerStopCommandTest {
+
+    private DockerStopCommand dockerStopCommand;
+    private static final String CONTAINER_NAME = "container_name";
+
+    @Before
+    public void setUp() throws Exception {
+        dockerStopCommand = new DockerStopCommand(CONTAINER_NAME);
+    }
+
+    @Test
+    public void getCommandOption() {
+        assertEquals("stop", dockerStopCommand.getCommandOption());
+    }
+
+    @Test
+    public void getCommandWithArguments() {
+        dockerStopCommand.setGracePeriod(3);
+        assertEquals("stop --time=3 container_name",
+            dockerStopCommand.getCommandWithArguments());
+    }
+}
\ No newline at end of file
diff --git a/storm-server/src/test/java/org/apache/storm/container/docker/DockerWaitCommandTest.java b/storm-server/src/test/java/org/apache/storm/container/docker/DockerWaitCommandTest.java
new file mode 100644
index 0000000..848df8a
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/container/docker/DockerWaitCommandTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
+ * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
+ * and limitations under the License.
+ */
+
+package org.apache.storm.container.docker;
+
+import static org.junit.Assert.assertEquals;
+
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class DockerWaitCommandTest {
+    private DockerWaitCommand dockerWaitCommand;
+    private static final String CONTAINER_NAME = "container_name";
+
+    @Before
+    public void setup() {
+        dockerWaitCommand = new DockerWaitCommand(CONTAINER_NAME);
+    }
+
+    @Test
+    public void getCommandOption() {
+        assertEquals("wait", dockerWaitCommand.getCommandOption());
+    }
+
+    @Test
+    public void getCommandWithArguments() {
+        assertEquals("wait container_name",
+            dockerWaitCommand.getCommandWithArguments());
+    }
+}
\ No newline at end of file
diff --git a/storm-server/src/test/java/org/apache/storm/container/oci/OciUtilsTest.java b/storm-server/src/test/java/org/apache/storm/container/oci/OciUtilsTest.java
new file mode 100644
index 0000000..74daba6
--- /dev/null
+++ b/storm-server/src/test/java/org/apache/storm/container/oci/OciUtilsTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.container.oci;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.Config;
+import org.apache.storm.DaemonConfig;
+import org.apache.storm.generated.InvalidTopologyException;
+import org.apache.storm.utils.WrappedInvalidTopologyException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OciUtilsTest {
+
+    @Test
+    public void validateImageInDaemonConfSkipped() {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(DaemonConfig.STORM_OCI_IMAGE, "storm/rhel7:dev_test");
+        //this is essentially a no-op
+        OciUtils.validateImageInDaemonConf(conf);
+    }
+
+    @Test
+    public void validateImageInDaemonConfTest() {
+        Map<String, Object> conf = new HashMap<>();
+        List<String> allowedImages = new ArrayList<>();
+        allowedImages.add("storm/rhel7:dev_test");
+        allowedImages.add("storm/rhel7:dev_current");
+        conf.put(DaemonConfig.STORM_OCI_ALLOWED_IMAGES, allowedImages);
+
+        conf.put(DaemonConfig.STORM_OCI_IMAGE, "storm/rhel7:dev_test");
+        OciUtils.validateImageInDaemonConf(conf);
+
+        allowedImages.add("*");
+        conf.put(DaemonConfig.STORM_OCI_IMAGE, "storm/rhel7:wow");
+        OciUtils.validateImageInDaemonConf(conf);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void validateImageInDaemonConfNotInAllowedList() {
+        Map<String, Object> conf = new HashMap<>();
+        List<String> allowedImages = new ArrayList<>();
+        allowedImages.add("storm/rhel7:dev_test");
+        conf.put(DaemonConfig.STORM_OCI_ALLOWED_IMAGES, allowedImages);
+
+        conf.put(DaemonConfig.STORM_OCI_IMAGE, "storm/rhel7:wow");
+        OciUtils.validateImageInDaemonConf(conf);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void validateImageInDaemonConfWithNullDefault() {
+        Map<String, Object> conf = new HashMap<>();
+        List<String> allowedImages = new ArrayList<>();
+        allowedImages.add("storm/rhel7:dev_test");
+        conf.put(DaemonConfig.STORM_OCI_ALLOWED_IMAGES, allowedImages);
+
+        conf.put(DaemonConfig.STORM_OCI_IMAGE, null); //or not set
+        OciUtils.validateImageInDaemonConf(conf);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void validateImageInDaemonConfWrongPattern() {
+        Map<String, Object> conf = new HashMap<>();
+        List<String> allowedImages = new ArrayList<>();
+        allowedImages.add("*");
+        conf.put(DaemonConfig.STORM_OCI_ALLOWED_IMAGES, allowedImages);
+
+        conf.put(DaemonConfig.STORM_OCI_IMAGE, "a-strange@image-name");
+        OciUtils.validateImageInDaemonConf(conf);
+    }
+
+    @Test
+    public void adjustImageConfigForTopoTest() throws InvalidTopologyException {
+        Map<String, Object> conf = new HashMap<>();
+        conf.put(DaemonConfig.STORM_OCI_ALLOWED_IMAGES, null); //or not set
+
+        Map<String, Object> topoConf = new HashMap<>();
+        String topoId = "topo1";
+
+        //case 1: nothing is not; nothing will happen
+        OciUtils.adjustImageConfigForTopo(conf, topoConf, topoId);
+
+        String image1 = "storm/rhel7:dev_test";
+        String defaultImage = "storm/rhel7:dev_current";
+
+        //case 2: allowed list is not set; topology oci image will be set to null
+        topoConf.put(Config.TOPOLOGY_OCI_IMAGE, image1);
+        OciUtils.adjustImageConfigForTopo(conf, topoConf, topoId);
+        Assert.assertNull(Config.TOPOLOGY_OCI_IMAGE + " is not removed", topoConf.get(Config.TOPOLOGY_OCI_IMAGE));
+
+        //set up daemon conf properly
+        List<String> allowedImages = new ArrayList<>();
+        allowedImages.add(image1);
+        allowedImages.add(defaultImage);
+        conf.put(DaemonConfig.STORM_OCI_ALLOWED_IMAGES, allowedImages);
+        conf.put(DaemonConfig.STORM_OCI_IMAGE, defaultImage);
+
+        //case 3: configs are set properly; nothing will happen
+        topoConf.put(Config.TOPOLOGY_OCI_IMAGE, image1);
+        OciUtils.adjustImageConfigForTopo(conf, topoConf, topoId);
+        Assert.assertEquals(image1, topoConf.get(Config.TOPOLOGY_OCI_IMAGE));
+
+        //case 4: topology oci image is not set; will be set to default image
+        topoConf.remove(Config.TOPOLOGY_OCI_IMAGE);
+        OciUtils.adjustImageConfigForTopo(conf, topoConf, topoId);
+        Assert.assertEquals(defaultImage, topoConf.get(Config.TOPOLOGY_OCI_IMAGE));
+
+        //case 5: any topology oci image is allowed
+        allowedImages.add("*");
+        String image2 = "storm/rhel7:dev_wow";
+        topoConf.put(Config.TOPOLOGY_OCI_IMAGE, image2);
+        OciUtils.adjustImageConfigForTopo(conf, topoConf, topoId);
+        Assert.assertEquals(image2, topoConf.get(Config.TOPOLOGY_OCI_IMAGE));
+    }
+
+    @Test(expected = WrappedInvalidTopologyException.class)
+    public void adjustImageConfigForTopoNotInAllowedList() throws InvalidTopologyException {
+        String image1 = "storm/rhel7:dev_test";
+        String image2 = "storm/rhel7:dev_current";
+
+        Map<String, Object> conf = new HashMap<>();
+        List<String> allowedImages = new ArrayList<>();
+        allowedImages.add(image1);
+        conf.put(DaemonConfig.STORM_OCI_ALLOWED_IMAGES, allowedImages);
+
+        Map<String, Object> topoConf = new HashMap<>();
+        String topoId = "topo1";
+        topoConf.put(Config.TOPOLOGY_OCI_IMAGE, image2);
+
+        OciUtils.adjustImageConfigForTopo(conf, topoConf, topoId);
+    }
+}
\ No newline at end of file