PIG-3038: Support for Credentials for UDF,Loader and Storer (satishsaley via rohini)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1842040 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index fe18b14..aa1a1a4 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -26,6 +26,8 @@
  
 IMPROVEMENTS
 
+PIG-3038: Support for Credentials for UDF,Loader and Storer (satishsaley via rohini)
+
 PIG-5358: Remove hive-contrib jar from lib directory (szita)
 
 PIG-5343: Upgrade developer build environment (nielsbasjes via szita)
diff --git a/src/org/apache/pig/EvalFunc.java b/src/org/apache/pig/EvalFunc.java
index fd139a8..9da85c6 100644
--- a/src/org/apache/pig/EvalFunc.java
+++ b/src/org/apache/pig/EvalFunc.java
@@ -20,6 +20,8 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigLogger;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PigProgressable;
 import org.apache.pig.builtin.OutputSchema;
@@ -382,4 +384,16 @@
         return null;
     }
 
+    /**
+     * Allows adding secrets or custom credentials that can be used to
+     * talk to external systems. For eg: keys to decrypt encrypted data,
+     * database passwords, hcatalog/hbase delegation tokens, etc.
+     * This will be called once on the front end before the job is submitted.
+     * The added credentials can be accessed in the backend
+     * via {@link org.apache.hadoop.security.UserGroupInformation#getCredentials()}.
+     * @param credentials Credentials object to which delegation tokens and secrets can be added
+     * @param conf
+     */
+    public void addCredentials(Credentials credentials, Configuration conf){
+    }
 }
diff --git a/src/org/apache/pig/LoadFunc.java b/src/org/apache/pig/LoadFunc.java
index 83e89a3..c36de94 100644
--- a/src/org/apache/pig/LoadFunc.java
+++ b/src/org/apache/pig/LoadFunc.java
@@ -34,6 +34,7 @@
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.security.Credentials;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.LoadPushDown.RequiredFieldList;
@@ -362,4 +363,16 @@
         return null;
     }
 
+    /**
+     * Allows adding secrets or custom credentials that can be used to
+     * talk to external systems. For eg: keys to decrypt encrypted data,
+     * database passwords, hcatalog/hbase delegation tokens, etc.
+     * This will be called once on the front end before the job is submitted.
+     * The added credentials can be accessed in the backend
+     * via {@link org.apache.hadoop.security.UserGroupInformation#getCredentials()}.
+     * @param credentials Credentials object to which delegation tokens and secrets can be added
+     * @param conf
+     */
+    public void addCredentials(Credentials credentials, Configuration conf) {
+    }
 }
diff --git a/src/org/apache/pig/StoreFuncInterface.java b/src/org/apache/pig/StoreFuncInterface.java
index c590084..a3ae2ab 100644
--- a/src/org/apache/pig/StoreFuncInterface.java
+++ b/src/org/apache/pig/StoreFuncInterface.java
@@ -19,11 +19,12 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
-
+import org.apache.hadoop.security.Credentials;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.Tuple;
@@ -152,4 +153,17 @@
      * any runtime job information. 
      */
     void cleanupOnSuccess(String location, Job job) throws IOException;
+
+    /**
+     * Allows adding secrets or custom credentials that can be used to
+     * talk to external systems. For eg: keys to decrypt encrypted data,
+     * database passwords, hcatalog/hbase delegation tokens, etc.
+     * This will be called once on the front end before the job is submitted.
+     * The added credentials can be accessed in the backend
+     * via {@link org.apache.hadoop.security.UserGroupInformation#getCredentials()}.
+     * @param credentials Credentials object to which delegation tokens and secrets can be added
+     * @param conf
+     */
+    default void addCredentials(Credentials credentials, Configuration conf) {
+    }
 }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
index 4d3ab50..31528dc 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
@@ -82,6 +82,7 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.UdfCacheShipFilesVisitor;
@@ -551,13 +552,20 @@
                 for (POLoad ld : lds) {
                     LoadFunc lf = ld.getLoadFunc();
                     lf.setLocation(ld.getLFile().getFileName(), nwJob);
-
+                    lf.addCredentials(nwJob.getCredentials(), conf);
                     ld.setParentPlan(null);
                     //Store the inp filespecs
                     inp.add(ld);
                 }
             }
 
+            //Process the POUserFunc
+            List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(mro.mapPlan, POUserFunc.class);
+            userFuncs.addAll(PlanHelper.getPhysicalOperators(mro.reducePlan, POUserFunc.class));
+            for (POUserFunc userFunc : userFuncs) {
+                userFunc.getFunc().addCredentials(nwJob.getCredentials(), conf);
+            }
+
             if(!mro.reducePlan.isEmpty()){
                 log.info("Reduce phase detected, estimating # of required reducers.");
                 adjustNumReducers(plan, mro, nwJob);
@@ -774,6 +782,7 @@
                         osf.cleanupOutput(st, nwJob);
                     }
                 }
+                sFunc.addCredentials(nwJob.getCredentials(), conf);
             }
 
             for (POStore st : reduceStores) {
@@ -786,6 +795,7 @@
                         osf.cleanupOutput(st, nwJob);
                     }
                 }
+                sFunc.addCredentials(nwJob.getCredentials(), conf);
             }
 
             setOutputFormat(nwJob);
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
index 2c8dea6..7ccd393 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
@@ -32,6 +32,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.Credentials;
 import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
@@ -68,6 +69,7 @@
     private transient EvalFunc func;
     private transient List<String> cacheFiles = null;
     private transient List<String> shipFiles = null;
+    private transient Credentials creds = null;
 
     FuncSpec funcSpec;
     FuncSpec origFSpec;
@@ -642,4 +644,13 @@
     public boolean needEndOfAllInputProcessing() {
         return getFunc().needEndOfAllInputProcessing();
     }
+
+    public Credentials getCredentials() {
+        return this.creds;
+    }
+
+    public void setCredentials(Credentials creds) {
+        this.creds = creds;
+   }
+
 }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
index f292487..0b74ca8 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
@@ -84,6 +84,7 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -360,7 +361,6 @@
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
         TezOperPlan tezPlan = getPlan();
         List<TezOperator> predecessors = tezPlan.getPredecessors(tezOp);
-
         // Construct vertex for the current Tez operator
         Vertex to = null;
         try {
@@ -611,6 +611,7 @@
 
     private Vertex newVertex(TezOperator tezOp) throws IOException,
             ClassNotFoundException, InterruptedException {
+
         ProcessorDescriptor procDesc = ProcessorDescriptor.create(
                 tezOp.getProcessorName());
 
@@ -642,6 +643,9 @@
         // Process stores
         LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
 
+        // Process UserFuncs
+        processUserFuncs(tezOp, job);
+
         Configuration inputPayLoad = null;
         Configuration outputPayLoad = null;
 
@@ -1044,6 +1048,19 @@
         return vertex;
     }
 
+    /**
+     * Process POUserFunc to add credentials
+     * @param tezOp
+     * @param job
+     * @throws VisitorException
+     */
+    private void processUserFuncs(TezOperator tezOp, Job job) throws VisitorException {
+        List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
+        for (POUserFunc userFunc : userFuncs) {
+            userFunc.getFunc().addCredentials(job.getCredentials(), job.getConfiguration());
+        }
+    }
+
     private LinkedList<POStore> processStores(TezOperator tezOp,
             Configuration payloadConf, Job job) throws VisitorException,
             IOException {
@@ -1057,6 +1074,7 @@
                 storeLocations.add(st);
                 StoreFuncInterface sFunc = st.getStoreFunc();
                 sFunc.setStoreLocation(st.getSFile().getFileName(), job);
+                sFunc.addCredentials(job.getCredentials(), job.getConfiguration());
             }
 
             Path tmpLocation = null;
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
index 7a12df7..d984fef 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
@@ -100,7 +100,7 @@
                 for (POLoad ld : lds) {
                     LoadFunc lf = ld.getLoadFunc();
                     lf.setLocation(ld.getLFile().getFileName(), job);
-
+                    lf.addCredentials(this.jobConf.getCredentials(), conf);
                     // Store the inp filespecs
                     inp.add(ld.getLFile());
                 }
diff --git a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
index 9804038..1953253 100644
--- a/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
+++ b/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
@@ -17,7 +17,6 @@
 package org.apache.pig.backend.hadoop.hbase;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -78,6 +77,7 @@
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.LoadCaster;
@@ -766,11 +766,6 @@
         job.getConfiguration().setBoolean("pig.noSplitCombination", true);
 
         m_conf = initializeLocalJobConfig(job);
-        String delegationTokenSet = udfProps.getProperty(HBASE_TOKEN_SET);
-        if (delegationTokenSet == null) {
-            addHBaseDelegationToken(m_conf, job);
-            udfProps.setProperty(HBASE_TOKEN_SET, "true");
-        }
 
         String tablename = location;
         if (location.startsWith("hbase://")) {
@@ -830,6 +825,35 @@
         return FuncUtils.getShipFiles(classList);
     }
 
+
+    @Override
+    public void addCredentials(Credentials credentials, Configuration conf) {
+        JobConf jobConf = initializeLocalJobConfig(conf);
+        if ("kerberos".equalsIgnoreCase(jobConf.get(HBASE_SECURITY_CONF_KEY))) {
+            LOG.info("hbase is configured to use Kerberos, attempting to fetch delegation token.");
+            try {
+                User currentUser = User.getCurrent();
+                UserGroupInformation currentUserGroupInformation = currentUser.getUGI();
+                if (currentUserGroupInformation.hasKerberosCredentials()) {
+                    try (Connection connection = ConnectionFactory.createConnection(jobConf, currentUser)) {
+                        TokenUtil.obtainTokenForJob(connection, jobConf, currentUser);
+                        LOG.info("Token retrieval succeeded for user " + currentUser.getName());
+                        credentials.addAll(jobConf.getCredentials());
+                    }
+                } else {
+                    LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available for user " + currentUser.getName());
+                }
+            } catch (RuntimeException re) {
+                throw re;
+            } catch (Exception e) {
+                throw new UndeclaredThrowableException(e,
+                        "Unexpected error calling TokenUtil.obtainTokenForJob()");
+            }
+        } else {
+            LOG.info("hbase is not configured to use kerberos, skipping delegation token");
+        }
+    }
+
     private void addClassToList(String className, List<Class> classList) {
         try {
             Class klass = Class.forName(className);
@@ -839,9 +863,8 @@
         }
     }
 
-    private JobConf initializeLocalJobConfig(Job job) {
+    private JobConf initializeLocalJobConfig(Configuration jobConf) {
         Properties udfProps = getUDFProperties();
-        Configuration jobConf = job.getConfiguration();
         JobConf localConf = new JobConf(jobConf);
         if (udfProps.containsKey(HBASE_CONFIG_SET)) {
             for (Entry<Object, Object> entry : udfProps.entrySet()) {
@@ -864,40 +887,8 @@
         return localConf;
     }
 
-    /**
-     * Get delegation token from hbase and add it to the Job
-     *
-     */
-    @SuppressWarnings({ "rawtypes", "unchecked" })
-    private void addHBaseDelegationToken(Configuration hbaseConf, Job job) {
-
-        if (!UDFContext.getUDFContext().isFrontend()) {
-            LOG.debug("skipping authentication checks because we're currently in a frontend UDF context");
-            return;
-        }
-
-        if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
-            LOG.info("hbase is configured to use Kerberos, attempting to fetch delegation token.");
-            try {
-                User currentUser = User.getCurrent();
-                UserGroupInformation currentUserGroupInformation = currentUser.getUGI();
-                if (currentUserGroupInformation.hasKerberosCredentials()) {
-                    try (Connection connection = ConnectionFactory.createConnection(hbaseConf, currentUser)) {
-                        TokenUtil.obtainTokenForJob(connection, currentUser, job);
-                        LOG.info("Token retrieval succeeded for user " + currentUser.getName());
-                    }
-                } else {
-                    LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available for user " + currentUser.getName());
-                }
-            } catch (RuntimeException re) {
-                throw re;
-            } catch (Exception e) {
-                throw new UndeclaredThrowableException(e,
-                        "Unexpected error calling TokenUtil.obtainTokenForJob()");
-            }
-        } else {
-            LOG.info("hbase is not configured to use kerberos, skipping delegation token");
-        }
+    private JobConf initializeLocalJobConfig(Job job) {
+        return initializeLocalJobConfig(job.getConfiguration());
     }
 
     @Override
@@ -1129,11 +1120,6 @@
         }
 
         m_conf = initializeLocalJobConfig(job);
-        // Not setting a udf property and getting the hbase delegation token
-        // only once like in setLocation as setStoreLocation gets different Job
-        // objects for each call and the last Job passed is the one that is
-        // launched. So we end up getting multiple hbase delegation tokens.
-        addHBaseDelegationToken(m_conf, job);
     }
 
     @Override
diff --git a/test/org/apache/pig/test/TestCredentials.java b/test/org/apache/pig/test/TestCredentials.java
new file mode 100644
index 0000000..8ec4ee2
--- /dev/null
+++ b/test/org/apache/pig/test/TestCredentials.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.test.utils.GenPhyOp;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestCredentials {
+    private static PigServer pigServer;
+    private static PigContext pc;
+    private static MiniGenericCluster cluster;
+    private static String INPUT_FILE = "input.txt";
+    private static String OUTPUT_DIR = "output";
+    private static final Text ALIAS = new Text ("testKey");
+    private static final String SECRET = "dummySecret";
+
+    public static class CredentialsEvalFunc extends EvalFunc<String>{
+        @Override
+        public String exec(Tuple input) throws IOException {
+            String val = new String(UserGroupInformation.getCurrentUser().getCredentials().getSecretKey(ALIAS));
+            if(!SECRET.equals(val)) {
+                throw new IOException("Invalid secret");
+            }
+            return val;
+        }
+
+        @Override
+        public void addCredentials(Credentials credentials, Configuration conf) {
+            Credentials creds = new Credentials();
+            creds.addSecretKey(ALIAS, SECRET.getBytes());
+            credentials.addAll(creds);
+        }
+    }
+
+    public static class CredPigStorage extends PigStorage {
+        @Override
+        public void addCredentials(Credentials credentials, Configuration conf) {
+            Credentials creds = new Credentials();
+            creds.addSecretKey(ALIAS, SECRET.getBytes());
+            credentials.addAll(creds);
+        }
+
+        @Override
+        public Tuple getNext() throws IOException {
+            Tuple ret = super.getNext();
+            if(ret == null) {
+                return ret;
+            }
+            byte[] b = UserGroupInformation.getCurrentUser().getCredentials().getSecretKey(ALIAS);
+            if(b != null) {
+                ret.append(new String(b));
+            }
+            return ret;
+        }
+
+        @Override
+        public void putNext(Tuple tuple) throws IOException {
+            if(tuple == null) {
+                return;
+            }
+            byte[] b = UserGroupInformation.getCurrentUser().getCredentials().getSecretKey(ALIAS);
+            if(b != null) {
+                tuple.append(new String(b));
+            }
+            super.putNext(tuple);
+        }
+    }
+
+    @BeforeClass
+    public static void setup() throws IOException {
+        cluster = MiniGenericCluster.buildCluster();
+        pc = new PigContext(cluster.getExecType(), cluster.getProperties());
+        pc.connect();
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        GenPhyOp.setPc(pc);
+        createInput();
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        deleteInput();
+        if(pigServer!=null) {
+            pigServer.shutdown();
+        }
+        cluster.shutDown();
+    }
+
+    @Test
+    public void testCredentialsEvalFunc() throws IOException {
+        Tuple expectedResult = (Tuple)Util.getPigConstant("('" + SECRET + "')");
+        pigServer.registerQuery("a = load '"+ INPUT_FILE +"' as (i:chararray);");
+        pigServer.registerQuery("d = foreach a generate " + CredentialsEvalFunc.class.getName() + "(i);");
+        Iterator<Tuple> it = pigServer.openIterator("d");
+        assertTrue(it.hasNext());
+        assertEquals(expectedResult, it.next());
+        assertFalse(it.hasNext());
+    }
+
+    @Test
+    public void testCredentialsLoadFunc() throws Exception {
+        Tuple expectedResult = (Tuple)Util.getPigConstant("('" + SECRET + "')");
+        pigServer.registerQuery("a = load '" + INPUT_FILE + "' using " + CredPigStorage.class.getName()
+                + "() as (text:chararray, secstr:chararray);");
+        pigServer.registerQuery("d = foreach a generate secstr;");
+        Iterator<Tuple> it = pigServer.openIterator("d");
+        assertTrue(it.hasNext());
+        assertEquals(expectedResult, it.next());
+        assertFalse(it.hasNext());
+    }
+
+    @Test
+    public void testCredentialsStoreFunc() throws Exception {
+        Tuple expectedResult = (Tuple)Util.getPigConstant("('" + SECRET + "')");
+        pigServer.registerQuery("a = load '" + INPUT_FILE + "' using PigStorage() as (text:chararray);");
+        pigServer.registerQuery("store a into '" + OUTPUT_DIR +"' using " + CredPigStorage.class.getName() + "();");
+        pigServer.registerQuery("c = load '" + OUTPUT_DIR + "' using PigStorage() as (text:chararray, secstr:chararray);");
+        pigServer.registerQuery("d = foreach c generate secstr;");
+        Iterator<Tuple> it = pigServer.openIterator("d");
+        assertTrue(it.hasNext());
+        assertEquals(expectedResult, it.next());
+        assertFalse(it.hasNext());
+    }
+
+    private static void createInput() throws IOException {
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+        w.println("dumb");
+        w.close();
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+    }
+
+    private static void deleteInput() throws IOException {
+        new File(INPUT_FILE).delete();
+        FileUtils.deleteDirectory(new File(OUTPUT_DIR));
+        Util.deleteFile(cluster, INPUT_FILE);
+        Util.deleteFile(cluster, OUTPUT_DIR);
+    }
+
+}