blob: f6fb35fbeede3b5ecaf8493fe8cf4c151809cc8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.writer;
import static java.lang.String.format;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDate;
import org.apache.drill.PlanTestBase;
import org.apache.drill.categories.ParquetTest;
import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.store.parquet.metadata.Metadata;
import org.apache.drill.test.TestBuilder;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Tests for compatibility reading old parquet files after date corruption
* issue was fixed in DRILL-4203.
*
* Drill could write non-standard dates into parquet files. This issue is related to
* all drill releases where {@link org.apache.drill.exec.store.parquet.ParquetRecordWriter#WRITER_VERSION_PROPERTY} <
* {@link org.apache.drill.exec.store.parquet.ParquetReaderUtility#DRILL_WRITER_VERSION_STD_DATE_FORMAT}
* The values have been read correctly by Drill, but external tools like Spark reading the files will see
* corrupted values for all dates that have been written by Drill.
*
* This change corrects the behavior of the Drill parquet writer to correctly
* store dates in the format given in the parquet specification.
*
* To maintain compatibility with old files, the parquet reader code has
* been updated to check for the old format and automatically shift the
* corrupted values into corrected ones automatically.
*
* The test cases included here should ensure that all files produced by
* historical versions of Drill will continue to return the same values they
* had in previous releases. For compatibility with external tools, any old
* files with corrupted dates can be re-written using the CREATE TABLE AS
* command (as the writer will now only produce the specification-compliant
* values, even if after reading out of older corrupt files).
*
* While the old behavior was a consistent shift into an unlikely range
* to be used in a modern database (over 10,000 years in the future), these are still
* valid date values. In the case where these may have been written into
* files intentionally, an option is included to turn off the auto-correction.
* Use of this option is assumed to be extremely unlikely, but it is included
* for completeness.
*/
@Category({ParquetTest.class, UnlikelyTest.class})
public class TestCorruptParquetDateCorrection extends PlanTestBase {
private static final Path PARQUET_4203 = Paths.get("parquet", "4203_corrupt_dates");
// 4 files are in the directory:
// - one created with the parquet-writer version number of "2"
// - files have extra meta field: parquet-writer.version = 2
// - one from and old version of Drill, before we put in proper created by in metadata
// - this is read properly by looking at a Max value in the file statistics, to see that
// it is way off of a typical date value
// - this behavior will be able to be turned off, but will be on by default
// - one from the 0.6 version of Drill, before files had min/max statistics
// - detecting corrupt values must be deferred to actual data page reading
// - one from 1.4, where there is a proper created-by, but the corruption is present
private static final Path MIXED_CORRUPTED_AND_CORRECT_DATES_PATH = PARQUET_4203.resolve("mixed_drill_versions");
// partitioned with 1.2.0, no certain metadata that these were written with Drill
// the value will be checked to see that they look corrupt and they will be corrected
// by default. Users can use the format plugin option autoCorrectCorruptDates to disable
// this behavior if they have foreign parquet files with valid rare date values that are
// in the similar range as Drill's corrupt values
private static final Path PARTITIONED_1_2_FOLDER = Paths.get("partitioned_with_corruption_4203_1_2");
private static final Path CORRUPTED_PARTITIONED_DATES_1_2_PATH = PARQUET_4203.resolve(PARTITIONED_1_2_FOLDER);
// partitioned with 1.4.0, no certain metadata regarding the date corruption status.
// The same detection approach of the corrupt date values as for the files partitioned with 1.2.0
private static final Path PARTITIONED_1_4_FOLDER = Paths.get("partitioned_with_corruption_4203");
private static final Path CORRUPTED_PARTITIONED_DATES_1_4_0_PATH = PARQUET_4203.resolve(PARTITIONED_1_4_FOLDER);
private static final Path PARQUET_DATE_FILE_WITH_NULL_FILLED_COLS = PARQUET_4203.resolve("null_date_cols_with_corruption_4203.parquet");
private static final Path PARTITIONED_1_9_FOLDER = Paths.get("1_9_0_partitioned_no_corruption");
private static final Path CORRECT_PARTITIONED_DATES_1_9_PATH = PARQUET_4203.resolve(PARTITIONED_1_9_FOLDER);
private static final Path VARCHAR_PARTITIONED = PARQUET_4203.resolve("fewtypes_varcharpartition");
private static final Path DATE_PARTITIONED = PARQUET_4203.resolve("fewtypes_datepartition");
private static final Path EXCEPTION_WHILE_PARSING_CREATED_BY_META = PARQUET_4203.resolve("hive1dot2_fewtypes_null");
private static final Path CORRECT_DATES_1_6_0_PATH = PARQUET_4203.resolve("correct_dates_and_old_drill_parquet_writer.parquet");
private static final Path MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER = Paths.get("mixed_partitioned");
@BeforeClass
public static void initFs() throws Exception {
// Move files into temp directory, rewrite the metadata cache file to contain the appropriate absolute path
dirTestWatcher.copyResourceToRoot(PARQUET_4203);
dirTestWatcher.copyResourceToRoot(CORRUPTED_PARTITIONED_DATES_1_2_PATH, PARTITIONED_1_2_FOLDER);
dirTestWatcher.copyResourceToRoot(CORRUPTED_PARTITIONED_DATES_1_4_0_PATH, MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER);
dirTestWatcher.copyResourceToRoot(CORRUPTED_PARTITIONED_DATES_1_2_PATH, MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER.resolve(PARTITIONED_1_2_FOLDER));
dirTestWatcher.copyResourceToRoot(CORRECT_PARTITIONED_DATES_1_9_PATH, MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER.resolve(PARTITIONED_1_9_FOLDER));
dirTestWatcher.copyResourceToRoot(CORRUPTED_PARTITIONED_DATES_1_4_0_PATH, MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER.resolve(PARTITIONED_1_4_FOLDER));
File metaData = dirTestWatcher.copyResourceToRoot(PARQUET_4203.resolve("drill.parquet.metadata_1_2.requires_replace.txt"),
PARTITIONED_1_2_FOLDER.resolve(Metadata.OLD_METADATA_FILENAME));
dirTestWatcher.replaceMetaDataContents(metaData, dirTestWatcher.getRootDir(), null);
}
/**
* Test reading a directory full of partitioned parquet files with dates, these files have a drill version
* number of "1.9.0-SNAPSHOT" and parquet-writer version number of "2" in their footers, so we can be certain
* they do not have corruption. The option to disable the correction is passed, but it will not change the result
* in the case where we are certain correction is NOT needed. For more info see DRILL-4203.
*/
@Test
public void testReadPartitionedOnCorrectDates() throws Exception {
try {
for (String selection : new String[]{"*", "date_col"}) {
// for sanity, try reading all partitions without a filter
TestBuilder builder = testBuilder()
.sqlQuery("select %s from table(dfs.`%s` (type => 'parquet', autoCorrectCorruptDates => false))",
selection, CORRECT_PARTITIONED_DATES_1_9_PATH)
.unOrdered()
.baselineColumns("date_col");
addDateBaselineValues(builder);
builder.go();
String query = format("select %s from table(dfs.`%s` (type => 'parquet', autoCorrectCorruptDates => false))" +
" where date_col = date '1970-01-01'", selection, CORRECT_PARTITIONED_DATES_1_9_PATH);
// verify that pruning is actually taking place
testPlanMatchingPatterns(query, new String[]{"numFiles=1"}, null);
// read with a filter on the partition column
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("date_col")
.baselineValues(LocalDate.of(1970, 1, 1))
.go();
}
} finally {
resetAllSessionOptions();
}
}
@Test
public void testVarcharPartitionedReadWithCorruption() throws Exception {
testBuilder()
.sqlQuery("select date_col from dfs.`%s` where length(varchar_col) = 12", VARCHAR_PARTITIONED)
.baselineColumns("date_col")
.unOrdered()
.baselineValues(LocalDate.of(2039, 4, 9))
.baselineValues(LocalDate.of(1999, 1, 8))
.go();
}
@Test
public void testDatePartitionedReadWithCorruption() throws Exception {
testBuilder()
.sqlQuery("select date_col from dfs.`%s` where date_col = '1999-04-08'", DATE_PARTITIONED)
.baselineColumns("date_col")
.unOrdered()
.baselineValues(LocalDate.of(1999, 4, 8))
.go();
String query = format("select date_col from dfs.`%s` where date_col > '1999-04-08'", DATE_PARTITIONED);
testPlanMatchingPatterns(query, new String[]{"numFiles=6"}, null);
}
@Test
public void testCorrectDatesAndExceptionWhileParsingCreatedBy() throws Exception {
testBuilder()
.sqlQuery("select date_col from dfs.`%s` where to_date(date_col, 'yyyy-mm-dd') < '1997-01-02'",
EXCEPTION_WHILE_PARSING_CREATED_BY_META)
.baselineColumns("date_col")
.unOrdered()
.baselineValues(LocalDate.of(1996, 1, 29))
.baselineValues(LocalDate.of(1996, 3, 1))
.baselineValues(LocalDate.of(1996, 3, 2))
.baselineValues(LocalDate.of(1997, 3, 1))
.go();
}
// according to SQL spec. '4.4.3.5 Datetime types' year should be less than 9999
@Test(expected = UserRemoteException.class)
public void testQueryWithCorruptedDates() throws Exception {
try {
TestBuilder builder = testBuilder()
.sqlQuery("select * from table(dfs.`%s` (type => 'parquet', autoCorrectCorruptDates => false))",
CORRUPTED_PARTITIONED_DATES_1_2_PATH)
.unOrdered()
.baselineColumns("date_col");
addCorruptedDateBaselineValues(builder);
builder.go();
String query = "select * from table(dfs.`%s` (type => 'parquet', " +
"autoCorrectCorruptDates => false)) where date_col = cast('15334-03-17' as date)";
test(query, CORRUPTED_PARTITIONED_DATES_1_2_PATH);
} catch (UserRemoteException e) {
Assert.assertTrue(e.getMessage()
.contains("Year out of range"));
throw e;
} finally {
resetAllSessionOptions();
}
}
@Test
public void testCorruptValueDetectionDuringPruning() throws Exception {
try {
for (String selection : new String[]{"*", "date_col"}) {
for (Path table : new Path[]{CORRUPTED_PARTITIONED_DATES_1_2_PATH, CORRUPTED_PARTITIONED_DATES_1_4_0_PATH}) {
// for sanity, try reading all partitions without a filter
TestBuilder builder = testBuilder()
.sqlQuery("select %s from dfs.`%s`", selection, table)
.unOrdered()
.baselineColumns("date_col");
addDateBaselineValues(builder);
builder.go();
String query = format("select %s from dfs.`%s`" +
" where date_col = date '1970-01-01'", selection, table);
// verify that pruning is actually taking place
testPlanMatchingPatterns(query, new String[]{"numFiles=1"}, null);
// read with a filter on the partition column
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("date_col")
.baselineValues(LocalDate.of(1970, 1, 1))
.go();
}
}
} finally {
resetAllSessionOptions();
}
}
/**
* To fix some of the corrupted dates fixed as part of DRILL-4203 it requires
* actually looking at the values stored in the file. A column with date values
* actually stored must be located to check a value. Just because we find one
* column where the all values are null does not mean we can safely avoid reading
* date columns with auto-correction, although null values do not need fixing,
* other columns may contain actual corrupt date values.
*
* This test checks the case where the first columns in the file are all null filled
* and a later column must be found to identify that the file is corrupt.
*/
@Test
public void testReadCorruptDatesWithNullFilledColumns() throws Exception {
testBuilder()
.sqlQuery("select null_dates_1, null_dates_2, non_existent_field, date_col from dfs.`%s`",
PARQUET_DATE_FILE_WITH_NULL_FILLED_COLS)
.unOrdered()
.baselineColumns("null_dates_1", "null_dates_2", "non_existent_field", "date_col")
.baselineValues(null, null, null, LocalDate.of(1970, 1, 1))
.baselineValues(null, null, null, LocalDate.of(1970, 1, 2))
.baselineValues(null, null, null, LocalDate.of(1969, 12, 31))
.baselineValues(null, null, null, LocalDate.of(1969, 12, 30))
.baselineValues(null, null, null, LocalDate.of(1900, 1, 1))
.baselineValues(null, null, null, LocalDate.of(2015, 1, 1))
.go();
}
@Test
public void testUserOverrideDateCorrection() throws Exception {
// read once with the flat reader
readFilesWithUserDisabledAutoCorrection();
try {
test("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER);
// read all of the types with the complex reader
readFilesWithUserDisabledAutoCorrection();
} finally {
resetAllSessionOptions();
}
}
/**
* Test reading a directory full of parquet files with dates, some of which have corrupted values
* due to DRILL-4203.
*
* Tests reading the files with both the vectorized and complex parquet readers.
*
* @throws Exception
*/
@Test
public void testReadMixedOldAndNewBothReaders() throws Exception {
/// read once with the flat reader
readMixedCorruptedAndCorrectDates();
try {
// read all of the types with the complex reader
test("alter session set %s = true", ExecConstants.PARQUET_NEW_RECORD_READER);
readMixedCorruptedAndCorrectDates();
} finally {
test("alter session set %s = false", ExecConstants.PARQUET_NEW_RECORD_READER);
}
}
@Test
public void testReadOldMetadataCacheFile() throws Exception {
// for sanity, try reading all partitions without a filter
String query = format("select date_col from dfs.`%s`", PARTITIONED_1_2_FOLDER);
TestBuilder builder = testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("date_col");
addDateBaselineValues(builder);
builder.go();
testPlanMatchingPatterns(query, new String[]{"usedMetadataFile=true"}, null);
}
@Test
public void testReadOldMetadataCacheFileWithPruning() throws Exception {
String query = format("select date_col from dfs.`%s` where date_col = date '1970-01-01'",
PARTITIONED_1_2_FOLDER);
// verify that pruning is actually taking place
testPlanMatchingPatterns(query, new String[]{"numFiles=1", "usedMetadataFile=true"}, null);
// read with a filter on the partition column
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("date_col")
.baselineValues(LocalDate.of(1970, 1, 1))
.go();
}
@Test
public void testReadNewMetadataCacheFileOverOldAndNewFiles() throws Exception {
File meta = dirTestWatcher.copyResourceToRoot(
PARQUET_4203.resolve("mixed_version_partitioned_metadata.requires_replace.txt"),
MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER.resolve(Metadata.OLD_METADATA_FILENAME));
dirTestWatcher.replaceMetaDataContents(meta, dirTestWatcher.getRootDir(), null);
// for sanity, try reading all partitions without a filter
TestBuilder builder = testBuilder()
.sqlQuery("select date_col from dfs.`%s`", MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER)
.unOrdered()
.baselineColumns("date_col");
addDateBaselineValues(builder);
addDateBaselineValues(builder);
addDateBaselineValues(builder);
builder.go();
String query = format("select date_col from dfs.`%s` where date_col = date '1970-01-01'", MIXED_CORRUPTED_AND_CORRECT_PARTITIONED_FOLDER);
// verify that pruning is actually taking place
testPlanMatchingPatterns(query, new String[]{"numFiles=3", "usedMetadataFile=true"}, null);
// read with a filter on the partition column
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("date_col")
.baselineValues(LocalDate.of(1970, 1, 1))
.baselineValues(LocalDate.of(1970, 1, 1))
.baselineValues(LocalDate.of(1970, 1, 1))
.go();
}
@Test
public void testCorrectDateValuesGeneratedByOldVersionOfDrill() throws Exception {
testBuilder()
.sqlQuery("select i_rec_end_date from dfs.`%s` limit 1", CORRECT_DATES_1_6_0_PATH)
.baselineColumns("i_rec_end_date")
.unOrdered()
.baselineValues(LocalDate.of(2000, 10, 26))
.go();
}
/**
* Read a directory with parquet files where some have corrupted dates, see DRILL-4203.
* @throws Exception
*/
private void readMixedCorruptedAndCorrectDates() throws Exception {
// ensure that selecting the date column explicitly or as part of a star still results
// in checking the file metadata for date columns (when we need to check the statistics
// for bad values) to set the flag that the values are corrupt
for (String selection : new String[] {"*", "date_col"}) {
TestBuilder builder = testBuilder()
.sqlQuery("select %s from dfs.`%s`", selection, MIXED_CORRUPTED_AND_CORRECT_DATES_PATH)
.unOrdered()
.baselineColumns("date_col");
for (int i = 0; i < 4; i++) {
addDateBaselineValues(builder);
}
builder.go();
}
}
private void addDateBaselineValues(TestBuilder builder) {
builder
.baselineValues(LocalDate.of(1970, 1, 1))
.baselineValues(LocalDate.of(1970, 1, 2))
.baselineValues(LocalDate.of(1969, 12, 31))
.baselineValues(LocalDate.of(1969, 12, 30))
.baselineValues(LocalDate.of(1900, 1, 1))
.baselineValues(LocalDate.of(2015, 1, 1));
}
/**
* These are the same values added in the addDateBaselineValues, shifted as corrupt values
*/
private void addCorruptedDateBaselineValues(TestBuilder builder) {
builder
.baselineValues(LocalDate.of(15334, 3, 17))
.baselineValues(LocalDate.of(15334, 3, 18))
.baselineValues(LocalDate.of(15334, 3, 15))
.baselineValues(LocalDate.of(15334, 3, 16))
.baselineValues(LocalDate.of(15264, 3, 16))
.baselineValues(LocalDate.of(15379, 3, 17));
}
private void readFilesWithUserDisabledAutoCorrection() throws Exception {
// ensure that selecting the date column explicitly or as part of a star still results
// in checking the file metadata for date columns (when we need to check the statistics
// for bad values) to set the flag that the values are corrupt
for (String selection : new String[] {"*", "date_col"}) {
TestBuilder builder = testBuilder()
.sqlQuery("select %s from table(dfs.`%s` (type => 'parquet', autoCorrectCorruptDates => false))",
selection, MIXED_CORRUPTED_AND_CORRECT_DATES_PATH)
.unOrdered()
.baselineColumns("date_col");
addDateBaselineValues(builder);
addCorruptedDateBaselineValues(builder);
addCorruptedDateBaselineValues(builder);
addCorruptedDateBaselineValues(builder);
builder.go();
}
}
}