blob: f1e2169af453bfa7cbef1d553fa95a137afdfd32 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.sql;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
import org.apache.spark.sql.execution.SparkPlan;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;
public class TestFilterPushDown extends SparkTestBaseWithCatalog {
@After
public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
sql("DROP TABLE IF EXISTS tmp_view");
}
@Test
public void testFilterPushdownWithDecimalValues() {
sql(
"CREATE TABLE %s (id INT, salary DECIMAL(10, 2), dep STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (dep)",
tableName);
sql("INSERT INTO %s VALUES (1, 100.01, 'd1')", tableName);
sql("INSERT INTO %s VALUES (2, 100.05, 'd1')", tableName);
checkFilters(
"dep = 'd1' AND salary > 100.03" /* query predicate */,
"isnotnull(salary) AND (salary > 100.03)" /* Spark post scan filter */,
"dep IS NOT NULL, salary IS NOT NULL, dep = 'd1', salary > 100.03" /* Iceberg scan filters */,
ImmutableList.of(row(2, new BigDecimal("100.05"), "d1")));
}
@Test
public void testFilterPushdownWithIdentityTransform() {
sql(
"CREATE TABLE %s (id INT, salary INT, dep STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (dep)",
tableName);
sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
sql("INSERT INTO %s VALUES (3, 300, 'd3')", tableName);
sql("INSERT INTO %s VALUES (4, 400, 'd4')", tableName);
sql("INSERT INTO %s VALUES (5, 500, 'd5')", tableName);
sql("INSERT INTO %s VALUES (6, 600, null)", tableName);
checkOnlyIcebergFilters(
"dep IS NULL" /* query predicate */,
"dep IS NULL" /* Iceberg scan filters */,
ImmutableList.of(row(6, 600, null)));
checkOnlyIcebergFilters(
"dep IS NOT NULL" /* query predicate */,
"dep IS NOT NULL" /* Iceberg scan filters */,
ImmutableList.of(
row(1, 100, "d1"),
row(2, 200, "d2"),
row(3, 300, "d3"),
row(4, 400, "d4"),
row(5, 500, "d5")));
checkOnlyIcebergFilters(
"dep = 'd3'" /* query predicate */,
"dep IS NOT NULL, dep = 'd3'" /* Iceberg scan filters */,
ImmutableList.of(row(3, 300, "d3")));
checkOnlyIcebergFilters(
"dep > 'd3'" /* query predicate */,
"dep IS NOT NULL, dep > 'd3'" /* Iceberg scan filters */,
ImmutableList.of(row(4, 400, "d4"), row(5, 500, "d5")));
checkOnlyIcebergFilters(
"dep >= 'd5'" /* query predicate */,
"dep IS NOT NULL, dep >= 'd5'" /* Iceberg scan filters */,
ImmutableList.of(row(5, 500, "d5")));
checkOnlyIcebergFilters(
"dep < 'd2'" /* query predicate */,
"dep IS NOT NULL, dep < 'd2'" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1")));
checkOnlyIcebergFilters(
"dep <= 'd2'" /* query predicate */,
"dep IS NOT NULL, dep <= 'd2'" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2")));
checkOnlyIcebergFilters(
"dep <=> 'd3'" /* query predicate */,
"dep = 'd3'" /* Iceberg scan filters */,
ImmutableList.of(row(3, 300, "d3")));
checkOnlyIcebergFilters(
"dep IN (null, 'd1')" /* query predicate */,
"dep IN ('d1')" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1")));
checkOnlyIcebergFilters(
"dep NOT IN ('d2', 'd4')" /* query predicate */,
"(dep IS NOT NULL AND dep NOT IN ('d2', 'd4'))" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1"), row(3, 300, "d3"), row(5, 500, "d5")));
checkOnlyIcebergFilters(
"dep = 'd1' AND dep IS NOT NULL" /* query predicate */,
"dep = 'd1', dep IS NOT NULL" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1")));
checkOnlyIcebergFilters(
"dep = 'd1' OR dep = 'd2' OR dep = 'd3'" /* query predicate */,
"((dep = 'd1' OR dep = 'd2') OR dep = 'd3')" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2"), row(3, 300, "d3")));
checkFilters(
"dep = 'd1' AND id = 1" /* query predicate */,
"isnotnull(id) AND (id = 1)" /* Spark post scan filter */,
"dep IS NOT NULL, id IS NOT NULL, dep = 'd1', id = 1" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1")));
checkFilters(
"dep = 'd2' OR id = 1" /* query predicate */,
"(dep = d2) OR (id = 1)" /* Spark post scan filter */,
"(dep = 'd2' OR id = 1)" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2")));
checkFilters(
"dep LIKE 'd1%' AND id = 1" /* query predicate */,
"isnotnull(id) AND (id = 1)" /* Spark post scan filter */,
"dep IS NOT NULL, id IS NOT NULL, dep LIKE 'd1%', id = 1" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1")));
checkFilters(
"dep NOT LIKE 'd5%' AND (id = 1 OR id = 5)" /* query predicate */,
"(id = 1) OR (id = 5)" /* Spark post scan filter */,
"dep IS NOT NULL, NOT (dep LIKE 'd5%'), (id = 1 OR id = 5)" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1")));
checkFilters(
"dep LIKE '%d5' AND id IN (1, 5)" /* query predicate */,
"EndsWith(dep, d5) AND id IN (1,5)" /* Spark post scan filter */,
"dep IS NOT NULL, id IN (1, 5)" /* Iceberg scan filters */,
ImmutableList.of(row(5, 500, "d5")));
}
@Test
public void testFilterPushdownWithHoursTransform() {
sql(
"CREATE TABLE %s (id INT, price INT, t TIMESTAMP)"
+ "USING iceberg "
+ "PARTITIONED BY (hours(t))",
tableName);
sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP '2021-06-30T01:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2021-06-30T02:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (3, 300, null)", tableName);
withDefaultTimeZone(
"UTC",
() -> {
checkOnlyIcebergFilters(
"t IS NULL" /* query predicate */,
"t IS NULL" /* Iceberg scan filters */,
ImmutableList.of(row(3, 300, null)));
// strict/inclusive projections for t < TIMESTAMP '2021-06-30T02:00:00.000Z' are equal,
// so this filter selects entire partitions and can be pushed down completely
checkOnlyIcebergFilters(
"t < TIMESTAMP '2021-06-30T02:00:00.000Z'" /* query predicate */,
"t IS NOT NULL, t < 1625018400000000" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.0Z"))));
// strict/inclusive projections for t < TIMESTAMP '2021-06-30T01:00:00.001Z' differ,
// so this filter does NOT select entire partitions and can't be pushed down completely
checkFilters(
"t < TIMESTAMP '2021-06-30T01:00:00.001Z'" /* query predicate */,
"t < 2021-06-30 01:00:00.001" /* Spark post scan filter */,
"t IS NOT NULL, t < 1625014800001000" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.0Z"))));
// strict/inclusive projections for t <= TIMESTAMP '2021-06-30T01:00:00.000Z' differ,
// so this filter does NOT select entire partitions and can't be pushed down completely
checkFilters(
"t <= TIMESTAMP '2021-06-30T01:00:00.000Z'" /* query predicate */,
"t <= 2021-06-30 01:00:00" /* Spark post scan filter */,
"t IS NOT NULL, t <= 1625014800000000" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.0Z"))));
});
}
@Test
public void testFilterPushdownWithDaysTransform() {
sql(
"CREATE TABLE %s (id INT, price INT, t TIMESTAMP)"
+ "USING iceberg "
+ "PARTITIONED BY (days(t))",
tableName);
sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP '2021-06-15T01:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2021-06-30T02:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (3, 300, TIMESTAMP '2021-07-15T10:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (4, 400, null)", tableName);
withDefaultTimeZone(
"UTC",
() -> {
checkOnlyIcebergFilters(
"t IS NULL" /* query predicate */,
"t IS NULL" /* Iceberg scan filters */,
ImmutableList.of(row(4, 400, null)));
// strict/inclusive projections for t < TIMESTAMP '2021-07-05T00:00:00.000Z' are equal,
// so this filter selects entire partitions and can be pushed down completely
checkOnlyIcebergFilters(
"t < TIMESTAMP '2021-07-05T00:00:00.000Z'" /* query predicate */,
"t IS NOT NULL, t < 1625443200000000" /* Iceberg scan filters */,
ImmutableList.of(
row(1, 100, timestamp("2021-06-15T01:00:00.000Z")),
row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
// strict/inclusive projections for t < TIMESTAMP '2021-06-30T03:00:00.000Z' differ,
// so this filter does NOT select entire partitions and can't be pushed down completely
checkFilters(
"t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */,
"t < 2021-06-30 03:00:00" /* Spark post scan filter */,
"t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */,
ImmutableList.of(
row(1, 100, timestamp("2021-06-15T01:00:00.000Z")),
row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
});
}
@Test
public void testFilterPushdownWithMonthsTransform() {
sql(
"CREATE TABLE %s (id INT, price INT, t TIMESTAMP)"
+ "USING iceberg "
+ "PARTITIONED BY (months(t))",
tableName);
sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP '2021-06-30T01:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2021-06-30T02:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (3, 300, TIMESTAMP '2021-07-15T10:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (4, 400, null)", tableName);
withDefaultTimeZone(
"UTC",
() -> {
checkOnlyIcebergFilters(
"t IS NULL" /* query predicate */,
"t IS NULL" /* Iceberg scan filters */,
ImmutableList.of(row(4, 400, null)));
// strict/inclusive projections for t < TIMESTAMP '2021-07-01T00:00:00.000Z' are equal,
// so this filter selects entire partitions and can be pushed down completely
checkOnlyIcebergFilters(
"t < TIMESTAMP '2021-07-01T00:00:00.000Z'" /* query predicate */,
"t IS NOT NULL, t < 1625097600000000" /* Iceberg scan filters */,
ImmutableList.of(
row(1, 100, timestamp("2021-06-30T01:00:00.000Z")),
row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
// strict/inclusive projections for t < TIMESTAMP '2021-06-30T03:00:00.000Z' differ,
// so this filter does NOT select entire partitions and can't be pushed down completely
checkFilters(
"t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */,
"t < 2021-06-30 03:00:00" /* Spark post scan filter */,
"t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */,
ImmutableList.of(
row(1, 100, timestamp("2021-06-30T01:00:00.000Z")),
row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
});
}
@Test
public void testFilterPushdownWithYearsTransform() {
sql(
"CREATE TABLE %s (id INT, price INT, t TIMESTAMP)"
+ "USING iceberg "
+ "PARTITIONED BY (years(t))",
tableName);
sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP '2021-06-30T01:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2021-06-30T02:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2022-09-25T02:00:00.000Z')", tableName);
sql("INSERT INTO %s VALUES (3, 300, null)", tableName);
withDefaultTimeZone(
"UTC",
() -> {
checkOnlyIcebergFilters(
"t IS NULL" /* query predicate */,
"t IS NULL" /* Iceberg scan filters */,
ImmutableList.of(row(3, 300, null)));
// strict/inclusive projections for t < TIMESTAMP '2022-01-01T00:00:00.000Z' are equal,
// so this filter selects entire partitions and can be pushed down completely
checkOnlyIcebergFilters(
"t < TIMESTAMP '2022-01-01T00:00:00.000Z'" /* query predicate */,
"t IS NOT NULL, t < 1640995200000000" /* Iceberg scan filters */,
ImmutableList.of(
row(1, 100, timestamp("2021-06-30T01:00:00.000Z")),
row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
// strict/inclusive projections for t < TIMESTAMP '2021-06-30T03:00:00.000Z' differ,
// so this filter does NOT select entire partitions and can't be pushed down completely
checkFilters(
"t < TIMESTAMP '2021-06-30T03:00:00.000Z'" /* query predicate */,
"t < 2021-06-30 03:00:00" /* Spark post scan filter */,
"t IS NOT NULL, t < 1625022000000000" /* Iceberg scan filters */,
ImmutableList.of(
row(1, 100, timestamp("2021-06-30T01:00:00.000Z")),
row(2, 200, timestamp("2021-06-30T02:00:00.000Z"))));
});
}
@Test
public void testFilterPushdownWithBucketTransform() {
sql(
"CREATE TABLE %s (id INT, salary INT, dep STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (dep, bucket(8, id))",
tableName);
sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
checkFilters(
"dep = 'd1' AND id = 1" /* query predicate */,
"id = 1" /* Spark post scan filter */,
"dep IS NOT NULL, id IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1")));
}
@Test
public void testFilterPushdownWithTruncateTransform() {
sql(
"CREATE TABLE %s (id INT, salary INT, dep STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (truncate(1, dep))",
tableName);
sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
sql("INSERT INTO %s VALUES (3, 300, 'a3')", tableName);
checkOnlyIcebergFilters(
"dep LIKE 'd%'" /* query predicate */,
"dep IS NOT NULL, dep LIKE 'd%'" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2")));
checkFilters(
"dep = 'd1'" /* query predicate */,
"dep = d1" /* Spark post scan filter */,
"dep IS NOT NULL" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1")));
}
@Test
public void testFilterPushdownWithSpecEvolutionAndIdentityTransforms() {
sql(
"CREATE TABLE %s (id INT, salary INT, dep STRING, sub_dep STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (dep)",
tableName);
sql("INSERT INTO %s VALUES (1, 100, 'd1', 'sd1')", tableName);
// the filter can be pushed completely because all specs include identity(dep)
checkOnlyIcebergFilters(
"dep = 'd1'" /* query predicate */,
"dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1", "sd1")));
Table table = validationCatalog.loadTable(tableIdent);
table.updateSpec().addField("sub_dep").commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO %s VALUES (2, 200, 'd2', 'sd2')", tableName);
// the filter can be pushed completely because all specs include identity(dep)
checkOnlyIcebergFilters(
"dep = 'd1'" /* query predicate */,
"dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1", "sd1")));
table.updateSpec().removeField("sub_dep").removeField("dep").commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO %s VALUES (3, 300, 'd3', 'sd3')", tableName);
// the filter can't be pushed completely because not all specs include identity(dep)
checkFilters(
"dep = 'd1'" /* query predicate */,
"isnotnull(dep) AND (dep = d1)" /* Spark post scan filter */,
"dep IS NOT NULL, dep = 'd1'" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1", "sd1")));
}
@Test
public void testFilterPushdownWithSpecEvolutionAndTruncateTransform() {
sql(
"CREATE TABLE %s (id INT, salary INT, dep STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (truncate(2, dep))",
tableName);
sql("INSERT INTO %s VALUES (1, 100, 'd1')", tableName);
// the filter can be pushed completely because the current spec supports it
checkOnlyIcebergFilters(
"dep LIKE 'd1%'" /* query predicate */,
"dep IS NOT NULL, dep LIKE 'd1%'" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1")));
Table table = validationCatalog.loadTable(tableIdent);
table
.updateSpec()
.removeField(Expressions.truncate("dep", 2))
.addField(Expressions.truncate("dep", 1))
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO %s VALUES (2, 200, 'd2')", tableName);
// the filter can be pushed completely because both specs support it
checkOnlyIcebergFilters(
"dep LIKE 'd%'" /* query predicate */,
"dep IS NOT NULL, dep LIKE 'd%'" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1"), row(2, 200, "d2")));
// the filter can't be pushed completely because the second spec is truncate(dep, 1) and
// the predicate literal is d1, which is two chars
checkFilters(
"dep LIKE 'd1%' AND id = 1" /* query predicate */,
"(isnotnull(id) AND StartsWith(dep, d1)) AND (id = 1)" /* Spark post scan filter */,
"dep IS NOT NULL, id IS NOT NULL, dep LIKE 'd1%', id = 1" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, "d1")));
}
@Test
public void testFilterPushdownWithSpecEvolutionAndTimeTransforms() {
sql(
"CREATE TABLE %s (id INT, price INT, t TIMESTAMP)"
+ "USING iceberg "
+ "PARTITIONED BY (hours(t))",
tableName);
withDefaultTimeZone(
"UTC",
() -> {
sql("INSERT INTO %s VALUES (1, 100, TIMESTAMP '2021-06-30T01:00:00.000Z')", tableName);
// the filter can be pushed completely because the current spec supports it
checkOnlyIcebergFilters(
"t < TIMESTAMP '2021-07-01T00:00:00.000Z'" /* query predicate */,
"t IS NOT NULL, t < 1625097600000000" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100, timestamp("2021-06-30T01:00:00.000Z"))));
Table table = validationCatalog.loadTable(tableIdent);
table
.updateSpec()
.removeField(Expressions.hour("t"))
.addField(Expressions.month("t"))
.commit();
sql("REFRESH TABLE %s", tableName);
sql("INSERT INTO %s VALUES (2, 200, TIMESTAMP '2021-05-30T01:00:00.000Z')", tableName);
// the filter can be pushed completely because both specs support it
checkOnlyIcebergFilters(
"t < TIMESTAMP '2021-06-01T00:00:00.000Z'" /* query predicate */,
"t IS NOT NULL, t < 1622505600000000" /* Iceberg scan filters */,
ImmutableList.of(row(2, 200, timestamp("2021-05-30T01:00:00.000Z"))));
});
}
@Test
public void testFilterPushdownWithSpecialFloatingPointPartitionValues() {
sql(
"CREATE TABLE %s (id INT, salary DOUBLE)" + "USING iceberg " + "PARTITIONED BY (salary)",
tableName);
sql("INSERT INTO %s VALUES (1, 100.5)", tableName);
sql("INSERT INTO %s VALUES (2, double('NaN'))", tableName);
sql("INSERT INTO %s VALUES (3, double('infinity'))", tableName);
sql("INSERT INTO %s VALUES (4, double('-infinity'))", tableName);
checkOnlyIcebergFilters(
"salary = 100.5" /* query predicate */,
"salary IS NOT NULL, salary = 100.5" /* Iceberg scan filters */,
ImmutableList.of(row(1, 100.5)));
checkOnlyIcebergFilters(
"salary = double('NaN')" /* query predicate */,
"salary IS NOT NULL, is_nan(salary)" /* Iceberg scan filters */,
ImmutableList.of(row(2, Double.NaN)));
checkOnlyIcebergFilters(
"salary != double('NaN')" /* query predicate */,
"salary IS NOT NULL, NOT (is_nan(salary))" /* Iceberg scan filters */,
ImmutableList.of(
row(1, 100.5), row(3, Double.POSITIVE_INFINITY), row(4, Double.NEGATIVE_INFINITY)));
checkOnlyIcebergFilters(
"salary = double('infinity')" /* query predicate */,
"salary IS NOT NULL, salary = Infinity" /* Iceberg scan filters */,
ImmutableList.of(row(3, Double.POSITIVE_INFINITY)));
checkOnlyIcebergFilters(
"salary = double('-infinity')" /* query predicate */,
"salary IS NOT NULL, salary = -Infinity" /* Iceberg scan filters */,
ImmutableList.of(row(4, Double.NEGATIVE_INFINITY)));
}
private void checkOnlyIcebergFilters(
String predicate, String icebergFilters, List<Object[]> expectedRows) {
checkFilters(predicate, null, icebergFilters, expectedRows);
}
private void checkFilters(
String predicate, String sparkFilter, String icebergFilters, List<Object[]> expectedRows) {
Action check =
() -> {
assertEquals(
"Rows must match",
expectedRows,
sql("SELECT * FROM %s WHERE %s ORDER BY id", tableName, predicate));
};
SparkPlan sparkPlan = executeAndKeepPlan(check);
String planAsString = sparkPlan.toString().replaceAll("#(\\d+L?)", "");
if (sparkFilter != null) {
Assertions.assertThat(planAsString)
.as("Post scan filter should match")
.contains("Filter (" + sparkFilter + ")");
} else {
Assertions.assertThat(planAsString)
.as("Should be no post scan filter")
.doesNotContain("Filter (");
}
Assertions.assertThat(planAsString)
.as("Pushed filters must match")
.contains("[filters=" + icebergFilters + ",");
}
private Timestamp timestamp(String timestampAsString) {
return Timestamp.from(Instant.parse(timestampAsString));
}
}