[STORM-3723] Fix isAnyPosixProcessPidDirAlive for PID reassignment (#3362)

diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
index 7868dcf..612883a 100644
--- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
+++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java
@@ -1295,9 +1295,9 @@
                     LOG.debug("Process {} is alive and owned by expectedUser {}/{}", pid, expectedUser, expectedUid);
                     return true;
                 }
-                LOG.info("Prior process is dead, since directory {} owner {} is not same as expected expectedUser {}/{}, "
-                        + "likely pid {} was reused for a new process for uid {}",
-                        pidDir, actualUser, expectedUser, expectedUid, pid, actualUid);
+                LOG.info("Prior process is dead, since directory {} owner {} is not same as expectedUser {}/{}, "
+                        + "likely pid {} was reused for a new process for uid {}, {}",
+                        pidDir, actualUser, expectedUser, expectedUid, pid, actualUid, getProcessDesc(pidDir));
             } else {
                 // actualUser is a string
                 LOG.debug("Process directory {} owner is {}", pidDir, actualUser);
@@ -1305,14 +1305,37 @@
                     LOG.debug("Process {} is alive and owned by expectedUser {}", pid, expectedUser);
                     return true;
                 }
-                LOG.info("Prior process is dead, since directory {} owner {} is not same as expected expectedUser {}, "
-                        + "likely pid {} was reused for a new process for expectedUser {}",
-                        pidDir, actualUser, expectedUser, pid, actualUser);
+                LOG.info("Prior process is dead, since directory {} owner {} is not same as expectedUser {}, "
+                        + "likely pid {} was reused for a new process for actualUser {}, {}}",
+                        pidDir, actualUser, expectedUser, pid, actualUser, getProcessDesc(pidDir));
             }
-            LOG.debug("Process {} is alive and owned by user {}", pid, expectedUser);
-            return true;
         }
         LOG.info("None of the processes {} are alive AND owned by expectedUser {}", pids, expectedUser);
         return false;
     }
+
+    /**
+     * Support method to obtain additional log info for the process. Use the contents of comm and cmdline
+     * in the process directory. Note that this method works properly only on posix systems with /proc directory.
+     *
+     * @param pidDir PID directory (/proc/<pid>)
+     * @return process description string
+     */
+    private static String getProcessDesc(File pidDir) {
+        String comm = "";
+        Path p = pidDir.toPath().resolve("comm");
+        try {
+            comm = String.join(", ", Files.readAllLines(p));
+        } catch (IOException ex) {
+            LOG.warn("Cannot get contents of " + p, ex);
+        }
+        String cmdline = "";
+        p = pidDir.toPath().resolve("cmdline");
+        try {
+            cmdline = String.join(", ", Files.readAllLines(p)).replace('\0', ' ');
+        } catch (IOException ex) {
+            LOG.warn("Cannot get contents of " + p, ex);
+        }
+        return String.format("process(comm=\"%s\", cmdline=\"%s\")", comm, cmdline);
+    }
 }
diff --git a/storm-server/src/test/java/org/apache/storm/utils/ServerUtilsTest.java b/storm-server/src/test/java/org/apache/storm/utils/ServerUtilsTest.java
index d0ddd1e..31224d3 100644
--- a/storm-server/src/test/java/org/apache/storm/utils/ServerUtilsTest.java
+++ b/storm-server/src/test/java/org/apache/storm/utils/ServerUtilsTest.java
@@ -36,12 +36,14 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.zip.ZipFile;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringBuilder;
 import org.apache.storm.shade.org.apache.commons.lang.builder.ToStringStyle;
 import org.apache.storm.testing.TmpPath;
@@ -94,10 +96,18 @@
         }
     }
 
-    private Collection<Long> getRunningProcessIds() throws IOException {
+    /**
+     * Get running processes for all or specified user.
+     *
+     * @param user Null for all users, otherwise specify the user. e.g. "root"
+     * @return List of ProcessIds for the user
+     * @throws IOException
+     */
+    private Collection<Long> getRunningProcessIds(String user) throws IOException {
         // get list of few running processes
         Collection<Long> pids = new ArrayList<>();
-        Process p = Runtime.getRuntime().exec(ServerUtils.IS_ON_WINDOWS ? "tasklist" : "ps -e");
+        String cmd = ServerUtils.IS_ON_WINDOWS ? "tasklist" : (user == null) ? "ps -e" : "ps -U " + user ;
+        Process p = Runtime.getRuntime().exec(cmd);
         try (BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()))) {
             String line;
             while ((line = input.readLine()) != null) {
@@ -106,7 +116,15 @@
                     continue;
                 }
                 try {
-                    pids.add(Long.parseLong(line.split("\\s")[0]));
+                    String pidStr = line.split("\\s")[0];
+                    if (pidStr.equalsIgnoreCase("pid")) {
+                        continue; // header line
+                    }
+                    if (!StringUtils.isNumeric(pidStr)) {
+                        LOG.debug("Ignoring line \"{}\" while looking for PIDs in output of \"{}\"", line, cmd);
+                        continue;
+                    }
+                    pids.add(Long.parseLong(pidStr));
                 } catch (Exception ex) {
                     ex.printStackTrace();
                 }
@@ -121,7 +139,7 @@
         String randomUser = RandomStringUtils.randomAlphanumeric(12);
 
         // get list of few running processes
-        Collection<Long> pids = getRunningProcessIds();
+        Collection<Long> pids = getRunningProcessIds(null);
         assertFalse(pids.isEmpty());
         for (long pid: pids) {
             boolean status = ServerUtils.isProcessAlive(pid, randomUser);
@@ -144,7 +162,7 @@
     public void testIsAnyProcessAlive() throws Exception {
         // no process should be alive for a randomly generated user
         String randomUser = RandomStringUtils.randomAlphanumeric(12);
-        Collection<Long> pids = getRunningProcessIds();
+        Collection<Long> pids = getRunningProcessIds(null);
 
         assertFalse(pids.isEmpty());
         boolean status = ServerUtils.isAnyProcessAlive(pids, randomUser);
@@ -206,7 +224,7 @@
         Set<Long> observables = new HashSet<>();
 
         for (int i = 0 ; i < maxPidCnt ; i++) {
-            String cmd = "sleep 2000";
+            String cmd = "sleep 20000";
             Process process = Runtime.getRuntime().exec(cmd);
             long pid = getPidOfPosixProcess(process, errors);
             LOG.info("{}: ({}) ran process \"{}\" with pid={}", testName, i, cmd, pid);
@@ -272,18 +290,28 @@
             LOG.info("Test testIsAnyPosixProcessPidDirAlive is designed to run on systems with /proc directory only, marking as success");
             return;
         }
-        Collection<Long> pids = getRunningProcessIds();
-        assertFalse(pids.isEmpty());
+        Collection<Long> allPids = getRunningProcessIds(null);
+        Collection<Long> rootPids = getRunningProcessIds("root");
+        assertFalse(allPids.isEmpty());
+        assertFalse(rootPids.isEmpty());
 
-        for (int i = 0 ; i < 2 ; i++) {
-            boolean mockFileOwnerToUid = (i % 2 == 0);
+        String currentUser = System.getProperty("user.name");
+
+        for (boolean mockFileOwnerToUid: Arrays.asList(true, false)) {
             // at least one pid will be owned by the current user (doing the testing)
-            String currentUser = System.getProperty("user.name");
-            boolean status = ServerUtils.isAnyPosixProcessPidDirAlive(pids, currentUser, mockFileOwnerToUid);
+            boolean status = ServerUtils.isAnyPosixProcessPidDirAlive(allPids, currentUser, mockFileOwnerToUid);
             String err = String.format("(mockFileOwnerToUid=%s) Expecting user %s to own at least one process",
                     mockFileOwnerToUid, currentUser);
             assertTrue(err, status);
         }
+
+        // simulate reassignment of all process id to a different user (root)
+        for (boolean mockFileOwnerToUid: Arrays.asList(true, false)) {
+            boolean status = ServerUtils.isAnyPosixProcessPidDirAlive(rootPids, currentUser, mockFileOwnerToUid);
+            String err = String.format("(mockFileOwnerToUid=%s) Expecting user %s to own no process",
+                    mockFileOwnerToUid, currentUser);
+            assertFalse(err, status);
+        }
     }
 
     /**