blob: 8a5202bb3e72d431048ffa3fe4b126139e2216a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.logical;
import org.apache.drill.categories.PlannerTest;
import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.test.ClusterFixture;
import org.apache.drill.test.ClusterFixtureBuilder;
import org.apache.drill.test.ClusterTest;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.nio.file.Paths;
import static org.junit.Assert.assertEquals;
@Category({PlannerTest.class, UnlikelyTest.class})
public class TestConvertCountToDirectScan extends ClusterTest {
@BeforeClass
public static void setup() throws Exception {
ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher);
dirTestWatcher.copyResourceToRoot(Paths.get("directcount.parquet"));
startCluster(builder);
}
@Test
public void testCaseDoesNotConvertToDirectScan() throws Exception {
queryBuilder()
.sql("select " +
"count(case when n_name = 'ALGERIA' and n_regionkey = 2 then n_nationkey else null end) as cnt " +
"from dfs.`directcount.parquet`")
.planMatcher()
.include("CASE")
.match();
}
@Test
public void testConvertSimpleCountToDirectScan() throws Exception {
String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`";
queryBuilder()
.sql(sql)
.planMatcher()
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("cnt")
.baselineValues(25L)
.go();
}
@Test
public void testConvertSimpleCountConstToDirectScan() throws Exception {
String sql = "select count(100) as cnt from cp.`tpch/nation.parquet`";
queryBuilder()
.sql(sql)
.planMatcher()
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("cnt")
.baselineValues(25L)
.go();
}
@Test
public void testConvertSimpleCountConstExprToDirectScan() throws Exception {
String sql = "select count(1 + 2) as cnt from cp.`tpch/nation.parquet`";
queryBuilder()
.sql(sql)
.planMatcher()
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("cnt")
.baselineValues(25L)
.go();
}
@Test
public void testDoesNotConvertForDirectoryColumns() throws Exception {
String sql = "select count(dir0) as cnt from cp.`tpch/nation.parquet`";
queryBuilder()
.sql(sql)
.planMatcher()
.include("ParquetGroupScan")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("cnt")
.baselineValues(0L)
.go();
}
@Test
public void testConvertForImplicitColumns() throws Exception {
String sql = "select count(fqn) as cnt from cp.`tpch/nation.parquet`";
queryBuilder()
.sql(sql)
.planMatcher()
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("cnt")
.baselineValues(25L)
.go();
}
@Test
public void ensureConvertForSeveralColumns() throws Exception {
run("use dfs.tmp");
String tableName = "parquet_table_counts";
try {
String newFqnColumnName = "new_fqn";
client.alterSession(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL, newFqnColumnName);
run("create table %s as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
"count(%s) as implicit_count,\n" +
"count(*) as star_count,\n" +
"count(col_int) as int_column_count,\n" +
"count(col_vrchr) as vrchr_column_count\n" +
"from %s", newFqnColumnName, tableName);
queryBuilder()
.sql(sql)
.planMatcher()
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("implicit_count", "star_count", "int_column_count", "vrchr_column_count")
.baselineValues(6L, 6L, 2L, 3L)
.go();
} finally {
client.resetSession(ExecConstants.IMPLICIT_FQN_COLUMN_LABEL);
run("drop table if exists %s", tableName);
}
}
@Test
public void testCorrectCountWithMissingStatistics() throws Exception {
run("use dfs.tmp");
String tableName = "wide_str_table";
try {
// table will contain two partitions: one - with null value, second - with non null value
run("create table %s partition by (col_str) as select * from cp.`parquet/wide_string.parquet`", tableName);
String sql = String.format("select count(col_str) as cnt_str, count(*) as cnt_total from %s", tableName);
// direct scan should not be applied since we don't have statistics
queryBuilder()
.sql(sql)
.planMatcher()
.exclude("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("cnt_str", "cnt_total")
.baselineValues(1L, 2L)
.go();
} finally {
run("drop table if exists %s", tableName);
}
}
@Test
public void testCountsWithMetadataCacheSummary() throws Exception {
run("use dfs.tmp");
String tableName = "parquet_table_counts";
try {
run("create table `%s/1` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("create table `%s/2` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("create table `%s/3` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("create table `%s/4` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
"count(*) as star_count,\n" +
"count(col_int) as int_column_count,\n" +
"count(col_vrchr) as vrchr_column_count\n" +
"from %s", tableName);
queryBuilder()
.sql(sql)
.planMatcher()
.include("numFiles = 1")
.include("usedMetadataSummaryFile = true")
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("star_count", "int_column_count", "vrchr_column_count")
.baselineValues(24L, 8L, 12L)
.go();
} finally {
run("drop table if exists %s", tableName);
}
}
@Test
public void testCountsWithMetadataCacheSummaryAndDirPruning() throws Exception {
run("use dfs.tmp");
String tableName = "parquet_table_counts";
try {
run("create table `%s/1` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("create table `%s/2` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("create table `%s/3` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("create table `%s/4` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
"count(*) as star_count,\n" +
"count(col_int) as int_column_count,\n" +
"count(col_vrchr) as vrchr_column_count\n" +
"from %s where dir0 = 1 ", tableName);
queryBuilder()
.sql(sql)
.planMatcher()
.include("numFiles = 1")
.include("usedMetadataSummaryFile = true")
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("star_count", "int_column_count", "vrchr_column_count")
.baselineValues(6L, 2L, 3L)
.go();
} finally {
run("drop table if exists %s", tableName);
}
}
@Test
public void textConvertAbsentColumn() throws Exception {
String sql = "select count(abc) as cnt from cp.`tpch/nation.parquet`";
queryBuilder()
.sql(sql)
.planMatcher()
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("cnt")
.baselineValues(0L)
.go();
}
@Test
public void testCountsWithWildCard() throws Exception {
run("use dfs.tmp");
String tableName = "parquet_table_counts";
try {
for (int i = 0; i < 10; i++) {
run("create table `%s/12/%s` as select * from cp.`tpch/nation.parquet`", tableName, i);
}
run("create table `%s/2` as select * from cp.`tpch/nation.parquet`", tableName);
run("create table `%s/2/11` as select * from cp.`tpch/nation.parquet`", tableName);
run("create table `%s/2/12` as select * from cp.`tpch/nation.parquet`", tableName);
run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
"count(*) as star_count\n" +
"from `%s/1*`", tableName);
queryBuilder()
.sql(sql)
.planMatcher()
.include("usedMetadataSummaryFile = false")
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("star_count")
.baselineValues(250L)
.go();
} finally {
run("drop table if exists %s", tableName);
}
}
@Test
public void testCountsForLeafDirectories() throws Exception {
run("use dfs.tmp");
String tableName = "parquet_table_counts";
try {
run("create table `%s/1` as select * from cp.`tpch/nation.parquet`", tableName);
run("create table `%s/2` as select * from cp.`tpch/nation.parquet`", tableName);
run("create table `%s/3` as select * from cp.`tpch/nation.parquet`", tableName);
run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
"count(*) as star_count\n" +
"from `%s/1`", tableName);
queryBuilder()
.sql(sql)
.planMatcher()
.include("numFiles = 1")
.include("usedMetadataSummaryFile = true")
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("star_count")
.baselineValues(25L)
.go();
} finally {
run("drop table if exists %s", tableName);
}
}
@Test
public void testCountsForDirWithFilesAndDir() throws Exception {
run("use dfs.tmp");
String tableName = "parquet_table_counts";
try {
run("create table `%s/1` as select * from cp.`tpch/nation.parquet`", tableName);
run("create table `%s/1/2` as select * from cp.`tpch/nation.parquet`", tableName);
run("create table `%s/1/3` as select * from cp.`tpch/nation.parquet`", tableName);
run("refresh table metadata %s", tableName);
String sql = String.format("select count(*) as star_count from `%s/1`", tableName);
queryBuilder()
.sql(sql)
.planMatcher()
.include("numFiles = 1")
.include("usedMetadataSummaryFile = true")
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("star_count")
.baselineValues(75L)
.go();
} finally {
run("drop table if exists %s", tableName);
}
}
@Test
public void testCountsWithNonExistingColumn() throws Exception {
run("use dfs.tmp");
String tableName = "parquet_table_counts_nonex";
try {
run("create table `%s/1` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("create table `%s/2` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("create table `%s/3` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("create table `%s/4` as select * from cp.`parquet/alltypes_optional.parquet`", tableName);
run("refresh table metadata %s", tableName);
String sql = String.format("select\n" +
"count(*) as star_count,\n" +
"count(col_int) as int_column_count,\n" +
"count(col_vrchr) as vrchr_column_count,\n" +
"count(non_existent) as non_existent\n" +
"from %s", tableName);
queryBuilder()
.sql(sql)
.planMatcher()
.include("numFiles = 1")
.include("usedMetadataSummaryFile = true")
.include("DynamicPojoRecordReader")
.match();
testBuilder()
.sqlQuery(sql)
.unOrdered()
.baselineColumns("star_count", "int_column_count", "vrchr_column_count", "non_existent" )
.baselineValues(24L, 8L, 12L, 0L)
.go();
} finally {
run("drop table if exists %s", tableName);
}
}
@Test
public void testSerDe() throws Exception {
String sql = "select count(*) as cnt from cp.`tpch/nation.parquet`";
String plan = queryBuilder().sql(sql).explainJson();
long cnt = queryBuilder().physical(plan).singletonLong();
assertEquals("Counts should match", 25L, cnt);
}
}