PIG-5046: Skewed join with auto parallelism hangs when right input also has autoparallelism (rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1779463 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index 5fb6142..9313fa7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -187,6 +187,8 @@
BUG FIXES
+PIG-5046: Skewed join with auto parallelism hangs when right input also has autoparallelism (rohini)
+
PIG-5108: AvroStorage on Tez with exception on nested records (daijy)
PIG-4260: SpillableMemoryManager.spill should revert spill on all exception (rohini)
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
index ae6deaf..344b579 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
@@ -17,23 +17,25 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
-import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
/**
@@ -46,8 +48,13 @@
public class PartitionerDefinedVertexManager extends VertexManagerPlugin {
private static final Log LOG = LogFactory.getLog(PartitionerDefinedVertexManager.class);
- private boolean isParallelismSet = false;
+ private volatile boolean parallelismSet;
private int dynamicParallelism = -1;
+ private int numConfiguredSources;
+ private int numSources = -1;
+ private volatile boolean configured;
+ private volatile boolean started;
+ private volatile boolean scheduled;
public PartitionerDefinedVertexManager(VertexManagerPluginContext context) {
super(context);
@@ -55,7 +62,31 @@
@Override
public void initialize() {
- // Nothing to do
+ // this will prevent vertex from starting until we notify we are done
+ getContext().vertexReconfigurationPlanned();
+ parallelismSet = false;
+ numConfiguredSources = 0;
+ configured = false;
+ started = false;
+ numSources = getContext().getInputVertexEdgeProperties().size();
+ // wait for sources and self to start
+ Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
+ for (String entry : edges.keySet()) {
+ getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
+ }
+ }
+
+ @Override
+ public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate)
+ throws Exception {
+ numConfiguredSources++;
+ LOG.info("For vertex: " + getContext().getVertexName() + " Received configured signal from: "
+ + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources
+ + " needed: " + numSources);
+ Preconditions.checkState(numConfiguredSources <= numSources, "Vertex: " + getContext().getVertexName());
+ if (numConfiguredSources == numSources) {
+ configure();
+ }
}
@Override
@@ -73,10 +104,9 @@
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
// There could be multiple partition vertex sending VertexManagerEvent
// Only need to setVertexParallelism once
- if (isParallelismSet) {
+ if (parallelismSet) {
return;
}
- isParallelismSet = true;
// Need to distinguish from VertexManagerEventPayloadProto emitted by OrderedPartitionedKVOutput
if (vmEvent.getUserPayload().limit()==4) {
dynamicParallelism = vmEvent.getUserPayload().getInt();
@@ -96,18 +126,50 @@
edgeManagers.put(entry.getKey(), edge);
}
getContext().reconfigureVertex(dynamicParallelism, null, edgeManagers);
+ parallelismSet = true;
+ configure();
}
}
}
+ private void configure() {
+ if(parallelismSet && (numSources == numConfiguredSources)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Done reconfiguring vertex " + getContext().getVertexName());
+ }
+ getContext().doneReconfiguringVertex();
+ configured = true;
+ trySchedulingTasks();
+ }
+ }
+
+ private synchronized void trySchedulingTasks() {
+ if (configured && started && !scheduled) {
+ LOG.info("Scheduling " + dynamicParallelism + " tasks for vertex " + getContext().getVertexName());
+ List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism);
+ for (int i = 0; i < dynamicParallelism; ++i) {
+ tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
+ }
+ getContext().scheduleVertexTasks(tasksToStart);
+ scheduled = true;
+ }
+ }
+
@Override
public void onVertexStarted(Map<String, List<Integer>> completions) {
- if (dynamicParallelism != -1) {
- List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism);
- for (int i=0; i<dynamicParallelism; ++i) {
- tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
- }
- getContext().scheduleVertexTasks(tasksToStart);
+ // This vertex manager will be getting the following calls
+ // 1) onVertexManagerEventReceived - Parallelism vertex manager event sent by sample aggregator vertex
+ // 2) onVertexStateUpdated - Vertex CONFIGURED status updates from
+ // - Order by Partitioner vertex (1-1) in case of Order by
+ // - Skewed Join Left Partitioner (1-1) and Right Input Vertices in case of SkewedJoin
+ // 3) onVertexStarted
+ // Calls 2) and 3) can happen in any order. So we should schedule tasks
+ // only after start is called and configuration is also complete
+ started = true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Vertex start received for " + getContext().getVertexName());
}
+ trySchedulingTasks();
}
+
}
diff --git a/test/org/apache/pig/tez/TestTezAutoParallelism.java b/test/org/apache/pig/tez/TestTezAutoParallelism.java
index 90c4f19..dbd4891 100644
--- a/test/org/apache/pig/tez/TestTezAutoParallelism.java
+++ b/test/org/apache/pig/tez/TestTezAutoParallelism.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
@@ -47,6 +48,7 @@
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
+import org.apache.tez.dag.api.TezConfiguration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -62,12 +64,23 @@
private static Properties properties;
private static MiniGenericCluster cluster;
+ private static final PathFilter PART_FILE_FILTER = new PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ if (path.getName().startsWith("part")) {
+ return true;
+ }
+ return false;
+ }
+ };
+
@BeforeClass
public static void oneTimeSetUp() throws Exception {
cluster = MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ);
properties = cluster.getProperties();
//Test spilling to disk as tests here have multiple splits
properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, "10");
+ properties.setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, "false");
createFiles();
}
@@ -84,6 +97,11 @@
@After
public void tearDown() throws Exception {
+ removeProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION);
+ removeProperty(MRConfiguration.MAX_SPLIT_SIZE);
+ removeProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
+ removeProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+ removeProperty(TezConfiguration.TEZ_AM_LOG_LEVEL);
pigServer.shutdown();
pigServer = null;
}
@@ -131,23 +149,15 @@
@Test
public void testGroupBy() throws IOException{
// parallelism is 3 originally, reduce to 1
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = group A by name;");
pigServer.store("B", "output1");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER);
assertEquals(files.length, 1);
fs.delete(new Path("output1"), true);
}
@@ -158,9 +168,9 @@
NodeIdGenerator.reset();
PigServer.resetScope();
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
StringWriter writer = new StringWriter();
Util.createLogAppender("testAutoParallelism", writer, TezDagBuilder.class);
@@ -169,15 +179,7 @@
pigServer.registerQuery("B = group A by name;");
pigServer.store("B", "output1");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output1"), PART_FILE_FILTER);
assertEquals(files.length, 10);
String log = writer.toString();
assertTrue(log.contains("For vertex - scope-13: parallelism=3"));
@@ -191,9 +193,9 @@
@Test
public void testOrderbyDecreaseParallelism() throws IOException{
// order by parallelism is 3 originally, reduce to 1
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = group A by name parallel 3;");
@@ -201,86 +203,54 @@
pigServer.registerQuery("D = order C by age;");
pigServer.store("D", "output2");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output2"), PART_FILE_FILTER);
assertEquals(files.length, 1);
}
@Test
public void testOrderbyIncreaseParallelism() throws IOException{
// order by parallelism is 3 originally, increase to 4
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = group A by name parallel 3;");
pigServer.registerQuery("C = foreach B generate group as name, AVG(A.age) as age;");
pigServer.registerQuery("D = order C by age;");
pigServer.store("D", "output3");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output3"), PART_FILE_FILTER);
assertEquals(files.length, 4);
}
@Test
public void testSkewedJoinDecreaseParallelism() throws IOException{
// skewed join parallelism is 4 originally, reduce to 1
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
pigServer.store("C", "output4");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output4"), PART_FILE_FILTER);
assertEquals(files.length, 1);
}
@Test
public void testSkewedJoinIncreaseParallelism() throws IOException{
// skewed join parallelism is 3 originally, increase to 5
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
pigServer.store("C", "output5");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER);
assertEquals(files.length, 5);
}
@@ -288,23 +258,15 @@
public void testSkewedFullJoinIncreaseParallelism() throws IOException{
// skewed full join parallelism take the initial setting, since the join vertex has a broadcast(sample) dependency,
// which prevent it changing parallelism
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
pigServer.registerQuery("C = join A by name full, B by name using 'skewed';");
pigServer.store("C", "output6");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output5"), PART_FILE_FILTER);
assertEquals(files.length, 5);
}
@@ -312,9 +274,9 @@
public void testSkewedJoinIncreaseParallelismWithScalar() throws IOException{
// skewed join parallelism take the initial setting, since the join vertex has a broadcast(scalar) dependency,
// which prevent it changing parallelism
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
pigServer.registerQuery("C = join A by name, B by name using 'skewed';");
@@ -324,19 +286,29 @@
pigServer.registerQuery("G = foreach C generate age/F.count, gender;");
pigServer.store("G", "output7");
FileSystem fs = cluster.getFileSystem();
- FileStatus[] files = fs.listStatus(new Path("output7"), new PathFilter(){
- @Override
- public boolean accept(Path path) {
- if (path.getName().startsWith("part")) {
- return true;
- }
- return false;
- }
- });
+ FileStatus[] files = fs.listStatus(new Path("output7"), PART_FILE_FILTER);
assertEquals(files.length, 4);
}
@Test
+ public void testSkewedJoinRightInputAutoParallelism() throws IOException{
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "40000");
+ setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "1.0");
+ setProperty(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = FILTER B by name == 'Noah';");
+ pigServer.registerQuery("B1 = group B by name;");
+ pigServer.registerQuery("C = join A by name, B1 by group using 'skewed';");
+ pigServer.store("C", "output8");
+ FileSystem fs = cluster.getFileSystem();
+ FileStatus[] files = fs.listStatus(new Path("output8"), PART_FILE_FILTER);
+ assertEquals(5, files.length);
+ }
+
+ @Test
public void testFlattenParallelism() throws IOException{
String outputDir = "/tmp/testFlattenParallelism";
String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);"
@@ -423,9 +395,9 @@
// When there is a combiner operation involved user specified parallelism is overriden
Util.createLogAppender("testAutoParallelism", writer, classesToLog);
try {
- pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
- pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
+ setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
+ setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
pigServer.setBatchOn();
pigServer.registerScript(new ByteArrayInputStream(script.getBytes()));
pigServer.executeBatch();
@@ -453,4 +425,12 @@
Util.deleteFile(cluster, outputDir);
}
}
+
+ private void setProperty(String property, String value) {
+ pigServer.getPigContext().getProperties().setProperty(property, value);
+ }
+
+ private void removeProperty(String property) {
+ pigServer.getPigContext().getProperties().remove(property);
+ }
}