blob: c92d6658143c9e7585980a6e905b0881425c9f38 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Constructor;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** The HCatSplit wrapper around the InputSplit returned by the underlying InputFormat */
public class HCatSplit extends InputSplit
implements Writable, org.apache.hadoop.mapred.InputSplit {
private static final Logger LOG = LoggerFactory.getLogger(HCatSplit.class);
/** The partition info for the split. */
private PartInfo partitionInfo;
/** The split returned by the underlying InputFormat split. */
private org.apache.hadoop.mapred.InputSplit baseMapRedSplit;
/** The schema for the HCatTable */
private HCatSchema tableSchema;
private HiveConf hiveConf;
/**
* Instantiates a new hcat split.
*/
public HCatSplit() {
}
/**
* Instantiates a new hcat split.
*
* @param partitionInfo the partition info
* @param baseMapRedSplit the base mapred split
* @param tableSchema the table level schema
*/
public HCatSplit(PartInfo partitionInfo,
org.apache.hadoop.mapred.InputSplit baseMapRedSplit,
HCatSchema tableSchema) {
this.partitionInfo = partitionInfo;
// dataSchema can be obtained from partitionInfo.getPartitionSchema()
this.baseMapRedSplit = baseMapRedSplit;
this.tableSchema = tableSchema;
}
/**
* Gets the partition info.
* @return the partitionInfo
*/
public PartInfo getPartitionInfo() {
return partitionInfo;
}
/**
* Gets the underlying InputSplit.
* @return the baseMapRedSplit
*/
public org.apache.hadoop.mapred.InputSplit getBaseSplit() {
return baseMapRedSplit;
}
/**
* Gets the data schema.
* @return the table schema
*/
public HCatSchema getDataSchema() {
return this.partitionInfo.getPartitionSchema();
}
/**
* Gets the table schema.
* @return the table schema
*/
public HCatSchema getTableSchema() {
return this.tableSchema;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.InputSplit#getLength()
*/
@Override
public long getLength() {
try {
return baseMapRedSplit.getLength();
} catch (IOException e) {
LOG.warn("Exception in HCatSplit", e);
}
return 0; // we errored
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
*/
@Override
public String[] getLocations() {
try {
return baseMapRedSplit.getLocations();
} catch (IOException e) {
LOG.warn("Exception in HCatSplit", e);
}
return new String[0]; // we errored
}
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
@SuppressWarnings("unchecked")
@Override
public void readFields(DataInput input) throws IOException {
String partitionInfoString = WritableUtils.readString(input);
partitionInfo = (PartInfo) HCatUtil.deserialize(partitionInfoString);
String baseSplitClassName = WritableUtils.readString(input);
org.apache.hadoop.mapred.InputSplit split;
try {
Class<? extends org.apache.hadoop.mapred.InputSplit> splitClass =
(Class<? extends org.apache.hadoop.mapred.InputSplit>) Class.forName(baseSplitClassName);
//Class.forName().newInstance() does not work if the underlying
//InputSplit has package visibility
Constructor<? extends org.apache.hadoop.mapred.InputSplit>
constructor =
splitClass.getDeclaredConstructor(new Class[]{});
constructor.setAccessible(true);
split = constructor.newInstance();
// read baseSplit from input
((Writable) split).readFields(input);
this.baseMapRedSplit = split;
} catch (Exception e) {
throw new IOException("Exception from " + baseSplitClassName, e);
}
String tableSchemaString = WritableUtils.readString(input);
tableSchema = (HCatSchema) HCatUtil.deserialize(tableSchemaString);
}
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
@Override
public void write(DataOutput output) throws IOException {
String partitionInfoString = HCatUtil.serialize(partitionInfo);
// write partitionInfo into output
WritableUtils.writeString(output, partitionInfoString);
WritableUtils.writeString(output, baseMapRedSplit.getClass().getName());
Writable baseSplitWritable = (Writable) baseMapRedSplit;
//write baseSplit into output
baseSplitWritable.write(output);
//write the table schema into output
String tableSchemaString = HCatUtil.serialize(tableSchema);
WritableUtils.writeString(output, tableSchemaString);
}
}