| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.drill.exec.store.parquet; |
| |
| import org.apache.commons.io.FileUtils; |
| import org.apache.drill.categories.ParquetTest; |
| import org.apache.drill.categories.UnlikelyTest; |
| import org.apache.drill.exec.ExecConstants; |
| import org.apache.drill.test.ClusterFixture; |
| import org.apache.drill.test.ClusterFixtureBuilder; |
| import org.apache.drill.test.ClusterTest; |
| import org.junit.After; |
| import org.junit.BeforeClass; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.math.BigDecimal; |
| import java.nio.file.Paths; |
| import java.time.Instant; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertTrue; |
| |
| @Category({ParquetTest.class, UnlikelyTest.class}) |
| public class TestPushDownAndPruningForDecimal extends ClusterTest { |
| |
| private static File fileStore; |
| private List<String> tablesToDrop = new ArrayList<>(); |
| |
| @BeforeClass |
| public static void setup() throws Exception { |
| ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher); |
| /* |
| Contains two data files generated by Drill 1.13.0 version (before upgrade to Parquet lib 1.10.0). |
| Also contains .drill.parquet_metadata generated for these two files. |
| |
| Schema: |
| |
| message root { |
| required int32 part_int_32 (DECIMAL(5,2)); |
| required int32 val_int_32 (DECIMAL(5,2)); |
| required int64 part_int_64 (DECIMAL(16,2)); |
| required int64 val_int_64 (DECIMAL(16,2)); |
| required fixed_len_byte_array(12) part_fixed (DECIMAL(16,2)); |
| required fixed_len_byte_array(12) val_fixed (DECIMAL(16,2)); |
| } |
| |
| Data: |
| |
| 0_0_1.parquet |
| ----------------------------------------------------------------------------- |
| part_int_32 | val_int_32 | part_int_64 | val_int_64 | part_fixed | val_fixed |
| ----------------------------------------------------------------------------- |
| 1.00 | 1.05 | 1.00 | 1.05 | 1.00 | 1.05 |
| 1.00 | 10.0 | 1.00 | 10.0 | 1.00 | 10.0 |
| 1.00 | 10.25 | 1.00 | 10.25 | 1.00 | 10.25 |
| ----------------------------------------------------------------------------- |
| |
| 0_0_2.parquet |
| ----------------------------------------------------------------------------- |
| part_int_32 | val_int_32 | part_int_64 | val_int_64 | part_fixed | val_fixed |
| ----------------------------------------------------------------------------- |
| 2.00 | 2.05 | 2.00 | 2.05 | 2.00 | 2.05 |
| 2.00 | 20.0 | 2.00 | 20.0 | 2.00 | 20.0 |
| 2.00 | 20.25 | 2.00 | 20.25 | 2.00 | 20.25 |
| ----------------------------------------------------------------------------- |
| */ |
| fileStore = dirTestWatcher.copyResourceToRoot(Paths.get("parquet", "decimal_gen_1_13_0")); |
| startCluster(builder); |
| } |
| |
| @After |
| public void reset() { |
| // reset all session options that might have been used in the tests |
| client.resetSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS); |
| client.resetSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS); |
| client.resetSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX); |
| |
| // drop all tables |
| tablesToDrop.forEach( |
| t -> client.runSqlSilently(String.format("drop table if exists %s", t)) |
| ); |
| tablesToDrop.clear(); |
| } |
| |
| /** |
| * Check partition pruning for old and new int_32 and int_64 decimal files |
| * without using Drill metadata file. |
| */ |
| @Test |
| public void testOldNewIntDecimalPruningNoMeta() throws Exception { |
| String oldTable = createTable("old_int_decimal_pruning_no_meta", true); |
| String newTable = "dfs.`tmp`.new_int_decimal_pruning_no_meta"; |
| tablesToDrop.add(newTable); |
| client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true); |
| queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", newTable, oldTable)).run(); |
| for (String column : Arrays.asList("part_int_32", "part_int_64")) { |
| for (String table : Arrays.asList(oldTable, newTable)) { |
| String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.00 as decimal(5, 2))", table, column); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=false") |
| .exclude("Filter") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("val_int_32", "val_int_64") |
| .baselineValues(new BigDecimal("1.05"), new BigDecimal("1.05")) |
| .baselineValues(new BigDecimal("10.00"), new BigDecimal("10.00")) |
| .baselineValues(new BigDecimal("10.25"), new BigDecimal("10.25")) |
| .go(); |
| } |
| } |
| } |
| |
| /** |
| * Check partition pruning for old and new int_32 and int_64 decimal files |
| * using Drill metadata file generated by current Drill version (i.e. after upgrade to parquet 1.10.0) |
| */ |
| @Test |
| public void testOldNewIntDecimalPruningWithMeta() throws Exception { |
| String oldTable = createTable("old_int_decimal_pruning_with_meta", true); |
| String newTable = "dfs.`tmp`.new_int_decimal_pruning_with_meta"; |
| tablesToDrop.add(newTable); |
| client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true); |
| queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", newTable, oldTable)).run(); |
| |
| for (String table : Arrays.asList(oldTable, newTable)) { |
| queryBuilder().sql(String.format("refresh table metadata %s", table)).run(); |
| for (String column : Arrays.asList("part_int_32", "part_int_64")) { |
| String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(2.00 as decimal(5,2))", table, column); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=true") |
| .exclude("Filter") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("val_int_32", "val_int_64") |
| .baselineValues(new BigDecimal("2.05"), new BigDecimal("2.05")) |
| .baselineValues(new BigDecimal("20.00"), new BigDecimal("20.00")) |
| .baselineValues(new BigDecimal("20.25"), new BigDecimal("20.25")) |
| .go(); |
| } |
| } |
| } |
| |
| /** |
| * Check filter push down for old int_32 and int_64 decimal files |
| * without using Drill metadata file. |
| */ |
| @Test |
| public void testOldIntDecimalPushDownNoMeta() throws Exception { |
| String table = createTable("old_int_decimal_push_down_no_meta", true); |
| for (String column : Arrays.asList("val_int_32", "val_int_64")) { |
| String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.05 as decimal(5, 2))", |
| table, column); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=2", "usedMetadataFile=false") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("val_int_32", "val_int_64") |
| .baselineValues(new BigDecimal("1.05"), new BigDecimal("1.05")) |
| .go(); |
| } |
| } |
| |
| /** |
| * Check filter push down for old int_32 and int_64 decimal files |
| * using old Drill metadata file, i.e. generated before upgrade to parquet 1.10.0 |
| */ |
| @Test |
| public void testOldIntDecimalPushDownWithOldMeta() throws Exception { |
| String table = createTable("old_int_decimal_push_down_with_old_meta", false); |
| for (String column : Arrays.asList("val_int_32", "val_int_64")) { |
| String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(1.05 as decimal(5, 2))", |
| table, column); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=true") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("val_int_32", "val_int_64") |
| .baselineValues(new BigDecimal("1.05"), new BigDecimal("1.05")) |
| .go(); |
| } |
| } |
| |
| /** |
| * Check filter push down for old int_32 and int_64 decimal files |
| * using new Drill metadata file, i.e. generated after upgrade to parquet 1.10.0 |
| */ |
| @Test |
| public void testOldIntDecimalPushDownWithNewMeta() throws Exception { |
| String table = createTable("old_int_decimal_push_down_with_new_meta", false); |
| queryBuilder().sql(String.format("refresh table metadata %s", table)).run(); |
| |
| for (String column : Arrays.asList("val_int_32", "val_int_64")) { |
| String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.25 as decimal(5, 2))", |
| table, column); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=2", "usedMetadataFile=true") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("val_int_32", "val_int_64") |
| .baselineValues(new BigDecimal("20.25"), new BigDecimal("20.25")) |
| .go(); |
| } |
| } |
| |
| /** |
| * Check filter push down for new int_32 and int_64 decimal files |
| * without using Drill metadata file. |
| */ |
| @Test |
| public void testNewIntDecimalPushDownNoMeta() throws Exception { |
| String dataTable = createTable("data_table_int_decimal_push_down_no_meta", true); |
| String table = "dfs.`tmp`.new_int_decimal_push_down_no_meta"; |
| tablesToDrop.add(table); |
| client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true); |
| queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", table, dataTable)).run(); |
| |
| for (String column : Arrays.asList("val_int_32", "val_int_64")) { |
| String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.25 as decimal(5, 2))", |
| table, column); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=false") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("val_int_32", "val_int_64") |
| .baselineValues(new BigDecimal("20.25"), new BigDecimal("20.25")) |
| .go(); |
| } |
| } |
| |
| /** |
| * Check filter push down for new int_32 and int_64 decimal files |
| * using Drill metadata file. |
| */ |
| @Test |
| public void testNewIntDecimalPushDownWithMeta() throws Exception { |
| String dataTable = createTable("data_table_int_decimal_push_down_no_meta", true); |
| String table = "dfs.`tmp`.new_int_decimal_push_down_with_meta"; |
| tablesToDrop.add(table); |
| client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, true); |
| queryBuilder().sql(String.format("create table %s partition by (part_int_32) as select * from %s", table, dataTable)).run(); |
| queryBuilder().sql(String.format("refresh table metadata %s", table)).run(); |
| |
| for (String column : Arrays.asList("val_int_32", "val_int_64")) { |
| String query = String.format("select val_int_32, val_int_64 from %s where %s = cast(20.0 as decimal(5, 2))", |
| table, column); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=true") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("val_int_32", "val_int_64") |
| .baselineValues(new BigDecimal("20.00"), new BigDecimal("20.00")) |
| .go(); |
| } |
| } |
| |
| /** |
| * Check partition pruning for old and new fixed decimal files |
| * without using Drill metadata file. |
| */ |
| @Test |
| public void testOldNewFixedDecimalPruningNoMeta() throws Exception { |
| String oldTable = createTable("old_fixed_decimal_pruning_no_meta", true); |
| String newTable = "dfs.`tmp`.new_fixed_decimal_pruning_no_meta"; |
| tablesToDrop.add(newTable); |
| client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false); |
| client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "fixed_len_byte_array"); |
| queryBuilder().sql(String.format("create table %s partition by (part_fixed) as select part_fixed, val_fixed from %s", |
| newTable, oldTable)).run(); |
| |
| for (String table : Arrays.asList(oldTable, newTable)) { |
| for (String optionValue : Arrays.asList("true", "false", "")) { |
| client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue); |
| String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(1.00 as decimal(5, 2))", table); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=false") |
| .exclude("Filter") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_fixed", "val_fixed") |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05")) |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00")) |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25")) |
| .go(); |
| } |
| } |
| } |
| |
| /** |
| * Check partition pruning for old and new fixed decimal files |
| * using old Drill metadata file, i.e. generated before upgrade to parquet 1.10.0. |
| */ |
| @Test |
| public void testOldFixedDecimalPruningWithOldMeta() throws Exception { |
| String table = createTable("old_fixed_decimal_pruning_with_old_meta", false); |
| for (String optionValue : Arrays.asList("true", "false", "")) { |
| client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue); |
| String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(1.00 as decimal(5, 2))", table); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=true") |
| .exclude("Filter") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_fixed", "val_fixed") |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05")) |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00")) |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25")) |
| .go(); |
| } |
| } |
| |
| /** |
| * Check partition pruning for old and new fixed decimal files |
| * using new Drill metadata file, i.e. generated after upgrade to parquet 1.10.0. |
| */ |
| @Test |
| public void testOldNewFixedDecimalPruningWithNewMeta() throws Exception { |
| String table = createTable("old_fixed_decimal_pruning_with_new_meta", false); |
| queryBuilder().sql(String.format("refresh table metadata %s", table)).run(); |
| for (String optionValue : Arrays.asList("true", "false", "")) { |
| client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, optionValue); |
| String query = String.format("select part_fixed, val_fixed from %s where part_fixed = cast(2.00 as decimal(5, 2))", table); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=true") |
| .exclude("Filter") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_fixed", "val_fixed") |
| .baselineValues(new BigDecimal("2.00"), new BigDecimal("2.05")) |
| .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.00")) |
| .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.25")) |
| .go(); |
| } |
| } |
| |
| /** |
| * Check filter push down for old fixed decimal files |
| * without using Drill metadata file. |
| */ |
| @Test |
| public void testOldFixedDecimalPushDownNoMeta() throws Exception { |
| String table = createTable("old_fixed_decimal_push_down_no_meta", true); |
| String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(1.05 as decimal(5, 2))", table); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| // statistics for fixed decimal is not available for files generated prior to parquet 1.10.0 version |
| .include("numRowGroups=2", "usedMetadataFile=false") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_fixed", "val_fixed") |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05")) |
| .go(); |
| } |
| |
| /** |
| * Check filter push down for old fixed decimal with |
| * using old Drill metadata file, i.e. created prior to upgrade to parquet 1.10.0. |
| */ |
| @Test |
| public void testOldFixedDecimalPushDownWithOldMeta() throws Exception { |
| String table = createTable("old_fixed_decimal_push_down_with_old_meta", false); |
| String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(1.05 as decimal(5, 2))", table); |
| |
| Map<String, String> properties = new HashMap<>(); |
| properties.put("true", "numRowGroups=1"); |
| properties.put("false", "numRowGroups=2"); |
| |
| for (Map.Entry<String, String> property : properties.entrySet()) { |
| client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, property.getKey()); |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include(property.getValue(), "usedMetadataFile=true") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_fixed", "val_fixed") |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05")) |
| .go(); |
| } |
| } |
| |
| /** |
| * Check filter push down for old fixed decimal with |
| * using new Drill metadata file, i.e. created after upgrade to parquet 1.10.0. |
| */ |
| @Test |
| public void testOldFixedDecimalPushDownWithNewMeta() throws Exception { |
| String table = createTable("old_fixed_decimal_push_down_with_new_meta", true); |
| String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(20.25 as decimal(5, 2))", table); |
| |
| queryBuilder().sql(String.format("refresh table metadata %s", table)).run(); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=2", "usedMetadataFile=true") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_fixed", "val_fixed") |
| .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.25")) |
| .go(); |
| } |
| |
| /** |
| * Check filter push down for fixed decimal generated after upgrade to parquet 1.10.0 |
| * with and without using Drill metadata file. |
| */ |
| @Test |
| public void testNewFixedDecimalPushDown() throws Exception { |
| String dataTable = createTable("data_table_for_fixed_decimal_push_down_no_meta", true); |
| String newTable = "dfs.`tmp`.new_fixed_decimal_pruning"; |
| tablesToDrop.add(newTable); |
| |
| client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false); |
| client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "fixed_len_byte_array"); |
| queryBuilder().sql(String.format("create table %s partition by (part_fixed) as select part_fixed, val_fixed from %s", |
| newTable, dataTable)).run(); |
| |
| String query = String.format("select part_fixed, val_fixed from %s where val_fixed = cast(1.05 as decimal(5, 2))", newTable); |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=false") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_fixed", "val_fixed") |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05")) |
| .go(); |
| |
| queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run(); |
| |
| // metadata for binary is allowed only after Drill 1.15.0 |
| // set string signed option to true since test was written on Drill 1.15.0-SNAPSHOT version |
| client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "true"); |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=true") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_fixed", "val_fixed") |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05")) |
| .go(); |
| } |
| |
| /** |
| * Check partition pruning for binary decimal with and without |
| * using Drill metadata file. |
| */ |
| @Test |
| public void testBinaryDecimalPruning() throws Exception { |
| String dataTable = createTable("data_table for_binary_decimal_pruning_no_meta", true); |
| String newTable = "dfs.`tmp`.binary_decimal_pruning"; |
| tablesToDrop.add(newTable); |
| |
| client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false); |
| client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "binary"); |
| queryBuilder().sql(String.format("create table %s partition by (part_binary) " |
| + "as select part_int_32 as part_binary, val_int_32 as val_binary from %s", newTable, dataTable)).run(); |
| |
| String query = String.format("select part_binary, val_binary from %s where part_binary = cast(1.00 as decimal(5, 2))", newTable); |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=false") |
| .exclude("Filter") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_binary", "val_binary") |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05")) |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00")) |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25")) |
| .go(); |
| |
| queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run(); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=true") |
| .exclude("Filter") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_binary", "val_binary") |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05")) |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.00")) |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("10.25")) |
| .go(); |
| } |
| |
| /** |
| * Check filter push down for binary decimal with and without |
| * using Drill metadata file. |
| */ |
| @Test |
| public void testBinaryDecimalPushDown() throws Exception { |
| String dataTable = createTable("data_table_for_binary_decimal_push_down_no_meta", true); |
| String newTable = "dfs.`tmp`.binary_decimal_push_down"; |
| tablesToDrop.add(newTable); |
| |
| client.alterSession(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS, false); |
| client.alterSession(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS, "binary"); |
| queryBuilder().sql(String.format("create table %s partition by (part_binary) " |
| + "as select part_int_32 as part_binary, val_int_32 as val_binary from %s", newTable, dataTable)).run(); |
| |
| String query = String.format("select part_binary, val_binary from %s where val_binary = cast(1.05 as decimal(5, 2))", newTable); |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=false") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_binary", "val_binary") |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05")) |
| .go(); |
| |
| queryBuilder().sql(String.format("refresh table metadata %s", newTable)).run(); |
| |
| // metadata for binary is allowed only after Drill 1.15.0 |
| // set string signed option to true, since test was written on Drill 1.15.0-SNAPSHOT version |
| client.alterSession(ExecConstants.PARQUET_READER_STRINGS_SIGNED_MIN_MAX, "true"); |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=true") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part_binary", "val_binary") |
| .baselineValues(new BigDecimal("1.00"), new BigDecimal("1.05")) |
| .go(); |
| } |
| |
| /** |
| * Check partition pruning for decimals with different scale. |
| */ |
| @Test |
| public void testDecimalPruningDifferentScale() throws Exception { |
| String dataTable = createTable("data_table_for_different_scale_pruning_check", true); |
| String newTable = "dfs.`tmp`.table_for_different_scale_pruning_check"; |
| tablesToDrop.add(newTable); |
| queryBuilder().sql(String.format("create table %s partition by (part) as select part_int_32 as part, val_int_32 as val from %s", |
| newTable, dataTable)).run(); |
| |
| for (String decimalType : Arrays.asList("decimal(5, 0)", "decimal(10, 5)", "decimal(5, 1)")) { |
| String query = String.format("select part, val from %s where part = cast(2.0 as %s)", newTable, decimalType); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=false") |
| .exclude("Filter") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part", "val") |
| .baselineValues(new BigDecimal("2.00"), new BigDecimal("2.05")) |
| .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.00")) |
| .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.25")) |
| .go(); |
| } |
| } |
| |
| /** |
| * Check filter push down for decimals with different scale. |
| */ |
| @Test |
| public void testDecimalPushDownDifferentScale() throws Exception { |
| String dataTable = createTable("data_table_for_different_scale_push_down_check", true); |
| String newTable = "dfs.`tmp`.table_for_different_scale_push_down_check"; |
| tablesToDrop.add(newTable); |
| queryBuilder().sql(String.format("create table %s partition by (part) as select part_int_32 as part, val_int_32 as val from %s", |
| newTable, dataTable)).run(); |
| |
| for (String decimalType : Arrays.asList("decimal(5, 0)", "decimal(10, 5)", "decimal(5, 1)")) { |
| String query = String.format("select part, val from %s where val = cast(20.0 as %s)", newTable, decimalType); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("numRowGroups=1", "usedMetadataFile=false") |
| .match(); |
| |
| client.testBuilder() |
| .sqlQuery(query) |
| .unOrdered() |
| .baselineColumns("part", "val") |
| .baselineValues(new BigDecimal("2.00"), new BigDecimal("20.00")) |
| .go(); |
| } |
| } |
| |
| @Test |
| public void testDecimalPruningWithNullPartition() throws Exception { |
| List<String> ctasQueries = new ArrayList<>(); |
| // decimal stores as fixed_len_byte_array |
| ctasQueries.add("create table %s partition by (manager_id) as " + |
| "select * from cp.`parquet/fixedlenDecimal.parquet`"); |
| // decimal stores as int32 |
| ctasQueries.add("create table %s partition by (manager_id) as " + |
| "select cast(manager_id as decimal(6, 0)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " + |
| "from cp.`parquet/fixedlenDecimal.parquet`"); |
| // decimal stores as int64 |
| ctasQueries.add("create table %s partition by (manager_id) as " + |
| "select cast(manager_id as decimal(18, 6)) as manager_id, EMPLOYEE_ID, FIRST_NAME, LAST_NAME " + |
| "from cp.`parquet/fixedlenDecimal.parquet`"); |
| final String decimalPartitionTable = "dfs.tmp.`decimal_optional_partition`"; |
| for (String ctasQuery : ctasQueries) { |
| try { |
| queryBuilder().sql(String.format(ctasQuery, decimalPartitionTable)).run(); |
| |
| String query = String.format("select * from %s where manager_id = 148", decimalPartitionTable); |
| int expectedRowCount = 6; |
| |
| long actualRowCount = client.queryBuilder().sql(query).run().recordCount(); |
| assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount); |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("usedMetadataFile=false") |
| .exclude("Filter") |
| .match(); |
| |
| queryBuilder().sql(String.format("refresh table metadata %s", decimalPartitionTable)).run(); |
| |
| actualRowCount = client.queryBuilder().sql(query).run().recordCount(); |
| assertEquals("Row count does not match the expected value", expectedRowCount, actualRowCount); |
| |
| |
| queryBuilder() |
| .sql(query) |
| .planMatcher() |
| .include("usedMetadataFile=true") |
| .exclude("Filter") |
| .match(); |
| } finally { |
| client.runSqlSilently(String.format("drop table if exists %s", decimalPartitionTable)); |
| } |
| } |
| } |
| |
| private String createTable(String tableName, boolean removeMetadata) throws IOException { |
| File rootDir = dirTestWatcher.getRootDir(); |
| File tablePath = new File(rootDir, String.format("%s_%s", tableName, UUID.randomUUID())); |
| FileUtils.copyDirectory(fileStore, tablePath); |
| File metadata = new File(tablePath, ".drill.parquet_metadata"); |
| if (removeMetadata) { |
| assertTrue(metadata.delete()); |
| } else { |
| // metadata modification time should be higher |
| // than directory modification time otherwise metadata file will be regenerated |
| assertTrue(metadata.setLastModified(Instant.now().toEpochMilli())); |
| } |
| String table = String.format("dfs.`root`.`%s`", tablePath.getName()); |
| tablesToDrop.add(table); |
| return table; |
| } |
| |
| |
| } |