PIG-4897: Scope of param substitution for run/exec commands (knoguchi)


git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1778200 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 96c373a..e3dd53d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -22,6 +22,8 @@
  
 INCOMPATIBLE CHANGES
 
+PIG-4897: Scope of param substitution for run/exec commands (knoguchi)
+
 PIG-4923: Drop Hadoop 1.x support in Pig 0.17 (szita via rohini)
 
 PIG-5067: Revisit union on numeric type and chararray to bytearray (knoguchi)
diff --git a/src/docs/src/documentation/content/xdocs/cont.xml b/src/docs/src/documentation/content/xdocs/cont.xml
index 62627fe..356d29d 100644
--- a/src/docs/src/documentation/content/xdocs/cont.xml
+++ b/src/docs/src/documentation/content/xdocs/cont.xml
@@ -1348,7 +1348,7 @@
                <p>Preprocessor statement included in a Pig script.</p>
                <p>Use to describe one parameter in terms of other parameters.</p>
                <p>The declare statement is processed prior to running the Pig script. </p>
-               <p>The scope of a parameter value defined using declare is all the lines following the declare statement until the next declare statement that defines the same parameter is encountered.</p>
+               <p>The scope of a parameter value defined using declare is all the lines following the declare statement until the next declare statement that defines the same parameter is encountered. When used with run/exec command, see <a href="#Parameter-Sub-scope">Scope section</a>.</p>
             </td>
          </tr>
          <tr>
@@ -1424,6 +1424,12 @@
          <p>Declare and default preprocessors statements are processed in the order they appear in the Pig script. </p>
       </li>
    </ul>
+   </section>
+
+   <section id="Parameter-Sub-scope">
+   <title>Scope</title>
+   <p>Scope of the parameters is global except when used with run/exec command. Caller would not see the parameters declared within the callee's scripts. See <a href='#Parameter-Sub-scope-example'>example</a> for more details.</p>
+
    </section></section>
   
   
@@ -1532,6 +1538,28 @@
 <em>etc ... </em>
 </source>
    </section>
+   <section id="Parameter-Sub-scope-example">
+   <title>Scoping with run/exec commands </title>
+   <p> In this example, parameters passed to run/exec command or declared within the called scripts are not visible to the caller.</p>
+<source>
+/* main.pig */
+run -param var1=10 script1.pig
+exec script2.pig
+
+A = ...
+B = FOREACH A generate $var1, $var2, ...  --ERROR. unknown parameters var1, var2
+
+</source>
+<source>
+/* script1.pig */
+...
+</source>
+<source>
+/* script2.pig */
+%declare var2 20
+...
+</source>
+   </section>
    </section>
    </section>
 
diff --git a/src/org/apache/pig/tools/grunt/GruntParser.java b/src/org/apache/pig/tools/grunt/GruntParser.java
index 558b745..8800fc0 100644
--- a/src/org/apache/pig/tools/grunt/GruntParser.java
+++ b/src/org/apache/pig/tools/grunt/GruntParser.java
@@ -516,8 +516,13 @@
         ConsoleReader reader;
         boolean interactive;
 
-        mPigServer.getPigContext().setParams(params);
-        mPigServer.getPigContext().setParamFiles(files);
+        PigContext pc = mPigServer.getPigContext();
+
+        if( !loadOnly ) {
+          pc.getPreprocessorContext().paramScopePush();
+        }
+        pc.setParams(params);
+        pc.setParamFiles(files);
 
         try {
             FetchFileRet fetchFile = FileLocalizer.fetchFile(mConf, script);
@@ -560,6 +565,9 @@
         if (interactive) {
             System.out.println("");
         }
+        if( ! loadOnly ) {
+          pc.getPreprocessorContext().paramScopePop();
+        }
     }
 
     @Override
diff --git a/src/org/apache/pig/tools/parameters/PreprocessorContext.java b/src/org/apache/pig/tools/parameters/PreprocessorContext.java
index e5c2ef6..b42ae48 100644
--- a/src/org/apache/pig/tools/parameters/PreprocessorContext.java
+++ b/src/org/apache/pig/tools/parameters/PreprocessorContext.java
@@ -27,6 +27,8 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.StringReader;
+import java.util.ArrayDeque;
+import java.util.Deque;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
@@ -44,12 +46,23 @@
 
 public class PreprocessorContext {
 
-    private Map<String, String> param_val;
+    private int tableinitsize = 10;
+    private Deque<Map<String,String>> param_val_stack;
 
     private PigContext pigContext;
 
     public Map<String, String> getParamVal() {
-        return param_val;
+        Map <String, String> ret = new Hashtable <String, String>(tableinitsize);
+
+        //stack (deque) iterates LIFO
+        for (Map <String, String> map : param_val_stack ) {
+            for (Map.Entry<String, String> entry : map.entrySet()) {
+                if( ! ret.containsKey(entry.getKey()) ) {
+                    ret.put(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+        return ret;
     }
 
     private final Log log = LogFactory.getLog(getClass());
@@ -59,11 +72,9 @@
      *                smaller number only impacts performance
      */
     public PreprocessorContext(int limit) {
-        param_val = new Hashtable<String, String> (limit);
-    }
-
-    public PreprocessorContext(Map<String, String> paramVal) {
-        param_val = paramVal;
+        tableinitsize = limit;
+        param_val_stack = new ArrayDeque<Map<String,String>> ();
+        param_val_stack.push(new Hashtable<String, String> (tableinitsize));
     }
 
     public void setPigContext(PigContext context) {
@@ -91,6 +102,36 @@
         processOrdLine(key, val, true);
     }
 
+    public void paramScopePush() {
+        param_val_stack.push( new Hashtable<String, String> (tableinitsize) );
+    }
+
+    public void paramScopePop() {
+        param_val_stack.pop();
+    }
+
+    public boolean paramval_containsKey(String key) {
+        for (Map <String, String> map : param_val_stack ) {
+            if( map.containsKey(key) ) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public String paramval_get(String key) {
+        for (Map <String, String> map : param_val_stack ) {
+            if( map.containsKey(key) ) {
+                return map.get(key);
+            }
+        }
+        return null;
+    }
+
+    public void paramval_put(String key, String value) {
+        param_val_stack.peek().put(key, value);
+    }
+
     /**
      * This method generates parameter value by running specified command
      *
@@ -103,7 +144,7 @@
             filter.validate(PigCommandFilter.Command.SH);
         }
 
-        if (param_val.containsKey(key) && !overwrite) {
+        if (paramval_containsKey(key) && !overwrite) {
             return;
         }
 
@@ -111,13 +152,13 @@
         String sub_val = substitute(val);
         sub_val = executeShellCommand(sub_val);
 
-        if (param_val.containsKey(key) && !param_val.get(key).equals(sub_val) ) {
+        if (paramval_containsKey(key) && !paramval_get(key).equals(sub_val) ) {
             //(boolean overwrite is always true here)
             log.warn("Warning : Multiple values found for " + key + " command `" + val + "`. "
-                     + "Previous value " + param_val.get(key) + ", now using value " + sub_val);
+                     + "Previous value " + paramval_get(key) + ", now using value " + sub_val);
         }
 
-        param_val.put(key, sub_val);
+        paramval_put(key, sub_val);
     }
 
     public void validate(String preprocessorCmd) throws FrontendException {
@@ -150,17 +191,17 @@
     public  void processOrdLine(String key, String val, Boolean overwrite)  throws ParameterSubstitutionException {
 
         String sub_val = substitute(val, key);
-        if (param_val.containsKey(key)) {
-            if (param_val.get(key).equals(sub_val) || !overwrite) {
+        if (paramval_containsKey(key)) {
+            if (paramval_get(key).equals(sub_val) || !overwrite) {
                 return;
             } else {
                 log.warn("Warning : Multiple values found for " + key
-                         + ". Previous value " + param_val.get(key)
+                         + ". Previous value " + paramval_get(key)
                          + ", now using value " + sub_val);
             }
         }
 
-        param_val.put(key, sub_val);
+        paramval_put(key, sub_val);
     }
 
 
@@ -292,7 +333,7 @@
         while (bracketKeyMatcher.find()) {
             if ( (bracketKeyMatcher.start() == 0) || (line.charAt( bracketKeyMatcher.start() - 1)) != '\\' ) {
                 key = bracketKeyMatcher.group(1);
-                if (!(param_val.containsKey(key))) {
+                if (!(paramval_containsKey(key))) {
                     String message;
                     if (parentKey == null) {
                         message = "Undefined parameter : " + key;
@@ -301,7 +342,7 @@
                     }
                     throw new ParameterSubstitutionException(message);
                 }
-                val = param_val.get(key);
+                val = paramval_get(key);
                 if (val.contains("$")) {
                     val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$");
                 }
@@ -319,7 +360,7 @@
             // for escaped vars of the form \$<id>
             if ( (keyMatcher.start() == 0) || (line.charAt( keyMatcher.start() - 1)) != '\\' ) {
                 key = keyMatcher.group(1);
-                if (!(param_val.containsKey(key))) {
+                if (!(paramval_containsKey(key))) {
                     String message;
                     if (parentKey == null) {
                         message = "Undefined parameter : " + key;
@@ -328,7 +369,7 @@
                     }
                     throw new ParameterSubstitutionException(message);
                 }
-                val = param_val.get(key);
+                val = paramval_get(key);
                 if (val.contains("$")) {
                     val = val.replaceAll("(?<!\\\\)\\$", "\\\\\\$");
                 }
diff --git a/test/org/apache/pig/test/TestParamSubPreproc.java b/test/org/apache/pig/test/TestParamSubPreproc.java
index 4b9d18c..caeada4 100644
--- a/test/org/apache/pig/test/TestParamSubPreproc.java
+++ b/test/org/apache/pig/test/TestParamSubPreproc.java
@@ -43,6 +43,7 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.tools.parameters.ParseException;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.Test;
 
 public class TestParamSubPreproc {
@@ -879,6 +880,213 @@
         assertEquals(resultContent, "daniel\t10\njenny\t20\n");
     }
 
+    @Test
+    public void testCommandLineParamOverwritingDefault() throws Exception {
+        log.info("Starting test testCommandLineParamOverwritingDefault()");
+        File inputFile = Util.createFile(
+                  "runinput",
+                  new String[] { "daniel\t10",
+                                 "jenny\t20;"});
+        File output1 = File.createTempFile("output1_", "");
+        output1.delete();
+
+        File script1 = Util.createFile("runscript1.pig",
+                  new String[] { "%default output /invalidpathThatShouldFail;",
+                                 "a = load 'runinput';",
+                                  "store a into '$output';"});
+
+        PigStats stats = org.apache.pig.PigRunner.run(new String[] {
+                                 "-x", Util.getLocalTestMode().toString(),
+                                 "-p", "output=" + output1.getAbsolutePath(),
+                                 script1.getAbsolutePath()} , null);
+        Util.deleteDirectory(output1);
+
+        assertTrue("job should succeed", stats.isSuccessful());
+        assertTrue("Default param should be overridden by the commandline param",
+                     output1.getAbsolutePath().endsWith(stats.getOutputNames().get(0)));
+    }
+
+    @Test
+    public void testRunWithParamOverwritingDefault() throws Exception {
+        log.info("Starting test testScopeOfParamWithRunCommand()");
+        File inputFile = Util.createFile(
+                  "runinput",
+                  new String[] { "daniel\t10",
+                                 "jenny\t20;"});
+        File output1 = File.createTempFile("output1_", "");
+        File output2 = File.createTempFile("output2_", "");
+        output1.delete();
+        output2.delete();
+
+
+        File script1 = Util.createFile("runscript1.pig",
+                  new String[] { "%default output '" + output2.getAbsolutePath() + "';",
+                                 "a = load 'runinput';",
+                                  "store a into '$output';"});
+
+        File mainscript = Util.createFile("mainscript.pig",
+                  new String[] {"run -param output=" + output1.getAbsolutePath()
+                                + " " + script1.getAbsolutePath() + ";"});
+
+
+        PigStats stats = org.apache.pig.PigRunner.run(new String[] {
+                                 "-x", Util.getLocalTestMode().toString(),
+                                 mainscript.getAbsolutePath()} , null);
+        Util.deleteDirectory(output1);
+        Util.deleteDirectory(output2);
+
+        assertTrue("job should succeed", stats.isSuccessful());
+        assertEquals("There should only be 1 output.",
+                     1, stats.getOutputNames().size());
+        assertEquals("Output name should be from output1 and not output2",
+                     output1.getAbsolutePath(),
+                     stats.getOutputLocations().get(0));
+    }
+
+    @Test
+    public void testScopeOfParamWithRunCommand() throws Exception {
+        log.info("Starting test testScopeOfParamWithRunCommand()");
+        File inputFile = Util.createFile(
+                  "runinput",
+                  new String[] { "daniel\t10",
+                                 "jenny\t20;"});
+        File output1 = File.createTempFile("output1_", "");
+        File output2 = File.createTempFile("output2_", "");
+        output1.delete();
+        output2.delete();
+
+        File script1 = Util.createFile("runscript1.pig",
+                  new String[] { "%default output '" + output1.getAbsolutePath() + "';",
+                                 "a = load 'runinput';",
+                                  "store a into '$output';"});
+
+        File script2 = Util.createFile("runscript2.pig",
+                  new String[] { "%default output '" + output2.getAbsolutePath() + "';",
+                                 "a = load 'runinput';",
+                                  "store a into '$output';"});
+
+        File mainscript = Util.createFile("mainscript.pig",
+                  new String[] { "run " + script1.getAbsolutePath() + ";",
+                                 "run " + script2.getAbsolutePath() + ";" });
+
+        PigStats stats = org.apache.pig.PigRunner.run(new String[] {
+                                 "-x", Util.getLocalTestMode().toString(),
+                                 mainscript.getAbsolutePath()} , null);
+        Util.deleteDirectory(output1);
+        Util.deleteDirectory(output2);
+
+        assertTrue("job should succeed", stats.isSuccessful());
+        assertNotEquals("Two output paths should differ",
+               stats.getOutputNames().get(0), stats.getOutputNames().get(1));
+        assertEquals("Each output should contain 2 records",
+                      2, stats.getOutputStats().get(0).getNumberRecords());
+        assertEquals("Each output should contain 2 records",
+                      2, stats.getOutputStats().get(1).getNumberRecords());
+    }
+
+    @Test
+    public void testScopeOfParamWithNestedRunCommand() throws Exception {
+        log.info("Starting test testScopeOfParamWithRunCommand()");
+        File inputFile = Util.createFile(
+                  "runinput",
+                  new String[] { "daniel\t10",
+                                 "jenny\t20;"});
+        /*
+         * script1 sets a=1, b=2, c=3; calls script2
+         *   script2 sets b=22 (by -param); calls script3
+         *     script3 sets c=333; saves $a$b$c  (122333)
+         *   script2 saves $a$b$c (1223)
+         * script1 saves $a$b$c (123)
+         */
+        File script3 = Util.createFile("runscript3.pig",
+                  new String[] { "%declare c '333';",
+                                 "a = load 'runinput';",
+                                  "store a into 'testScopeOfParamWithNestedRunCommand${a}${b}${c}';"});
+
+        File script2 = Util.createFile("runscript2.pig",
+                  new String[] { "run " + script3.getAbsolutePath() + ";",
+                                 "a = load 'runinput';",
+                                  "store a into 'testScopeOfParamWithNestedRunCommand${a}${b}${c}';"});
+
+        File script1 = Util.createFile("runscript1.pig",
+                  new String[] { "%declare a '1';",
+                                 "%declare b '2';",
+                                 "%declare c '3';",
+                                 "run -param b=22 " + script2.getAbsolutePath() + ";",
+                                 "a = load 'runinput';",
+                                  "store a into 'testScopeOfParamWithNestedRunCommand${a}${b}${c}';"});
+
+        PigStats stats = org.apache.pig.PigRunner.run(new String[] {
+                                 "-x", Util.getLocalTestMode().toString(),
+                                 script1.getAbsolutePath()} , null);
+
+        for( String output : stats.getOutputNames() ) {
+            assertTrue(output.contains("testScopeOfParamWithNestedRunCommand"));
+            Util.deleteDirectory(new File(output));
+        }
+        assertTrue("job should succeed", stats.isSuccessful());
+        assertEquals("There should be three outputs.", 3, stats.getOutputNames().size());
+
+        for( String expectedoutput : new String [] {"testScopeOfParamWithNestedRunCommand123",
+                                            "testScopeOfParamWithNestedRunCommand1223",
+                                            "testScopeOfParamWithNestedRunCommand122333"} ) {
+            boolean found=false;
+            for( String output : stats.getOutputNames() ) {
+                if( output.endsWith(expectedoutput) ) {
+                    found=true;
+                }
+            }
+            assertTrue("Output " + expectedoutput + " should exist.", found);
+        }
+    }
+
+    /*  This currently does not work since PigMacro only picks the
+     *  param setting from the root script (script1)
+     *  To revisit after Grunt moves to ANTLR in PIG-2597.
+     *  Tracking in PIG-5028.
+     *
+
+    @Test
+    public void testScopeOfParamWithMacro() throws Exception {
+        log.info("Starting test testScopeOfParamWithMacro()");
+        File inputFile = Util.createFile(
+                  "runinput",
+                  new String[] { "daniel\t10",
+                                 "jenny\t20;"});
+        File macro = Util.createFile("testmacro.pig",
+                  new String[] { "DEFINE mymacro (A) RETURNS void {",
+                                 "store $A into 'testScopeOfParamWithMacro${a}${b}${c}';",
+                                 "};"});
+
+        File script3 = Util.createFile("runscript3.pig",
+                  new String[] { "%declare c '333';"});
+
+        File script2 = Util.createFile("runscript2.pig",
+                  new String[] { "%declare b '22';",
+                                 "import '" + macro.getAbsolutePath() + "';",
+                                 "a = load 'runinput';",
+                                 "mymacro(a);",
+                                 "exec " + script3.getAbsolutePath() + ";"});
+
+        File script1 = Util.createFile("runscript1.pig",
+                  new String[] { "%declare a '1';",
+                                 "%declare b '2';",
+                                 "%declare c '3';",
+                                 "exec " + script2.getAbsolutePath() + ";"});
+
+        PigStats stats = org.apache.pig.PigRunner.run(new String[] {
+                                 "-x", Util.getLocalTestMode().toString(),
+                                 script1.getAbsolutePath()} , null);
+
+        assertTrue("job should succeed", stats.isSuccessful());
+        Util.deleteDirectory(new File(stats.getOutputNames().get(0)));
+        assertEquals("There should be only 1 output.", 1, stats.getOutputNames().size());
+        assertTrue("Expected output testScopeOfParamWithMacro1223 but got " + stats.getOutputNames().get(0),
+                   stats.getOutputNames().get(0).equals("testScopeOfParamWithMacro1223"));
+    }
+    */
+
+
     /*
      *  Test parameter substition when register contains /* globbing
      */
diff --git a/test/org/apache/pig/test/Util.java b/test/org/apache/pig/test/Util.java
index 14c61f0..18b241e 100644
--- a/test/org/apache/pig/test/Util.java
+++ b/test/org/apache/pig/test/Util.java
@@ -833,7 +833,23 @@
     }
 
     public static File createFile(String[] data) throws Exception{
-        File f = File.createTempFile("tmp", "");
+        return createFile(null,data);
+    }
+
+    public static File createFile(String filePath, String[] data) throws Exception {
+        File f;
+        if( null == filePath || filePath.isEmpty() ) {
+          f = File.createTempFile("tmp", "");
+        } else  {
+          f = new File(filePath);
+        }
+
+        if (f.getParent() != null && !(new File(f.getParent())).exists()) {
+            (new File(f.getParent())).mkdirs();
+        }
+
+        f.deleteOnExit();
+
         PrintWriter pw = new PrintWriter(f);
         for (int i=0; i<data.length; i++){
             pw.println(data[i]);