| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hcatalog.mapreduce; |
| |
| import java.io.IOException; |
| import java.util.Map; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hive.serde2.Deserializer; |
| import org.apache.hadoop.hive.serde2.SerDeException; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.io.WritableComparable; |
| import org.apache.hadoop.mapred.JobConf; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hcatalog.common.HCatConstants; |
| import org.apache.hcatalog.common.HCatUtil; |
| import org.apache.hcatalog.data.DefaultHCatRecord; |
| import org.apache.hcatalog.data.HCatRecord; |
| import org.apache.hcatalog.data.LazyHCatRecord; |
| import org.apache.hcatalog.data.schema.HCatSchema; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** The HCat wrapper for the underlying RecordReader, |
| * this ensures that the initialize on |
| * the underlying record reader is done with the underlying split, |
| * not with HCatSplit. |
| */ |
| class HCatRecordReader extends RecordReader<WritableComparable, HCatRecord> { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(HCatRecordReader.class); |
| |
| private InputErrorTracker errorTracker; |
| |
| WritableComparable currentKey; |
| Writable currentValue; |
| HCatRecord currentHCatRecord; |
| |
| /** The underlying record reader to delegate to. */ |
| private org.apache.hadoop.mapred.RecordReader<WritableComparable, Writable> baseRecordReader; |
| |
| /** The storage handler used */ |
| private final HCatStorageHandler storageHandler; |
| |
| private Deserializer deserializer; |
| |
| private Map<String,String> valuesNotInDataCols; |
| |
| private HCatSchema outputSchema = null; |
| private HCatSchema dataSchema = null; |
| |
| /** |
| * Instantiates a new hcat record reader. |
| */ |
| public HCatRecordReader(HCatStorageHandler storageHandler, |
| Map<String,String> valuesNotInDataCols) { |
| this.storageHandler = storageHandler; |
| this.valuesNotInDataCols = valuesNotInDataCols; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.hadoop.mapreduce.RecordReader#initialize( |
| * org.apache.hadoop.mapreduce.InputSplit, |
| * org.apache.hadoop.mapreduce.TaskAttemptContext) |
| */ |
| @Override |
| public void initialize(org.apache.hadoop.mapreduce.InputSplit split, |
| TaskAttemptContext taskContext) throws IOException, InterruptedException { |
| |
| HCatSplit hcatSplit = InternalUtil.castToHCatSplit(split); |
| |
| baseRecordReader = createBaseRecordReader(hcatSplit, storageHandler, taskContext); |
| createDeserializer(hcatSplit, storageHandler, taskContext); |
| |
| // Pull the output schema out of the TaskAttemptContext |
| outputSchema = (HCatSchema) HCatUtil.deserialize( |
| taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA)); |
| |
| if (outputSchema == null) { |
| outputSchema = hcatSplit.getTableSchema(); |
| } |
| |
| // Pull the table schema out of the Split info |
| // TODO This should be passed in the TaskAttemptContext instead |
| dataSchema = hcatSplit.getDataSchema(); |
| |
| errorTracker = new InputErrorTracker(taskContext.getConfiguration()); |
| } |
| |
| private org.apache.hadoop.mapred.RecordReader createBaseRecordReader(HCatSplit hcatSplit, |
| HCatStorageHandler storageHandler, TaskAttemptContext taskContext) throws IOException { |
| |
| JobConf jobConf = HCatUtil.getJobConfFromContext(taskContext); |
| HCatUtil.copyJobPropertiesToJobConf(hcatSplit.getPartitionInfo().getJobProperties(), jobConf); |
| org.apache.hadoop.mapred.InputFormat inputFormat = |
| HCatInputFormat.getMapRedInputFormat(jobConf, storageHandler.getInputFormatClass()); |
| return inputFormat.getRecordReader(hcatSplit.getBaseSplit(), jobConf, |
| InternalUtil.createReporter(taskContext)); |
| } |
| |
| private void createDeserializer(HCatSplit hcatSplit, HCatStorageHandler storageHandler, |
| TaskAttemptContext taskContext) throws IOException { |
| |
| deserializer = ReflectionUtils.newInstance(storageHandler.getSerDeClass(), |
| taskContext.getConfiguration()); |
| |
| try { |
| InternalUtil.initializeDeserializer(deserializer, storageHandler.getConf(), |
| hcatSplit.getPartitionInfo().getTableInfo(), |
| hcatSplit.getPartitionInfo().getPartitionSchema()); |
| } catch (SerDeException e) { |
| throw new IOException("Failed initializing deserializer " |
| + storageHandler.getSerDeClass().getName(), e); |
| } |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentKey() |
| */ |
| @Override |
| public WritableComparable getCurrentKey() |
| throws IOException, InterruptedException { |
| return currentKey; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.hadoop.mapreduce.RecordReader#getCurrentValue() |
| */ |
| @Override |
| public HCatRecord getCurrentValue() throws IOException, InterruptedException { |
| return currentHCatRecord; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.hadoop.mapreduce.RecordReader#getProgress() |
| */ |
| @Override |
| public float getProgress() { |
| try { |
| return baseRecordReader.getProgress(); |
| } catch (IOException e) { |
| LOG.warn("Exception in HCatRecord reader",e); |
| } |
| return 0.0f; // errored |
| } |
| |
| /** |
| * Check if the wrapped RecordReader has another record, and if so convert it into an |
| * HCatRecord. We both check for records and convert here so a configurable percent of |
| * bad records can be tolerated. |
| * |
| * @return if there is a next record |
| * @throws IOException on error |
| * @throws InterruptedException on error |
| */ |
| @Override |
| public boolean nextKeyValue() throws IOException, InterruptedException { |
| if (currentKey == null) { |
| currentKey = baseRecordReader.createKey(); |
| currentValue = baseRecordReader.createValue(); |
| } |
| |
| while (baseRecordReader.next(currentKey, currentValue)) { |
| HCatRecord r = null; |
| Throwable t = null; |
| |
| errorTracker.incRecords(); |
| |
| try { |
| Object o = deserializer.deserialize(currentValue); |
| r = new LazyHCatRecord(o, deserializer.getObjectInspector()); |
| } catch (Throwable throwable) { |
| t = throwable; |
| } |
| |
| if (r == null) { |
| errorTracker.incErrors(t); |
| continue; |
| } |
| |
| DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size()); |
| int i = 0; |
| for (String fieldName : outputSchema.getFieldNames()) { |
| if (dataSchema.getPosition(fieldName) != null) { |
| dr.set(i, r.get(fieldName, dataSchema)); |
| } else { |
| dr.set(i, valuesNotInDataCols.get(fieldName)); |
| } |
| i++; |
| } |
| |
| currentHCatRecord = dr; |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /* (non-Javadoc) |
| * @see org.apache.hadoop.mapreduce.RecordReader#close() |
| */ |
| @Override |
| public void close() throws IOException { |
| baseRecordReader.close(); |
| } |
| |
| /** |
| * Tracks number of of errors in input and throws a Runtime exception |
| * if the rate of errors crosses a limit. |
| * <br/> |
| * The intention is to skip over very rare file corruption or incorrect |
| * input, but catch programmer errors (incorrect format, or incorrect |
| * deserializers etc). |
| * |
| * This class was largely copied from Elephant-Bird (thanks @rangadi!) |
| * https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapreduce/input/LzoRecordReader.java |
| */ |
| static class InputErrorTracker { |
| long numRecords; |
| long numErrors; |
| |
| double errorThreshold; // max fraction of errors allowed |
| long minErrors; // throw error only after this many errors |
| |
| InputErrorTracker(Configuration conf) { |
| errorThreshold = conf.getFloat(HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_KEY, |
| HCatConstants.HCAT_INPUT_BAD_RECORD_THRESHOLD_DEFAULT); |
| minErrors = conf.getLong(HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_KEY, |
| HCatConstants.HCAT_INPUT_BAD_RECORD_MIN_DEFAULT); |
| numRecords = 0; |
| numErrors = 0; |
| } |
| |
| void incRecords() { |
| numRecords++; |
| } |
| |
| void incErrors(Throwable cause) { |
| numErrors++; |
| if (numErrors > numRecords) { |
| // incorrect use of this class |
| throw new RuntimeException("Forgot to invoke incRecords()?"); |
| } |
| |
| if (cause == null) { |
| cause = new Exception("Unknown error"); |
| } |
| |
| if (errorThreshold <= 0) { // no errors are tolerated |
| throw new RuntimeException("error while reading input records", cause); |
| } |
| |
| LOG.warn("Error while reading an input record (" |
| + numErrors + " out of " + numRecords + " so far ): ", cause); |
| |
| double errRate = numErrors / (double) numRecords; |
| |
| // will always excuse the first error. We can decide if single |
| // error crosses threshold inside close() if we want to. |
| if (numErrors >= minErrors && errRate > errorThreshold) { |
| LOG.error(numErrors + " out of " + numRecords |
| + " crosses configured threshold (" + errorThreshold + ")"); |
| throw new RuntimeException("error rate while reading input records crossed threshold", cause); |
| } |
| } |
| } |
| } |