blob: 553017698ccb5bf454443c934e89811b3b0f9e83 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.window;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Properties;
import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.test.DrillTestWrapper;
import org.apache.drill.categories.UnlikelyTest;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserRemoteException;
import org.apache.drill.exec.ExecConstants;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
public class TestWindowFrame extends BaseTestQuery {
@BeforeClass
public static void setupMSortBatchSize() throws IOException {
// make sure memory sorter outputs 20 rows per batch
final Properties props = cloneDefaultTestConfigProperties();
props.put(ExecConstants.EXTERNAL_SORT_MSORT_MAX_BATCHSIZE, Integer.toString(20));
updateTestCluster(1, DrillConfig.create(props));
dirTestWatcher.copyResourceToRoot(Paths.get("window"));
}
private DrillTestWrapper buildWindowQuery(final String tableName, final boolean withPartitionBy, final int numBatches)
throws Exception {
return testBuilder()
.sqlQuery(getFile("window/q1.sql"), tableName, withPartitionBy ? "(partition by position_id)":"()")
.ordered()
.csvBaselineFile("window/" + tableName + (withPartitionBy ? ".pby" : "") + ".tsv")
.baselineColumns("count", "sum")
.expectsNumBatches(numBatches)
.build();
}
private DrillTestWrapper buildWindowWithOrderByQuery(final String tableName, final boolean withPartitionBy,
final int numBatches) throws Exception {
return testBuilder()
.sqlQuery(getFile("window/q2.sql"), tableName, withPartitionBy ? "(partition by position_id order by sub)" : "(order by sub)")
.ordered()
.csvBaselineFile("window/" + tableName + (withPartitionBy ? ".pby" : "") + ".oby.tsv")
.baselineColumns("count", "sum", "row_number", "rank", "dense_rank", "cume_dist", "percent_rank")
.expectsNumBatches(numBatches)
.build();
}
private void runTest(final String tableName, final boolean withPartitionBy, final boolean withOrderBy, final int numBatches) throws Exception {
DrillTestWrapper testWrapper = withOrderBy ?
buildWindowWithOrderByQuery(tableName, withPartitionBy, numBatches) : buildWindowQuery(tableName, withPartitionBy, numBatches);
testWrapper.run();
}
private void runTest(final String tableName, final int numBatches) throws Exception {
// we do expect an "extra" empty batch
runTest(tableName, true, true, numBatches + 1);
runTest(tableName, true, false, numBatches + 1);
runTest(tableName, false, true, numBatches + 1);
runTest(tableName, false, false, numBatches + 1);
}
/**
* Single batch with a single partition (position_id column)
*/
@Test
public void testB1P1() throws Exception {
runTest("b1.p1", 1);
}
/**
* Single batch with 2 partitions (position_id column)
*/
@Test
public void testB1P2() throws Exception {
runTest("b1.p2", 1);
}
@Test
public void testMultipleFramers() throws Exception {
final String window = " OVER(PARTITION BY position_id ORDER by sub)";
test("SELECT COUNT(*)"+window+", SUM(salary)"+window+", ROW_NUMBER()"+window+", RANK()"+window+" " +
"FROM dfs.`window/b1.p1`"
);
}
@Test
public void testUnboundedFollowing() throws Exception {
testBuilder()
.sqlQuery(getFile("window/q3.sql"))
.ordered()
.sqlBaselineQuery(getFile("window/q4.sql"))
.build()
.run();
}
@Test
public void testAggregateRowsUnboundedAndCurrentRow() throws Exception {
final String table = "dfs.`window/b4.p4`";
testBuilder()
.sqlQuery(getFile("window/aggregate_rows_unbounded_current.sql"), table)
.ordered()
.sqlBaselineQuery(getFile("window/aggregate_rows_unbounded_current_baseline.sql"), table)
.build()
.run();
}
@Test
public void testLastValueRowsUnboundedAndCurrentRow() throws Exception {
final String table = "dfs.`window/b4.p4`";
testBuilder()
.sqlQuery(getFile("window/last_value_rows_unbounded_current.sql"), table)
.unOrdered()
.sqlBaselineQuery(getFile("window/last_value_rows_unbounded_current_baseline.sql"), table)
.build()
.run();
}
@Test
public void testAggregateRangeCurrentAndCurrent() throws Exception {
final String table = "dfs.`window/b4.p4`";
testBuilder()
.sqlQuery(getFile("window/aggregate_range_current_current.sql"), table)
.unOrdered()
.sqlBaselineQuery(getFile("window/aggregate_range_current_current_baseline.sql"), table)
.build()
.run();
}
@Test
public void testFirstValueRangeCurrentAndCurrent() throws Exception {
final String table = "dfs.`window/b4.p4`";
testBuilder()
.sqlQuery(getFile("window/first_value_range_current_current.sql"), table)
.unOrdered()
.sqlBaselineQuery(getFile("window/first_value_range_current_current_baseline.sql"), table)
.build()
.run();
}
/**
* 2 batches with 2 partitions (position_id column), each batch contains a different partition
*/
@Test
public void testB2P2() throws Exception {
runTest("b2.p2", 2);
}
/**
* 2 batches with 4 partitions, one partition has rows in both batches
*/
@Test
public void testB2P4() throws Exception {
runTest("b2.p4", 2);
}
/**
* 3 batches with 2 partitions, one partition has rows in all 3 batches
*/
@Test
public void testB3P2() throws Exception {
runTest("b3.p2", 3);
}
/**
* 4 batches with 4 partitions. After processing 1st batch, when innerNext() is called again, framer can process
* current batch without the need to call next(incoming).
*/
@Test
public void testB4P4() throws Exception {
runTest("b4.p4", 4);
}
@Test // DRILL-1862
@Category(UnlikelyTest.class)
public void testEmptyPartitionBy() throws Exception {
test("SELECT employee_id, position_id, salary, SUM(salary) OVER(ORDER BY position_id) FROM cp.`employee.json` LIMIT 10");
}
@Test // DRILL-3172
@Category(UnlikelyTest.class)
public void testEmptyOverClause() throws Exception {
test("SELECT employee_id, position_id, salary, SUM(salary) OVER() FROM cp.`employee.json` LIMIT 10");
}
@Test // DRILL-3218
@Category(UnlikelyTest.class)
public void testMaxVarChar() throws Exception {
test(getFile("window/q3218.sql"));
}
@Test // DRILL-3220
@Category(UnlikelyTest.class)
public void testCountConst() throws Exception {
test(getFile("window/q3220.sql"));
}
@Test // DRILL-3604
@Category(UnlikelyTest.class)
public void testFix3604() throws Exception {
// make sure the query doesn't fail
test(getFile("window/3604.sql"));
}
@Test // DRILL-3605
@Category(UnlikelyTest.class)
public void testFix3605() throws Exception {
testBuilder()
.sqlQuery(getFile("window/3605.sql"))
.ordered()
.csvBaselineFile("window/3605.tsv")
.baselineColumns("col2", "lead_col2")
.build()
.run();
}
@Test // DRILL-3606
@Category(UnlikelyTest.class)
public void testFix3606() throws Exception {
testBuilder()
.sqlQuery(getFile("window/3606.sql"))
.ordered()
.csvBaselineFile("window/3606.tsv")
.baselineColumns("col2", "lead_col2")
.build()
.run();
}
@Test
public void testLead() throws Exception {
testBuilder()
.sqlQuery(getFile("window/lead.oby.sql"))
.ordered()
.csvBaselineFile("window/b4.p4.lead.oby.tsv")
.baselineColumns("lead")
.build()
.run();
}
@Test
public void testLagWithPby() throws Exception {
testBuilder()
.sqlQuery(getFile("window/lag.pby.oby.sql"))
.ordered()
.csvBaselineFile("window/b4.p4.lag.pby.oby.tsv")
.baselineColumns("lag")
.build()
.run();
}
@Test
public void testLag() throws Exception {
testBuilder()
.sqlQuery(getFile("window/lag.oby.sql"))
.ordered()
.csvBaselineFile("window/b4.p4.lag.oby.tsv")
.baselineColumns("lag")
.build()
.run();
}
@Test
@Category(UnlikelyTest.class)
public void testLeadWithPby() throws Exception {
testBuilder()
.sqlQuery(getFile("window/lead.pby.oby.sql"))
.ordered()
.csvBaselineFile("window/b4.p4.lead.pby.oby.tsv")
.baselineColumns("lead")
.build()
.run();
}
@Test
public void testFirstValue() throws Exception {
testBuilder()
.sqlQuery(getFile("window/fval.pby.sql"))
.ordered()
.csvBaselineFile("window/b4.p4.fval.pby.tsv")
.baselineColumns("first_value")
.build()
.run();
}
@Test
public void testLastValue() throws Exception {
testBuilder()
.sqlQuery(getFile("window/lval.pby.oby.sql"))
.ordered()
.csvBaselineFile("window/b4.p4.lval.pby.oby.tsv")
.baselineColumns("last_value")
.build()
.run();
}
@Test
public void testFirstValueAllTypes() throws Exception {
// make sure all types are handled properly
test(getFile("window/fval.alltypes.sql"));
}
@Test
public void testLastValueAllTypes() throws Exception {
// make sure all types are handled properly
test(getFile("window/fval.alltypes.sql"));
}
@Test
public void testNtile() throws Exception {
testBuilder()
.sqlQuery(getFile("window/ntile.sql"))
.ordered()
.csvBaselineFile("window/b2.p4.ntile.tsv")
.baselineColumns("ntile")
.build()
.run();
}
@Test
public void test3648Fix() throws Exception {
testBuilder()
.sqlQuery(getFile("window/3648.sql"))
.ordered()
.csvBaselineFile("window/3648.tsv")
.baselineColumns("ntile")
.build()
.run();
}
@Test
public void test3654Fix() throws Exception {
test("SELECT FIRST_VALUE(col8) OVER(PARTITION BY col7 ORDER BY col8) FROM dfs.`window/3648.parquet`");
}
@Test
public void test3643Fix() throws Exception {
try {
test("SELECT NTILE(0) OVER(PARTITION BY col7 ORDER BY col8) FROM dfs.`window/3648.parquet`");
fail("Query should have failed");
} catch (UserRemoteException e) {
assertEquals(ErrorType.FUNCTION, e.getErrorType());
}
}
@Test
public void test3668Fix() throws Exception {
testBuilder()
.sqlQuery(getFile("window/3668.sql"))
.ordered()
.baselineColumns("cnt").baselineValues(2L)
.build()
.run();
}
@Test
public void testLeadParams() throws Exception {
// make sure we only support default arguments for LEAD/LAG functions
final String query = "SELECT %s OVER(PARTITION BY col7 ORDER BY col8) FROM dfs.`window/3648.parquet`";
test(query, "LEAD(col8, 1)");
test(query, "LAG(col8, 1)");
try {
test(query, "LEAD(col8, 2)");
fail("query should fail");
} catch (UserRemoteException e) {
assertEquals(ErrorType.UNSUPPORTED_OPERATION, e.getErrorType());
}
try {
test(query, "LAG(col8, 2)");
fail("query should fail");
} catch (UserRemoteException e) {
assertEquals(ErrorType.UNSUPPORTED_OPERATION, e.getErrorType());
}
}
@Test
public void testPartitionNtile() {
Partition partition = new Partition();
partition.updateLength(12, false);
assertEquals(1, partition.ntile(5));
partition.rowAggregated();
assertEquals(1, partition.ntile(5));
partition.rowAggregated();
assertEquals(1, partition.ntile(5));
partition.rowAggregated();
assertEquals(2, partition.ntile(5));
partition.rowAggregated();
assertEquals(2, partition.ntile(5));
partition.rowAggregated();
assertEquals(2, partition.ntile(5));
partition.rowAggregated();
assertEquals(3, partition.ntile(5));
partition.rowAggregated();
assertEquals(3, partition.ntile(5));
partition.rowAggregated();
assertEquals(4, partition.ntile(5));
partition.rowAggregated();
assertEquals(4, partition.ntile(5));
partition.rowAggregated();
assertEquals(5, partition.ntile(5));
partition.rowAggregated();
assertEquals(5, partition.ntile(5));
}
@Test
public void test4457() throws Exception {
runSQL("CREATE TABLE dfs.tmp.`4457` AS " +
"SELECT columns[0] AS c0, NULLIF(columns[1], 'null') AS c1 " +
"FROM dfs.`window/4457.csv`");
testBuilder()
.sqlQuery("SELECT COALESCE(FIRST_VALUE(c1) OVER(ORDER BY c0 RANGE BETWEEN CURRENT ROW AND CURRENT ROW), 'EMPTY') AS fv FROM dfs.tmp.`4457`")
.ordered()
.baselineColumns("fv")
.baselineValues("a")
.baselineValues("b")
.baselineValues("EMPTY")
.go();
}
// Note: This test is unstable. It works when forcing the merge/sort batch
// size to 20, but not for other sizes. The problem is either that the results
// are not ordered (and so subject to sort instability), or there is some bug
// somewhere in the window functions.
@Test
@Category(UnlikelyTest.class)
public void test4657() throws Exception {
testBuilder()
.sqlQuery("select row_number() over(order by position_id) rn, rank() over(order by position_id) rnk from dfs.`window/b3.p2`")
.ordered()
.csvBaselineFile("window/4657.tsv")
.baselineColumns("rn", "rnk")
.expectsNumBatches(4) // we expect 3 data batches and the fast schema
.go();
}
}