blob: 935489c7495596d368b233d250f8cc89362500d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import org.apache.drill.PlanTestBase;
import org.apache.drill.categories.EasyOutOfMemory;
import org.apache.drill.categories.HiveStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.expr.fn.impl.DateUtility;
import org.apache.drill.exec.hive.HiveTestBase;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
@Category({SlowTest.class, HiveStorageTest.class, EasyOutOfMemory.class})
public class TestHiveDrillNativeParquetReader extends HiveTestBase {
@BeforeClass
public static void init() {
setSessionOption(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER, true);
setSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY, true);
}
@AfterClass
public static void cleanup() {
resetSessionOption(ExecConstants.HIVE_OPTIMIZE_PARQUET_SCAN_WITH_NATIVE_READER);
resetSessionOption(PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY);
}
@Rule
public ExpectedException thrown = ExpectedException.none();
@Test
public void testFilterPushDownForManagedTable() throws Exception {
String query = "select * from hive.kv_native where key > 1";
int actualRowCount = testSql(query);
assertEquals("Expected and actual row count should match", 2, actualRowCount);
testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numFiles=1");
}
@Test
public void testFilterPushDownForExternalTable() throws Exception {
String query = "select * from hive.kv_native_ext where key = 1";
int actualRowCount = testSql(query);
assertEquals("Expected and actual row count should match", 1, actualRowCount);
testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numFiles=1");
}
@Test
public void testManagedPartitionPruning() throws Exception {
String query = "select * from hive.readtest_parquet where tinyint_part = 64";
int actualRowCount = testSql(query);
assertEquals("Expected and actual row count should match", 2, actualRowCount);
// Hive partition pruning is applied during logical stage
// while convert to Drill native parquet reader during physical
// thus plan should not contain filter
testPlanMatchingPatterns(query,
new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new String[]{"Filter"});
}
@Test
public void testExternalPartitionPruning() throws Exception {
String query = "select `key` from hive.kv_native_ext where part_key = 2";
int actualRowCount = testSql(query);
assertEquals("Expected and actual row count should match", 2, actualRowCount);
// Hive partition pruning is applied during logical stage
// while convert to Drill native parquet reader during physical
// thus plan should not contain filter
testPlanMatchingPatterns(query,
new String[]{"HiveDrillNativeParquetScan", "numFiles=1"}, new String[]{"Filter"});
}
@Test
public void testSimpleStarSubQueryFilterPushDown() throws Exception {
String query = "select * from (select * from (select * from hive.kv_native)) where key > 1";
int actualRowCount = testSql(query);
assertEquals("Expected and actual row count should match", 2, actualRowCount);
testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numFiles=1");
}
@Test
public void testPartitionedExternalTable() throws Exception {
String query = "select * from hive.kv_native_ext";
testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numFiles=2");
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("key", "part_key")
.baselineValues(1, 1)
.baselineValues(2, 1)
.baselineValues(3, 2)
.baselineValues(4, 2)
.go();
}
@Test
public void testEmptyTable() throws Exception {
String query = "select * from hive.empty_table";
// Hive reader should be chosen to output the schema
testPlanMatchingPatterns(query, new String[]{"HiveScan"}, new String[]{"HiveDrillNativeParquetScan"});
}
@Test
public void testEmptyPartition() throws Exception {
String query = "select * from hive.kv_native_ext where part_key = 3";
// Hive reader should be chosen to output the schema
testPlanMatchingPatterns(query, new String[]{"HiveScan"}, new String[]{"HiveDrillNativeParquetScan"});
}
@Test
public void testPhysicalPlanSubmission() throws Exception {
// checks only group scan
PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native");
PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.kv_native_ext");
try {
alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
PlanTestBase.testPhysicalPlanExecutionBasedOnQuery("select * from hive.sub_dir_table");
} finally {
resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
}
}
@Test
public void testProjectPushDownOptimization() throws Exception {
String query = "select boolean_field, int_part from hive.readtest_parquet";
int actualRowCount = testSql(query);
assertEquals("Expected and actual row count should match", 2, actualRowCount);
testPlanMatchingPatterns(query,
// partition column is named during scan as Drill partition columns
// it will be renamed to actual value in subsequent project
new String[]{"Project\\(boolean_field=\\[\\$0\\], int_part=\\[CAST\\(\\$1\\):INTEGER\\]\\)",
"HiveDrillNativeParquetScan",
"columns=\\[`boolean_field`, `dir9`\\]"},
new String[]{});
}
@Test
public void testLimitPushDownOptimization() throws Exception {
String query = "select * from hive.kv_native limit 2";
int actualRowCount = testSql(query);
assertEquals("Expected and actual row count should match", 2, actualRowCount);
testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numFiles=1");
}
@Test
public void testConvertCountToDirectScanOptimization() throws Exception {
String query = "select count(1) as cnt from hive.kv_native";
testPlanMatchingPatterns(query, "DynamicPojoRecordReader");
testPhysicalPlanExecutionBasedOnQuery(query);
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("cnt")
.baselineValues(8L)
.go();
}
@Test
public void testImplicitColumns() throws Exception {
thrown.expect(UserRemoteException.class);
thrown.expectMessage(CoreMatchers.allOf(containsString("VALIDATION ERROR"), containsString("not found in any table")));
test("select *, filename, fqn, filepath, suffix from hive.kv_native");
}
@Test // DRILL-3739
public void testReadingFromStorageHandleBasedTable() throws Exception {
testBuilder()
.sqlQuery("select * from hive.kv_sh order by key limit 2")
.ordered()
.baselineColumns("key", "value")
.expectsEmptyResultSet()
.go();
}
@Test
public void testReadAllSupportedHiveDataTypesNativeParquet() throws Exception {
String query = "select * from hive.readtest_parquet";
testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan");
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("binary_field", "boolean_field", "tinyint_field", "decimal0_field", "decimal9_field", "decimal18_field", "decimal28_field", "decimal38_field", "double_field", "float_field", "int_field", "bigint_field", "smallint_field", "string_field", "varchar_field", "timestamp_field", "char_field",
// There is a regression in Hive 1.2.1 in binary and boolean partition columns. Disable for now.
//"binary_part",
"boolean_part", "tinyint_part", "decimal0_part", "decimal9_part", "decimal18_part", "decimal28_part", "decimal38_part", "double_part", "float_part", "int_part", "bigint_part", "smallint_part", "string_part", "varchar_part", "timestamp_part", "date_part", "char_part")
.baselineValues("binaryfield".getBytes(StandardCharsets.UTF_8), false, 34, new BigDecimal("66"), new BigDecimal("2347.92"), new BigDecimal("2758725827.99990"), new BigDecimal("29375892739852.8"), new BigDecimal("89853749534593985.783"), 8.345d, 4.67f, 123456, 234235L, 3455, "stringfield", "varcharfield", DateUtility.parseBest("2013-07-05 17:01:00"), "charfield",
// There is a regression in Hive 1.2.1 in binary and boolean partition columns. Disable for now.
//"binary",
true, 64, new BigDecimal("37"), new BigDecimal("36.90"), new BigDecimal("3289379872.94565"), new BigDecimal("39579334534534.4"), new BigDecimal("363945093845093890.900"), 8.345d, 4.67f, 123456, 234235L, 3455, "string", "varchar", DateUtility.parseBest("2013-07-05 17:01:00"), DateUtility.parseLocalDate("2013-07-05"), "char").baselineValues( // All fields are null, but partition fields have non-null values
null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null,
// There is a regression in Hive 1.2.1 in binary and boolean partition columns. Disable for now.
//"binary",
true, 64, new BigDecimal("37"), new BigDecimal("36.90"), new BigDecimal("3289379872.94565"), new BigDecimal("39579334534534.4"), new BigDecimal("363945093845093890.900"), 8.345d, 4.67f, 123456, 234235L, 3455, "string", "varchar", DateUtility.parseBest("2013-07-05 17:01:00"), DateUtility.parseLocalDate("2013-07-05"), "char").go();
}
@Test // DRILL-3938
public void testNativeReaderIsDisabledForAlteredPartitionedTable() throws Exception {
String query = "select key, `value`, newcol from hive.kv_parquet order by key limit 1";
// Make sure the HiveScan in plan has no native parquet reader
testPlanMatchingPatterns(query, new String[] {"HiveScan"}, new String[]{"HiveDrillNativeParquetScan"});
}
@Test
public void testHiveConfPropertiesAtSessionLevel() throws Exception {
String query = "select * from hive.sub_dir_table";
try {
alterSession(ExecConstants.HIVE_CONF_PROPERTIES, "hive.mapred.supports.subdirectories=true\nmapred.input.dir.recursive=true");
test(query);
} finally {
resetSessionOption(ExecConstants.HIVE_CONF_PROPERTIES);
}
}
@Test
public void testHiveVarcharPushDown() throws Exception {
String query = "select int_key from hive.kv_native where var_key = 'var_1'";
Map<String, String> properties = new HashMap<>();
properties.put("true", "numRowGroups=1");
properties.put("false", "numRowGroups=4"); // Hive creates parquet files using Parquet lib older than 1.10.0
try {
for (Map.Entry<String, String> property : properties.entrySet()) {
alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, property.getKey());
testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", property.getValue());
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("int_key")
.baselineValues(1)
.go();
}
} finally {
resetSessionOption(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX);
}
}
@Test
public void testHiveDecimalPushDown() throws Exception {
String query = "select int_key from hive.kv_native where dec_key = cast(1.11 as decimal(5, 2))";
// Hive generates parquet files using parquet lib older than 1.10.0
// thus statistics for decimal is not available
testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numRowGroups=4");
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("int_key")
.baselineValues(1)
.go();
}
@Test
public void testInt96TimestampConversionWithNativeReader() throws Exception {
String query = "select timestamp_field from hive.readtest_parquet";
try {
setSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP, true);
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("timestamp_field")
.baselineValues(DateUtility.parseBest("2013-07-05 17:01:00"))
.baselineValues(new Object[]{null})
.go();
} finally {
resetSessionOption(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP);
}
}
@Test
public void testEmptyParquetTable() throws Exception {
String query = "select * from hive.`table_with_empty_parquet`";
testBuilder()
.sqlQuery(query)
.expectsEmptyResultSet()
.go();
testPlanMatchingPatterns(query, "HiveDrillNativeParquetScan", "numRowGroups=1");
}
}