blob: 12b2088aa40f07740bcba68529b227bb4844ed1d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.work.ExecErrorConstants;
import org.apache.parquet.SemanticVersion;
import org.apache.parquet.VersionParser;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.format.ConvertedType;
import org.apache.parquet.format.FileMetaData;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.OriginalType;
import org.joda.time.Chronology;
import org.joda.time.DateTimeConstants;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Utility class where we can capture common logic between the two parquet readers
*/
public class ParquetReaderUtility {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetReaderUtility.class);
/**
* Number of days between Julian day epoch (January 1, 4713 BC) and Unix day epoch (January 1, 1970).
* The value of this constant is {@value}.
*/
public static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
/**
* All old parquet files (which haven't "is.date.correct=true" property in metadata) have
* a corrupt date shift: {@value} days or 2 * {@value #JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH}
*/
public static final long CORRECT_CORRUPT_DATE_SHIFT = 2 * JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH;
// The year 5000 (or 1106685 day from Unix epoch) is chosen as the threshold for auto-detecting date corruption.
// This balances two possible cases of bad auto-correction. External tools writing dates in the future will not
// be shifted unless they are past this threshold (and we cannot identify them as external files based on the metadata).
// On the other hand, historical dates written with Drill wouldn't risk being incorrectly shifted unless they were
// something like 10,000 years in the past.
private static final Chronology UTC = org.joda.time.chrono.ISOChronology.getInstanceUTC();
public static final int DATE_CORRUPTION_THRESHOLD =
(int) (UTC.getDateTimeMillis(5000, 1, 1, 0) / DateTimeConstants.MILLIS_PER_DAY);
/**
* For most recently created parquet files, we can determine if we have corrupted dates (see DRILL-4203)
* based on the file metadata. For older files that lack statistics we must actually test the values
* in the data pages themselves to see if they are likely corrupt.
*/
public enum DateCorruptionStatus {
META_SHOWS_CORRUPTION{
@Override
public String toString(){
return "It is determined from metadata that the date values are definitely CORRUPT";
}
},
META_SHOWS_NO_CORRUPTION {
@Override
public String toString(){
return "It is determined from metadata that the date values are definitely CORRECT";
}
},
META_UNCLEAR_TEST_VALUES {
@Override
public String toString(){
return "Not enough info in metadata, parquet reader will test individual date values";
}
}
}
public static void checkDecimalTypeEnabled(OptionManager options) {
if (options.getOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY).bool_val == false) {
throw UserException.unsupportedError()
.message(ExecErrorConstants.DECIMAL_DISABLE_ERR_MSG)
.build(logger);
}
}
public static int getIntFromLEBytes(byte[] input, int start) {
int out = 0;
int shiftOrder = 0;
for (int i = start; i < start + 4; i++) {
out |= (((input[i]) & 0xFF) << shiftOrder);
shiftOrder += 8;
}
return out;
}
public static Map<String, SchemaElement> getColNameToSchemaElementMapping(ParquetMetadata footer) {
HashMap<String, SchemaElement> schemaElements = new HashMap<>();
FileMetaData fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
for (SchemaElement se : fileMetaData.getSchema()) {
schemaElements.put(se.getName(), se);
}
return schemaElements;
}
public static int autoCorrectCorruptedDate(int corruptedDate) {
return (int) (corruptedDate - CORRECT_CORRUPT_DATE_SHIFT);
}
public static void correctDatesInMetadataCache(Metadata.ParquetTableMetadataBase parquetTableMetadata) {
boolean isDateCorrect = parquetTableMetadata.isDateCorrect();
DateCorruptionStatus cacheFileContainsCorruptDates = isDateCorrect ?
DateCorruptionStatus.META_SHOWS_NO_CORRUPTION : DateCorruptionStatus.META_SHOWS_CORRUPTION;
if (cacheFileContainsCorruptDates == DateCorruptionStatus.META_SHOWS_CORRUPTION) {
// Looking for the DATE data type of column names in the metadata cache file ("metadata_version" : "v2")
String[] names = new String[0];
if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2) {
for (Metadata.ColumnTypeMetadata_v2 columnTypeMetadata :
((Metadata.ParquetTableMetadata_v2) parquetTableMetadata).columnTypeInfo.values()) {
if (OriginalType.DATE.equals(columnTypeMetadata.originalType)) {
names = columnTypeMetadata.name;
}
}
}
for (Metadata.ParquetFileMetadata file : parquetTableMetadata.getFiles()) {
// Drill has only ever written a single row group per file, only need to correct the statistics
// on the first row group
Metadata.RowGroupMetadata rowGroupMetadata = file.getRowGroups().get(0);
for (Metadata.ColumnMetadata columnMetadata : rowGroupMetadata.getColumns()) {
// Setting Min/Max values for ParquetTableMetadata_v1
if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v1) {
OriginalType originalType = columnMetadata.getOriginalType();
if (OriginalType.DATE.equals(originalType) && columnMetadata.hasSingleValue() &&
(Integer) columnMetadata.getMaxValue() > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
int newMinMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer)columnMetadata.getMaxValue());
columnMetadata.setMax(newMinMax);
columnMetadata.setMin(newMinMax);
}
}
// Setting Max values for ParquetTableMetadata_v2
else if (parquetTableMetadata instanceof Metadata.ParquetTableMetadata_v2 &&
columnMetadata.getName() != null && Arrays.equals(columnMetadata.getName(), names) &&
columnMetadata.hasSingleValue() && (Integer) columnMetadata.getMaxValue() >
ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
int newMax = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) columnMetadata.getMaxValue());
columnMetadata.setMax(newMax);
}
}
}
}
}
/**
* Check for corrupted dates in a parquet file. See Drill-4203
*/
public static DateCorruptionStatus detectCorruptDates(ParquetMetadata footer,
List<SchemaPath> columns,
boolean autoCorrectCorruptDates) {
// old drill files have "parquet-mr" as created by string, and no drill version, need to check min/max values to see
// if they look corrupt
// - option to disable this auto-correction based on the date values, in case users are storing these
// dates intentionally
// migrated parquet files have 1.8.1 parquet-mr version with drill-r0 in the part of the name usually containing "SNAPSHOT"
// new parquet files are generated with "is.date.correct" property have no corruption dates
String createdBy = footer.getFileMetaData().getCreatedBy();
String drillVersion = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.DRILL_VERSION_PROPERTY);
String isDateCorrect = footer.getFileMetaData().getKeyValueMetaData().get(ParquetRecordWriter.IS_DATE_CORRECT_PROPERTY);
if (drillVersion != null) {
return Boolean.valueOf(isDateCorrect) ? DateCorruptionStatus.META_SHOWS_NO_CORRUPTION
: DateCorruptionStatus.META_SHOWS_CORRUPTION;
} else {
// Possibly an old, un-migrated Drill file, check the column statistics to see if min/max values look corrupt
// only applies if there is a date column selected
if (createdBy == null || createdBy.equals("parquet-mr")) {
// loop through parquet column metadata to find date columns, check for corrupt values
return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
} else {
// check the created by to see if it is a migrated Drill file
try {
VersionParser.ParsedVersion parsedCreatedByVersion = VersionParser.parse(createdBy);
// check if this is a migrated Drill file, lacking a Drill version number, but with
// "drill" in the parquet created-by string
if (parsedCreatedByVersion.hasSemanticVersion()) {
SemanticVersion semVer = parsedCreatedByVersion.getSemanticVersion();
String pre = semVer.pre + "";
if (semVer.major == 1 && semVer.minor == 8 && semVer.patch == 1 && pre.contains("drill")) {
return DateCorruptionStatus.META_SHOWS_CORRUPTION;
}
}
// written by a tool that wasn't Drill, the dates are not corrupted
return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
} catch (VersionParser.VersionParseException e) {
// If we couldn't parse "created by" field, check column metadata of date columns
return checkForCorruptDateValuesInStatistics(footer, columns, autoCorrectCorruptDates);
}
}
}
}
/**
* Detect corrupt date values by looking at the min/max values in the metadata.
*
* This should only be used when a file does not have enough metadata to determine if
* the data was written with an older version of Drill, or an external tool. Drill
* versions 1.3 and beyond should have enough metadata to confirm that the data was written
* by Drill.
*
* This method only checks the first Row Group, because Drill has only ever written
* a single Row Group per file.
*
* @param footer
* @param columns
* @param autoCorrectCorruptDates user setting to allow enabling/disabling of auto-correction
* of corrupt dates. There are some rare cases (storing dates thousands
* of years into the future, with tools other than Drill writing files)
* that would result in the date values being "corrected" into bad values.
*/
public static DateCorruptionStatus checkForCorruptDateValuesInStatistics(ParquetMetadata footer,
List<SchemaPath> columns,
boolean autoCorrectCorruptDates) {
// Users can turn-off date correction in cases where we are detecting corruption based on the date values
// that are unlikely to appear in common datasets. In this case report that no correction needs to happen
// during the file read
if (! autoCorrectCorruptDates) {
return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
}
// Drill produced files have only ever have a single row group, if this changes in the future it won't matter
// as we will know from the Drill version written in the files that the dates are correct
int rowGroupIndex = 0;
Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
findDateColWithStatsLoop : for (SchemaPath schemaPath : columns) {
List<ColumnDescriptor> parquetColumns = footer.getFileMetaData().getSchema().getColumns();
for (int i = 0; i < parquetColumns.size(); ++i) {
ColumnDescriptor column = parquetColumns.get(i);
// this reader only supports flat data, this is restricted in the ParquetScanBatchCreator
// creating a NameSegment makes sure we are using the standard code for comparing names,
// currently it is all case-insensitive
if (AbstractRecordReader.isStarQuery(columns) || new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) {
int colIndex = -1;
ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type();
if (convertedType != null && convertedType.equals(ConvertedType.DATE)) {
List<ColumnChunkMetaData> colChunkList = footer.getBlocks().get(rowGroupIndex).getColumns();
for (int j = 0; j < colChunkList.size(); j++) {
if (colChunkList.get(j).getPath().equals(ColumnPath.get(column.getPath()))) {
colIndex = j;
break;
}
}
}
if (colIndex == -1) {
// column does not appear in this file, skip it
continue;
}
Statistics statistics = footer.getBlocks().get(rowGroupIndex).getColumns().get(colIndex).getStatistics();
Integer max = (Integer) statistics.genericGetMax();
if (statistics.hasNonNullValue()) {
if (max > ParquetReaderUtility.DATE_CORRUPTION_THRESHOLD) {
return DateCorruptionStatus.META_SHOWS_CORRUPTION;
}
} else {
// no statistics, go check the first page
return DateCorruptionStatus.META_UNCLEAR_TEST_VALUES;
}
}
}
}
return DateCorruptionStatus.META_SHOWS_NO_CORRUPTION;
}
}