Add time window filter to Iceberg reader (#18531)
* Add time window filter to Iceberg reader
* Fix checkstyle
* Code scan fix
* Spelling
* Add type info in docs
diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md
index d4698b2..be9a745 100644
--- a/docs/ingestion/input-sources.md
+++ b/docs/ingestion/input-sources.md
@@ -1119,7 +1119,7 @@
### Iceberg filter object
-This input source provides the following filters: `and`, `equals`, `interval`, and `or`. You can use these filters to filter out data files from a snapshot, reducing the number of files Druid has to ingest.
+This input source provides the following filters: `and`, `equals`, `interval`, `timeWindow`, `range` and `or`. You can use these filters to filter out data files from a snapshot, reducing the number of files Druid has to ingest.
It is strongly recommended to apply filtering only on Iceberg partition columns. When filtering on non-partition columns, Iceberg filters may return rows that do not fully match the expression. To address this, it may help to define an additional filter in the [`transformSpec`](./ingestion-spec.md#transformspec) to remove residual rows.
`equals` Filter:
@@ -1170,6 +1170,16 @@
|lowerOpen|Boolean indicating if lower bound is open in the interval of values defined by the range (`>` instead of `>=`). |false|no|
|upperOpen|Boolean indicating if upper bound is open on the interval of values defined by range (`<` instead of `<=`). |false|no|
+`timeWindow` Filter:
+
+| Property|Description|Default| Required |
+|---------|-----------|-------|----------|
+|type| Set this value to `timeWindow`.|None|yes|
+|filterColumn|The column name from the iceberg table schema based on which filtering needs to happen. The filter column must be defined as TimestampType in Iceberg.|None|yes|
+|baseTime|Determines the reference timestamp from which the lookback and lookahead durations are applied to define the time window.|Current UTC timestamp|no|
+|lookbackDuration|Defines the duration that determines how far backward should the filter include data relative to `baseTime`.|P1D|no|
+|lookaheadDuration|Defines the duration that determines how far ahead should the filter include data relative to `baseTime`.|Zero|no|
+
## Delta Lake input source
:::info[Required extension]
diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
index cff8797..b47f0fc 100644
--- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
+++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergFilter.java
@@ -34,7 +34,8 @@
@JsonSubTypes.Type(name = "and", value = IcebergAndFilter.class),
@JsonSubTypes.Type(name = "not", value = IcebergNotFilter.class),
@JsonSubTypes.Type(name = "or", value = IcebergOrFilter.class),
- @JsonSubTypes.Type(name = "range", value = IcebergRangeFilter.class)
+ @JsonSubTypes.Type(name = "range", value = IcebergRangeFilter.class),
+ @JsonSubTypes.Type(name = "timeWindow", value = IcebergTimeWindowFilter.class)
})
public interface IcebergFilter
{
diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilter.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilter.java
new file mode 100644
index 0000000..29ca187
--- /dev/null
+++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.filter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.types.Types;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+
+public class IcebergTimeWindowFilter implements IcebergFilter
+{
+ @JsonProperty
+ private final String filterColumn;
+
+ @JsonProperty
+ private final Duration lookbackDuration;
+
+ @JsonProperty
+ private final Duration lookaheadDuration;
+
+ @JsonProperty
+ private final DateTime baseTime;
+
+ @JsonCreator
+ public IcebergTimeWindowFilter(
+ @JsonProperty("filterColumn") String filterColumn,
+ @JsonProperty("lookbackDuration") Duration lookbackDuration,
+ @JsonProperty("lookaheadDuration") Duration lookaheadDuration,
+ @JsonProperty("baseTime") DateTime baseTime
+ )
+ {
+ Preconditions.checkNotNull(filterColumn, "You must specify a filter column on the timeWindow filter");
+ this.filterColumn = filterColumn;
+ this.lookbackDuration = Configs.valueOrDefault(lookbackDuration, new Period("P1D").toStandardDuration());
+ this.lookaheadDuration = Configs.valueOrDefault(lookaheadDuration, Duration.ZERO);
+ this.baseTime = Configs.valueOrDefault(baseTime, DateTimes.nowUtc());
+ }
+
+ @Override
+ public TableScan filter(TableScan tableScan)
+ {
+ return tableScan.filter(getFilterExpression());
+ }
+
+ @Override
+ public Expression getFilterExpression()
+ {
+ // Convert milliseconds to microseconds because Iceberg TimestampType uses microsecond precision
+ long lookbackDurationinMicros = (baseTime.getMillis() - lookbackDuration.getMillis()) * 1000L;
+ long lookforwardDurationinMicros = (baseTime.getMillis() + lookaheadDuration.getMillis()) * 1000L;
+ return Expressions.and(
+ Expressions.greaterThanOrEqual(
+ filterColumn,
+ Literal.of(lookbackDurationinMicros)
+ .to(Types.TimestampType.withZone())
+ .value()
+ ),
+ Expressions.lessThanOrEqual(
+ filterColumn,
+ Literal.of(lookforwardDurationinMicros)
+ .to(Types.TimestampType.withZone())
+ .value()
+ )
+ );
+ }
+}
diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilterTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilterTest.java
new file mode 100644
index 0000000..51df6c9
--- /dev/null
+++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/filter/IcebergTimeWindowFilterTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.filter;
+
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.types.Types;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.joda.time.Period;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class IcebergTimeWindowFilterTest
+{
+ @Test
+ public void testFilter()
+ {
+ String intervalColumn = "eventTime";
+ DateTime currentTimestamp = DateTimes.nowUtc();
+ IcebergTimeWindowFilter intervalFilter = new IcebergTimeWindowFilter(
+ intervalColumn,
+ new Period("P2D").toStandardDuration(),
+ new Period("P1D").toStandardDuration(),
+ currentTimestamp
+ );
+ Expression expectedExpression = Expressions.and(
+ Expressions.greaterThanOrEqual(
+ intervalColumn,
+ Literal.of((currentTimestamp.getMillis() - Duration.standardDays(2L).getMillis()) * 1000)
+ .to(Types.TimestampType.withZone())
+ .value()
+ ),
+ Expressions.lessThanOrEqual(
+ intervalColumn,
+ Literal.of((currentTimestamp.getMillis() + Duration.standardDays(1L).getMillis()) * 1000)
+ .to(Types.TimestampType.withZone())
+ .value()
+ )
+ );
+ Assert.assertEquals(expectedExpression.toString(), intervalFilter.getFilterExpression().toString());
+ }
+}
diff --git a/website/.spelling b/website/.spelling
index 610d82d..767ef42 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -286,6 +286,7 @@
backfilled
backpressure
base64
+baseTime
big-endian
bigint
blkio
@@ -409,7 +410,10 @@
log4j
log4j2
log4j2.xml
+lookahead
+lookaheadDuration
lookback
+lookbackDuration
lookups
mapreduce
masse
@@ -1348,6 +1352,7 @@
lowerOpen
timestamp
timestampColumnName
+TimestampType
timestampSpec
upperOpen
urls