| ==== |
| ---- QUERY |
| #################################################### |
| # Test case 1: broadcast join. |
| # Basic filtering use case: p's partition columns thanks to an implicit relationship |
| # between join column and build-side predicates. |
| # Without filtering, expect 7300 / 3 = 2433 rows. |
| # With filtering, expect 618 / 3 = 206 rows. |
| #################################################### |
| |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=OFF; |
| select STRAIGHT_JOIN count(*) from alltypes p join [BROADCAST] alltypestiny b |
| on p.id = b.id and b.month = 1 and b.string_col = "1" |
| ---- RESULTS |
| 1 |
| ---- RUNTIME_PROFILE |
| row_regex: .*RowsRead: 2.43K .* |
| ==== |
| ---- QUERY |
| # Now turn on local filtering: we expect to see a reduction in scan volume. |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=LOCAL; |
| select STRAIGHT_JOIN count(*) from alltypes p join [BROADCAST] alltypestiny b |
| on p.id = b.id and b.month = 1 and b.string_col = "1" |
| ---- RESULTS |
| 1 |
| ---- RUNTIME_PROFILE |
| row_regex: .*Rows rejected: 2.43K .* |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 2: shuffle join - test for filter propagation (or lack thereof in LOCAL mode). |
| # Without filtering, expect 7300 / 3 = 2433 rows. |
| # With filtering, expect 618 / 3 = 206 rows. |
| #################################################### |
| |
| # Local mode. Filters won't be propagated to scan, so scans will read all rows. |
| # Still give enough time for filters to show up (even if they won't) |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=LOCAL; |
| select STRAIGHT_JOIN count(*) from alltypes p join [SHUFFLE] alltypestiny b |
| on p.id = b.int_col and b.month = 1 and b.string_col = "1" |
| ---- RESULTS |
| 1 |
| ---- RUNTIME_PROFILE |
| row_regex: .*RowsRead: 2.43K .* |
| ==== |
| ---- QUERY |
| # Shuffle join, global mode. Expect filters to be propagated. |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| select STRAIGHT_JOIN count(*) from alltypes p join [SHUFFLE] alltypestiny b |
| on p.id = b.int_col and b.month = 1 and b.string_col = "1" |
| ---- RESULTS |
| 1 |
| ---- RUNTIME_PROFILE |
| row_regex: .*Rows rejected: 2.43K .* |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 3: two-hop filter chain with BROADCAST |
| # joins. |
| # Without filtering in left-most scan, expect 7300 / 3 = 2433 rows. |
| # With filtering, expect 0 rows. |
| #################################################### |
| |
| # Local mode. Only the left-most scan will receive its filter, but since the scan of 'b' |
| # will not, the lack of predicates means there is no filter effect. |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=LOCAL; |
| select STRAIGHT_JOIN count(*) from alltypes a |
| join [BROADCAST] alltypes b |
| join [BROADCAST] alltypestiny c |
| where c.month = 13 and b.id = c.id and a.year = b.year |
| ---- RESULTS |
| 0 |
| ---- RUNTIME_PROFILE |
| row_regex: .*Files rejected: 0 .* |
| ==== |
| ---- QUERY |
| # Global mode. Scan of 'b' will receive highly effective filter, and will propagate that |
| # to left-most scan. |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| select STRAIGHT_JOIN count(*) from alltypes a |
| join [BROADCAST] alltypes b |
| join [BROADCAST] alltypestiny c |
| where c.month = 13 and b.id = c.id and a.year = b.year |
| ---- RESULTS |
| 0 |
| ---- RUNTIME_PROFILE |
| row_regex: .*Files rejected: 8 .* |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 4: complex filter expressions. The join predicate matches nothing, but |
| # isn't simplified by the planner before execution. |
| # Without filtering in left-most scan, expect 7300 / 3 = 2433 rows. |
| # With filtering, expect 0 rows. |
| #################################################### |
| |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=LOCAL; |
| select STRAIGHT_JOIN count(*) from alltypes a |
| join [BROADCAST] alltypestiny b |
| on substr(a.date_string_col, 1, 2) = substr(b.date_string_col, 1, 0) |
| ---- RESULTS |
| 0 |
| ---- RUNTIME_PROFILE |
| row_regex: .*Rows rejected: 2.43K .* |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 5: filters with local target don't get broadcast |
| #################################################### |
| |
| # Local mode. Coordinator should report 0 filter updates received. |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=LOCAL; |
| select STRAIGHT_JOIN count(*) from alltypes a |
| join [BROADCAST] alltypestiny b |
| on a.id = b.id + 10000; |
| ---- RESULTS |
| 0 |
| ---- RUNTIME_PROFILE |
| row_regex: .*FiltersReceived: 0 .* |
| row_regex: .*Rows rejected: 2.43K .* |
| ==== |
| ---- QUERY |
| # Global mode. Coordinator should report 0 filter updates received. |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| select STRAIGHT_JOIN count(*) from alltypes a |
| join [BROADCAST] alltypestiny b |
| on a.id = b.id + 10000; |
| ---- RESULTS |
| 0 |
| ---- RUNTIME_PROFILE |
| row_regex: .*FiltersReceived: 0 .* |
| row_regex: .*Rows rejected: 2.43K .* |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 6: filters with non-local target get broadcast in GLOBAL mode only. |
| #################################################### |
| |
| # Local mode. Coordinator should report 0 filter updates received. |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=LOCAL; |
| select STRAIGHT_JOIN count(*) from alltypes a |
| join [SHUFFLE] alltypestiny b |
| on a.id = b.id + 10000; |
| ---- RESULTS |
| 0 |
| ---- RUNTIME_PROFILE |
| row_regex: .*FiltersReceived: 0 .* |
| ==== |
| ---- QUERY |
| # Global mode. Coordinator should report 1 filter updates per backend. |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| select STRAIGHT_JOIN count(*) from alltypes a |
| join [SHUFFLE] alltypestiny b |
| on a.id = b.id + 10000; |
| ---- RESULTS |
| 0 |
| ---- RUNTIME_PROFILE |
| row_regex: .*FiltersReceived: 3 .* |
| ==== |
| |
| ---- QUERY |
| #################################################### |
| # Test case 7: filters with target exprs bound by > 1 slotref |
| #################################################### |
| |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=LOCAL; |
| select STRAIGHT_JOIN count(*) from alltypes a |
| join [BROADCAST] alltypestiny b |
| on a.id + a.int_col = b.id; |
| ---- RESULTS |
| 4 |
| ---- RUNTIME_PROFILE |
| row_regex: .*Rows rejected: 2.43K .* |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 8: filters do not pass through LOJ. |
| #################################################### |
| |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| select STRAIGHT_JOIN count(*) from alltypes a |
| LEFT OUTER join alltypestiny b |
| on a.id + a.int_col = b.id; |
| ---- RESULTS |
| 7300 |
| ---- RUNTIME_PROFILE |
| row_regex: .*RowsReturned: 2.43K .* |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 9: filters do pass through ROJ. |
| #################################################### |
| |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| select STRAIGHT_JOIN count(*) from alltypes a |
| RIGHT OUTER join alltypestiny b |
| on a.id + a.int_col = b.id; |
| ---- RESULTS |
| 8 |
| ---- RUNTIME_PROFILE |
| row_regex: .*Rows rejected: 2.43K .* |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 10: filters do not pass through FOJ. |
| #################################################### |
| |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| select STRAIGHT_JOIN count(*) from alltypes a |
| FULL OUTER join alltypestiny b |
| on a.id + a.int_col = b.id; |
| ---- RESULTS |
| 7304 |
| ---- RUNTIME_PROFILE |
| row_regex: .*RowsReturned: 2.43K .* |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 11: Large build triggers maximum filter size. |
| # Run only on Parquet because query can take ~20s. |
| #################################################### |
| |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| with l as (select l_orderkey from tpch_parquet.lineitem UNION ALL |
| select l_orderkey from tpch_parquet.lineitem UNION ALL |
| select l_orderkey from tpch_parquet.lineitem UNION ALL |
| select l_orderkey from tpch_parquet.lineitem) |
| select STRAIGHT_JOIN count(*) from |
| (select l_orderkey from tpch_parquet.lineitem a LIMIT 1) a |
| join (select l_orderkey from l UNION ALL select l_orderkey from l) b |
| on a.l_orderkey = -b.l_orderkey |
| ---- RESULTS |
| 0 |
| ---- RUNTIME_PROFILE |
| row_regex: .*1 of 1 Runtime Filter Published.* |
| row_regex: .*Filter 0 \(16.00 MB\).* |
| ==== |
| |
| |
| ---- QUERY |
| ################################################### |
| # Test case 12: filter with both remote and local targets |
| ################################################### |
| |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| select straight_join count(*) |
| from alltypes a join [BROADCAST] alltypessmall c |
| on a.id = c.id join [BROADCAST] alltypesagg b |
| on a.id = b.id where b.int_col < 100; |
| ---- RESULTS |
| 108 |
| ==== |
| |
| |
| ---- QUERY |
| ################################################### |
| # Test case 13: filter with multiple remote targets |
| ################################################### |
| |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| select straight_join count(*) |
| from alltypes a join [SHUFFLE] alltypessmall c |
| on a.id = c.id join [SHUFFLE] alltypesagg b |
| on a.id = b.id where b.int_col < 100; |
| ---- RESULTS |
| 108 |
| ==== |
| |
| |
| ---- QUERY |
| ################################################### |
| # Test case 14: filter with expression that uses local allocations. |
| # IMPALA-5885: the parquet scanner should free local allocations from upper()/lower(). |
| # mem_limit is calibrated so that the query fails if allocations are not freed. |
| ################################################### |
| |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| SET MEM_LIMIT=250MB; |
| select straight_join count(*) |
| from tpch_parquet.lineitem l1 join tpch_parquet.lineitem l2 |
| on lower(upper(lower(upper(lower(l1.l_comment))))) = concat(l2.l_comment, 'foo') |
| ---- RESULTS |
| 0 |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 15: filter with a predicate that has different decimal precision between |
| # lhs expr and rhs expr. |
| # IMPALA-5597: Runtime filter should be generated and assigned successfully when the |
| # source expr and target expr have different decimal types. |
| #################################################### |
| |
| select count(*) |
| from tpch_parquet.lineitem |
| left join tpch_parquet.part on if(l_orderkey % 2 = 0, NULL, l_partkey) = p_partkey |
| where l_orderkey = 965 and l_extendedprice * l_tax = p_retailprice; |
| ---- RESULTS |
| 1 |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 16: PHJ nodes that spill should still produce filters. |
| # Run this for Parquet only to avoid variable memory |
| # consumption / spilling behaviour. |
| #################################################### |
| |
| SET BUFFER_POOL_LIMIT=80m; |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_BLOOM_FILTER_SIZE=16M; |
| select STRAIGHT_JOIN count(a.l_comment) |
| from tpch_parquet.lineitem a join tpch_parquet.lineitem b |
| on a.l_comment = b.l_comment; |
| ---- RESULTS |
| 51495713 |
| ---- TYPES |
| BIGINT |
| ---- RUNTIME_PROFILE |
| row_regex: .*SpilledPartitions: [1-9]\d* .* |
| row_regex: .*Rows processed: 16.38K.* |
| row_regex: .*Rows rejected: 0 .* |
| row_regex: .*1 of 1 Runtime Filter Published.* |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 17: Filters are still effective inside subplans |
| # (in certain cases). The query has a HJ-scan pair inside a |
| # subplan (on the LHS). |
| #################################################### |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| select straight_join count(1) |
| from alltypes a join complextypestbl b |
| # b.id + 10 confuses planner, so it doesn't think it |
| # can transitively push a.id < 20 to scan of a. |
| on a.id = b.id + 10 join b.int_array where b.id < 10 |
| ---- RESULTS |
| 10 |
| ---- RUNTIME_PROFILE |
| row_regex: .*1 of 1 Runtime Filter Published.* |
| row_regex: .*Rows rejected: 2.43K \(2432\).* |
| ==== |
| |
| |
| ---- QUERY |
| #################################################### |
| # Test case 18: Query is not admitted if it exceeds the |
| # mem requirement after accounting for the memory |
| # required by runtime filters. |
| #################################################### |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MIN_SIZE=128MB; |
| SET RUNTIME_FILTER_MAX_SIZE=500MB; |
| # Query would have been admitted if memory for runtime filters was not accounted for. |
| SET BUFFER_POOL_LIMIT=290MB; |
| select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b |
| on a.month = b.id and b.int_col = -3 |
| ---- RESULTS |
| ==== |
| ---- QUERY |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MIN_SIZE=128MB; |
| SET RUNTIME_FILTER_MAX_SIZE=500MB; |
| # Disable the estimation of cardinality for an hdfs table withot stats. |
| SET DISABLE_HDFS_NUM_ROWS_ESTIMATE=1; |
| # Query would have been admitted if memory for runtime filters was not accounted for. |
| SET BUFFER_POOL_LIMIT=290MB; |
| select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b |
| on a.month = b.id and b.int_col = -3 |
| ---- RESULTS |
| ---- CATCH |
| row_regex:.*minimum memory reservation on backend '.*' is greater than memory available to |
| the query for buffer reservations\. Increase the buffer_pool_limit to 290.17 MB\. See |
| the query profile for more information about the per-node memory requirements\. |
| ==== |
| ---- QUERY |
| # Confirm that with broadcast join, memory limit is not hit. |
| SET RUNTIME_FILTER_MODE=GLOBAL; |
| SET RUNTIME_FILTER_WAIT_TIME_MS=$RUNTIME_FILTER_WAIT_TIME_MS; |
| SET RUNTIME_FILTER_MIN_SIZE=128MB; |
| SET RUNTIME_FILTER_MAX_SIZE=500MB; |
| # This would run perfectly with just enough memory provided by the buffer pool. |
| SET BUFFER_POOL_LIMIT=295MB; |
| select STRAIGHT_JOIN * from alltypes a join [SHUFFLE] alltypes b |
| on a.month = b.id and b.int_col = -3 |
| ---- RESULTS |
| ---- RUNTIME_PROFILE |
| row_regex: .*Filter 0 \(128.00 MB\).* |
| row_regex: .*Files processed: 8.* |
| row_regex: .*Files rejected: 8.* |
| ==== |