blob: 81be52996f27d2226f78e98a470f7d91c74031fe [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.hive;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.base.Functions;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.BatchCreator;
import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.parquet.ParquetDirectByteBufferAllocator;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@SuppressWarnings("unused")
public class HiveDrillNativeScanBatchCreator implements BatchCreator<HiveDrillNativeParquetSubScan> {
@Override
public ScanBatch getBatch(FragmentContext context, HiveDrillNativeParquetSubScan config, List<RecordBatch> children)
throws ExecutionSetupException {
final Table table = config.getTable();
final List<InputSplit> splits = config.getInputSplits();
final List<Partition> partitions = config.getPartitions();
final List<SchemaPath> columns = config.getColumns();
final String partitionDesignator = context.getOptions()
.getOption(ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL).string_val;
List<Map<String, String>> implicitColumns = Lists.newLinkedList();
boolean selectAllQuery = AbstractRecordReader.isStarQuery(columns);
final boolean hasPartitions = (partitions != null && partitions.size() > 0);
final List<String[]> partitionColumns = Lists.newArrayList();
final List<Integer> selectedPartitionColumns = Lists.newArrayList();
List<SchemaPath> newColumns = columns;
if (!selectAllQuery) {
// Separate out the partition and non-partition columns. Non-partition columns are passed directly to the
// ParquetRecordReader. Partition columns are passed to ScanBatch.
newColumns = Lists.newArrayList();
Pattern pattern = Pattern.compile(String.format("%s[0-9]+", partitionDesignator));
for (SchemaPath column : columns) {
Matcher m = pattern.matcher(column.getAsUnescapedPath());
if (m.matches()) {
selectedPartitionColumns.add(
Integer.parseInt(column.getAsUnescapedPath().substring(partitionDesignator.length())));
} else {
newColumns.add(column);
}
}
}
final OperatorContext oContext = context.newOperatorContext(config);
int currentPartitionIndex = 0;
final List<RecordReader> readers = Lists.newArrayList();
final HiveConf conf = config.getHiveConf();
// TODO: In future we can get this cache from Metadata cached on filesystem.
final Map<String, ParquetMetadata> footerCache = Maps.newHashMap();
Map<String, String> mapWithMaxColumns = Maps.newLinkedHashMap();
try {
for (InputSplit split : splits) {
final FileSplit fileSplit = (FileSplit) split;
final Path finalPath = fileSplit.getPath();
final JobConf cloneJob =
new ProjectionPusher().pushProjectionsAndFilters(new JobConf(conf), finalPath.getParent());
final FileSystem fs = finalPath.getFileSystem(cloneJob);
ParquetMetadata parquetMetadata = footerCache.get(finalPath.toString());
if (parquetMetadata == null){
parquetMetadata = ParquetFileReader.readFooter(cloneJob, finalPath);
footerCache.put(finalPath.toString(), parquetMetadata);
}
final List<Integer> rowGroupNums = getRowGroupNumbersFromFileSplit(fileSplit, parquetMetadata);
for(int rowGroupNum : rowGroupNums) {
//DRILL-5009 : Skip the row group if the row count is zero
if (parquetMetadata.getBlocks().get(rowGroupNum).getRowCount() == 0) {
continue;
}
// Drill has only ever written a single row group per file, only detect corruption
// in the first row group
ParquetReaderUtility.DateCorruptionStatus containsCorruptDates =
ParquetReaderUtility.detectCorruptDates(parquetMetadata, config.getColumns(), true);
logger.info(containsCorruptDates.toString());
readers.add(new ParquetRecordReader(
context,
Path.getPathWithoutSchemeAndAuthority(finalPath).toString(),
rowGroupNum, fs,
CodecFactory.createDirectCodecFactory(fs.getConf(),
new ParquetDirectByteBufferAllocator(oContext.getAllocator()), 0),
parquetMetadata,
newColumns,
containsCorruptDates)
);
Map<String, String> implicitValues = Maps.newLinkedHashMap();
if (hasPartitions) {
List<String> values = partitions.get(currentPartitionIndex).getValues();
for (int i = 0; i < values.size(); i++) {
if (selectAllQuery || selectedPartitionColumns.contains(i)) {
implicitValues.put(partitionDesignator + i, values.get(i));
}
}
}
implicitColumns.add(implicitValues);
if (implicitValues.size() > mapWithMaxColumns.size()) {
mapWithMaxColumns = implicitValues;
}
}
currentPartitionIndex++;
}
} catch (final IOException|RuntimeException e) {
AutoCloseables.close(e, readers);
throw new ExecutionSetupException("Failed to create RecordReaders. " + e.getMessage(), e);
}
// all readers should have the same number of implicit columns, add missing ones with value null
mapWithMaxColumns = Maps.transformValues(mapWithMaxColumns, Functions.constant((String) null));
for (Map<String, String> map : implicitColumns) {
map.putAll(Maps.difference(map, mapWithMaxColumns).entriesOnlyOnRight());
}
// If there are no readers created (which is possible when the table is empty or no row groups are matched),
// create an empty RecordReader to output the schema
if (readers.size() == 0) {
readers.add(new HiveDefaultReader(table, null, null, columns, context, conf,
ImpersonationUtil.createProxyUgi(config.getUserName(), context.getQueryUserName())));
}
return new ScanBatch(config, context, oContext, readers.iterator(), implicitColumns);
}
/**
* Get the list of row group numbers for given file input split. Logic used here is same as how Hive's parquet input
* format finds the row group numbers for input split.
*/
private List<Integer> getRowGroupNumbersFromFileSplit(final FileSplit split,
final ParquetMetadata footer) throws IOException {
final List<BlockMetaData> blocks = footer.getBlocks();
final long splitStart = split.getStart();
final long splitLength = split.getLength();
final List<Integer> rowGroupNums = Lists.newArrayList();
int i = 0;
for (final BlockMetaData block : blocks) {
final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
if (firstDataPage >= splitStart && firstDataPage < splitStart + splitLength) {
rowGroupNums.add(i);
}
i++;
}
return rowGroupNums;
}
}