PIG-3038: Support for Credentials for UDF,Loader and Storer (satishsaley via rohini)
git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1842040 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index fe18b14..aa1a1a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
IMPROVEMENTS
+PIG-3038: Support for Credentials for UDF,Loader and Storer (satishsaley via rohini)
+
PIG-5358: Remove hive-contrib jar from lib directory (szita)
PIG-5343: Upgrade developer build environment (nielsbasjes via szita)
diff --git a/src/org/apache/pig/EvalFunc.java b/src/org/apache/pig/EvalFunc.java
index fd139a8..9da85c6 100644
--- a/src/org/apache/pig/EvalFunc.java
+++ b/src/org/apache/pig/EvalFunc.java
@@ -20,6 +20,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
import org.apache.pig.builtin.OutputSchema;
@@ -382,4 +384,16 @@
return null;
}
+ /**
+ * Allows adding secrets or custom credentials that can be used to
+ * talk to external systems. For eg: keys to decrypt encrypted data,
+ * database passwords, hcatalog/hbase delegation tokens, etc.
+ * This will be called once on the front end before the job is submitted.
+ * The added credentials can be accessed in the backend
+ * via {@link org.apache.hadoop.security.UserGroupInformation#getCredentials()}.
+ * @param credentials Credentials object to which delegation tokens and secrets can be added
+ * @param conf
+ */
+ public void addCredentials(Credentials credentials, Configuration conf){
+ }
}
diff --git a/src/org/apache/pig/LoadFunc.java b/src/org/apache/pig/LoadFunc.java
index 83e89a3..c36de94 100644
--- a/src/org/apache/pig/LoadFunc.java
+++ b/src/org/apache/pig/LoadFunc.java
@@ -34,6 +34,7 @@
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.security.Credentials;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.LoadPushDown.RequiredFieldList;
@@ -362,4 +363,16 @@
return null;
}
+ /**
+ * Allows adding secrets or custom credentials that can be used to
+ * talk to external systems. For eg: keys to decrypt encrypted data,
+ * database passwords, hcatalog/hbase delegation tokens, etc.
+ * This will be called once on the front end before the job is submitted.
+ * The added credentials can be accessed in the backend
+ * via {@link org.apache.hadoop.security.UserGroupInformation#getCredentials()}.
+ * @param credentials Credentials object to which delegation tokens and secrets can be added
+ * @param conf
+ */
+ public void addCredentials(Credentials credentials, Configuration conf) {
+ }
}
diff --git a/src/org/apache/pig/StoreFuncInterface.java b/src/org/apache/pig/StoreFuncInterface.java
index c590084..a3ae2ab 100644
--- a/src/org/apache/pig/StoreFuncInterface.java
+++ b/src/org/apache/pig/StoreFuncInterface.java
@@ -19,11 +19,12 @@
import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
-
+import org.apache.hadoop.security.Credentials;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.pig.classification.InterfaceStability;
import org.apache.pig.data.Tuple;
@@ -152,4 +153,17 @@
* any runtime job information.
*/
void cleanupOnSuccess(String location, Job job) throws IOException;
+
+ /**
+ * Allows adding secrets or custom credentials that can be used to
+ * talk to external systems. For eg: keys to decrypt encrypted data,
+ * database passwords, hcatalog/hbase delegation tokens, etc.
+ * This will be called once on the front end before the job is submitted.
+ * The added credentials can be accessed in the backend
+ * via {@link org.apache.hadoop.security.UserGroupInformation#getCredentials()}.
+ * @param credentials Credentials object to which delegation tokens and secrets can be added
+ * @param conf
+ */
+ default void addCredentials(Credentials credentials, Configuration conf) {
+ }
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
index 4d3ab50..31528dc 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
@@ -82,6 +82,7 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
@@ -551,13 +552,20 @@
for (POLoad ld : lds) {
LoadFunc lf = ld.getLoadFunc();
lf.setLocation(ld.getLFile().getFileName(), nwJob);
-
+ lf.addCredentials(nwJob.getCredentials(), conf);
ld.setParentPlan(null);
//Store the inp filespecs
inp.add(ld);
}
}
+ //Process the POUserFunc
+ List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(mro.mapPlan, POUserFunc.class);
+ userFuncs.addAll(PlanHelper.getPhysicalOperators(mro.reducePlan, POUserFunc.class));
+ for (POUserFunc userFunc : userFuncs) {
+ userFunc.getFunc().addCredentials(nwJob.getCredentials(), conf);
+ }
+
if(!mro.reducePlan.isEmpty()){
log.info("Reduce phase detected, estimating # of required reducers.");
adjustNumReducers(plan, mro, nwJob);
@@ -774,6 +782,7 @@
osf.cleanupOutput(st, nwJob);
}
}
+ sFunc.addCredentials(nwJob.getCredentials(), conf);
}
for (POStore st : reduceStores) {
@@ -786,6 +795,7 @@
osf.cleanupOutput(st, nwJob);
}
}
+ sFunc.addCredentials(nwJob.getCredentials(), conf);
}
setOutputFormat(nwJob);
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
index 2c8dea6..7ccd393 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
@@ -32,6 +32,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
import org.apache.pig.Accumulator;
import org.apache.pig.Algebraic;
import org.apache.pig.EvalFunc;
@@ -68,6 +69,7 @@
private transient EvalFunc func;
private transient List<String> cacheFiles = null;
private transient List<String> shipFiles = null;
+ private transient Credentials creds = null;
FuncSpec funcSpec;
FuncSpec origFSpec;
@@ -642,4 +644,13 @@
public boolean needEndOfAllInputProcessing() {
return getFunc().needEndOfAllInputProcessing();
}
+
+ public Credentials getCredentials() {
+ return this.creds;
+ }
+
+ public void setCredentials(Credentials creds) {
+ this.creds = creds;
+ }
+
}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
index f292487..0b74ca8 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
@@ -84,6 +84,7 @@
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -360,7 +361,6 @@
public void visitTezOp(TezOperator tezOp) throws VisitorException {
TezOperPlan tezPlan = getPlan();
List<TezOperator> predecessors = tezPlan.getPredecessors(tezOp);
-
// Construct vertex for the current Tez operator
Vertex to = null;
try {
@@ -611,6 +611,7 @@
private Vertex newVertex(TezOperator tezOp) throws IOException,
ClassNotFoundException, InterruptedException {
+
ProcessorDescriptor procDesc = ProcessorDescriptor.create(
tezOp.getProcessorName());
@@ -642,6 +643,9 @@
// Process stores
LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
+ // Process UserFuncs
+ processUserFuncs(tezOp, job);
+
Configuration inputPayLoad = null;
Configuration outputPayLoad = null;
@@ -1044,6 +1048,19 @@
return vertex;
}
+ /**
+ * Process POUserFunc to add credentials
+ * @param tezOp
+ * @param job
+ * @throws VisitorException
+ */
+ private void processUserFuncs(TezOperator tezOp, Job job) throws VisitorException {
+ List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
+ for (POUserFunc userFunc : userFuncs) {
+ userFunc.getFunc().addCredentials(job.getCredentials(), job.getConfiguration());
+ }
+ }
+
private LinkedList<POStore> processStores(TezOperator tezOp,
Configuration payloadConf, Job job) throws VisitorException,
IOException {
@@ -1057,6 +1074,7 @@
storeLocations.add(st);
StoreFuncInterface sFunc = st.getStoreFunc();
sFunc.setStoreLocation(st.getSFile().getFileName(), job);
+ sFunc.addCredentials(job.getCredentials(), job.getConfiguration());
}
Path tmpLocation = null;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
index 7a12df7..d984fef 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
@@ -100,7 +100,7 @@
for (POLoad ld : lds) {
LoadFunc lf = ld.getLoadFunc();
lf.setLocation(ld.getLFile().getFileName(), job);
-
+ lf.addCredentials(this.jobConf.getCredentials(), conf);
// Store the inp filespecs
inp.add(ld.getLFile());
}
diff --git a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
index 9804038..1953253 100644
--- a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
+++ b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
@@ -17,7 +17,6 @@
package org.apache.pig.backend.hadoop.hbase;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -78,6 +77,7 @@
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.pig.CollectableLoadFunc;
import org.apache.pig.LoadCaster;
@@ -766,11 +766,6 @@
job.getConfiguration().setBoolean("pig.noSplitCombination", true);
m_conf = initializeLocalJobConfig(job);
- String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
- if (delegationTokenSet == null) {
- addHBaseDelegationToken(m_conf, job);
- udfProps.setProperty(HBASE_TOKEN_SET, "true");
- }
String tablename = location;
if (location.startsWith("hbase://")) {
@@ -830,6 +825,35 @@
return FuncUtils.getShipFiles(classList);
}
+
+ @Override
+ public void addCredentials(Credentials credentials, Configuration conf) {
+ JobConf jobConf = initializeLocalJobConfig(conf);
+ if ("kerberos".equalsIgnoreCase(jobConf.get(HBASE_SECURITY_CONF_KEY))) {
+ LOG.info("hbase is configured to use Kerberos, attempting to fetch delegation token.");
+ try {
+ User currentUser = User.getCurrent();
+ UserGroupInformation currentUserGroupInformation = currentUser.getUGI();
+ if (currentUserGroupInformation.hasKerberosCredentials()) {
+ try (Connection connection = ConnectionFactory.createConnection(jobConf, currentUser)) {
+ TokenUtil.obtainTokenForJob(connection, jobConf, currentUser);
+ LOG.info("Token retrieval succeeded for user " + currentUser.getName());
+ credentials.addAll(jobConf.getCredentials());
+ }
+ } else {
+ LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available for user " + currentUser.getName());
+ }
+ } catch (RuntimeException re) {
+ throw re;
+ } catch (Exception e) {
+ throw new UndeclaredThrowableException(e,
+ "Unexpected error calling TokenUtil.obtainTokenForJob()");
+ }
+ } else {
+ LOG.info("hbase is not configured to use kerberos, skipping delegation token");
+ }
+ }
+
private void addClassToList(String className, List<Class> classList) {
try {
Class klass = Class.forName(className);
@@ -839,9 +863,8 @@
}
}
- private JobConf initializeLocalJobConfig(Job job) {
+ private JobConf initializeLocalJobConfig(Configuration jobConf) {
Properties udfProps = getUDFProperties();
- Configuration jobConf = job.getConfiguration();
JobConf localConf = new JobConf(jobConf);
if (udfProps.containsKey(HBASE_CONFIG_SET)) {
for (Entry<Object, Object> entry : udfProps.entrySet()) {
@@ -864,40 +887,8 @@
return localConf;
}
- /**
- * Get delegation token from hbase and add it to the Job
- *
- */
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private void addHBaseDelegationToken(Configuration hbaseConf, Job job) {
-
- if (!UDFContext.getUDFContext().isFrontend()) {
- LOG.debug("skipping authentication checks because we're currently in a frontend UDF context");
- return;
- }
-
- if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
- LOG.info("hbase is configured to use Kerberos, attempting to fetch delegation token.");
- try {
- User currentUser = User.getCurrent();
- UserGroupInformation currentUserGroupInformation = currentUser.getUGI();
- if (currentUserGroupInformation.hasKerberosCredentials()) {
- try (Connection connection = ConnectionFactory.createConnection(hbaseConf, currentUser)) {
- TokenUtil.obtainTokenForJob(connection, currentUser, job);
- LOG.info("Token retrieval succeeded for user " + currentUser.getName());
- }
- } else {
- LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available for user " + currentUser.getName());
- }
- } catch (RuntimeException re) {
- throw re;
- } catch (Exception e) {
- throw new UndeclaredThrowableException(e,
- "Unexpected error calling TokenUtil.obtainTokenForJob()");
- }
- } else {
- LOG.info("hbase is not configured to use kerberos, skipping delegation token");
- }
+ private JobConf initializeLocalJobConfig(Job job) {
+ return initializeLocalJobConfig(job.getConfiguration());
}
@Override
@@ -1129,11 +1120,6 @@
}
m_conf = initializeLocalJobConfig(job);
- // Not setting a udf property and getting the hbase delegation token
- // only once like in setLocation as setStoreLocation gets different Job
- // objects for each call and the last Job passed is the one that is
- // launched. So we end up getting multiple hbase delegation tokens.
- addHBaseDelegationToken(m_conf, job);
}
@Override
diff --git a/test/org/apache/pig/test/TestCredentials.java b/test/org/apache/pig/test/TestCredentials.java
new file mode 100644
index 0000000..8ec4ee2
--- /dev/null
+++ b/test/org/apache/pig/test/TestCredentials.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestCredentials {
+ private static PigServer pigServer;
+ private static PigContext pc;
+ private static MiniGenericCluster cluster;
+ private static String INPUT_FILE = "input.txt";
+ private static String OUTPUT_DIR = "output";
+ private static final Text ALIAS = new Text ("testKey");
+ private static final String SECRET = "dummySecret";
+
+ public static class CredentialsEvalFunc extends EvalFunc<String>{
+ @Override
+ public String exec(Tuple input) throws IOException {
+ String val = new String(UserGroupInformation.getCurrentUser().getCredentials().getSecretKey(ALIAS));
+ if(!SECRET.equals(val)) {
+ throw new IOException("Invalid secret");
+ }
+ return val;
+ }
+
+ @Override
+ public void addCredentials(Credentials credentials, Configuration conf) {
+ Credentials creds = new Credentials();
+ creds.addSecretKey(ALIAS, SECRET.getBytes());
+ credentials.addAll(creds);
+ }
+ }
+
+ public static class CredPigStorage extends PigStorage {
+ @Override
+ public void addCredentials(Credentials credentials, Configuration conf) {
+ Credentials creds = new Credentials();
+ creds.addSecretKey(ALIAS, SECRET.getBytes());
+ credentials.addAll(creds);
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ Tuple ret = super.getNext();
+ if(ret == null) {
+ return ret;
+ }
+ byte[] b = UserGroupInformation.getCurrentUser().getCredentials().getSecretKey(ALIAS);
+ if(b != null) {
+ ret.append(new String(b));
+ }
+ return ret;
+ }
+
+ @Override
+ public void putNext(Tuple tuple) throws IOException {
+ if(tuple == null) {
+ return;
+ }
+ byte[] b = UserGroupInformation.getCurrentUser().getCredentials().getSecretKey(ALIAS);
+ if(b != null) {
+ tuple.append(new String(b));
+ }
+ super.putNext(tuple);
+ }
+ }
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ cluster = MiniGenericCluster.buildCluster();
+ pc = new PigContext(cluster.getExecType(), cluster.getProperties());
+ pc.connect();
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ GenPhyOp.setPc(pc);
+ createInput();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ deleteInput();
+ if(pigServer!=null) {
+ pigServer.shutdown();
+ }
+ cluster.shutDown();
+ }
+
+ @Test
+ public void testCredentialsEvalFunc() throws IOException {
+ Tuple expectedResult = (Tuple)Util.getPigConstant("('" + SECRET + "')");
+ pigServer.registerQuery("a = load '"+ INPUT_FILE +"' as (i:chararray);");
+ pigServer.registerQuery("d = foreach a generate " + CredentialsEvalFunc.class.getName() + "(i);");
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ assertTrue(it.hasNext());
+ assertEquals(expectedResult, it.next());
+ assertFalse(it.hasNext());
+ }
+
+ @Test
+ public void testCredentialsLoadFunc() throws Exception {
+ Tuple expectedResult = (Tuple)Util.getPigConstant("('" + SECRET + "')");
+ pigServer.registerQuery("a = load '" + INPUT_FILE + "' using " + CredPigStorage.class.getName()
+ + "() as (text:chararray, secstr:chararray);");
+ pigServer.registerQuery("d = foreach a generate secstr;");
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ assertTrue(it.hasNext());
+ assertEquals(expectedResult, it.next());
+ assertFalse(it.hasNext());
+ }
+
+ @Test
+ public void testCredentialsStoreFunc() throws Exception {
+ Tuple expectedResult = (Tuple)Util.getPigConstant("('" + SECRET + "')");
+ pigServer.registerQuery("a = load '" + INPUT_FILE + "' using PigStorage() as (text:chararray);");
+ pigServer.registerQuery("store a into '" + OUTPUT_DIR +"' using " + CredPigStorage.class.getName() + "();");
+ pigServer.registerQuery("c = load '" + OUTPUT_DIR + "' using PigStorage() as (text:chararray, secstr:chararray);");
+ pigServer.registerQuery("d = foreach c generate secstr;");
+ Iterator<Tuple> it = pigServer.openIterator("d");
+ assertTrue(it.hasNext());
+ assertEquals(expectedResult, it.next());
+ assertFalse(it.hasNext());
+ }
+
+ private static void createInput() throws IOException {
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("dumb");
+ w.close();
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+ }
+
+ private static void deleteInput() throws IOException {
+ new File(INPUT_FILE).delete();
+ FileUtils.deleteDirectory(new File(OUTPUT_DIR));
+ Util.deleteFile(cluster, INPUT_FILE);
+ Util.deleteFile(cluster, OUTPUT_DIR);
+ }
+
+}