blob: 53bced71012186543499b902ddacba0f67a9d3d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.columnreaders;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.vector.NullableIntVector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import com.google.common.collect.Lists;
/**
* Mapping from the schema of the Parquet file to that of the record reader
* to the schema that Drill and the Parquet reader uses.
*/
public final class ParquetSchema {
/**
* Set of columns specified in the SELECT clause. Will be null for
* a SELECT * query.
*/
private final Collection<SchemaPath> selectedCols;
/**
* Parallel list to the columns list above, it is used to determine the subset of the project
* pushdown columns that do not appear in this file.
*/
private final boolean[] columnsFound;
private final OptionManager options;
private final int rowGroupIndex;
private final ParquetMetadata footer;
/**
* List of metadata for selected columns. This list does two things.
* First, it identifies the Parquet columns we wish to select. Second, it
* provides metadata for those columns. Note that null columns (columns
* in the SELECT clause but not in the file) appear elsewhere.
*/
private List<ParquetColumnMetadata> selectedColumnMetadata = new ArrayList<>();
private int bitWidthAllFixedFields;
private boolean allFieldsFixedLength;
private long groupRecordCount;
/**
* Build the Parquet schema. The schema can be based on a "SELECT *",
* meaning we want all columns defined in the Parquet file. In this case,
* the list of selected columns is null. Or, the query can be based on
* an explicit list of selected columns. In this case, the
* columns need not exist in the Parquet file. If a column does not exist,
* the reader returns null for that column. If no selected column exists
* in the file, then we return "mock" records: records with only null
* values, but repeated for the number of rows in the Parquet file.
*
* @param options session options
* @param rowGroupIndex row group to read
* @param selectedCols columns specified in the SELECT clause, or null if
* this is a SELECT * query
*/
public ParquetSchema(OptionManager options, int rowGroupIndex, ParquetMetadata footer, Collection<SchemaPath> selectedCols) {
this.options = options;
this.rowGroupIndex = rowGroupIndex;
this.selectedCols = selectedCols;
this.footer = footer;
if (selectedCols == null) {
columnsFound = null;
} else {
columnsFound = new boolean[selectedCols.size()];
}
}
/**
* Build the schema for this read as a combination of the schema specified in
* the Parquet footer and the list of columns selected in the query.
*/
public void buildSchema() {
BlockMetaData rowGroupMetadata = getRowGroupMetadata();
groupRecordCount = rowGroupMetadata == null ? 0 : rowGroupMetadata.getRowCount();
loadParquetSchema();
computeFixedPart();
}
/**
* Scan the Parquet footer, then map each Parquet column to the list of columns
* we want to read. Track those to be read.
*/
private void loadParquetSchema() {
// TODO - figure out how to deal with this better once we add nested reading, note also look where this map is used below
// store a map from column name to converted types if they are non-null
Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
// loop to add up the length of the fixed width columns and build the schema
for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) {
ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column);
columnMetadata.resolveDrillType(schemaElements, options);
if (!columnSelected(column)) {
continue;
}
selectedColumnMetadata.add(columnMetadata);
}
}
/**
* Fixed-width fields are the easiest to plan. We know the size of each column,
* making it easy to determine the total length of each vector, once we know
* the target record count. A special reader is used in the fortunate case
* that all fields are fixed width.
*/
private void computeFixedPart() {
allFieldsFixedLength = true;
for (ParquetColumnMetadata colMd : selectedColumnMetadata) {
if (colMd.isFixedLength()) {
bitWidthAllFixedFields += colMd.length;
} else {
allFieldsFixedLength = false;
}
}
}
public boolean isStarQuery() { return selectedCols == null; }
public ParquetMetadata footer() { return footer; }
public int getBitWidthAllFixedFields() { return bitWidthAllFixedFields; }
public boolean allFieldsFixedLength() { return allFieldsFixedLength; }
public List<ParquetColumnMetadata> getColumnMetadata() { return selectedColumnMetadata; }
/**
* Return the Parquet file row count.
*
* @return number of records in the Parquet row group
*/
public long getGroupRecordCount() { return groupRecordCount; }
public BlockMetaData getRowGroupMetadata() {
if (rowGroupIndex == -1) {
return null;
}
return footer.getBlocks().get(rowGroupIndex);
}
/**
* Determine if a Parquet column is selected for the query. It is selected
* either if this is a star query (we want all columns), or the column
* appears in the select list.
*
* @param column the Parquet column expressed as column descriptor
* @return true if the column is to be included in the scan, false
* if not
*/
private boolean columnSelected(ColumnDescriptor column) {
if (isStarQuery()) {
return true;
}
int i = 0;
for (SchemaPath expr : selectedCols) {
if (ParquetReaderUtility.getFullColumnPath(column).equalsIgnoreCase(expr.getUnIndexed().toString())) {
columnsFound[i] = true;
return true;
}
i++;
}
return false;
}
/**
* Create "dummy" fields for columns which are selected in the SELECT clause, but not
* present in the Parquet schema.
* @param output the output container
* @throws SchemaChangeException should not occur
*/
public void createNonExistentColumns(OutputMutator output, List<NullableIntVector> nullFilledVectors) throws SchemaChangeException {
List<SchemaPath> projectedColumns = Lists.newArrayList(selectedCols);
for (int i = 0; i < columnsFound.length; i++) {
SchemaPath col = projectedColumns.get(i);
assert col != null;
if ( ! columnsFound[i] && ! col.equals(SchemaPath.STAR_COLUMN)) {
nullFilledVectors.add(createMissingColumn(col, output));
}
}
}
/**
* Create a "dummy" column for a missing field. The column is of type optional
* int, but will always be null.
*
* @param col the selected, but non-existent, schema path
* @param output the output container
* @return the value vector for the field
* @throws SchemaChangeException should not occur
*/
private NullableIntVector createMissingColumn(SchemaPath col, OutputMutator output) throws SchemaChangeException {
// col.toExpr() is used here as field name since we don't want to see these fields in the existing maps
MaterializedField field = MaterializedField.create(col.toExpr(),
Types.optional(TypeProtos.MinorType.INT));
return (NullableIntVector) output.addField(field,
TypeHelper.getValueVectorClass(TypeProtos.MinorType.INT, DataMode.OPTIONAL));
}
Map<String, Integer> buildChunkMap(BlockMetaData rowGroupMetadata) {
// the column chunk meta-data is not guaranteed to be in the same order as the columns in the schema
// a map is constructed for fast access to the correct columnChunkMetadata to correspond
// to an element in the schema
Map<String, Integer> columnChunkMetadataPositionsInList = new HashMap<>();
int colChunkIndex = 0;
for (ColumnChunkMetaData colChunk : rowGroupMetadata.getColumns()) {
columnChunkMetadataPositionsInList.put(Arrays.toString(colChunk.getPath().toArray()), colChunkIndex);
colChunkIndex++;
}
return columnChunkMetadataPositionsInList;
}
}