HCATALOG-525 Remove dead classes from HCatalog Source Tree
git-svn-id: https://svn.apache.org/repos/asf/incubator/hcatalog/trunk@1398595 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/CHANGES.txt b/CHANGES.txt
index f7ae00d..66432d3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -44,6 +44,8 @@
HCAT-427 Document storage-based authorization (lefty via gates)
IMPROVEMENTS
+ HCAT-525 Remove dead classes from HCatalog Source Tree (amalakar via traviscrawford)
+
HCAT-521 Ignore .reviewboardrc in git (nitay via traviscrawford)
HCAT-519 Migrate pig-adapter/webhcat to maven dependencies and continue build cleanup (traviscrawford)
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java
index 031afda..66caae6 100644
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java
+++ b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseLoader.java
@@ -46,7 +46,7 @@
* Base class for HCatLoader and HCatEximLoader
*/
-public abstract class HCatBaseLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
+abstract class HCatBaseLoader extends LoadFunc implements LoadMetadata, LoadPushDown {
protected static final String PRUNE_PROJECTION_INFO = "prune.projection.info";
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java
index b3ba293..a3f9007 100644
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java
+++ b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatBaseStorer.java
@@ -58,7 +58,7 @@
*
*/
-public abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata {
+abstract class HCatBaseStorer extends StoreFunc implements StoreMetadata {
private static final List<Type> SUPPORTED_INTEGER_CONVERSIONS =
Lists.newArrayList(Type.TINYINT, Type.SMALLINT, Type.INT);
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java
index 817866f..69d56cf 100644
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java
+++ b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/PigHCatUtil.java
@@ -58,7 +58,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PigHCatUtil {
+class PigHCatUtil {
private static final Logger LOG = LoggerFactory.getLogger(PigHCatUtil.class);
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java.broken b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java.broken
deleted file mode 100644
index 1505dd0..0000000
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputDriver.java.broken
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hcatalog.pig.drivers;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hcatalog.common.ErrorType;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatException;
-import org.apache.hcatalog.data.DefaultHCatRecord;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
-import org.apache.hcatalog.pig.HCatLoader;
-import org.apache.hcatalog.pig.PigHCatUtil;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.data.Tuple;
-
-
-/**
- * This is a base class which wraps a Load func in HCatInputStorageDriver.
- * If you already have a LoadFunc, then this class along with LoadFuncBasedInputFormat
- * is doing all the heavy lifting. For a new HCat Input Storage Driver just extend it
- * and override the initialize(). {@link PigStorageInputDriver} illustrates
- * that well.
- */
-public class LoadFuncBasedInputDriver extends HCatInputStorageDriver{
-
- private LoadFuncBasedInputFormat inputFormat;
- private HCatSchema dataSchema;
- private Map<String,String> partVals;
- private List<String> desiredColNames;
- protected LoadFunc lf;
-
- @Override
- public HCatRecord convertToHCatRecord(WritableComparable baseKey, Writable baseValue)
- throws IOException {
-
- List<Object> data = ((Tuple)baseValue).getAll();
- List<Object> hcatRecord = new ArrayList<Object>(desiredColNames.size());
-
- /* Iterate through columns asked for in output schema, look them up in
- * original data schema. If found, put it. Else look up in partition columns
- * if found, put it. Else, its a new column, so need to put null. Map lookup
- * on partition map will return null, if column is not found.
- */
- for(String colName : desiredColNames){
- Integer idx = dataSchema.getPosition(colName);
- hcatRecord.add( idx != null ? data.get(idx) : partVals.get(colName));
- }
- return new DefaultHCatRecord(hcatRecord);
- }
-
- @Override
- public InputFormat<? extends WritableComparable, ? extends Writable> getInputFormat(
- Properties hcatProperties) {
-
- return inputFormat;
- }
-
- @Override
- public void setOriginalSchema(JobContext jobContext, HCatSchema hcatSchema) throws IOException {
-
- dataSchema = hcatSchema;
- }
-
- @Override
- public void setOutputSchema(JobContext jobContext, HCatSchema hcatSchema) throws IOException {
-
- desiredColNames = hcatSchema.getFieldNames();
- }
-
- @Override
- public void setPartitionValues(JobContext jobContext, Map<String, String> partitionValues)
- throws IOException {
-
- partVals = partitionValues;
- }
-
- @Override
- public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
-
- String loaderString = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_LOADER);
- if (loaderString==null) {
- throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Don't know how to instantiate loader, " + HCatConstants.HCAT_PIG_LOADER + " property is not defined for table ");
- }
- String loaderArgs = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_LOADER_ARGS);
-
- String[] args;
- if (loaderArgs!=null) {
- String delimit = storageDriverArgs.getProperty(HCatConstants.HCAT_PIG_ARGS_DELIMIT);
- if (delimit==null) {
- delimit = HCatConstants.HCAT_PIG_ARGS_DELIMIT_DEFAULT;
- }
- args = loaderArgs.split(delimit);
- } else {
- args = new String[0];
- }
-
- try {
- Class loaderClass = Class.forName(loaderString);
-
- Constructor[] constructors = loaderClass.getConstructors();
- for (Constructor constructor : constructors) {
- if (constructor.getParameterTypes().length==args.length) {
- lf = (LoadFunc)constructor.newInstance(args);
- break;
- }
- }
- } catch (Exception e) {
- throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Cannot instantiate " + loaderString, e);
- }
-
- if (lf==null) {
- throw new HCatException(ErrorType.ERROR_INIT_LOADER, "Cannot instantiate " + loaderString + " with construct args " + loaderArgs);
- }
-
- // Need to set the right signature in setLocation. The original signature is used by HCatLoader
- // and it does use this signature to access UDFContext, so we need to invent a new signature for
- // the wrapped loader.
- // As for PigStorage/JsonStorage, set signature right before setLocation seems to be good enough,
- // we may need to set signature more aggressively if we support more loaders
- String innerSignature = context.getConfiguration().get(HCatLoader.INNER_SIGNATURE);
- lf.setUDFContextSignature(innerSignature);
- lf.setLocation(location, new Job(context.getConfiguration()));
- inputFormat = new LoadFuncBasedInputFormat(lf, PigHCatUtil.getResourceSchema(dataSchema), location, context.getConfiguration());
- }
-
- private String location;
-
- @Override
- public void setInputPath(JobContext jobContext, String location) throws IOException {
-
- this.location = location;
- super.setInputPath(jobContext, location);
- }
-}
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java
deleted file mode 100644
index 7028865..0000000
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/LoadFuncBasedInputFormat.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hcatalog.pig.drivers;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.pig.LoadCaster;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceSchema.ResourceFieldSchema;
-import org.apache.pig.data.DataByteArray;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
-
-/**
- * based on {@link org.apache.pig.builtin.PigStorage}
- */
-public class LoadFuncBasedInputFormat extends InputFormat<BytesWritable, Tuple> {
-
- private final LoadFunc loadFunc;
- private static ResourceFieldSchema[] fields;
-
- public LoadFuncBasedInputFormat(LoadFunc loadFunc, ResourceSchema dataSchema, String location, Configuration conf) throws IOException {
-
- this.loadFunc = loadFunc;
- fields = dataSchema.getFields();
-
- // Simulate the frontend call sequence for LoadFunc, in case LoadFunc need to store something into UDFContext (as JsonLoader does)
- if (loadFunc instanceof LoadMetadata) {
- ((LoadMetadata) loadFunc).getSchema(location, new Job(conf));
- }
- }
-
- @Override
- public RecordReader<BytesWritable, Tuple> createRecordReader(
- InputSplit split, TaskAttemptContext taskContext) throws IOException,
- InterruptedException {
- RecordReader<BytesWritable, Tuple> reader = loadFunc.getInputFormat().createRecordReader(split, taskContext);
- return new LoadFuncBasedRecordReader(reader, loadFunc);
- }
-
- @Override
- public List<InputSplit> getSplits(JobContext jobContext) throws IOException,
- InterruptedException {
- try {
- InputFormat<BytesWritable, Tuple> inpFormat = loadFunc.getInputFormat();
- return inpFormat.getSplits(jobContext);
-
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- static class LoadFuncBasedRecordReader extends RecordReader<BytesWritable, Tuple> {
-
- private Tuple tupleFromDisk;
- private final RecordReader<BytesWritable, Tuple> reader;
- private final LoadFunc loadFunc;
- private final LoadCaster caster;
-
- /**
- * @param reader
- * @param loadFunc
- * @throws IOException
- */
- public LoadFuncBasedRecordReader(RecordReader<BytesWritable, Tuple> reader, LoadFunc loadFunc) throws IOException {
- this.reader = reader;
- this.loadFunc = loadFunc;
- this.caster = loadFunc.getLoadCaster();
- }
-
- @Override
- public void close() throws IOException {
- reader.close();
- }
-
- @Override
- public BytesWritable getCurrentKey() throws IOException,
- InterruptedException {
- return null;
- }
-
- @Override
- public Tuple getCurrentValue() throws IOException, InterruptedException {
-
- for (int i = 0; i < tupleFromDisk.size(); i++) {
-
- Object data = tupleFromDisk.get(i);
-
- // We will do conversion for bytes only for now
- if (data instanceof DataByteArray) {
-
- DataByteArray dba = (DataByteArray) data;
-
- if (dba == null) {
- // PigStorage will insert nulls for empty fields.
- tupleFromDisk.set(i, null);
- continue;
- }
-
- switch (fields[i].getType()) {
-
- case DataType.CHARARRAY:
- tupleFromDisk.set(i, caster.bytesToCharArray(dba.get()));
- break;
-
- case DataType.INTEGER:
- tupleFromDisk.set(i, caster.bytesToInteger(dba.get()));
- break;
-
- case DataType.FLOAT:
- tupleFromDisk.set(i, caster.bytesToFloat(dba.get()));
- break;
-
- case DataType.LONG:
- tupleFromDisk.set(i, caster.bytesToLong(dba.get()));
- break;
-
- case DataType.DOUBLE:
- tupleFromDisk.set(i, caster.bytesToDouble(dba.get()));
- break;
-
- case DataType.MAP:
- tupleFromDisk.set(i, caster.bytesToMap(dba.get()));
- break;
-
- case DataType.BAG:
- tupleFromDisk.set(i, caster.bytesToBag(dba.get(), fields[i]));
- break;
-
- case DataType.TUPLE:
- tupleFromDisk.set(i, caster.bytesToTuple(dba.get(), fields[i]));
- break;
-
- default:
- throw new IOException("Unknown Pig type in data: " + fields[i].getType());
- }
- }
- }
-
- return tupleFromDisk;
- }
-
-
- @Override
- public void initialize(InputSplit split, TaskAttemptContext ctx)
- throws IOException, InterruptedException {
-
- reader.initialize(split, ctx);
- loadFunc.prepareToRead(reader, null);
- }
-
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
-
- // even if we don't need any data from disk, we will need to call
- // getNext() on pigStorage() so we know how many rows to emit in our
- // final output - getNext() will eventually return null when it has
- // read all disk data and we will know to stop emitting final output
- tupleFromDisk = loadFunc.getNext();
- return tupleFromDisk != null;
- }
-
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return 0;
- }
-
- }
-}
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java.broken b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java.broken
deleted file mode 100644
index 2cbde2f..0000000
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/PigStorageInputDriver.java.broken
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hcatalog.pig.drivers;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.pig.builtin.PigStorage;
-
-public class PigStorageInputDriver extends LoadFuncBasedInputDriver {
-
- public static final String delim = "hcat.pigstorage.delim";
-
- @Override
- public void initialize(JobContext context, Properties storageDriverArgs) throws IOException {
-
- lf = storageDriverArgs.containsKey(delim) ?
- new PigStorage(storageDriverArgs.getProperty(delim)) : new PigStorage();
- super.initialize(context, storageDriverArgs);
- }
-}
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java.broken b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java.broken
deleted file mode 100644
index 27e28a4..0000000
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputDriver.java.broken
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hcatalog.pig.drivers;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hcatalog.common.ErrorType;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatException;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.FileOutputStorageDriver;
-import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.hcatalog.pig.HCatLoader;
-import org.apache.hcatalog.pig.HCatStorer;
-import org.apache.hcatalog.pig.PigHCatUtil;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.StoreFunc;
-import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.data.DefaultTupleFactory;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
-public class StoreFuncBasedOutputDriver extends FileOutputStorageDriver {
-
- protected StoreFuncInterface sf;
- private TupleFactory factory = TupleFactory.getInstance();
- private HCatSchema schema;
- private String location;
-
- @Override
- public void initialize(JobContext jobContext, Properties hcatProperties) throws IOException {
- String storerString = hcatProperties.getProperty(HCatConstants.HCAT_PIG_STORER);
- if (storerString==null) {
- throw new HCatException(ErrorType.ERROR_INIT_STORER, "Don't know how to instantiate storer, " + HCatConstants.HCAT_PIG_STORER + " property is not defined for table ");
- }
- String storerArgs = hcatProperties.getProperty(HCatConstants.HCAT_PIG_STORER_ARGS);
-
- String[] args;
- if (storerArgs!=null) {
- String delimit = hcatProperties.getProperty(HCatConstants.HCAT_PIG_ARGS_DELIMIT);
- if (delimit==null) {
- delimit = HCatConstants.HCAT_PIG_ARGS_DELIMIT_DEFAULT;
- }
- args = storerArgs.split(delimit);
- } else {
- args = new String[0];
- }
-
- try {
- Class storerClass = Class.forName(storerString);
-
- Constructor[] constructors = storerClass.getConstructors();
- for (Constructor constructor : constructors) {
- if (constructor.getParameterTypes().length==args.length) {
- sf = (StoreFuncInterface)constructor.newInstance(args);
- break;
- }
- }
- } catch (Exception e) {
- throw new HCatException(ErrorType.ERROR_INIT_STORER, "Cannot instantiate " + storerString, e);
- }
-
- if (sf==null) {
- throw new HCatException(ErrorType.ERROR_INIT_STORER, "Cannot instantiate " + storerString + " with construct args " + storerArgs);
- }
-
- super.initialize(jobContext, hcatProperties);
-
- Job job = new Job(jobContext.getConfiguration());
- String innerSignature = jobContext.getConfiguration().get(HCatStorer.INNER_SIGNATURE);
-
- // Set signature before invoking StoreFunc methods, see comment in
- // see comments in LoadFuncBasedInputDriver.initialize
- sf.setStoreFuncUDFContextSignature(innerSignature);
- sf.checkSchema(PigHCatUtil.getResourceSchema(schema));
-
- sf.setStoreLocation(location, job);
- ConfigurationUtil.mergeConf(jobContext.getConfiguration(),
- job.getConfiguration());
- }
-
- @Override
- public OutputFormat<? extends WritableComparable<?>, ? extends Writable> getOutputFormat()
- throws IOException {
- StoreFuncBasedOutputFormat outputFormat = new StoreFuncBasedOutputFormat(sf);
- return outputFormat;
- }
-
- @Override
- public void setOutputPath(JobContext jobContext, String location)
- throws IOException {
- this.location = location;
- }
-
- @Override
- public void setSchema(JobContext jobContext, HCatSchema schema)
- throws IOException {
- this.schema = schema;
- }
-
- @Override
- public void setPartitionValues(JobContext jobContext,
- Map<String, String> partitionValues) throws IOException {
- // Doing nothing, partition keys are not stored along with the data, so ignore it
- }
-
- @Override
- public WritableComparable<?> generateKey(HCatRecord value)
- throws IOException {
- return null;
- }
-
- @Override
- public Writable convertValue(HCatRecord value) throws IOException {
- Tuple t = factory.newTupleNoCopy(value.getAll());
- return t;
- }
-
-}
diff --git a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java b/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java
deleted file mode 100644
index ba603eb..0000000
--- a/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/drivers/StoreFuncBasedOutputFormat.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hcatalog.pig.drivers;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
-import org.apache.hcatalog.pig.PigHCatUtil;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.StoreFuncInterface;
-import org.apache.pig.StoreMetadata;
-import org.apache.pig.data.Tuple;
-
-public class StoreFuncBasedOutputFormat extends
- OutputFormat<BytesWritable, Tuple> {
-
- private final StoreFuncInterface storeFunc;
-
- public StoreFuncBasedOutputFormat(StoreFuncInterface storeFunc) {
-
- this.storeFunc = storeFunc;
- }
-
- @Override
- public void checkOutputSpecs(JobContext jobContext) throws IOException,
- InterruptedException {
- OutputFormat<BytesWritable, Tuple> outputFormat = storeFunc.getOutputFormat();
- outputFormat.checkOutputSpecs(jobContext);
- }
-
- @Override
- public OutputCommitter getOutputCommitter(TaskAttemptContext ctx)
- throws IOException, InterruptedException {
- String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
- OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo);
- ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
- String location = outputJobInfo.getLocation();
- OutputFormat<BytesWritable, Tuple> outputFormat = storeFunc.getOutputFormat();
- return new StoreFuncBasedOutputCommitter(storeFunc, outputFormat.getOutputCommitter(ctx), location, rs);
- }
-
- @Override
- public RecordWriter<BytesWritable, Tuple> getRecordWriter(
- TaskAttemptContext ctx) throws IOException, InterruptedException {
- RecordWriter<BytesWritable, Tuple> writer = storeFunc.getOutputFormat().getRecordWriter(ctx);
- String serializedJobInfo = ctx.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
- OutputJobInfo outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(serializedJobInfo);
- ResourceSchema rs = PigHCatUtil.getResourceSchema(outputJobInfo.getOutputSchema());
- String location = outputJobInfo.getLocation();
- return new StoreFuncBasedRecordWriter(writer, storeFunc, location, rs);
- }
-
- static class StoreFuncBasedRecordWriter extends RecordWriter<BytesWritable, Tuple> {
- private final RecordWriter<BytesWritable, Tuple> writer;
- private final StoreFuncInterface storeFunc;
- private final ResourceSchema schema;
- private final String location;
-
- public StoreFuncBasedRecordWriter(RecordWriter<BytesWritable, Tuple> writer, StoreFuncInterface sf, String location, ResourceSchema rs) throws IOException {
- this.writer = writer;
- this.storeFunc = sf;
- this.schema = rs;
- this.location = location;
- storeFunc.prepareToWrite(writer);
- }
-
- @Override
- public void close(TaskAttemptContext ctx) throws IOException,
- InterruptedException {
- writer.close(ctx);
- }
-
- @Override
- public void write(BytesWritable key, Tuple value) throws IOException,
- InterruptedException {
- storeFunc.putNext(value);
- }
- }
-
- static class StoreFuncBasedOutputCommitter extends OutputCommitter {
- StoreFuncInterface sf;
- OutputCommitter wrappedOutputCommitter;
- String location;
- ResourceSchema rs;
-
- public StoreFuncBasedOutputCommitter(StoreFuncInterface sf, OutputCommitter outputCommitter, String location, ResourceSchema rs) {
- this.sf = sf;
- this.wrappedOutputCommitter = outputCommitter;
- this.location = location;
- this.rs = rs;
- }
-
- @Override
- public void abortTask(TaskAttemptContext context) throws IOException {
- wrappedOutputCommitter.abortTask(context);
- }
-
- @Override
- public void commitTask(TaskAttemptContext context) throws IOException {
- wrappedOutputCommitter.commitTask(context);
- }
-
- @Override
- public boolean needsTaskCommit(TaskAttemptContext context)
- throws IOException {
- return wrappedOutputCommitter.needsTaskCommit(context);
- }
-
- @Override
- public void setupJob(JobContext context) throws IOException {
- wrappedOutputCommitter.setupJob(context);
- }
-
- @Override
- public void setupTask(TaskAttemptContext context) throws IOException {
- wrappedOutputCommitter.setupTask(context);
- }
-
- public void commitJob(JobContext context) throws IOException {
- wrappedOutputCommitter.commitJob(context);
- if (sf instanceof StoreMetadata) {
- if (rs != null) {
- ((StoreMetadata) sf).storeSchema(
- rs, location, new Job(context.getConfiguration()));
- }
- }
- }
-
- @Override
- public void cleanupJob(JobContext context) throws IOException {
- wrappedOutputCommitter.cleanupJob(context);
- if (sf instanceof StoreMetadata) {
- if (rs != null) {
- ((StoreMetadata) sf).storeSchema(
- rs, location, new Job(context.getConfiguration()));
- }
- }
- }
-
- public void abortJob(JobContext context, JobStatus.State state) throws IOException {
- wrappedOutputCommitter.abortJob(context, state);
- }
- }
-}