blob: dadd3bd8a581969690e2a36d7c99e03c002d4fd4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.hive.readers;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.store.hive.HivePartition;
import org.apache.drill.exec.store.hive.HiveTableWithColumnCache;
import org.apache.drill.exec.store.hive.HiveUtilities;
import org.apache.drill.exec.store.hive.readers.inspectors.SkipFooterRecordsInspector;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.security.UserGroupInformation;
/**
* Reading of Hive table stored in text format may require to skip few header/footer records.
* This class extends default reader to add the functionality.
*/
public class HiveTextRecordReader extends HiveDefaultRecordReader {
private SkipFooterRecordsInspector skipFooterValueHolder;
/**
* Constructor matching super.
*
* @param table metadata about Hive table being read
* @param partition holder of metadata about table partitioning
* @param inputSplits input splits for reading data from distributed storage
* @param projectedColumns target columns for scan
* @param context fragmentContext of fragment
* @param hiveConf Hive configuration
* @param proxyUgi user/group info to be used for initialization
*/
public HiveTextRecordReader(HiveTableWithColumnCache table, HivePartition partition,
Collection<InputSplit> inputSplits, List<SchemaPath> projectedColumns,
FragmentContext context, HiveConf hiveConf, UserGroupInformation proxyUgi) {
super(table, partition, inputSplits, projectedColumns, context, hiveConf, proxyUgi);
}
@Override
protected void internalInit(Properties hiveTableProperties) {
int skipHeaderCount = HiveUtilities.retrieveIntProperty(hiveTableProperties, serdeConstants.HEADER_COUNT, -1);
// skip first N records to apply skip header policy
try {
for (int i = 0; i < skipHeaderCount; i++) {
if (!hasNextValue(valueHolder)) {
// no more records to skip, we drained the table
empty = true;
break;
}
}
} catch (IOException | ExecutionSetupException e) {
throw new DrillRuntimeException(e.getMessage(), e);
}
// if table was drained while skipping first N records, there is no need to check for skip footer logic
if (!empty) {
int skipFooterCount = HiveUtilities.retrieveIntProperty(hiveTableProperties, serdeConstants.FOOTER_COUNT, -1);
// if we need to skip N last records, use records skipFooterValueHolder which will buffer records while reading
if (skipFooterCount > 0) {
skipFooterValueHolder = new SkipFooterRecordsInspector(mapredReader, skipFooterCount);
}
}
}
/**
* Reads batch of records skipping footer rows when necessary.
*
* @return count of read records
*/
@Override
public int next() {
if (skipFooterValueHolder == null) {
return super.next();
} else {
try {
// starting new batch, reset processed records count
skipFooterValueHolder.reset();
while (!skipFooterValueHolder.isBatchFull() && hasNextValue(skipFooterValueHolder.getValueHolder())) {
Object value = skipFooterValueHolder.getNextValue();
if (value != null) {
Object deSerializedValue = partitionToTableSchemaConverter.convert(partitionDeserializer.deserialize((Writable) value));
outputWriter.setPosition(skipFooterValueHolder.getProcessedRecordCount());
readHiveRecordAndInsertIntoRecordBatch(deSerializedValue);
skipFooterValueHolder.incrementProcessedRecordCount();
}
}
outputWriter.setValueCount(skipFooterValueHolder.getProcessedRecordCount());
return skipFooterValueHolder.getProcessedRecordCount();
} catch (ExecutionSetupException | IOException | SerDeException e) {
throw new DrillRuntimeException(e.getMessage(), e);
}
}
}
}