blob: e91d12c3160b69f211a6cb7c0c51c3f82afa313f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.mapreduce;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.LazyHCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** The HCat wrapper for the underlying RecordReader,
* this ensures that the initialize on
* the underlying record reader is done with the underlying split,
* not with HCatSplit.
*/
class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord> {
private static final Logger LOG = LoggerFactory.getLogger(HCatRecordReader.class);
private InputErrorTracker errorTracker;
WritableComparable currentKey;
Writable currentValue;
HCatRecord currentHCatRecord;
/** The underlying record reader to delegate to. */
private org.apache.hadoop.mapred.RecordReader<WritableComparable, Writable> baseRecordReader;
/** The storage handler used */
private final HCatStorageHandler storageHandler;
private Deserializer deserializer;
private Map<String, String> valuesNotInDataCols;
private HCatSchema outputSchema = null;
private HCatSchema dataSchema = null;
/**
* Instantiates a new hcat record reader.
*/
public HCatRecordReader(HCatStorageHandler storageHandler,
Map<String, String> valuesNotInDataCols) {
this.storageHandler = storageHandler;
this.valuesNotInDataCols = valuesNotInDataCols;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.RecordReader#initialize(
* org.apache.hadoop.mapreduce.InputSplit,
* org.apache.hadoop.mapreduce.TaskAttemptContext)
*/
@Override
public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
TaskAttemptContext taskContext) throws IOException, InterruptedException {
HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split);
baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext);
createDeserializer(hcatSplit, storageHandler, taskContext);
// Pull the output schema out of the TaskAttemptContext
outputSchema = (HCatSchema) HCatUtil.deserialize(
taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA));
if (outputSchema == null) {
outputSchema = hcatSplit.getTableSchema();
}
// Pull the table schema out of the Split info
// TODO This should be passed in the TaskAttemptContext instead
dataSchema = hcatSplit.getDataSchema();
errorTracker = new InputErrorTracker(taskContext.getConfiguration());
}
private org.apache.hadoop.mapred.RecordReader createBaseRecordReader(HCatSplit hcatSplit,
HCatStorageHandler storageHandler, TaskAttemptContext taskContext) throws IOException {
JobConf jobConf = HCatUtil.getJobConfFromContext(taskContext);
HCatUtil.copyJobPropertiesToJobConf(hcatSplit.getPartitionInfo().getJobProperties(), jobConf);
org.apache.hadoop.mapred.InputFormat inputFormat =
HCatInputFormat.getMapRedInputFormat(jobConf, storageHandler.getInputFormatClass());
return inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf,
InternalUtil.createReporter(taskContext));
}
private void createDeserializer(HCatSplit hcatSplit, HCatStorageHandler storageHandler,
TaskAttemptContext taskContext) throws IOException {
deserializer = ReflectionUtils.newInstance(storageHandler.getSerDeClass(),
taskContext.getConfiguration());
try {
InternalUtil.initializeDeserializer(deserializer, storageHandler.getConf(),
hcatSplit.getPartitionInfo().getTableInfo(),
hcatSplit.getPartitionInfo().getPartitionSchema());
} catch (SerDeException e) {
throw new IOException("Failed initializing deserializer "
+ storageHandler.getSerDeClass().getName(), e);
}
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey()
*/
@Override
public WritableComparable getCurrentKey()
throws IOException, InterruptedException {
return currentKey;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue()
*/
@Override
public HCatRecord getCurrentValue() throws IOException, InterruptedException {
return currentHCatRecord;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.RecordReader#getProgress()
*/
@Override
public float getProgress() {
try {
return baseRecordReader.getProgress();
} catch (IOException e) {
LOG.warn("Exception in HCatRecord reader", e);
}
return 0.0f; // errored
}
/**
* Check if the wrapped RecordReader has another record, and if so convert it into an
* HCatRecord. We both check for records and convert here so a configurable percent of
* bad records can be tolerated.
*
* @return if there is a next record
* @throws IOException on error
* @throws InterruptedException on error
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (currentKey == null) {
currentKey = baseRecordReader.createKey();
currentValue = baseRecordReader.createValue();
}
while (baseRecordReader.next(currentKey, currentValue)) {
HCatRecord r = null;
Throwable t = null;
errorTracker.incRecords();
try {
Object o = deserializer.deserialize(currentValue);
r = new LazyHCatRecord(o, deserializer.getObjectInspector());
} catch (Throwable throwable) {
t = throwable;
}
if (r == null) {
errorTracker.incErrors(t);
continue;
}
DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size());
int i = 0;
for (String fieldName : outputSchema.getFieldNames()) {
if (dataSchema.getPosition(fieldName) != null) {
dr.set(i, r.get(fieldName, dataSchema));
} else {
dr.set(i, valuesNotInDataCols.get(fieldName));
}
i++;
}
currentHCatRecord = dr;
return true;
}
return false;
}
/* (non-Javadoc)
* @see org.apache.hadoop.mapreduce.RecordReader#close()
*/
@Override
public void close() throws IOException {
baseRecordReader.close();
}
/**
* Tracks number of of errors in input and throws a Runtime exception
* if the rate of errors crosses a limit.
* <br/>
* The intention is to skip over very rare file corruption or incorrect
* input, but catch programmer errors (incorrect format, or incorrect
* deserializers etc).
*
* This class was largely copied from Elephant-Bird (thanks @rangadi!)
* https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java
*/
static class InputErrorTracker {
long numRecords;
long numErrors;
double errorThreshold; // max fraction of errors allowed
long minErrors; // throw error only after this many errors
InputErrorTracker(Configuration conf) {
errorThreshold = conf.getFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY,
HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT);
minErrors = conf.getLong(HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_KEY,
HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_DEFAULT);
numRecords = 0;
numErrors = 0;
}
void incRecords() {
numRecords++;
}
void incErrors(Throwable cause) {
numErrors++;
if (numErrors > numRecords) {
// incorrect use of this class
throw new RuntimeException("Forgot to invoke incRecords()?");
}
if (cause == null) {
cause = new Exception("Unknown error");
}
if (errorThreshold <= 0) { // no errors are tolerated
throw new RuntimeException("error while reading input records", cause);
}
LOG.warn("Error while reading an input record ("
+ numErrors + " out of " + numRecords + " so far ): ", cause);
double errRate = numErrors / (double) numRecords;
// will always excuse the first error. We can decide if single
// error crosses threshold inside close() if we want to.
if (numErrors >= minErrors && errRate > errorThreshold) {
LOG.error(numErrors + " out of " + numRecords
+ " crosses configured threshold (" + errorThreshold + ")");
throw new RuntimeException("error rate while reading input records crossed threshold", cause);
}
}
}
}