PIG-5216: Customizable Error Handling for Loaders in Pig (chenjunz via daijy)

git-svn-id: https://svn.apache.org/repos/asf/pig/trunk@1797099 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/src/docs/src/documentation/content/xdocs/udf.xml b/src/docs/src/documentation/content/xdocs/udf.xml
index 790ac00..5052c98 100644
--- a/src/docs/src/documentation/content/xdocs/udf.xml
+++ b/src/docs/src/documentation/content/xdocs/udf.xml
@@ -1204,7 +1204,7 @@
 <li id="storeresources"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/StoreResources.java?view=markup">StoreResources:</a> 
 This interface has methods to put hdfs files or local files to distributed cache. </li>
 <li id="errorhandling"><a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandling.java?view=markup">ErrorHandling:</a> 
-This interface allow you to skip bad records in the storer so the storer will not throw exception and terminate the job. You can implement your own error handler by overriding <a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandler.java?view=markup">ErrorHandler</a> interface, or use predefined error handler: <a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java?view=markup">CounterBasedErrorHandler</a>. ErrorHandling can be turned on by setting the property pig.error-handling.enabled to true in pig.properties. Default is false.  CounterBasedErrorHandler uses two settings - pig.error-handling.min.error.records (the minimum number of errors to trigger error handling) and pig.error-handling.error.threshold (percentage of the number of records as a fraction exceeding which error is thrown).</li>
+This interface allow you to skip bad records in both loader and storer so they will not throw exception and terminate the job. You can implement your own error handler by overriding <a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/ErrorHandler.java?view=markup">ErrorHandler</a> interface, or use predefined error handler: <a href="http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/CounterBasedErrorHandler.java?view=markup">CounterBasedErrorHandler</a>. ErrorHandling can be turned on by setting the property pig.error-handling.enabled to true in pig.properties. Default is false.  CounterBasedErrorHandler uses two settings - pig.error-handling.min.error.records (the minimum number of errors to trigger error handling) and pig.error-handling.error.threshold (percentage of the number of records as a fraction exceeding which error is thrown).</li>
 </ul>
 
 <p id="storefunc-override">The methods which need to be overridden in StoreFunc are explained below: </p>
diff --git a/src/org/apache/pig/CounterBasedErrorHandler.java b/src/org/apache/pig/CounterBasedErrorHandler.java
index 69b6c13..f5f1579 100644
--- a/src/org/apache/pig/CounterBasedErrorHandler.java
+++ b/src/org/apache/pig/CounterBasedErrorHandler.java
@@ -19,15 +19,14 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counter;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public class CounterBasedErrorHandler implements ErrorHandler {
 
-    public static final String STORER_ERROR_HANDLER_COUNTER_GROUP = "storer_Error_Handler";
-    public static final String STORER_ERROR_COUNT = "bad_record_count";
-    public static final String STORER_RECORD_COUNT = "record__count";
+    public static final String ERROR_HANDLER_COUNTER_GROUP = "error_Handler";
+    public static final String ERROR_COUNT = "bad_record_count";
+    public static final String RECORD_COUNT = "record__count";
 
     private final long minErrors;
     private final float errorThreshold; // fraction of errors allowed
@@ -42,18 +41,18 @@
 
     @Override
     public void onSuccess(String uniqueSignature) {
-        incAndGetCounter(uniqueSignature, STORER_RECORD_COUNT);
+        incAndGetCounter(uniqueSignature, RECORD_COUNT);
     }
 
     @Override
-    public void onError(String uniqueSignature, Exception e, Tuple inputTuple) {
-        long numErrors = incAndGetCounter(uniqueSignature, STORER_ERROR_COUNT);
-        long numRecords = incAndGetCounter(uniqueSignature, STORER_RECORD_COUNT);
+    public void onError(String uniqueSignature, Exception e) {
+        long numErrors = incAndGetCounter(uniqueSignature, ERROR_COUNT);
+        long numRecords = incAndGetCounter(uniqueSignature, RECORD_COUNT);
         boolean exceedThreshold = hasErrorExceededThreshold(numErrors,
                 numRecords);
         if (exceedThreshold) {
             throw new RuntimeException(
-                    "Exceeded the error rate while writing records. The latest error seen  ",
+                    "Exceeded the error rate while processing records. The latest error seen  ",
                     e);
         }
     }
@@ -71,37 +70,37 @@
         return false;
     }
 
-    public long getRecordCount(String storeSignature) {
-        Counter counter = getCounter(storeSignature, STORER_RECORD_COUNT);
+    public long getRecordCount(String signature) {
+        Counter counter = getCounter(signature, RECORD_COUNT);
         return counter.getValue();
     }
 
-    private long incAndGetCounter(String storeSignature, String counterName) {
-        Counter counter = getCounter(storeSignature, counterName);
+    private long incAndGetCounter(String signature, String counterName) {
+        Counter counter = getCounter(signature, counterName);
         counter.increment(1);
         return counter.getValue();
     }
 
     /**
-     * Get Counter for a given counterName and Store Signature
+     * Get Counter for a given counterName and signature
      * 
      * @param counterName
-     * @param storeSignature
+     * @param signature
      * @return
      */
-    private Counter getCounter(String storeSignature, String counterName) {
+    private Counter getCounter(String signature, String counterName) {
         PigStatusReporter reporter = PigStatusReporter.getInstance();
         @SuppressWarnings("deprecation")
         Counter counter = reporter.getCounter(
-                STORER_ERROR_HANDLER_COUNTER_GROUP,
-                getCounterNameForStore(counterName, storeSignature));
+                ERROR_HANDLER_COUNTER_GROUP,
+                getCounterNameForStore(counterName, signature));
         return counter;
     }
 
     private String getCounterNameForStore(String counterNamePrefix,
-            String storeSignature) {
+            String signature) {
         StringBuilder counterName = new StringBuilder()
-                .append(counterNamePrefix).append("_").append(storeSignature);
+                .append(counterNamePrefix).append("_").append(signature);
         return counterName.toString();
     }
 }
diff --git a/src/org/apache/pig/ErrorHandler.java b/src/org/apache/pig/ErrorHandler.java
index 6964c37..1a89d79 100644
--- a/src/org/apache/pig/ErrorHandler.java
+++ b/src/org/apache/pig/ErrorHandler.java
@@ -43,5 +43,5 @@
      * @param inputTuple
      *            the tuple to store.
      */
-    public void onError(String uniqueSignature, Exception e, Tuple inputTuple);
+    public void onError(String uniqueSignature, Exception e);
 }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
index 74f0f32..1b6c2b5 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
@@ -508,7 +508,7 @@
 
         Configuration conf = nwJob.getConfiguration();
 
-        ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
+        ArrayList<POLoad> inp = new ArrayList<POLoad>();
         ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
         ArrayList<String> inpSignatureLists = new ArrayList<String>();
         ArrayList<Long> inpLimits = new ArrayList<Long>();
@@ -548,8 +548,9 @@
                     LoadFunc lf = ld.getLoadFunc();
                     lf.setLocation(ld.getLFile().getFileName(), nwJob);
 
+                    ld.setParentPlan(null);
                     //Store the inp filespecs
-                    inp.add(ld.getLFile());
+                    inp.add(ld);
                 }
             }
 
@@ -704,7 +705,7 @@
             if(Utils.isLocal(pigContext, conf)) {
                 ConfigurationUtil.replaceConfigForLocalMode(conf);
             }
-            conf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(inp));
+            conf.set(PigInputFormat.PIG_LOADS, ObjectSerializer.serialize(inp));
             conf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets));
             conf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(inpSignatureLists));
             conf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits));
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
index aad4926..cf1a076 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigInputFormat.java
@@ -41,6 +41,8 @@
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LoadFuncDecorator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.Tuple;
@@ -56,7 +58,7 @@
     public static final Log log = LogFactory
             .getLog(PigInputFormat.class);
 
-    public static final String PIG_INPUTS = "pig.inputs";
+    public static final String PIG_LOADS = "pig.loads";
     public static final String PIG_INPUT_TARGETS = "pig.inpTargets";
     public static final String PIG_INPUT_SIGNATURES = "pig.inpSignatures";
     public static final String PIG_INPUT_LIMITS = "pig.inpLimits";
@@ -81,7 +83,7 @@
     protected static class RecordReaderFactory {
         protected InputFormat inputFormat;
         protected PigSplit pigSplit;
-        protected LoadFunc loadFunc;
+        protected LoadFuncDecorator decorator;
         protected TaskAttemptContext context;
         protected long limit;
 
@@ -106,7 +108,9 @@
             PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
                     .deserialize(conf.get("udf.import.list")));
             MapRedUtil.setupUDFContext(conf);
-            LoadFunc loadFunc = getLoadFunc(pigSplit.getInputIndex(), conf);
+            POLoad poLoad = getLoadFunc(pigSplit.getInputIndex(), conf);
+            LoadFunc loadFunc = poLoad.getLoadFunc();
+            LoadFuncDecorator decorator = poLoad.getLoadFuncDecorator();
             // Pass loader signature to LoadFunc and to InputFormat through
             // the conf
             passLoadSignature(loadFunc, pigSplit.getInputIndex(), conf);
@@ -122,13 +126,13 @@
 
             this.inputFormat = inputFormat;
             this.pigSplit = pigSplit;
-            this.loadFunc = loadFunc;
+            this.decorator = decorator;
             this.context = context;
             this.limit = inpLimitLists.get(pigSplit.getInputIndex());
         }
 
         public org.apache.hadoop.mapreduce.RecordReader<Text, Tuple> createRecordReader() throws IOException, InterruptedException {
-            return new PigRecordReader(inputFormat, pigSplit, loadFunc, context, limit);
+            return new PigRecordReader(inputFormat, pigSplit, decorator, context, limit);
         }
     }
 
@@ -159,20 +163,19 @@
      * @throws IOException
      */
     @SuppressWarnings("unchecked")
-    private static LoadFunc getLoadFunc(int inputIndex, Configuration conf) throws IOException {
-        ArrayList<FileSpec> inputs =
-                (ArrayList<FileSpec>) ObjectSerializer.deserialize(
-                        conf.get(PIG_INPUTS));
-        FuncSpec loadFuncSpec = inputs.get(inputIndex).getFuncSpec();
-        return (LoadFunc) PigContext.instantiateFuncFromSpec(loadFuncSpec);
+    private static POLoad getLoadFunc(int inputIndex, Configuration conf) throws IOException {
+        ArrayList<POLoad> inputs =
+            (ArrayList<POLoad>) ObjectSerializer.deserialize(
+                     conf.get(PIG_LOADS));
+        return inputs.get(inputIndex);
     }
 
     @SuppressWarnings("unchecked")
     private static String getLoadLocation(int inputIndex, Configuration conf) throws IOException {
-        ArrayList<FileSpec> inputs =
-                (ArrayList<FileSpec>) ObjectSerializer.deserialize(
-                        conf.get(PIG_INPUTS));
-        return inputs.get(inputIndex).getFileName();
+        ArrayList<POLoad> inputs =
+                (ArrayList<POLoad>) ObjectSerializer.deserialize(
+                        conf.get(PIG_LOADS));
+        return inputs.get(inputIndex).getLFile().getFileName();
     }
 
     /**
@@ -210,11 +213,11 @@
 
         Configuration conf = jobcontext.getConfiguration();
 
-        ArrayList<FileSpec> inputs;
+        ArrayList<POLoad> inputs;
         ArrayList<ArrayList<OperatorKey>> inpTargets;
         try {
-            inputs = (ArrayList<FileSpec>) ObjectSerializer
-                    .deserialize(conf.get(PIG_INPUTS));
+            inputs = (ArrayList<POLoad>) ObjectSerializer
+                    .deserialize(conf.get(PIG_LOADS));
             inpTargets = (ArrayList<ArrayList<OperatorKey>>) ObjectSerializer
                     .deserialize(conf.get(PIG_INPUT_TARGETS));
             PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(conf.get("udf.import.list")));
@@ -228,7 +231,7 @@
         ArrayList<InputSplit> splits = new ArrayList<InputSplit>();
         for (int i = 0; i < inputs.size(); i++) {
             try {
-                Path path = new Path(inputs.get(i).getFileName());
+                Path path = new Path(inputs.get(i).getLFile().getFileName());
 
                 FileSystem fs;
                 boolean isFsPath = true;
@@ -257,7 +260,7 @@
                 // FileInputFormat stores this in mapred.input.dir in the conf),
                 // then for different inputs, the loader's don't end up
                 // over-writing the same conf.
-                FuncSpec loadFuncSpec = inputs.get(i).getFuncSpec();
+                FuncSpec loadFuncSpec = inputs.get(i).getLFile().getFuncSpec();
                 LoadFunc loadFunc = (LoadFunc) PigContext.instantiateFuncFromSpec(
                         loadFuncSpec);
                 boolean combinable = !(loadFunc instanceof MergeJoinIndexer
@@ -270,7 +273,7 @@
                 // Pass loader signature to LoadFunc and to InputFormat through
                 // the conf
                 passLoadSignature(loadFunc, i, inputSpecificJob.getConfiguration());
-                loadFunc.setLocation(inputs.get(i).getFileName(),
+                loadFunc.setLocation(inputs.get(i).getLFile().getFileName(),
                         inputSpecificJob);
                 // The above setLocation call could write to the conf within
                 // the inputSpecificJob - use this updated conf
@@ -289,7 +292,8 @@
                 throw ee;
             } catch (Exception e) {
                 int errCode = 2118;
-                String msg = "Unable to create input splits for: " + inputs.get(i).getFileName();
+                String msg = "Unable to create input splits for: " +
+                        inputs.get(i).getLFile().getFileName();
                 if(e.getMessage() !=null && (!e.getMessage().isEmpty()) ){
                     throw new ExecException(e.getMessage(), errCode, PigException.BUG, e);
                 }else{
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
index 1004df6..d27bdc4 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
@@ -34,8 +34,9 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LoadFuncDecorator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
@@ -72,6 +73,9 @@
     // the loader object
     private LoadFunc loadfunc;
 
+    // the LoadFuncDecorator
+    private LoadFuncDecorator decorator;
+
     // the Hadoop counter name
     transient private String counterName = null;
 
@@ -107,10 +111,11 @@
      *
      */
     public PigRecordReader(InputFormat<?, ?> inputformat, PigSplit pigSplit,
-            LoadFunc loadFunc, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
+            LoadFuncDecorator decorator, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
         this.inputformat = inputformat;
         this.pigSplit = pigSplit;
-        this.loadfunc = loadFunc;
+        this.decorator = decorator;
+        this.loadfunc = decorator.getLoader();
         this.context = context;
         this.reporter = PigStatusReporter.getInstance();
         this.inputSpecificConf = context.getConfiguration();
@@ -121,7 +126,7 @@
         initNextRecordReader();
         doTiming = inputSpecificConf.getBoolean(PIG_UDF_PROFILE, false);
         if (doTiming) {
-            counterGroup = loadFunc.toString();
+            counterGroup = loadfunc.toString();
             timingFrequency = inputSpecificConf.getLong(PIG_UDF_PROFILE_FREQUENCY, 100L);
         }
     }
@@ -201,7 +206,7 @@
         if (timeThis) {
             startNanos = System.nanoTime();
         }
-        while ((curReader == null) || (curValue = loadfunc.getNext()) == null) {
+        while ((curReader == null) || (curValue = decorator.getNext()) == null) {
             if (!initNextRecordReader()) {
               return false;
             }
@@ -217,10 +222,10 @@
     @SuppressWarnings("unchecked")
     private static String getMultiInputsCounerName(PigSplit pigSplit,
             Configuration conf) throws IOException {
-        ArrayList<FileSpec> inputs =
-            (ArrayList<FileSpec>) ObjectSerializer.deserialize(
-                    conf.get(PigInputFormat.PIG_INPUTS));
-        String fname = inputs.get(pigSplit.getInputIndex()).getFileName();
+        ArrayList<POLoad> inputs =
+                (ArrayList<POLoad>) ObjectSerializer.deserialize(
+                        conf.get(PigInputFormat.PIG_LOADS));
+        String fname = inputs.get(pigSplit.getInputIndex()).getLFile().getFileName();
         return PigStatsUtil.getMultiInputsCounterName(fname, pigSplit.getInputIndex());
     }
 
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LoadFuncDecorator.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LoadFuncDecorator.java
new file mode 100644
index 0000000..04bdb7d
--- /dev/null
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LoadFuncDecorator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.IOException;
+
+import org.apache.pig.ErrorHandler;
+import org.apache.pig.ErrorHandling;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * This class is used to decorate the {@code LoadFunc#getNext(Tuple)}. It
+ * handles errors by calling
+ * {@code OutputErrorHandler#handle(String, long, Throwable)} if the
+ * {@link LoadFunc} implements {@link ErrorHandling}
+ *
+ */
+
+public class LoadFuncDecorator {
+	private final LoadFunc loader;
+    private final String udfSignature;
+    private boolean shouldHandleErrors;
+    private ErrorHandler errorHandler;
+
+    public LoadFuncDecorator(LoadFunc loader, String udfSignature) {
+        this.loader = loader;
+        this.udfSignature = udfSignature;
+        init();
+    }
+
+    private void init() {
+        // The decorators work is mainly on backend only so not creating error
+        // handler on frontend
+        if (UDFContext.getUDFContext().isFrontend()) {
+            return;
+        }
+        if (loader instanceof ErrorHandling && allowErrors()) {
+            errorHandler = ((ErrorHandling) loader).getErrorHandler();
+            shouldHandleErrors = true;
+        }
+    }
+
+    private boolean allowErrors() {
+        return UDFContext.getUDFContext().getJobConf()
+                .getBoolean(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, false);
+    }
+
+    /**
+     * Call {@code LoadFunc#getNext(Tuple)} and handle errors
+     *
+     * @throws IOException
+     */
+    public Tuple getNext() throws IOException {
+        Tuple t = null;
+        try {
+            t = loader.getNext();
+            if (shouldHandleErrors) {
+                errorHandler.onSuccess(udfSignature);
+            }
+        } catch (Exception e) {
+            if (shouldHandleErrors) {
+                errorHandler.onError(udfSignature, e);
+            } else {
+                throw new IOException(e);
+            }
+        }
+		return t;
+    }
+
+    public LoadFunc getLoader() {
+        return loader;
+    }
+
+    public boolean getErrorHandling() {
+        return shouldHandleErrors;
+    }
+}
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
index 9d86981..d0d96e8 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
@@ -54,6 +54,7 @@
     private static final long serialVersionUID = 1L;
     // The user defined load function or a default load function
     private transient LoadFunc loader = null;
+    private transient LoadFuncDecorator lDecorator;
     // The filespec on which the operator is based
     FileSpec lFile;
     // PigContext passed to us by the operator creator
@@ -100,6 +101,7 @@
                 PigContext.instantiateFuncFromSpec(lFile.getFuncSpec()), 
                 ConfigurationUtil.toConfiguration(pc.getProperties()), 
                 lFile.getFileName(),0, signature);
+        setLoadFuncDecorator(new LoadFuncDecorator(loader, signature));
     }
     
     /**
@@ -134,8 +136,8 @@
         }
         Result res = new Result();
         try {
-            res.result = loader.getNext();
-            if(res.result==null){
+            res.result = lDecorator.getNext();
+            if(res.result==null && !lDecorator.getErrorHandling()){
                 res.returnStatus = POStatus.STATUS_EOP;
                 tearDown();
             }
@@ -213,9 +215,20 @@
             this.loader = (LoadFunc)PigContext.instantiateFuncFromSpec(lFile.getFuncSpec());
             this.loader.setUDFContextSignature(signature);
         }
+        if (lDecorator == null) {
+            setLoadFuncDecorator(new LoadFuncDecorator(loader, signature));
+        }
         return this.loader;
     }
     
+    void setLoadFuncDecorator(LoadFuncDecorator lDecorator) {
+        this.lDecorator = lDecorator;
+    }
+
+    public LoadFuncDecorator getLoadFuncDecorator() {
+        return lDecorator;
+    }
+
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         if(illustrator != null) {
           if (!illustrator.ceilingCheck()) {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
index c7c3bb1..14790b8 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java
@@ -78,7 +78,7 @@
             }
         } catch (Exception e) {
             if (shouldHandleErrors) {
-                errorHandler.onError(udfSignature, e, tuple);
+                errorHandler.onError(udfSignature, e);
             } else {
                 throw new IOException(e);
             }
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java b/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
index 375f7f7..9d1cd0f 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkPigRecordReader.java
@@ -23,9 +23,9 @@
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.pig.LoadFunc;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LoadFuncDecorator;
 
 /**
  * Record reader for Spark mode - handles SparkPigSplit
@@ -40,8 +40,8 @@
      * @param context
      * @param limit
      */
-    public SparkPigRecordReader(InputFormat<?, ?> inputformat, PigSplit pigSplit, LoadFunc loadFunc, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
-        super(inputformat, pigSplit, loadFunc, context, limit);
+    public SparkPigRecordReader(InputFormat<?, ?> inputformat, PigSplit pigSplit, LoadFuncDecorator decorator, TaskAttemptContext context, long limit) throws IOException, InterruptedException {
+        super(inputformat, pigSplit, decorator, context, limit);
     }
 
     @Override
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java b/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
index 67fe7a6..fdde313 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
@@ -212,11 +212,6 @@
 
         loadFunc.setLocation(poLoad.getLFile().getFileName(), job);
 
-        // stolen from JobControlCompiler
-        ArrayList<FileSpec> pigInputs = new ArrayList<FileSpec>();
-        // Store the inp filespecs
-        pigInputs.add(poLoad.getLFile());
-
         ArrayList<List<OperatorKey>> inpTargets = Lists.newArrayList();
         ArrayList<String> inpSignatures = Lists.newArrayList();
         ArrayList<Long> inpLimits = Lists.newArrayList();
@@ -234,7 +229,13 @@
         inpSignatures.add(poLoad.getSignature());
         inpLimits.add(poLoad.getLimit());
 
-        jobConf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(pigInputs));
+        // stolen from JobControlCompiler
+        PhysicalPlan pp = poLoad.getParentPlan();
+        ArrayList<POLoad> pigLoads = new ArrayList<POLoad>();
+        poLoad.setParentPlan(null);
+        pigLoads.add(poLoad);
+        jobConf.set(PigInputFormat.PIG_LOADS, ObjectSerializer.serialize(pigLoads));
+        poLoad.setParentPlan(pp);
         jobConf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets));
         jobConf.set(PigInputFormat.PIG_INPUT_SIGNATURES,
                 ObjectSerializer.serialize(inpSignatures));
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java b/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
index 604665e..650a7cb 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
@@ -128,7 +128,7 @@
 
         @Override
         public RecordReader<Text, Tuple> createRecordReader() throws IOException, InterruptedException {
-            return new SparkPigRecordReader(inputFormat, pigSplit, loadFunc, context, limit);
+            return new SparkPigRecordReader(inputFormat, pigSplit, decorator, context, limit);
         }
     }
 }
\ No newline at end of file
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
index 9980c0d..f292487 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
@@ -652,7 +652,7 @@
         }
 
         if (!(tezOp.getLoaderInfo().getLoads().isEmpty())) {
-            payloadConf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
+            payloadConf.set(PigInputFormat.PIG_LOADS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getLoads()));
             payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
             payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
             inputPayLoad = new Configuration(payloadConf);
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
index 5d8ade9..6e99d39 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
@@ -201,15 +201,15 @@
     private boolean isVertexGroup = false;
 
     public static class LoaderInfo implements Serializable {
-        private List<POLoad> loads = null;
+        private ArrayList<POLoad> loads = null;
         private ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
         private ArrayList<String> inpSignatureLists = new ArrayList<String>();
         private ArrayList<Long> inpLimits = new ArrayList<Long>();
         private transient InputSplitInfo inputSplitInfo = null;
-        public List<POLoad> getLoads() {
+        public ArrayList<POLoad> getLoads() {
             return loads;
         }
-        public void setLoads(List<POLoad> loads) {
+        public void setLoads(ArrayList<POLoad> loads) {
             this.loads = loads;
         }
         public ArrayList<FileSpec> getInp() {
diff --git a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
index 05a02de..7a12df7 100644
--- a/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
+++ b/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java
@@ -81,7 +81,7 @@
      * @throws InterruptedException
      * @throws ClassNotFoundException
      */
-    private List<POLoad> processLoads(TezOperator tezOp
+    private ArrayList<POLoad> processLoads(TezOperator tezOp
             ) throws VisitorException, IOException, ClassNotFoundException, InterruptedException {
         ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
         ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
@@ -90,6 +90,7 @@
 
         List<POLoad> lds = PlanHelper.getPhysicalOperators(tezOp.plan,
                 POLoad.class);
+        ArrayList<POLoad> poLoads = new ArrayList<POLoad>();
 
         Job job = Job.getInstance(jobConf);
         Configuration conf = job.getConfiguration();
@@ -140,11 +141,13 @@
                 for (PhysicalOperator sucs : ldSucs) {
                     tezOp.plan.connect(tezLoad, sucs);
                 }
+                poLoads.add(ld);
             }
             UDFContext.getUDFContext().serialize(conf);
             conf.set("udf.import.list",
                     ObjectSerializer.serialize(PigContext.getPackageImportList()));
-            conf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(inp));
+
+            conf.set(PigInputFormat.PIG_LOADS, ObjectSerializer.serialize(poLoads));
             conf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets));
             conf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(inpSignatureLists));
             conf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits));
@@ -173,7 +176,7 @@
             tezOp.setRequestedParallelism(parallelism);
             tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job));
         }
-        return lds;
+        return poLoads;
     }
 
     @Override
diff --git a/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java b/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
index e5900c2..f39cf38 100644
--- a/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
+++ b/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
@@ -41,6 +41,8 @@
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.classification.InterfaceStability;
@@ -82,7 +84,7 @@
 
     private List<POStore> reduceStores = null;
 
-    private List<FileSpec> loads = null;
+    private List<POLoad> loads = null;
 
     private Boolean disableCounter = false;
 
@@ -215,8 +217,8 @@
                     .get(JobControlCompiler.PIG_MAP_STORES));
             this.reduceStores = (List<POStore>) ObjectSerializer.deserialize(conf
                     .get(JobControlCompiler.PIG_REDUCE_STORES));
-            this.loads = (ArrayList<FileSpec>) ObjectSerializer.deserialize(conf
-                    .get("pig.inputs"));
+            this.loads = (ArrayList<POLoad>) ObjectSerializer.deserialize(conf
+                    .get(PigInputFormat.PIG_LOADS));
             this.disableCounter = conf.getBoolean("pig.disable.counter", false);
         } catch (IOException e) {
             LOG.warn("Failed to deserialize the store list", e);
@@ -481,7 +483,7 @@
         }
 
         if (loads.size() == 1) {
-            FileSpec fsp = loads.get(0);
+            FileSpec fsp = loads.get(0).getLFile();
             if (!MRPigStatsUtil.isTempFile(fsp.getFileName())) {
                 long records = mapInputRecords;
                 InputStats is = new InputStats(fsp.getFileName(),
@@ -493,7 +495,7 @@
             }
         } else {
             for (int i=0; i<loads.size(); i++) {
-                FileSpec fsp = loads.get(i);
+                FileSpec fsp = loads.get(i).getLFile();
                 if (MRPigStatsUtil.isTempFile(fsp.getFileName())) continue;
                 addOneInputStats(fsp.getFileName(), i);
             }
diff --git a/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java b/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
index 3b01f7e..9012b71 100644
--- a/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
+++ b/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
@@ -36,6 +36,7 @@
 import org.apache.pig.PigCounters;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.impl.io.FileSpec;
@@ -72,7 +73,7 @@
     private Map<String, Map<String, Long>> counters = null;
 
     private List<POStore> stores = null;
-    private List<FileSpec> loads = null;
+    private List<POLoad> loads = null;
 
     private int numTasks = 0;
     private long numInputRecords = 0;
@@ -139,8 +140,8 @@
             // tez. For now, we keep it since it's used in PigOutputFormat.
             this.stores = (List<POStore>) ObjectSerializer.deserialize(
                     conf.get(JobControlCompiler.PIG_REDUCE_STORES));
-            this.loads = (List<FileSpec>) ObjectSerializer.deserialize(
-                    conf.get(PigInputFormat.PIG_INPUTS));
+            this.loads = (List<POLoad>) ObjectSerializer.deserialize(
+                    conf.get(PigInputFormat.PIG_LOADS));
         } catch (IOException e) {
             LOG.warn("Failed to deserialize the store list", e);
         }
@@ -241,13 +242,13 @@
         }
 
         // There is always only one load in a Tez vertex
-        for (FileSpec fs : loads) {
+        for (POLoad fs : loads) {
             long records = -1;
             long hdfsBytesRead = -1;
-            String filename = fs.getFileName();
+            String filename = fs.getLFile().getFileName();
             if (counters != null) {
                 if (mIGroup != null) {
-                    Long n = mIGroup.get(PigStatsUtil.getMultiInputsCounterName(fs.getFileName(), 0));
+                    Long n = mIGroup.get(PigStatsUtil.getMultiInputsCounterName(fs.getLFile().getFileName(), 0));
                     if (n != null) records = n;
                 }
                 if (records == -1) {
diff --git a/test/org/apache/pig/test/TestErrorHandlingLoadAndStoreFunc.java b/test/org/apache/pig/test/TestErrorHandlingLoadAndStoreFunc.java
new file mode 100644
index 0000000..42d96dd
--- /dev/null
+++ b/test/org/apache/pig/test/TestErrorHandlingLoadAndStoreFunc.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.pig.CounterBasedErrorHandler;
+import org.apache.pig.ErrorHandling;
+import org.apache.pig.ExecType;
+import org.apache.pig.ErrorHandler;
+import org.apache.pig.PigConfiguration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.mock.Storage;
+import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+
+/**
+ * This class contains Unit tests for load func and store func which has a certain error
+ * threshold set.
+ *
+ */
+public class TestErrorHandlingLoadAndStoreFunc {
+
+    private static PigServer pigServer;
+    private File tempDir;
+
+    @Before
+    public void setup() throws IOException {
+        pigServer = new PigServer(ExecType.LOCAL);
+        tempDir = Files.createTempDir();
+        tempDir.deleteOnExit();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        pigServer.shutdown();
+        tempDir.delete();
+    }
+
+    public static class TestErroroneousStoreFunc extends PigStorage implements
+            ErrorHandling {
+        protected static AtomicLong COUNTER = new AtomicLong();
+
+        @Override
+        public void putNext(Tuple f) throws IOException {
+            long count = COUNTER.incrementAndGet();
+            super.putNext(f);
+            if (count % 3 == 0) {
+                throw new RuntimeException("Throw error for test");
+            }
+        }
+
+        @Override
+        public ErrorHandler getErrorHandler() {
+            return new CounterBasedErrorHandler();
+        }
+    }
+
+    public static class TestErroroneousStoreFunc2 extends PigStorage implements
+            ErrorHandling {
+        protected static AtomicLong COUNTER = new AtomicLong();
+
+        @Override
+        public void putNext(Tuple f) throws IOException {
+            long count = COUNTER.incrementAndGet();
+            super.putNext(f);
+            if (count % 3 == 0) {
+                throw new RuntimeException("Throw error for test");
+            }
+        }
+
+        @Override
+        public ErrorHandler getErrorHandler() {
+            return new CounterBasedErrorHandler();
+        }
+    }
+
+    public static class TestErroroneousLoadFunc extends PigStorage implements
+            ErrorHandling {
+        protected static AtomicLong COUNTER = new AtomicLong();
+
+        @Override
+        public Tuple getNext() throws IOException {
+            long count = COUNTER.incrementAndGet();
+            Tuple t = super.getNext();
+            if (count % 3 == 0) {
+                throw new RuntimeException("Throw error for test");
+            }
+            return t;
+        }
+
+        @Override
+        public ErrorHandler getErrorHandler() {
+            return new CounterBasedErrorHandler();
+        }
+    }
+
+    public static class TestErroroneousLoadFunc2 extends PigStorage implements
+            ErrorHandling {
+        protected static AtomicLong COUNTER = new AtomicLong();
+
+        @Override
+        public Tuple getNext() throws IOException {
+            long count = COUNTER.incrementAndGet();
+            Tuple t = super.getNext();
+            if (count % 3 == 0) {
+                throw new RuntimeException("Throw error for test");
+            }
+            return t;
+        }
+
+        @Override
+        public ErrorHandler getErrorHandler() {
+            return new CounterBasedErrorHandler();
+        }
+    }
+
+    /**
+     * Test Pig job succeeds even with errors within threshold
+     *
+     */
+    @Test
+    public void testStorerWithErrorInLimit() throws Exception {
+        updatePigProperties(true, 3L, 0.4);
+        runTestStore(JOB_STATUS.COMPLETED);
+    }
+
+    /**
+     * Test Pig job fails if errors exceed min errors and threshold
+     */
+    @Test
+    public void testStorerWithErrorOutExceedingLimit() throws Exception {
+        updatePigProperties(true, 2L, 0.3);
+        runTestStore(JOB_STATUS.FAILED);
+    }
+
+    /**
+     * Test Pig Job fails on error if the config is set to false
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testStorerWithConfigNotEnabled() throws Exception {
+        updatePigProperties(false, 3L, 0.3);
+        runTestStore(JOB_STATUS.FAILED);
+    }
+
+    /**
+     * Test Pig job succeeds even with errors within threshold
+     *
+     */
+    @Test
+    public void testLoaderWithErrorInLimit() throws Exception {
+        updatePigProperties(true, 3L, 0.4);
+        runTestLoad(JOB_STATUS.COMPLETED);
+    }
+
+    /**
+     * Test Pig job fails if errors exceed min errors and threshold
+     */
+    @Test
+    public void testLoaderWithErrorOutExceedingLimit() throws Exception {
+        updatePigProperties(true, 1L, 0.1);
+        runTestLoad(JOB_STATUS.FAILED);
+    }
+
+    /**
+     * Test Pig Job fails on error if the config is set to false
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testLoaderWithConfigNotEnabled() throws Exception {
+        updatePigProperties(false, 3L, 0.3);
+        runTestLoad(JOB_STATUS.FAILED);
+    }
+
+    /**
+     * Test Pig Job with multiple stores.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMultiStore() throws Exception {
+        updatePigProperties(true, 3L, 0.4);
+        pigServer.getPigContext().getProperties()
+                .put(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+        Data data = Storage.resetData(pigServer);
+        final Collection<Tuple> list = Lists.newArrayList();
+        // Create input dataset
+        int rows = 10;
+        for (int i = 0; i < rows; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            t.append(i);
+            t.append("a" + i);
+            list.add(t);
+        }
+        data.set("in", "id:int,name:chararray", list);
+        pigServer.setBatchOn();
+        String loadQuery = "A = LOAD 'in' using mock.Storage();";
+        pigServer.registerQuery(loadQuery);
+        String storeAQuery = "store A into '" + tempDir.getAbsolutePath()
+                + "/output' using " + TestErroroneousStoreFunc.class.getName()
+                + "();";
+        pigServer.registerQuery(storeAQuery);
+        pigServer.registerQuery("B = FILTER A by id >0;");
+        String storeBQuery = "store B into '" + tempDir.getAbsolutePath()
+                + "/output2' using "
+                + TestErroroneousStoreFunc2.class.getName() + "();";
+        pigServer.registerQuery(storeBQuery);
+        if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
+            throw new RuntimeException("Job failed", pigServer.executeBatch()
+                    .get(0).getException());
+        }
+    }
+
+    /**
+     * Test Pig Job with multiple loaders.
+     *
+     * @throws Exception
+     */
+    @Test
+    public void testMultiLoad() throws Exception {
+        updatePigProperties(true, 3L, 0.4);
+        pigServer.getPigContext().getProperties()
+              .put(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
+        Storage.resetData(pigServer);
+
+        String inputLocation1 = tempDir.getAbsolutePath() + "/ina";
+        String inputLocation2 = tempDir.getAbsolutePath() + "/inb";
+        writeFile(inputLocation1);
+        writeFile(inputLocation2);
+
+        pigServer.setBatchOn();
+        String loadQuery1 = "A = LOAD '" + inputLocation1 + "' USING " + TestErroroneousLoadFunc.class.getName() + "();";
+        pigServer.registerQuery(loadQuery1);
+        String loadQuery2 = "B = LOAD '" + inputLocation2 + "' USING " + TestErroroneousLoadFunc2.class.getName() + "();";
+        pigServer.registerQuery(loadQuery2);
+        String storeQuery1 = "store A into '/output1' using mock.Storage();";
+        pigServer.registerQuery(storeQuery1);
+        String storeQuery2 = "store B into '/output2' using mock.Storage();";
+        pigServer.registerQuery(storeQuery2);
+
+        if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
+            throw new RuntimeException("Job did not reach the expected status"
+                    + pigServer.executeBatch().get(0).getStatus());
+        }
+    }
+
+    private void runTestStore(JOB_STATUS expectedJobStatus) throws Exception {
+        Data data = Storage.resetData(pigServer);
+        final Collection<Tuple> list = Lists.newArrayList();
+        // Create input dataset
+        int rows = 10;
+        for (int i = 0; i < rows; i++) {
+            Tuple t = TupleFactory.getInstance().newTuple();
+            t.append(i);
+            t.append("a" + i);
+            list.add(t);
+        }
+        data.set("in", "id:int,name:chararray", list);
+
+        pigServer.setBatchOn();
+        String loadQuery = "A = LOAD 'in' USING mock.Storage();";
+        pigServer.registerQuery(loadQuery);
+        String storeQuery = "store A into '" + tempDir.getAbsolutePath()
+                + "/output' using " + TestErroroneousStoreFunc.class.getName()
+                + "();";
+        pigServer.registerQuery(storeQuery);
+
+        if (pigServer.executeBatch().get(0).getStatus() != expectedJobStatus) {
+            throw new RuntimeException("Job did not reach the expected status"
+                    + pigServer.executeBatch().get(0).getStatus());
+        }
+    }
+
+    private void runTestLoad(JOB_STATUS expectedJobStatus) throws Exception {
+        Storage.resetData(pigServer);
+        String inputLocation = tempDir.getAbsolutePath() + "/in";
+        writeFile(inputLocation);
+        pigServer.setBatchOn();
+        String loadQuery = "A = LOAD '" + inputLocation + "' USING " + TestErroroneousLoadFunc.class.getName() + "();";
+        pigServer.registerQuery(loadQuery);
+        String storeQuery = "store A into '" + tempDir.getAbsolutePath()
+                + "/output' using mock.Storage();";
+        pigServer.registerQuery(storeQuery);
+
+        if (pigServer.executeBatch().get(0).getStatus() != expectedJobStatus) {
+            throw new RuntimeException("Job did not reach the expected status"
+                    + pigServer.executeBatch().get(0).getStatus());
+        }
+    }
+
+    private void updatePigProperties(boolean allowErrors, long minErrors,
+            double errorThreshold) {
+        Properties properties = pigServer.getPigContext().getProperties();
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_ENABLED,
+                Boolean.toString(allowErrors));
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS,
+                Long.toString(minErrors));
+        properties.put(PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT,
+                Double.toString(errorThreshold));
+    }
+
+    private void writeFile(String fileLocation) throws IOException {
+        try{
+            PrintWriter writer = new PrintWriter(fileLocation, "UTF-8");
+            int rows = 20;
+            for (int i = 0; i < rows; i++) {
+                writer.println("a" + i);
+            }
+            writer.close();
+        } catch (IOException e) {
+           throw new IOException(e);
+        }
+    }
+}
diff --git a/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java b/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
deleted file mode 100644
index d44de5b..0000000
--- a/test/org/apache/pig/test/TestErrorHandlingStoreFunc.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Properties;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.pig.CounterBasedErrorHandler;
-import org.apache.pig.ErrorHandling;
-import org.apache.pig.ExecType;
-import org.apache.pig.ErrorHandler;
-import org.apache.pig.PigConfiguration;
-import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.mock.Storage;
-import org.apache.pig.builtin.mock.Storage.Data;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-
-/**
- * This class contains Unit tests for store func which has a certain error
- * threshold set.
- * 
- */
-public class TestErrorHandlingStoreFunc {
-
-    private static PigServer pigServer;
-    private File tempDir;
-
-    @Before
-    public void setup() throws IOException {
-        pigServer = new PigServer(ExecType.LOCAL);
-        tempDir = Files.createTempDir();
-        tempDir.deleteOnExit();
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        pigServer.shutdown();
-        tempDir.delete();
-    }
-
-    public static class TestErroroneousStoreFunc extends PigStorage implements
-            ErrorHandling {
-        protected static AtomicLong COUNTER = new AtomicLong();
-
-        @Override
-        public void putNext(Tuple f) throws IOException {
-            long count = COUNTER.incrementAndGet();
-            super.putNext(f);
-            if (count % 3 == 0) {
-                throw new RuntimeException("Throw error for test");
-            }
-        }
-
-        @Override
-        public ErrorHandler getErrorHandler() {
-            return new CounterBasedErrorHandler();
-        }
-    }
-
-    public static class TestErroroneousStoreFunc2 extends PigStorage implements
-            ErrorHandling {
-        protected static AtomicLong COUNTER = new AtomicLong();
-
-        @Override
-        public void putNext(Tuple f) throws IOException {
-            long count = COUNTER.incrementAndGet();
-            super.putNext(f);
-            if (count % 3 == 0) {
-                throw new RuntimeException("Throw error for test");
-            }
-        }
-
-        @Override
-        public ErrorHandler getErrorHandler() {
-            return new CounterBasedErrorHandler();
-        }
-    }
-
-    /**
-     * Test Pig job succeeds even with errors within threshold
-     * 
-     */
-    @Test
-    public void testStorerWithErrorInLimit() throws Exception {
-        updatePigProperties(true, 3L, 0.4);
-        runTest(JOB_STATUS.COMPLETED);
-    }
-
-    /**
-     * Test Pig job fails if errors exceed min errors and threshold
-     */
-    @Test
-    public void testStorerWithErrorOutExceedingLimit() throws Exception {
-        updatePigProperties(true, 2L, 0.3);
-        runTest(JOB_STATUS.FAILED);
-    }
-
-    /**
-     * Test Pig Job fails on error if the config is set to false
-     * 
-     * @throws Exception
-     */
-    @Test
-    public void testStorerWithConfigNotEnabled() throws Exception {
-        updatePigProperties(false, 3L, 0.3);
-        runTest(JOB_STATUS.FAILED);
-    }
-
-    /**
-     * Test Pig Job with multiple stores.
-     * 
-     * @throws Exception
-     */
-    @Test
-    public void testMultiStore() throws Exception {
-        updatePigProperties(true, 3L, 0.4);
-        pigServer.getPigContext().getProperties()
-                .put(PigConfiguration.PIG_OPT_MULTIQUERY, "" + true);
-        Data data = Storage.resetData(pigServer);
-        final Collection<Tuple> list = Lists.newArrayList();
-        // Create input dataset
-        int rows = 10;
-        for (int i = 0; i < rows; i++) {
-            Tuple t = TupleFactory.getInstance().newTuple();
-            t.append(i);
-            t.append("a" + i);
-            list.add(t);
-        }
-        data.set("in", "id:int,name:chararray", list);
-        pigServer.setBatchOn();
-        pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
-        String storeAQuery = "store A into '" + tempDir.getAbsolutePath()
-                + "/output' using " + TestErroroneousStoreFunc.class.getName()
-                + "();";
-        pigServer.registerQuery(storeAQuery);
-        pigServer.registerQuery("B = FILTER A by id >0;");
-        String storeBQuery = "store B into '" + tempDir.getAbsolutePath()
-                + "/output2' using "
-                + TestErroroneousStoreFunc2.class.getName() + "();";
-        pigServer.registerQuery(storeBQuery);
-        if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) {
-            throw new RuntimeException("Job failed", pigServer.executeBatch()
-                    .get(0).getException());
-        }
-    }
-
-    private void runTest(JOB_STATUS expectedJobStatus) throws Exception {
-        Data data = Storage.resetData(pigServer);
-        final Collection<Tuple> list = Lists.newArrayList();
-        // Create input dataset
-        int rows = 10;
-        for (int i = 0; i < rows; i++) {
-            Tuple t = TupleFactory.getInstance().newTuple();
-            t.append(i);
-            t.append("a" + i);
-            list.add(t);
-        }
-        data.set("in", "id:int,name:chararray", list);
-
-        pigServer.setBatchOn();
-        pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
-        String storeQuery = "store A into '" + tempDir.getAbsolutePath()
-                + "/output' using " + TestErroroneousStoreFunc.class.getName()
-                + "();";
-        pigServer.registerQuery(storeQuery);
-
-        if (pigServer.executeBatch().get(0).getStatus() != expectedJobStatus) {
-            throw new RuntimeException("Job did not reach the expected status"
-                    + pigServer.executeBatch().get(0).getStatus());
-        }
-    }
-
-    private void updatePigProperties(boolean allowErrors, long minErrors,
-            double errorThreshold) {
-        Properties properties = pigServer.getPigContext().getProperties();
-        properties.put(PigConfiguration.PIG_ERROR_HANDLING_ENABLED,
-                Boolean.toString(allowErrors));
-        properties.put(PigConfiguration.PIG_ERROR_HANDLING_MIN_ERROR_RECORDS,
-                Long.toString(minErrors));
-        properties.put(PigConfiguration.PIG_ERROR_HANDLING_THRESHOLD_PERCENT,
-                Double.toString(errorThreshold));
-    }
-}