HCATALOG-525 Remove dead classes from HCatalog Source Tree

git-svn-id: https://svn.apache.org/repos/asf/incubator/hcatalog/trunk@1398595 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index f7ae00d..66432d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -44,6 +44,8 @@
   HCAT-427 Document storage-based authorization (lefty via gates)
 
   IMPROVEMENTS
+  HCAT-525 Remove dead classes from HCatalog Source Tree (amalakar via traviscrawford)
+
   HCAT-521 Ignore .reviewboardrc in git (nitay via traviscrawford)
 
   HCAT-519 Migrate pig-adapter/webhcat to maven dependencies and continue build cleanup (traviscrawford)
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java
index 031afda..66caae6 100644
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java
+++ b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java
@@ -46,7 +46,7 @@
  * Base class for HCatLoader and HCatEximLoader
  */
 
-public abstract class HCatBaseLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
+abstract class HCatBaseLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
 
     protected static final String PRUNE_PROJECTION_INFO = "prune.projection.info";
 
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java
index b3ba293..a3f9007 100644
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java
+++ b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java
@@ -58,7 +58,7 @@
  *
  */
 
-public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata {
+abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata {
 
     private static final List<Type> SUPPORTED_INTEGER_CONVERSIONS =
         Lists.newArrayList(Type.TINYINT, Type.SMALLINT, Type.INT);
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java
index 817866f..69d56cf 100644
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java
+++ b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java
@@ -58,7 +58,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PigHCatUtil {
+class PigHCatUtil {
 
     private static final Logger LOG = LoggerFactory.getLogger(PigHCatUtil.class);
 
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java.broken b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java.broken
deleted file mode 100644
index 1505dd0..0000000
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java.broken
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hcatalog.pig.drivers;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hcatalog.common.ErrorType;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatException;
-import org.apache.hcatalog.data.DefaultHCatRecord;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
-import org.apache.hcatalog.pig.HCatLoader;
-import org.apache.hcatalog.pig.PigHCatUtil;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.Tuple;
-
-
-/**
- * This is a base class which wraps a Load func in HCatInputStorageDriver.
- * If you already have a LoadFunc, then this class along with LoadFuncBasedInputFormat
- * is doing all the heavy lifting. For a new HCat Input Storage Driver just extend it
- * and override the initialize(). {@link PigStorageInputDriver} illustrates
- * that well.
- */
-public class LoadFuncBasedInputDriver extends HCatInputStorageDriver{
-
-  private LoadFuncBasedInputFormat inputFormat;
-  private HCatSchema dataSchema;
-  private Map<String,String> partVals;
-  private List<String> desiredColNames;
-  protected LoadFunc lf;
-
-  @Override
-  public HCatRecord convertToHCatRecord(WritableComparable baseKey, Writable baseValue)
-      throws IOException {
-
-    List<Object> data = ((Tuple)baseValue).getAll();
-    List<Object> hcatRecord = new ArrayList<Object>(desiredColNames.size());
-
-    /* Iterate through columns asked for in output schema, look them up in
-     * original data schema. If found, put it. Else look up in partition columns
-     * if found, put it. Else, its a new column, so need to put null. Map lookup
-     * on partition map will return null, if column is not found.
-     */
-    for(String colName : desiredColNames){
-      Integer idx = dataSchema.getPosition(colName);
-      hcatRecord.add( idx != null ? data.get(idx) : partVals.get(colName));
-    }
-    return new DefaultHCatRecord(hcatRecord);
-  }
-
-  @Override
-  public InputFormat<? extends WritableComparable, ? extends Writable> getInputFormat(
-      Properties hcatProperties) {
-
-    return inputFormat;
-  }
-
-  @Override
-  public void setOriginalSchema(JobContext jobContext, HCatSchema hcatSchema) throws IOException {
-
-    dataSchema = hcatSchema;
-  }
-
-  @Override
-  public void setOutputSchema(JobContext jobContext, HCatSchema hcatSchema) throws IOException {
-
-    desiredColNames = hcatSchema.getFieldNames();
-  }
-
-  @Override
-  public void setPartitionValues(JobContext jobContext, Map<String, String> partitionValues)
-      throws IOException {
-
-    partVals = partitionValues;
-  }
-
-  @Override
-  public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
-    
-    String loaderString = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_LOADER);
-    if (loaderString==null) {
-        throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Don't know how to instantiate loader, " + HCatConstants.HCAT_PIG_LOADER + " property is not defined for table ");
-    }
-    String loaderArgs = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_LOADER_ARGS);
-    
-    String[] args;
-    if (loaderArgs!=null) {
-        String delimit = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_ARGS_DELIMIT);
-        if (delimit==null) {
-            delimit = HCatConstants.HCAT_PIG_ARGS_DELIMIT_DEFAULT;
-        }
-        args = loaderArgs.split(delimit);
-    } else {
-        args = new String[0];
-    }
-    
-    try {
-        Class loaderClass = Class.forName(loaderString);
-    
-        Constructor[] constructors = loaderClass.getConstructors();
-        for (Constructor constructor : constructors) {
-            if (constructor.getParameterTypes().length==args.length) {
-                lf = (LoadFunc)constructor.newInstance(args);
-                break;
-            }
-        }
-    } catch (Exception e) {
-        throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Cannot instantiate " + loaderString, e);
-    }
-    
-    if (lf==null) {
-        throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Cannot instantiate " + loaderString + " with construct args " + loaderArgs);
-    }
- 
-    // Need to set the right signature in setLocation. The original signature is used by HCatLoader
-    // and it does use this signature to access UDFContext, so we need to invent a new signature for
-    // the wrapped loader.
-    // As for PigStorage/JsonStorage, set signature right before setLocation seems to be good enough,
-    // we may need to set signature more aggressively if we support more loaders
-    String innerSignature = context.getConfiguration().get(HCatLoader.INNER_SIGNATURE);
-    lf.setUDFContextSignature(innerSignature);
-    lf.setLocation(location, new Job(context.getConfiguration()));
-    inputFormat = new LoadFuncBasedInputFormat(lf, PigHCatUtil.getResourceSchema(dataSchema), location, context.getConfiguration());
-  }
-
-  private String location;
-
-  @Override
-  public void setInputPath(JobContext jobContext, String location) throws IOException {
-
-    this.location = location;
-    super.setInputPath(jobContext, location);
-  }
-}
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java
deleted file mode 100644
index 7028865..0000000
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hcatalog.pig.drivers;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.pig.LoadCaster;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-
-/**
- * based on {@link org.apache.pig.builtin.PigStorage}
- */
-public class LoadFuncBasedInputFormat extends InputFormat<BytesWritable, Tuple> {
-
-    private final LoadFunc loadFunc;
-    private static ResourceFieldSchema[] fields;
-
-    public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema, String location, Configuration conf) throws IOException {
-
-        this.loadFunc = loadFunc;
-        fields = dataSchema.getFields();
-
-        // Simulate the frontend call sequence for LoadFunc, in case LoadFunc need to store something into UDFContext (as JsonLoader does)
-        if (loadFunc instanceof LoadMetadata) {
-            ((LoadMetadata) loadFunc).getSchema(location, new Job(conf));
-        }
-    }
-
-    @Override
-    public RecordReader<BytesWritable, Tuple> createRecordReader(
-        InputSplit split, TaskAttemptContext taskContext) throws IOException,
-        InterruptedException {
-        RecordReader<BytesWritable, Tuple> reader = loadFunc.getInputFormat().createRecordReader(split, taskContext);
-        return new LoadFuncBasedRecordReader(reader, loadFunc);
-    }
-
-    @Override
-    public List<InputSplit> getSplits(JobContext jobContext) throws IOException,
-        InterruptedException {
-        try {
-            InputFormat<BytesWritable, Tuple> inpFormat = loadFunc.getInputFormat();
-            return inpFormat.getSplits(jobContext);
-
-        } catch (InterruptedException e) {
-            throw new IOException(e);
-        }
-    }
-
-    static class LoadFuncBasedRecordReader extends RecordReader<BytesWritable, Tuple> {
-
-        private Tuple tupleFromDisk;
-        private final RecordReader<BytesWritable, Tuple> reader;
-        private final LoadFunc loadFunc;
-        private final LoadCaster caster;
-
-        /**
-         * @param reader
-         * @param loadFunc
-         * @throws IOException
-         */
-        public LoadFuncBasedRecordReader(RecordReader<BytesWritable, Tuple> reader, LoadFunc loadFunc) throws IOException {
-            this.reader = reader;
-            this.loadFunc = loadFunc;
-            this.caster = loadFunc.getLoadCaster();
-        }
-
-        @Override
-        public void close() throws IOException {
-            reader.close();
-        }
-
-        @Override
-        public BytesWritable getCurrentKey() throws IOException,
-            InterruptedException {
-            return null;
-        }
-
-        @Override
-        public Tuple getCurrentValue() throws IOException, InterruptedException {
-
-            for (int i = 0; i < tupleFromDisk.size(); i++) {
-
-                Object data = tupleFromDisk.get(i);
-
-                // We will do conversion for bytes only for now
-                if (data instanceof DataByteArray) {
-
-                    DataByteArray dba = (DataByteArray) data;
-
-                    if (dba == null) {
-                        // PigStorage will insert nulls for empty fields.
-                        tupleFromDisk.set(i, null);
-                        continue;
-                    }
-
-                    switch (fields[i].getType()) {
-
-                    case DataType.CHARARRAY:
-                        tupleFromDisk.set(i, caster.bytesToCharArray(dba.get()));
-                        break;
-
-                    case DataType.INTEGER:
-                        tupleFromDisk.set(i, caster.bytesToInteger(dba.get()));
-                        break;
-
-                    case DataType.FLOAT:
-                        tupleFromDisk.set(i, caster.bytesToFloat(dba.get()));
-                        break;
-
-                    case DataType.LONG:
-                        tupleFromDisk.set(i, caster.bytesToLong(dba.get()));
-                        break;
-
-                    case DataType.DOUBLE:
-                        tupleFromDisk.set(i, caster.bytesToDouble(dba.get()));
-                        break;
-
-                    case DataType.MAP:
-                        tupleFromDisk.set(i, caster.bytesToMap(dba.get()));
-                        break;
-
-                    case DataType.BAG:
-                        tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i]));
-                        break;
-
-                    case DataType.TUPLE:
-                        tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i]));
-                        break;
-
-                    default:
-                        throw new IOException("Unknown Pig type in data: " + fields[i].getType());
-                    }
-                }
-            }
-
-            return tupleFromDisk;
-        }
-
-
-        @Override
-        public void initialize(InputSplit split, TaskAttemptContext ctx)
-            throws IOException, InterruptedException {
-
-            reader.initialize(split, ctx);
-            loadFunc.prepareToRead(reader, null);
-        }
-
-        @Override
-        public boolean nextKeyValue() throws IOException, InterruptedException {
-
-            // even if we don't need any data from disk, we will need to call
-            // getNext() on pigStorage() so we know how many rows to emit in our
-            // final output - getNext() will eventually return null when it has
-            // read all disk data and we will know to stop emitting final output
-            tupleFromDisk = loadFunc.getNext();
-            return tupleFromDisk != null;
-        }
-
-        @Override
-        public float getProgress() throws IOException, InterruptedException {
-            return 0;
-        }
-
-    }
-}
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java.broken b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java.broken
deleted file mode 100644
index 2cbde2f..0000000
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java.broken
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hcatalog.pig.drivers;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.pig.builtin.PigStorage;
-
-public class PigStorageInputDriver extends LoadFuncBasedInputDriver {
-
-  public static final String delim = "hcat.pigstorage.delim";
-
-  @Override
-  public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
-
-    lf = storageDriverArgs.containsKey(delim) ?
-        new PigStorage(storageDriverArgs.getProperty(delim)) : new PigStorage();
-    super.initialize(context, storageDriverArgs);
-  }
-}
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java.broken b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java.broken
deleted file mode 100644
index 27e28a4..0000000
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java.broken
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hcatalog.pig.drivers;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hcatalog.common.ErrorType;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatException;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.FileOutputStorageDriver;
-import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.hcatalog.pig.HCatLoader;
-import org.apache.hcatalog.pig.HCatStorer;
-import org.apache.hcatalog.pig.PigHCatUtil;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.data.DefaultTupleFactory;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-public class StoreFuncBasedOutputDriver extends FileOutputStorageDriver {
-
-    protected StoreFuncInterface sf;
-    private TupleFactory factory = TupleFactory.getInstance();
-    private HCatSchema schema;
-    private String location;
-    
-    @Override
-    public void initialize(JobContext jobContext, Properties hcatProperties) throws IOException {
-        String storerString = hcatProperties.getProperty(HCatConstants.HCAT_PIG_STORER);
-        if (storerString==null) {
-            throw new HCatException(ErrorType.ERROR_INIT_STORER, "Don't know how to instantiate storer, " + HCatConstants.HCAT_PIG_STORER + " property is not defined for table ");
-        }
-        String storerArgs = hcatProperties.getProperty(HCatConstants.HCAT_PIG_STORER_ARGS);
-        
-        String[] args;
-        if (storerArgs!=null) {
-            String delimit = hcatProperties.getProperty(HCatConstants.HCAT_PIG_ARGS_DELIMIT);
-            if (delimit==null) {
-                delimit = HCatConstants.HCAT_PIG_ARGS_DELIMIT_DEFAULT;
-            }
-            args = storerArgs.split(delimit);
-        } else {
-            args = new String[0];
-        }
-        
-        try {
-            Class storerClass = Class.forName(storerString);
-        
-            Constructor[] constructors = storerClass.getConstructors();
-            for (Constructor constructor : constructors) {
-                if (constructor.getParameterTypes().length==args.length) {
-                    sf = (StoreFuncInterface)constructor.newInstance(args);
-                    break;
-                }
-            }
-        } catch (Exception e) {
-            throw new HCatException(ErrorType.ERROR_INIT_STORER, "Cannot instantiate " + storerString, e);
-        }
-        
-        if (sf==null) {
-            throw new HCatException(ErrorType.ERROR_INIT_STORER, "Cannot instantiate " + storerString + " with construct args " + storerArgs);
-        }
-     
-        super.initialize(jobContext, hcatProperties);
-        
-        Job job = new Job(jobContext.getConfiguration());
-        String innerSignature = jobContext.getConfiguration().get(HCatStorer.INNER_SIGNATURE);
-        
-        // Set signature before invoking StoreFunc methods, see comment in
-        // see comments in LoadFuncBasedInputDriver.initialize
-        sf.setStoreFuncUDFContextSignature(innerSignature);
-        sf.checkSchema(PigHCatUtil.getResourceSchema(schema));
-
-        sf.setStoreLocation(location, job);
-        ConfigurationUtil.mergeConf(jobContext.getConfiguration(), 
-                job.getConfiguration());
-    }
-    
-    @Override
-    public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat()
-            throws IOException {
-        StoreFuncBasedOutputFormat outputFormat = new StoreFuncBasedOutputFormat(sf);
-        return outputFormat;
-    }
-
-    @Override
-    public void setOutputPath(JobContext jobContext, String location)
-            throws IOException {
-        this.location = location;
-    }
-
-    @Override
-    public void setSchema(JobContext jobContext, HCatSchema schema)
-            throws IOException {
-        this.schema = schema;
-    }
-
-    @Override
-    public void setPartitionValues(JobContext jobContext,
-            Map<String, String> partitionValues) throws IOException {
-        // Doing nothing, partition keys are not stored along with the data, so ignore it
-    }
-
-    @Override
-    public WritableComparable<?> generateKey(HCatRecord value)
-            throws IOException {
-        return null;
-    }
-
-    @Override
-    public Writable convertValue(HCatRecord value) throws IOException {
-        Tuple t = factory.newTupleNoCopy(value.getAll());
-        return t;
-    }
-
-}
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java
deleted file mode 100644
index ba603eb..0000000
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hcatalog.pig.drivers;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.hcatalog.pig.PigHCatUtil;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.StoreMetadata;
-import org.apache.pig.data.Tuple;
-
-public class StoreFuncBasedOutputFormat extends
-    OutputFormat<BytesWritable, Tuple> {
-
-    private final StoreFuncInterface storeFunc;
-
-    public StoreFuncBasedOutputFormat(StoreFuncInterface storeFunc) {
-
-        this.storeFunc = storeFunc;
-    }
-
-    @Override
-    public void checkOutputSpecs(JobContext jobContext) throws IOException,
-        InterruptedException {
-        OutputFormat<BytesWritable, Tuple> outputFormat = storeFunc.getOutputFormat();
-        outputFormat.checkOutputSpecs(jobContext);
-    }
-
-    @Override
-    public OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
-        throws IOException, InterruptedException {
-        String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
-        OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo);
-        ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
-        String location = outputJobInfo.getLocation();
-        OutputFormat<BytesWritable, Tuple> outputFormat = storeFunc.getOutputFormat();
-        return new StoreFuncBasedOutputCommitter(storeFunc, outputFormat.getOutputCommitter(ctx), location, rs);
-    }
-
-    @Override
-    public RecordWriter<BytesWritable, Tuple> getRecordWriter(
-        TaskAttemptContext ctx) throws IOException, InterruptedException {
-        RecordWriter<BytesWritable, Tuple> writer = storeFunc.getOutputFormat().getRecordWriter(ctx);
-        String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
-        OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo);
-        ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
-        String location = outputJobInfo.getLocation();
-        return new StoreFuncBasedRecordWriter(writer, storeFunc, location, rs);
-    }
-
-    static class StoreFuncBasedRecordWriter extends RecordWriter<BytesWritable, Tuple> {
-        private final RecordWriter<BytesWritable, Tuple> writer;
-        private final StoreFuncInterface storeFunc;
-        private final ResourceSchema schema;
-        private final String location;
-
-        public StoreFuncBasedRecordWriter(RecordWriter<BytesWritable, Tuple> writer, StoreFuncInterface sf, String location, ResourceSchema rs) throws IOException {
-            this.writer = writer;
-            this.storeFunc = sf;
-            this.schema = rs;
-            this.location = location;
-            storeFunc.prepareToWrite(writer);
-        }
-
-        @Override
-        public void close(TaskAttemptContext ctx) throws IOException,
-            InterruptedException {
-            writer.close(ctx);
-        }
-
-        @Override
-        public void write(BytesWritable key, Tuple value) throws IOException,
-            InterruptedException {
-            storeFunc.putNext(value);
-        }
-    }
-
-    static class StoreFuncBasedOutputCommitter extends OutputCommitter {
-        StoreFuncInterface sf;
-        OutputCommitter wrappedOutputCommitter;
-        String location;
-        ResourceSchema rs;
-
-        public StoreFuncBasedOutputCommitter(StoreFuncInterface sf, OutputCommitter outputCommitter, String location, ResourceSchema rs) {
-            this.sf = sf;
-            this.wrappedOutputCommitter = outputCommitter;
-            this.location = location;
-            this.rs = rs;
-        }
-
-        @Override
-        public void abortTask(TaskAttemptContext context) throws IOException {
-            wrappedOutputCommitter.abortTask(context);
-        }
-
-        @Override
-        public void commitTask(TaskAttemptContext context) throws IOException {
-            wrappedOutputCommitter.commitTask(context);
-        }
-
-        @Override
-        public boolean needsTaskCommit(TaskAttemptContext context)
-            throws IOException {
-            return wrappedOutputCommitter.needsTaskCommit(context);
-        }
-
-        @Override
-        public void setupJob(JobContext context) throws IOException {
-            wrappedOutputCommitter.setupJob(context);
-        }
-
-        @Override
-        public void setupTask(TaskAttemptContext context) throws IOException {
-            wrappedOutputCommitter.setupTask(context);
-        }
-
-        public void commitJob(JobContext context) throws IOException {
-            wrappedOutputCommitter.commitJob(context);
-            if (sf instanceof StoreMetadata) {
-                if (rs != null) {
-                    ((StoreMetadata) sf).storeSchema(
-                        rs, location, new Job(context.getConfiguration()));
-                }
-            }
-        }
-
-        @Override
-        public void cleanupJob(JobContext context) throws IOException {
-            wrappedOutputCommitter.cleanupJob(context);
-            if (sf instanceof StoreMetadata) {
-                if (rs != null) {
-                    ((StoreMetadata) sf).storeSchema(
-                        rs, location, new Job(context.getConfiguration()));
-                }
-            }
-        }
-
-        public void abortJob(JobContext context, JobStatus.State state) throws IOException {
-            wrappedOutputCommitter.abortJob(context, state);
-        }
-    }
-}