blob: 8676c283019f7709ab79d7492d6f6f5d7850d6d5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec;
import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.common.util.TestTools;
import org.apache.drill.exec.work.foreman.SqlUnsupportedException;
import org.apache.drill.exec.work.foreman.UnsupportedFunctionException;
import org.junit.Test;
public class TestWindowFunctions extends BaseTestQuery {
static final String WORKING_PATH = TestTools.getWorkingPath();
static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";
private static void throwAsUnsupportedException(UserException ex) throws Exception {
SqlUnsupportedException.errorClassNameToException(ex.getOrCreatePBError(false).getException().getExceptionClass());
throw ex;
}
@Test // DRILL-3196
public void testSinglePartition() throws Exception {
final String query = "explain plan for select sum(a2) over(partition by a2), count(*) over(partition by a2) \n" +
"from cp.`tpch/nation.parquet`";
test(query);
}
@Test // DRILL-3196
public void testSinglePartitionDefinedInWindowList() throws Exception {
final String query = "explain plan for select sum(a2) over w \n" +
"from cp.`tpch/nation.parquet` \n" +
"window w as (partition by a2 order by a2)";
test(query);
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3196
public void testMultiplePartitions() throws Exception {
try {
final String query = "explain plan for select sum(a2) over(partition by a2), count(*) over(partition by a2,b2,c2) \n" +
"from cp.`tpch/nation.parquet`";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3196
public void testSinglePartitionMultipleOrderBy() throws Exception {
try {
final String query = "explain plan for select sum(a2) over(partition by a2 order by a2), count(*) over(partition by a2 order by b2) \n" +
"from cp.`tpch/nation.parquet`";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3196
public void testMultiplePartitionsDefinedInWindowList() throws Exception {
try {
final String query = "explain plan for select sum(a2) over(partition by a2), count(*) over w \n" +
"from cp.`tpch/nation.parquet` \n" +
"window w as (partition by a2, b2, c2)";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3182
public void testWindowFunctionWithDistinct() throws Exception {
try {
final String query = "explain plan for select a2, count(distinct b2) over(partition by a2) \n" +
"from cp.`tpch/nation.parquet`";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3195
public void testWindowFunctionNTILE() throws Exception {
try {
final String query = "explain plan for select NTILE(1) over(partition by n_name order by n_name) \n" +
"from cp.`tpch/nation.parquet`";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3195
public void testWindowFunctionLAG() throws Exception {
try {
final String query = "explain plan for select LAG(n_nationKey, 1) over(partition by n_name order by n_name) \n" +
"from cp.`tpch/nation.parquet`";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3195
public void testWindowFunctionLEAD() throws Exception {
try {
final String query = "explain plan for select LEAD(n_nationKey, 1) over(partition by n_name order by n_name) \n" +
"from cp.`tpch/nation.parquet`";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3195
public void testWindowFunctionFIRST_VALUE() throws Exception {
try {
final String query = "explain plan for select FIRST_VALUE(n_nationKey) over(partition by n_name order by n_name) \n" +
"from cp.`tpch/nation.parquet`";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3195
public void testWindowFunctionLAST_VALUE() throws Exception {
try {
final String query = "explain plan for select LAST_VALUE(n_nationKey) over(partition by n_name order by n_name) \n" +
"from cp.`tpch/nation.parquet`";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3188
public void testWindowFrame() throws Exception {
try {
final String query = "select a2, sum(a2) over(partition by a2 order by a2 rows between 1 preceding and 1 following ) \n" +
"from cp.`tpch/nation.parquet` t \n" +
"order by a2";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3188
public void testRowsUnboundedPreceding() throws Exception {
try {
final String query = "explain plan for select sum(n_nationKey) over(partition by n_nationKey order by n_nationKey \n" +
"rows UNBOUNDED PRECEDING)" +
"from cp.`tpch/nation.parquet` t \n" +
"order by n_nationKey";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3359
public void testFramesDefinedInWindowClause() throws Exception {
try {
final String query = "explain plan for select sum(n_nationKey) over w \n" +
"from cp.`tpch/nation.parquet` \n" +
"window w as (partition by n_nationKey order by n_nationKey rows UNBOUNDED PRECEDING)";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test(expected = UnsupportedFunctionException.class) // DRILL-3326
public void testWindowWithAlias() throws Exception {
try {
String query = "explain plan for SELECT sum(n_nationkey) OVER (PARTITION BY n_name ORDER BY n_name ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) as col2 \n" +
"from cp.`tpch/nation.parquet`";
test(query);
} catch(UserException ex) {
throwAsUnsupportedException(ex);
throw ex;
}
}
@Test // DRILL-3344
public void testWindowGroupBy() throws Exception {
String query = "explain plan for SELECT max(n_nationkey) OVER (), n_name as col2 \n" +
"from cp.`tpch/nation.parquet` \n" +
"group by n_name";
parseErrorHelper(query);
}
@Test // DRILL-3346
public void testWindowGroupByOnView() throws Exception {
try {
String createView = "create view testWindowGroupByOnView(a, b) as \n" +
"select n_nationkey, n_name from cp.`tpch/nation.parquet`";
String query = "explain plan for SELECT max(a) OVER (), b as col2 \n" +
"from testWindowGroupByOnView \n" +
"group by b";
test("use dfs_test.tmp");
test(createView);
parseErrorHelper(query);
} finally {
test("drop view testWindowGroupByOnView");
}
}
@Test // DRILL-3188
public void testWindowFrameEquivalentToDefault() throws Exception {
final String query1 = "explain plan for select sum(n_nationKey) over(partition by n_nationKey order by n_nationKey) \n" +
"from cp.`tpch/nation.parquet` t \n" +
"order by n_nationKey";
final String query2 = "explain plan for select sum(n_nationKey) over(partition by n_nationKey order by n_nationKey \n" +
"range between unbounded preceding and current row) \n" +
"from cp.`tpch/nation.parquet` t \n" +
"order by n_nationKey";
final String query3 = "explain plan for select sum(n_nationKey) over(partition by n_nationKey \n" +
"rows BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)" +
"from cp.`tpch/nation.parquet` t \n" +
"order by n_nationKey";
test(query1);
test(query2);
test(query3);
}
@Test // DRILL-3204
public void testWindowWithJoin() throws Exception {
final String query = "select sum(t1.r_regionKey) over(partition by t1.r_regionKey) \n" +
"from cp.`tpch/region.parquet` t1, cp.`tpch/nation.parquet` t2 \n" +
"where t1.r_regionKey = t2.n_nationKey \n" +
"group by t1.r_regionKey";
test(query);
}
@Test // DRILL-3298
public void testCountEmptyPartitionByWithExchange() throws Exception {
String query = String.format("select count(*) over (order by o_orderpriority) as cnt from dfs.`%s/multilevel/parquet` where o_custkey < 100", TEST_RES_PATH);
try {
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("cnt")
.optionSettingQueriesForTestQuery("alter session set `planner.slice_target` = 1")
.baselineValues(1l)
.baselineValues(4l)
.baselineValues(4l)
.baselineValues(4l)
.build().run();
} finally {
test("alter session set `planner.slice_target` = " + ExecConstants.SLICE_TARGET_DEFAULT);
}
}
/* Verify the output of aggregate functions (which are reduced
* eg: avg(x) = sum(x)/count(x)) return results of the correct
* data type (double)
*/
@Test
public void testAvgVarianceWindowFunctions() throws Exception {
final String avgQuery = "select avg(n_nationkey) over (partition by n_nationkey) col1 " +
"from cp.`tpch/nation.parquet` " +
"where n_nationkey = 1";
testBuilder()
.sqlQuery(avgQuery)
.unOrdered()
.baselineColumns("col1")
.baselineValues(1.0d)
.go();
final String varianceQuery = "select var_pop(n_nationkey) over (partition by n_nationkey) col1 " +
"from cp.`tpch/nation.parquet` " +
"where n_nationkey = 1";
testBuilder()
.sqlQuery(varianceQuery)
.unOrdered()
.baselineColumns("col1")
.baselineValues(0.0d)
.go();
}
@Test
public void testWindowFunctionWithKnownType() throws Exception {
final String query = "select sum(cast(col_int as int)) over (partition by col_varchar) as col1 " +
"from cp.`jsoninput/large_int.json` limit 1";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("col1")
.baselineValues(2147483649l)
.go();
final String avgQuery = "select avg(cast(col_int as int)) over (partition by col_varchar) as col1 " +
"from cp.`jsoninput/large_int.json` limit 1";
testBuilder()
.sqlQuery(avgQuery)
.unOrdered()
.baselineColumns("col1")
.baselineValues(1.0737418245E9d)
.go();
}
@Test
public void testCompoundIdentifierInWindowDefinition() throws Exception {
String root = FileUtils.getResourceAsFile("/multilevel/csv/1994/Q1/orders_94_q1.csv").toURI().toString();
String query = String.format("SELECT count(*) OVER w as col1, count(*) OVER w as col2 \n" +
"FROM dfs_test.`%s` \n" +
"WINDOW w AS (PARTITION BY columns[1] ORDER BY columns[0] DESC)", root);
// Validate the result
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("col1", "col2")
.baselineValues((long) 1, (long) 1)
.baselineValues((long) 1, (long) 1)
.baselineValues((long) 1, (long) 1)
.baselineValues((long) 1, (long) 1)
.baselineValues((long) 1, (long) 1)
.baselineValues((long) 1, (long) 1)
.baselineValues((long) 1, (long) 1)
.baselineValues((long) 1, (long) 1)
.baselineValues((long) 1, (long) 1)
.baselineValues((long) 1, (long) 1)
.build()
.run();
}
@Test
public void testRankWithGroupBy() throws Exception {
final String query = "select dense_rank() over (order by l_suppkey) as rank1 " +
" from cp.`tpch/lineitem.parquet` group by l_partkey, l_suppkey order by 1 desc limit 1";
testBuilder()
.sqlQuery(query)
.unOrdered()
.baselineColumns("rank1")
.baselineValues(100l)
.go();
}
@Test // DRILL-3404
public void testWindowSumAggIsNotNull() throws Exception {
String query = String.format("select count(*) cnt from (select sum ( c1 ) over ( partition by c2 order by c1 asc nulls first ) w_sum from dfs.`%s/window/table_with_nulls.parquet` ) sub_query where w_sum is not null", TEST_RES_PATH);
testBuilder()
.sqlQuery(query)
.ordered()
.baselineColumns("cnt")
.baselineValues(26l)
.build().run();
}
}