Merge pull request #3246 from agresch/agresch_storm_3618

STORM-3618 add meter to track scheduling errors
diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
index 6d7e16b..f7fdda7 100644
--- a/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
+++ b/storm-client/src/jvm/org/apache/storm/security/auth/kerberos/AutoTGT.java
@@ -12,7 +12,9 @@
 
 package org.apache.storm.security.auth.kerberos;
 
+import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
+import java.security.Principal;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -207,19 +209,37 @@
                 return;
             }
 
-            LOG.info("Invoking Hadoop UserGroupInformation.loginUserFromSubject.");
-            Method login = ugi.getMethod("loginUserFromSubject", Subject.class);
-            login.invoke(null, subject);
+            // We are just trying to do the following:
+            //
+            // Configuration conf = new Configuration();
+            // HadoopKerberosName.setConfiguration(conf);
+            // subject.getPrincipals().add(new User(tgt.getClient().toString(), AuthenticationMethod.KERBEROS, null));
 
-            //Refer to STORM-3606 for details
-            LOG.warn("UserGroupInformation.loginUserFromSubject will spawn a TGT renewal thread (\"TGT Renewer for <username>\") "
-                + "to execute \"kinit -R\" command some time before the current TGT expires. "
-                + "It will fail because TGT is not in the local TGT cache and the thread will eventually abort. "
-                + "Exceptions from this TGT renewal thread can be ignored. Note: TGT for the Worker is kept in memory. "
-                + "Please refer to STORM-3606 for detailed explanations");
+            Class<?> confClass = Class.forName("org.apache.hadoop.conf.Configuration");
+            Constructor confCons = confClass.getConstructor();
+            Object conf = confCons.newInstance();
+            Class<?> hknClass = Class.forName("org.apache.hadoop.security.HadoopKerberosName");
+            Method hknSetConf = hknClass.getMethod("setConfiguration", confClass);
+            hknSetConf.invoke(null, conf);
+
+            Class<?> authMethodClass = Class.forName("org.apache.hadoop.security.UserGroupInformation$AuthenticationMethod");
+            Object kerbAuthMethod = null;
+            for (Object authMethod : authMethodClass.getEnumConstants()) {
+                if ("KERBEROS".equals(authMethod.toString())) {
+                    kerbAuthMethod = authMethod;
+                    break;
+                }
+            }
+
+            Class<?> userClass = Class.forName("org.apache.hadoop.security.User");
+            Constructor userCons = userClass.getConstructor(String.class, authMethodClass, LoginContext.class);
+            userCons.setAccessible(true);
+            String name = getTGT(subject).getClient().toString();
+            Object user = userCons.newInstance(name, kerbAuthMethod, null);
+            subject.getPrincipals().add((Principal) user);
 
         } catch (Exception e) {
-            LOG.warn("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop "
+            LOG.error("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop "
                      + "may not be compatible.", e);
         }
     }
diff --git a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
index 8896fd4..e68e07c 100644
--- a/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
+++ b/storm-server/src/main/java/org/apache/storm/healthcheck/HealthChecker.java
@@ -22,6 +22,7 @@
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.nio.channels.ClosedByInterruptException;
@@ -119,14 +120,16 @@
             curThread.interrupted();
 
             if (process.exitValue() != 0) {
-                String str;
-                InputStream stdin = process.getInputStream();
-                BufferedReader reader = new BufferedReader(new InputStreamReader(stdin));
-                while ((str = reader.readLine()) != null) {
-                    if (str.startsWith("ERROR")) {
-                        LOG.warn("The healthcheck process {} exited with code {}", script, process.exitValue());
-                        return FAILED;
-                    }
+                String outMessage = readFromStream(process.getInputStream());
+                String errMessage = readFromStream(process.getErrorStream());
+
+                LOG.warn("The healthcheck process {} exited with code: {}; output: {}; err: {}.",
+                    script, process.exitValue(), outMessage, errMessage);
+
+                //Keep this for backwards compatibility.
+                //It relies on "ERROR" at the beginning of stdout to determine FAILED status
+                if (outMessage.startsWith("ERROR")) {
+                    return FAILED;
                 }
                 return FAILED_WITH_EXIT_CODE;
             }
@@ -144,4 +147,14 @@
         }
     }
 
+    private static String readFromStream(InputStream is) throws IOException {
+        StringBuilder stringBuilder = new StringBuilder();
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(is))) {
+            String str;
+            while ((str = reader.readLine()) != null) {
+                stringBuilder.append(str).append("\n");
+            }
+        }
+        return stringBuilder.toString().trim();
+    }
 }