PIG-5226: PreprocessorContext.java can deadlock forever with large stderr (jtolar via knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1793098 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index c701b2c..9d06f17 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -97,6 +97,8 @@
  
 BUG FIXES
 
+PIG-5226: PreprocessorContext.java can deadlock forever with large stderr (jtolar via knoguchi)
+
 PIG-5221: More fs.default.name deprecation warnings (wattsinabox via daijy)
 
 PIG-5222: Fix Junit Deprecations (wattsinabox via daijy)
diff --git a/src/org/apache/pig/tools/parameters/PreprocessorContext.java b/src/org/apache/pig/tools/parameters/PreprocessorContext.java
index 69e55de..958c0e1 100644
--- a/src/org/apache/pig/tools/parameters/PreprocessorContext.java
+++ b/src/org/apache/pig/tools/parameters/PreprocessorContext.java
@@ -25,16 +25,22 @@
 import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.InputStream;
 import java.io.StringReader;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.Shell;
@@ -203,6 +209,27 @@
         paramval_put(key, sub_val);
     }
 
+    /**
+     * Slurp in an entire input stream and close it.
+     */
+    public static class CallableStreamReader implements Callable<String> {
+        private final InputStream inputStream;
+
+        public CallableStreamReader(InputStream stream) {
+            inputStream = stream;
+        }
+
+        @Override
+        public String call() {
+            try {
+                return IOUtils.toString(inputStream);
+            } catch (IOException e) {
+                throw new RuntimeException("IO Exception while executing shell command: " + e.getMessage() , e);
+            } finally {
+                IOUtils.closeQuietly(inputStream);
+            }
+        }
+    }
 
     /*
      * executes the 'cmd' in shell and returns result
@@ -235,40 +262,21 @@
             throw rte;
         }
 
-        BufferedReader br = null;
-        try{
-            InputStreamReader isr = new InputStreamReader(p.getInputStream());
-            br = new BufferedReader(isr);
-            String line=null;
-            StringBuilder sb = new StringBuilder();
-            while ( (line = br.readLine()) != null){
-                sb.append(line);
-                sb.append("\n");
-            }
-            streamData = sb.toString();
-        } catch (IOException e){
-            RuntimeException rte = new RuntimeException("IO Exception while executing shell command : "+e.getMessage() , e);
-            throw rte;
-        } finally {
-            if (br != null) try {br.close();} catch(Exception e) {}
-        }
+        // Read stdout and stderr in separate threads to avoid deadlock due to pipe buffer size
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        Future<String> futureOut = executorService.submit(new CallableStreamReader(p.getInputStream()));
+        Future<String> futureErr = executorService.submit(new CallableStreamReader(p.getErrorStream()));
 
         try {
-            InputStreamReader isr = new InputStreamReader(p.getErrorStream());
-            br = new BufferedReader(isr);
-            String line=null;
-            StringBuilder sb = new StringBuilder();
-            while ( (line = br.readLine()) != null ) {
-                sb.append(line);
-                sb.append("\n");
-            }
-            streamError = sb.toString();
+            streamData = futureOut.get();
+            streamError = futureErr.get();
             log.debug("Error stream while executing shell command : " + streamError);
-        } catch (Exception e) {
-            RuntimeException rte = new RuntimeException("IO Exception while executing shell command : "+e.getMessage() , e);
-            throw rte;
+        } catch (InterruptedException e) {
+            throw new RuntimeException("InterruptedException while executing shell command : " + e.getMessage() , e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("ExecutionException while executing shell command : " + e.getMessage(), e);
         } finally {
-            if (br != null) try {br.close();} catch(Exception e) {}
+            executorService.shutdownNow();
         }
 
         int exitVal;
diff --git a/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java b/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java
new file mode 100644
index 0000000..f6aea7c
--- /dev/null
+++ b/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.parameters;
+
+import org.apache.hadoop.util.Shell;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.junit.Assume;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public class TestPreprocessorContext {
+
+    @Test
+    public void testProcessShellCmd() throws ParameterSubstitutionException, FrontendException {
+        PreprocessorContext ctx = new PreprocessorContext(0);
+        String cmd = "echo hello";
+        ctx.processShellCmd("some_value", "`" + cmd + "`");
+
+        Map<String, String> paramVal = ctx.getParamVal();
+        assertEquals("hello", paramVal.get("some_value"));
+    }
+
+    @Test
+    public void testProcessShellCmdBigStderr() throws ParameterSubstitutionException, FrontendException {
+        // This test probably doesn't work on Windows, but should work elsewhere
+        Assume.assumeFalse(Shell.WINDOWS);
+
+        PreprocessorContext ctx = new PreprocessorContext(0);
+        String cmd = "bash -c 'i=0; while [ \"\\$i\" -lt 10000 ]; do echo long-stderr-output >&2; " +
+                "i=\\$((i+1)); done; echo hello'";
+        ctx.processShellCmd("some_value", "`" + cmd + "`");
+
+        Map<String, String> paramVal = ctx.getParamVal();
+        assertEquals("hello", paramVal.get("some_value"));
+    }
+
+    @Test
+    public void testFailingCommand() throws ParameterSubstitutionException, FrontendException {
+        try {
+            PreprocessorContext ctx = new PreprocessorContext(0);
+            String cmd = "exit 1";
+            ctx.processShellCmd("some_value", "`" + cmd + "`");
+        } catch (RuntimeException e) {
+            assertTrue(Pattern.compile("Error executing shell command:.*exit code.*")
+                    .matcher(e.getMessage()).matches()
+            );
+        }
+    }
+}