blob: 88ff7ccf5fe151fa2118b4b9ec1b7e58706b5299 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.reflect.Constructor;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.schema.HCatSchema;
/** The HCatSplit wrapper around the InputSplit returned by the underlying InputFormat */
class HCatSplit extends InputSplit implements Writable {
/** The partition info for the split. */
private PartInfo partitionInfo;
/** The split returned by the underlying InputFormat split. */
private InputSplit baseSplit;
/** The schema for the HCatTable */
private HCatSchema tableSchema;
/**
* Instantiates a new hcat split.
*/
public HCatSplit() {
}
/**
* Instantiates a new hcat split.
*
* @param partitionInfo the partition info
* @param baseSplit the base split
* @param tableSchema the table level schema
*/
public HCatSplit(PartInfo partitionInfo, InputSplit baseSplit, HCatSchema tableSchema) {
this.partitionInfo = partitionInfo;
this.baseSplit = baseSplit;
this.tableSchema = tableSchema;
}
/**
* Gets the partition info.
* @return the partitionInfo
*/
public PartInfo getPartitionInfo() {
return partitionInfo;
}
/**
* Gets the underlying InputSplit.
* @return the baseSplit
*/
public InputSplit getBaseSplit() {
return baseSplit;
}
/**
* Sets the table schema.
* @param tableSchema the new table schema
*/
public void setTableSchema(HCatSchema tableSchema) {
this.tableSchema = tableSchema;
}
/**
* Gets the table schema.
* @return the table schema
*/
public HCatSchema getTableSchema() {
return tableSchema;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.InputSplit#getLength()
*/
@Override
public long getLength() throws IOException, InterruptedException {
return baseSplit.getLength();
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.InputSplit#getLocations()
*/
@Override
public String[] getLocations() throws IOException, InterruptedException {
return baseSplit.getLocations();
}
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
@SuppressWarnings("unchecked")
@Override
public void readFields(DataInput input) throws IOException {
String partitionInfoString = WritableUtils.readString(input);
partitionInfo = (PartInfo) HCatUtil.deserialize(partitionInfoString);
String baseSplitClassName = WritableUtils.readString(input);
InputSplit split;
try{
Class<? extends InputSplit> splitClass =
(Class<? extends InputSplit>) Class.forName(baseSplitClassName);
//Class.forName().newInstance() does not work if the underlying
//InputSplit has package visibility
Constructor<? extends InputSplit> constructor =
splitClass.getDeclaredConstructor(new Class[]{});
constructor.setAccessible(true);
split = constructor.newInstance();
// read baseSplit from input
((Writable)split).readFields(input);
this.baseSplit = split;
}catch(Exception e){
throw new IOException ("Exception from " +baseSplitClassName + " : " + e.getMessage());
}
String tableSchemaString = WritableUtils.readString(input);
tableSchema = (HCatSchema) HCatUtil.deserialize(tableSchemaString);
}
/* (non-Javadoc)
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
@Override
public void write(DataOutput output) throws IOException {
String partitionInfoString = HCatUtil.serialize(partitionInfo);
// write partitionInfo into output
WritableUtils.writeString(output, partitionInfoString);
WritableUtils.writeString(output, baseSplit.getClass().getName());
Writable baseSplitWritable = (Writable)baseSplit;
//write baseSplit into output
baseSplitWritable.write(output);
//write the table schema into output
String tableSchemaString = HCatUtil.serialize(tableSchema);
WritableUtils.writeString(output, tableSchemaString);
}
}