SLIDER-1264 killing another processes by slider-agent, when commandScript timed out
diff --git a/slider-agent/src/main/python/agent/process_utils.py b/slider-agent/src/main/python/agent/process_utils.py
new file mode 100644
index 0000000..50ffd02
--- /dev/null
+++ b/slider-agent/src/main/python/agent/process_utils.py
@@ -0,0 +1,100 @@
+# !/usr/bin/env python
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import subprocess
+import time
+
+check_time_delay = 0.1 # seconds between checks of process killed
+
+
+def get_children(pid):
+ PSCMD = ["ps", "-o", "pid", "--no-headers", "--ppid", str(pid)]
+ ps_process = subprocess.Popen(PSCMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = ps_process.communicate()
+ if ps_process.returncode != 0:
+ return []
+ return stdout.split()
+
+
+def get_flat_process_tree(pid):
+ """
+ :param pid: process id of parent process
+ :return: list of child process pids. Resulting list also includes parent pid
+ """
+ res = [str(pid)]
+ children = get_children(pid)
+ for child in children:
+ res += get_flat_process_tree(child)
+ return res
+
+
+def kill_pids(pids, signal):
+ from resource_management.core.exceptions import Fail
+ CMD = ["kill", "-" + str(signal)]
+ CMD.extend(pids)
+ process = subprocess.Popen(CMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = process.communicate()
+ if process.returncode != 0:
+ raise Fail("Unable to kill PIDs {0} : {1}".format(str(pids),stderr))
+
+
+def get_command_by_pid(pid):
+ CMD = ["ps", "-p", str(pid), "-o", "command", "--no-headers"]
+ process = subprocess.Popen(CMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = process.communicate()
+ if process.returncode != 0:
+ return "NOT_FOUND[%s]" % pid
+ return stdout
+
+
+def wait_for_entire_process_tree_death(pids):
+ for child in pids:
+ wait_for_process_death(child)
+
+
+def wait_for_process_death(pid, timeout=5):
+ start = time.time()
+ current_time = start
+ while is_process_running(pid) and current_time < start + timeout:
+ time.sleep(check_time_delay)
+ current_time = time.time()
+
+
+def is_process_running(pid):
+ CMD = ["ps", "-p", str(pid), "-o", "pid", "--no-headers"]
+ process = subprocess.Popen(CMD, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ stdout, stderr = process.communicate()
+ if process.returncode != 0:
+ return False
+ return pid in stdout
+
+
+
+def get_processes_running(process_pids):
+ """
+ Checks what processes are still running
+ :param process_pids: list of process pids
+ :return: list of pids for processes that are still running
+ """
+ result = []
+ for pid in process_pids:
+ if is_process_running(pid):
+ result.append(pid)
+ return result
diff --git a/slider-agent/src/main/python/agent/shell.py b/slider-agent/src/main/python/agent/shell.py
index 446dde9..f22d535 100644
--- a/slider-agent/src/main/python/agent/shell.py
+++ b/slider-agent/src/main/python/agent/shell.py
@@ -19,16 +19,13 @@
'''
import logging
-import subprocess
import os
-import tempfile
import signal
-import sys
+import subprocess
import threading
-import time
-import traceback
-import pprint
import platform
+from process_utils import get_flat_process_tree, kill_pids, wait_for_entire_process_tree_death, \
+ get_processes_running, get_command_by_pid
if platform.system() != "Windows":
try:
@@ -42,7 +39,6 @@
shellRunner = None
threadLocal = threading.local()
-gracefull_kill_delay = 5 # seconds between SIGTERM and SIGKILL
tempFiles = []
def noteTempFile(filename):
tempFiles.append(filename)
@@ -92,40 +88,36 @@
logger.debug("Exitcode for %s is %d" % (cmd, code))
return _dict_to_object({'exitCode': code, 'output': out, 'error': err})
-
#linux specific code
def _kill_process_with_children_linux(parent_pid):
- def kill_tree_function(pid, signal):
- '''
+ """
Kills process tree starting from a given pid.
- '''
- # The command below starts 'ps' linux utility and then parses it's
- # output using 'awk'. AWK recursively extracts PIDs of all children of
- # a given PID and then passes list of "kill -<SIGNAL> PID" commands to 'sh'
- # shell.
- CMD = """ps xf | awk -v PID=""" + str(pid) + \
- """ ' $1 == PID { P = $1; next } P && /_/ { P = P " " $1;""" + \
- """K=P } P && !/_/ { P="" } END { print "kill -""" \
- + str(signal) + """ "K }' | sh """
- process = subprocess.Popen(CMD, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE, shell=True)
- process.communicate()
- _run_kill_function(kill_tree_function, parent_pid)
+ :param parent_pid: head of tree
+ :param graceful_kill_delays: map <command name, custom delay between SIGTERM and SIGKILL>
+ :return:
+ """
+
+ pids = get_flat_process_tree(parent_pid)
+ logger.info("Process tree: %s" % ','.join(pids))
+ try:
+ kill_pids(pids, signal.SIGTERM)
+ except Exception, e:
+ logger.warn("Failed to kill PID %d" % parent_pid)
+ logger.warn("Reported error: " + repr(e))
+
+ wait_for_entire_process_tree_death(pids)
+
+ try:
+ running_processes = get_processes_running(pids)
+ if running_processes:
+ process_names = map(lambda x: get_command_by_pid(x), running_processes)
+ logger.warn("These PIDs %s did not die after SIGTERM, sending SIGKILL. Exact commands to be killed:\n %s" %
+ (", ".join(running_processes), "\n".join(process_names)))
+ kill_pids(running_processes, signal.SIGKILL)
+ except Exception, e:
+ logger.error("Failed to send SIGKILL to PID %d. Process exited?" % parent_pid)
+ logger.error("Reported error: " + repr(e))
-def _run_kill_function(kill_function, pid):
- try:
- kill_function(pid, signal.SIGTERM)
- except Exception, e:
- logger.warn("Failed to kill PID %d" % (pid))
- logger.warn("Reported error: " + repr(e))
-
- time.sleep(gracefull_kill_delay)
-
- try:
- kill_function(pid, signal.SIGKILL)
- except Exception, e:
- logger.error("Failed to send SIGKILL to PID %d. Process exited?" % (pid))
- logger.error("Reported error: " + repr(e))
def _changeUid():
try:
diff --git a/slider-agent/src/test/python/agent/TestProcessUtils.py b/slider-agent/src/test/python/agent/TestProcessUtils.py
new file mode 100644
index 0000000..d3c708e
--- /dev/null
+++ b/slider-agent/src/test/python/agent/TestProcessUtils.py
@@ -0,0 +1,221 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+'''
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+'''
+
+import unittest
+import signal
+import subprocess, time
+from mock.mock import patch, MagicMock, PropertyMock, call
+from agent import process_utils
+
+process_tree = {"111": "222\n 22",
+ "222": "333\n 33",
+ "22": "44\n 444",}
+
+
+class TestProcessUtils(unittest.TestCase):
+ @patch("subprocess.Popen")
+ def test_kill(self, popen_mock):
+ process_mock = MagicMock()
+ process_mock.communicate.return_value = (None, None)
+ returncode_mock = PropertyMock()
+ returncode_mock.return_value = 0
+ type(process_mock).returncode = returncode_mock
+ popen_mock.return_value = process_mock
+ process_utils.kill_pids(["12321113230", "2312415453"], signal.SIGTERM)
+ expected = [call(['kill', '-15', '12321113230', '2312415453'], stderr=-1, stdout=-1)]
+ self.assertEquals(popen_mock.call_args_list, expected)
+
+ @patch("subprocess.Popen")
+ def test_get_children(self, popen_mock):
+
+ process_mock = MagicMock()
+ process_mock.communicate.return_value = ("123 \n \n 321\n", None)
+ popen_mock.return_value = process_mock
+ returncode_mock = PropertyMock()
+ returncode_mock.return_value = 0
+ type(process_mock).returncode = returncode_mock
+ result = process_utils.get_children("2312415453")
+
+ self.assertEquals(result, ["123", "321"])
+
+ expected = [
+ call(['ps', '-o', 'pid', '--no-headers', '--ppid', '2312415453'], stderr=subprocess.PIPE, stdout=subprocess.PIPE)]
+ self.assertEquals(popen_mock.call_args_list, expected)
+
+ @patch("subprocess.Popen")
+ def test_get_flat_process_tree(self, popen_mock):
+ def side_effect(*args, **kwargs):
+ process_mock = MagicMock()
+ returncode_mock = PropertyMock()
+ returncode_mock.return_value = 0
+ type(process_mock).returncode = returncode_mock
+ if args[0][5] in process_tree.keys():
+ process_mock.communicate.return_value = (process_tree[args[0][5]], None)
+ else:
+ process_mock.communicate.return_value = ("", None)
+ return process_mock
+
+ popen_mock.side_effect = side_effect
+ result = process_utils.get_flat_process_tree("111")
+ self.assertEquals(result, ['111', '222', '333', '33', '22', '44', '444'])
+
+ expected = [call(['ps', '-o', 'pid', '--no-headers', '--ppid', '111'], stderr=-1, stdout=-1),
+ call(['ps', '-o', 'pid', '--no-headers', '--ppid', '222'], stderr=-1, stdout=-1),
+ call(['ps', '-o', 'pid', '--no-headers', '--ppid', '333'], stderr=-1, stdout=-1),
+ call(['ps', '-o', 'pid', '--no-headers', '--ppid', '33'], stderr=-1, stdout=-1),
+ call(['ps', '-o', 'pid', '--no-headers', '--ppid', '22'], stderr=-1, stdout=-1),
+ call(['ps', '-o', 'pid', '--no-headers', '--ppid', '44'], stderr=-1, stdout=-1),
+ call(['ps', '-o', 'pid', '--no-headers', '--ppid', '444'], stderr=-1, stdout=-1)]
+ self.assertEquals(popen_mock.call_args_list, expected)
+
+ @patch("subprocess.Popen")
+ def test_get_command_by_pid(self, popen_mock):
+
+ process_mock = MagicMock()
+ process_mock.communicate.return_value = ("yum something", None)
+ returncode_mock = PropertyMock()
+ returncode_mock.return_value = 0
+ type(process_mock).returncode = returncode_mock
+ popen_mock.return_value = process_mock
+
+ result = process_utils.get_command_by_pid("2312415453")
+
+ self.assertEquals(result, "yum something")
+
+ expected = [call(['ps', '-p', '2312415453', '-o', 'command', '--no-headers'], stderr=-1, stdout=-1)]
+ self.assertEquals(popen_mock.call_args_list, expected)
+
+ @patch("subprocess.Popen")
+ def test_get_command_by_pid_not_exist(self, popen_mock):
+
+ process_mock = MagicMock()
+ process_mock.communicate.return_value = ("", None)
+ returncode_mock = PropertyMock()
+ returncode_mock.return_value = 1
+ type(process_mock).returncode = returncode_mock
+ popen_mock.return_value = process_mock
+
+ result = process_utils.get_command_by_pid("2312415453")
+
+ self.assertEquals(result, "NOT_FOUND[2312415453]")
+
+ expected = [call(['ps', '-p', '2312415453', '-o', 'command', '--no-headers'], stderr=-1, stdout=-1)]
+ self.assertEquals(popen_mock.call_args_list, expected)
+
+ @patch("subprocess.Popen")
+ def test_is_process_running(self, popen_mock):
+
+ process_mock = MagicMock()
+ process_mock.communicate.return_value = ("2312415453", None)
+ returncode_mock = PropertyMock()
+ returncode_mock.return_value = 0
+ type(process_mock).returncode = returncode_mock
+ popen_mock.return_value = process_mock
+
+ result = process_utils.is_process_running("2312415453")
+
+ self.assertEquals(result, True)
+
+ expected = [call(['ps', '-p', '2312415453', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1)]
+ self.assertEquals(popen_mock.call_args_list, expected)
+
+ @patch("subprocess.Popen")
+ def test_is_process_not_running(self, popen_mock):
+
+ process_mock = MagicMock()
+ process_mock.communicate.return_value = ("", None)
+ returncode_mock = PropertyMock()
+ returncode_mock.return_value = 1
+ type(process_mock).returncode = returncode_mock
+ popen_mock.return_value = process_mock
+
+ result = process_utils.is_process_running("2312415453")
+
+ self.assertEquals(result, False)
+
+ expected = [call(['ps', '-p', '2312415453', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1)]
+ self.assertEquals(popen_mock.call_args_list, expected)
+
+ @patch("subprocess.Popen")
+ def test_get_processes_running(self, popen_mock):
+ def side_effect(*args, **kwargs):
+ process_mock = MagicMock()
+ returncode_mock = PropertyMock()
+ if args[0][2] == "4321":
+ returncode_mock.return_value = 0
+ process_mock.communicate.return_value = ("4321", None)
+ else:
+ returncode_mock.return_value = 1
+ process_mock.communicate.return_value = (None, None)
+ type(process_mock).returncode = returncode_mock
+ return process_mock
+
+ popen_mock.side_effect = side_effect
+
+ result = process_utils.get_processes_running(["1234", "4321"])
+
+ self.assertEquals(result, ["4321"])
+
+ expected = [call(['ps', '-p', '1234', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1),
+ call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1)]
+ self.assertEquals(popen_mock.call_args_list, expected)
+
+ @patch("time.sleep")
+ @patch("subprocess.Popen")
+ def test_wait_for_process_death(self, popen_mock, sleep_mock):
+
+ process_mock = MagicMock()
+ process_mock.communicate.side_effect = [("4321", None),("4321", None),(None, None)]
+ returncode_mock = PropertyMock()
+ returncode_mock.side_effect = [0, 0, 1]
+ type(process_mock).returncode = returncode_mock
+ popen_mock.return_value = process_mock
+
+ process_utils.wait_for_process_death("4321")
+
+ expected = [call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1),
+ call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1),
+ call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1)]
+ self.assertEquals(popen_mock.call_args_list, expected)
+ expected = [call(0.1), call(0.1)]
+ self.assertEquals(sleep_mock.call_args_list, expected)
+
+ @patch("time.sleep")
+ @patch("subprocess.Popen")
+ def test_wait_for_entire_process_tree_death(self, popen_mock, sleep_mock):
+
+ process_mock = MagicMock()
+ process_mock.communicate.side_effect = [("1234", None), (None, None), ("4321", None), ("4321", None), (None, None)]
+ returncode_mock = PropertyMock()
+ returncode_mock.side_effect = [0, 1, 0, 0, 1]
+ type(process_mock).returncode = returncode_mock
+ popen_mock.return_value = process_mock
+
+ process_utils.wait_for_entire_process_tree_death(["1234", "4321"])
+
+ expected = [call(['ps', '-p', '1234', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1),
+ call(['ps', '-p', '1234', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1),
+ call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1),
+ call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1),
+ call(['ps', '-p', '4321', '-o', 'pid', '--no-headers'], stderr=-1, stdout=-1)]
+ self.assertEquals(popen_mock.call_args_list, expected)
+ expected = [call(0.1), call(0.1), call(0.1)]
+ self.assertEquals(sleep_mock.call_args_list, expected)
diff --git a/slider-agent/src/test/python/agent/TestShell.py b/slider-agent/src/test/python/agent/TestShell.py
index 8caed7b..a707e09 100644
--- a/slider-agent/src/test/python/agent/TestShell.py
+++ b/slider-agent/src/test/python/agent/TestShell.py
@@ -50,8 +50,6 @@
return
if _platform == "linux" or _platform == "linux2": # Test is Linux-specific
- gracefull_kill_delay_old = shell.gracefull_kill_delay
- shell.gracefull_kill_delay = 0.1
sleep_cmd = "sleep 10"
test_cmd = """ (({0}) & ({0} & {0})) """.format(sleep_cmd)
# Starting process tree (multiple process groups)
@@ -69,7 +67,6 @@
ps_process = subprocess.Popen(ps_cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True)
(out, err) = ps_process.communicate()
self.assertFalse(sleep_cmd in out)
- shell.gracefull_kill_delay = gracefull_kill_delay_old
else:
# Do not run under other systems
pass