PIG-5046: Skewed join with auto parallelism hangs when right input also has autoparallelism (rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1779463 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 5fb6142..9313fa7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -187,6 +187,8 @@
 
 BUG FIXES
 
+PIG-5046: Skewed join with auto parallelism hangs when right input also has autoparallelism (rohini)
+
 PIG-5108: AvroStorage on Tez with exception on nested records (daijy)
 
 PIG-4260: SpillableMemoryManager.spill should revert spill on all exception (rohini)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
index ae6deaf..344b579 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
@@ -17,23 +17,25 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
 
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
-import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -46,8 +48,13 @@
 public class PartitionerDefinedVertexManager extends VertexManagerPlugin {
     private static final Log LOG = LogFactory.getLog(PartitionerDefinedVertexManager.class);
 
-    private boolean isParallelismSet = false;
+    private volatile boolean parallelismSet;
     private int dynamicParallelism = -1;
+    private int numConfiguredSources;
+    private int numSources = -1;
+    private volatile boolean configured;
+    private volatile boolean started;
+    private volatile boolean scheduled;
 
     public PartitionerDefinedVertexManager(VertexManagerPluginContext context) {
         super(context);
@@ -55,7 +62,31 @@
 
     @Override
     public void initialize() {
-        // Nothing to do
+        // this will prevent vertex from starting until we notify we are done
+        getContext().vertexReconfigurationPlanned();
+        parallelismSet = false;
+        numConfiguredSources = 0;
+        configured = false;
+        started = false;
+        numSources = getContext().getInputVertexEdgeProperties().size();
+        // wait for sources and self to start
+        Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
+        for (String entry : edges.keySet()) {
+            getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
+        }
+    }
+
+    @Override
+    public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate)
+            throws Exception {
+        numConfiguredSources++;
+        LOG.info("For vertex: " + getContext().getVertexName() + " Received configured signal from: "
+            + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources
+            + " needed: " + numSources);
+        Preconditions.checkState(numConfiguredSources <= numSources, "Vertex: " + getContext().getVertexName());
+        if (numConfiguredSources == numSources) {
+            configure();
+        }
     }
 
     @Override
@@ -73,10 +104,9 @@
     public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
         // There could be multiple partition vertex sending VertexManagerEvent
         // Only need to setVertexParallelism once
-        if (isParallelismSet) {
+        if (parallelismSet) {
             return;
         }
-        isParallelismSet = true;
         // Need to distinguish from VertexManagerEventPayloadProto emitted by OrderedPartitionedKVOutput
         if (vmEvent.getUserPayload().limit()==4) {
             dynamicParallelism = vmEvent.getUserPayload().getInt();
@@ -96,18 +126,50 @@
                     edgeManagers.put(entry.getKey(), edge);
                 }
                 getContext().reconfigureVertex(dynamicParallelism, null, edgeManagers);
+                parallelismSet = true;
+                configure();
             }
         }
     }
 
+    private void configure() {
+        if(parallelismSet && (numSources == numConfiguredSources)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Done reconfiguring vertex " + getContext().getVertexName());
+            }
+            getContext().doneReconfiguringVertex();
+            configured = true;
+            trySchedulingTasks();
+        }
+    }
+
+    private synchronized void trySchedulingTasks() {
+        if (configured && started && !scheduled) {
+            LOG.info("Scheduling " + dynamicParallelism + " tasks for vertex " + getContext().getVertexName());
+            List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism);
+            for (int i = 0; i < dynamicParallelism; ++i) {
+                tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
+            }
+            getContext().scheduleVertexTasks(tasksToStart);
+            scheduled = true;
+        }
+    }
+
     @Override
     public void onVertexStarted(Map<String, List<Integer>> completions) {
-        if (dynamicParallelism != -1) {
-            List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism);
-            for (int i=0; i<dynamicParallelism; ++i) {
-                tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
-            }
-            getContext().scheduleVertexTasks(tasksToStart);
+        // This vertex manager will be getting the following calls
+        //   1) onVertexManagerEventReceived - Parallelism vertex manager event sent by sample aggregator vertex
+        //   2) onVertexStateUpdated - Vertex CONFIGURED status updates from
+        //       - Order by Partitioner vertex (1-1) in case of Order by
+        //       - Skewed Join Left Partitioner (1-1) and Right Input Vertices in case of SkewedJoin
+        //   3) onVertexStarted
+        // Calls 2) and 3) can happen in any order. So we should schedule tasks
+        // only after start is called and configuration is also complete
+        started = true;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Vertex start received for " + getContext().getVertexName());
         }
+        trySchedulingTasks();
     }
+
 }
diff --git a/test/org/apache/pig/tez/TestTezAutoParallelism.java b/test/org/apache/pig/tez/TestTezAutoParallelism.java
index 90c4f19..dbd4891 100644
--- a/test/org/apache/pig/tez/TestTezAutoParallelism.java
+++ b/test/org/apache/pig/tez/TestTezAutoParallelism.java
@@ -36,6 +36,7 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
@@ -47,6 +48,7 @@
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -62,12 +64,23 @@
     private static Properties properties;
     private static MiniGenericCluster cluster;
 
+    private static final PathFilter PART_FILE_FILTER = new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+            if (path.getName().startsWith("part")) {
+                return true;
+            }
+            return false;
+        }
+    };
+
     @BeforeClass
     public static void oneTimeSetUp() throws Exception {
         cluster = MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ);
         properties = cluster.getProperties();
         //Test spilling to disk as tests here have multiple splits
         properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, "10");
+        properties.setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false");
         createFiles();
     }
 
@@ -84,6 +97,11 @@
 
     @After
     public void tearDown() throws Exception {
+        removeProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION);
+        removeProperty(MRConfiguration.MAX_SPLIT_SIZE);
+        removeProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
+        removeProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+        removeProperty(TezConfiguration.TEZ_AM_LOG_LEVEL);
         pigServer.shutdown();
         pigServer = null;
     }
@@ -131,23 +149,15 @@
     @Test
     public void testGroupBy() throws IOException{
         // parallelism is 3 originally, reduce to 1
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name;");
         pigServer.store("B", "output1");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER);
         assertEquals(files.length, 1);
         fs.delete(new Path("output1"), true);
     }
@@ -158,9 +168,9 @@
         NodeIdGenerator.reset();
         PigServer.resetScope();
 
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
 
         StringWriter writer = new StringWriter();
         Util.createLogAppender("testAutoParallelism", writer, TezDagBuilder.class);
@@ -169,15 +179,7 @@
             pigServer.registerQuery("B = group A by name;");
             pigServer.store("B", "output1");
             FileSystem fs = cluster.getFileSystem();
-            FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
-                @Override
-                public boolean accept(Path path) {
-                    if (path.getName().startsWith("part")) {
-                        return true;
-                    }
-                    return false;
-                }
-            });
+            FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER);
             assertEquals(files.length, 10);
             String log = writer.toString();
             assertTrue(log.contains("For vertex - scope-13: parallelism=3"));
@@ -191,9 +193,9 @@
     @Test
     public void testOrderbyDecreaseParallelism() throws IOException{
         // order by parallelism is 3 originally, reduce to 1
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name parallel 3;");
@@ -201,86 +203,54 @@
         pigServer.registerQuery("D = order C by age;");
         pigServer.store("D", "output2");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output2"), PART_FILE_FILTER);
         assertEquals(files.length, 1);
     }
 
     @Test
     public void testOrderbyIncreaseParallelism() throws IOException{
         // order by parallelism is 3 originally, increase to 4
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name parallel 3;");
         pigServer.registerQuery("C = foreach B generate group as name, AVG(A.age) as age;");
         pigServer.registerQuery("D = order C by age;");
         pigServer.store("D", "output3");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output3"), PART_FILE_FILTER);
         assertEquals(files.length, 4);
     }
 
     @Test
     public void testSkewedJoinDecreaseParallelism() throws IOException{
         // skewed join parallelism is 4 originally, reduce to 1
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
         pigServer.store("C", "output4");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output4"), PART_FILE_FILTER);
         assertEquals(files.length, 1);
     }
 
     @Test
     public void testSkewedJoinIncreaseParallelism() throws IOException{
         // skewed join parallelism is 3 originally, increase to 5
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
         pigServer.store("C", "output5");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER);
         assertEquals(files.length, 5);
     }
 
@@ -288,23 +258,15 @@
     public void testSkewedFullJoinIncreaseParallelism() throws IOException{
         // skewed full join parallelism take the initial setting, since the join vertex has a broadcast(sample) dependency,
         // which prevent it changing parallelism
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name full, B by name using 'skewed';");
         pigServer.store("C", "output6");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER);
         assertEquals(files.length, 5);
     }
 
@@ -312,9 +274,9 @@
     public void testSkewedJoinIncreaseParallelismWithScalar() throws IOException{
         // skewed join parallelism take the initial setting, since the join vertex has a broadcast(scalar) dependency,
         // which prevent it changing parallelism
-        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-        pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
@@ -324,19 +286,29 @@
         pigServer.registerQuery("G = foreach C generate age/F.count, gender;");
         pigServer.store("G", "output7");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output7"), new PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output7"), PART_FILE_FILTER);
         assertEquals(files.length, 4);
     }
 
     @Test
+    public void testSkewedJoinRightInputAutoParallelism() throws IOException{
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+        setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "1.0");
+        setProperty(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+        pigServer.registerQuery("B = FILTER B by name == 'Noah';");
+        pigServer.registerQuery("B1 = group B by name;");
+        pigServer.registerQuery("C = join A by name, B1 by group using 'skewed';");
+        pigServer.store("C", "output8");
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus[] files = fs.listStatus(new Path("output8"), PART_FILE_FILTER);
+        assertEquals(5, files.length);
+    }
+
+    @Test
     public void testFlattenParallelism() throws IOException{
         String outputDir = "/tmp/testFlattenParallelism";
         String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"
@@ -423,9 +395,9 @@
         // When there is a combiner operation involved user specified parallelism is overriden
         Util.createLogAppender("testAutoParallelism", writer, classesToLog);
         try {
-            pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
-            pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
-            pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
+            setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+            setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
+            setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
             pigServer.setBatchOn();
             pigServer.registerScript(new ByteArrayInputStream(script.getBytes()));
             pigServer.executeBatch();
@@ -453,4 +425,12 @@
             Util.deleteFile(cluster, outputDir);
         }
     }
+
+    private void setProperty(String property, String value) {
+        pigServer.getPigContext().getProperties().setProperty(property, value);
+    }
+
+    private void removeProperty(String property) {
+        pigServer.getPigContext().getProperties().remove(property);
+    }
 }