API: Detect whether required fields nested within optionals can produce nulls (#14270)

* API: Detect whether required fields nested within optionals can produce nulls

This partially reverts some changes around the `Accessor` API that were introduced by https://github.com/apache/iceberg/pull/13804 and uses a Schema visitor
to detect whether any of the parent fields of a nested required field are optional.
This info is then used when IS_NULL / NOT_NULL is evaluated

* only check parent fields on IS_NULL/NOT_NULL
diff --git a/api/src/main/java/org/apache/iceberg/Accessor.java b/api/src/main/java/org/apache/iceberg/Accessor.java
index 20b09bf..2a20a04 100644
--- a/api/src/main/java/org/apache/iceberg/Accessor.java
+++ b/api/src/main/java/org/apache/iceberg/Accessor.java
@@ -25,9 +25,4 @@
   Object get(T container);
 
   Type type();
-
-  /** Returns true if the current field or any ancestor in the access path is optional. */
-  default boolean hasOptionalFieldInPath() {
-    return false;
-  }
 }
diff --git a/api/src/main/java/org/apache/iceberg/Accessors.java b/api/src/main/java/org/apache/iceberg/Accessors.java
index 06ee0a9..0b36730 100644
--- a/api/src/main/java/org/apache/iceberg/Accessors.java
+++ b/api/src/main/java/org/apache/iceberg/Accessors.java
@@ -59,13 +59,11 @@
     private final int position;
     private final Type type;
     private final Class<?> javaClass;
-    private final boolean hasOptionalFieldInPath;
 
-    PositionAccessor(int pos, Type type, boolean isOptional) {
+    PositionAccessor(int pos, Type type) {
       this.position = pos;
       this.type = type;
       this.javaClass = type.typeId().javaClass();
-      this.hasOptionalFieldInPath = isOptional;
     }
 
     @Override
@@ -87,11 +85,6 @@
     }
 
     @Override
-    public boolean hasOptionalFieldInPath() {
-      return hasOptionalFieldInPath;
-    }
-
-    @Override
     public String toString() {
       return "Accessor(positions=[" + position + "], type=" + type + ")";
     }
@@ -102,14 +95,12 @@
     private final int p1;
     private final Type type;
     private final Class<?> javaClass;
-    private final boolean hasOptionalFieldInPath;
 
-    Position2Accessor(int pos, PositionAccessor wrapped, boolean isOptional) {
+    Position2Accessor(int pos, PositionAccessor wrapped) {
       this.p0 = pos;
       this.p1 = wrapped.position();
       this.type = wrapped.type();
       this.javaClass = wrapped.javaClass();
-      this.hasOptionalFieldInPath = isOptional || wrapped.hasOptionalFieldInPath();
     }
 
     @Override
@@ -127,11 +118,6 @@
     }
 
     @Override
-    public boolean hasOptionalFieldInPath() {
-      return hasOptionalFieldInPath;
-    }
-
-    @Override
     public String toString() {
       return "Accessor(positions=[" + p0 + ", " + p1 + "], type=" + type + ")";
     }
@@ -143,15 +129,13 @@
     private final int p2;
     private final Type type;
     private final Class<?> javaClass;
-    private final boolean hasOptionalFieldInPath;
 
-    Position3Accessor(int pos, Position2Accessor wrapped, boolean isOptional) {
+    Position3Accessor(int pos, Position2Accessor wrapped) {
       this.p0 = pos;
       this.p1 = wrapped.p0;
       this.p2 = wrapped.p1;
       this.type = wrapped.type();
       this.javaClass = wrapped.javaClass();
-      this.hasOptionalFieldInPath = isOptional || wrapped.hasOptionalFieldInPath();
     }
 
     @Override
@@ -165,11 +149,6 @@
     }
 
     @Override
-    public boolean hasOptionalFieldInPath() {
-      return hasOptionalFieldInPath;
-    }
-
-    @Override
     public String toString() {
       return "Accessor(positions=[" + p0 + ", " + p1 + ", " + p2 + "], type=" + type + ")";
     }
@@ -178,12 +157,10 @@
   private static class WrappedPositionAccessor implements Accessor<StructLike> {
     private final int position;
     private final Accessor<StructLike> accessor;
-    private final boolean hasOptionalFieldInPath;
 
-    WrappedPositionAccessor(int pos, Accessor<StructLike> accessor, boolean isOptional) {
+    WrappedPositionAccessor(int pos, Accessor<StructLike> accessor) {
       this.position = pos;
       this.accessor = accessor;
-      this.hasOptionalFieldInPath = isOptional || accessor.hasOptionalFieldInPath();
     }
 
     @Override
@@ -201,31 +178,26 @@
     }
 
     @Override
-    public boolean hasOptionalFieldInPath() {
-      return hasOptionalFieldInPath;
-    }
-
-    @Override
     public String toString() {
       return "WrappedAccessor(position=" + position + ", wrapped=" + accessor + ")";
     }
   }
 
-  private static Accessor<StructLike> newAccessor(int pos, boolean isOptional, Type type) {
-    return new PositionAccessor(pos, type, isOptional);
+  private static Accessor<StructLike> newAccessor(int pos, Type type) {
+    return new PositionAccessor(pos, type);
   }
 
   private static Accessor<StructLike> newAccessor(
       int pos, boolean isOptional, Accessor<StructLike> accessor) {
     if (isOptional) {
       // the wrapped position handles null layers
-      return new WrappedPositionAccessor(pos, accessor, isOptional);
+      return new WrappedPositionAccessor(pos, accessor);
     } else if (accessor.getClass() == PositionAccessor.class) {
-      return new Position2Accessor(pos, (PositionAccessor) accessor, isOptional);
+      return new Position2Accessor(pos, (PositionAccessor) accessor);
     } else if (accessor instanceof Position2Accessor) {
-      return new Position3Accessor(pos, (Position2Accessor) accessor, isOptional);
+      return new Position3Accessor(pos, (Position2Accessor) accessor);
     } else {
-      return new WrappedPositionAccessor(pos, accessor, isOptional);
+      return new WrappedPositionAccessor(pos, accessor);
     }
   }
 
@@ -254,7 +226,7 @@
         }
 
         // Add an accessor for this field as an Object (may or may not be primitive).
-        accessors.put(field.fieldId(), newAccessor(i, field.isOptional(), field.type()));
+        accessors.put(field.fieldId(), newAccessor(i, field.type()));
       }
 
       return accessors;
diff --git a/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java b/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java
index decda85..0295fe5 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/BoundReference.java
@@ -57,9 +57,7 @@
 
   @Override
   public boolean producesNull() {
-    // A leaf required field can evaluate to null if it is optional itself or any
-    // ancestor on the path is optional.
-    return accessor.hasOptionalFieldInPath();
+    return field.isOptional();
   }
 
   @Override
diff --git a/api/src/main/java/org/apache/iceberg/expressions/UnboundPredicate.java b/api/src/main/java/org/apache/iceberg/expressions/UnboundPredicate.java
index 4736ca4..75ca9d5 100644
--- a/api/src/main/java/org/apache/iceberg/expressions/UnboundPredicate.java
+++ b/api/src/main/java/org/apache/iceberg/expressions/UnboundPredicate.java
@@ -27,6 +27,7 @@
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.StructType;
 import org.apache.iceberg.util.CharSequenceSet;
@@ -112,7 +113,7 @@
     BoundTerm<T> bound = term().bind(struct, caseSensitive);
 
     if (literals == null) {
-      return bindUnaryOperation(bound);
+      return bindUnaryOperation(struct, bound);
     }
 
     if (op() == Operation.IN || op() == Operation.NOT_IN) {
@@ -122,17 +123,19 @@
     return bindLiteralOperation(bound);
   }
 
-  private Expression bindUnaryOperation(BoundTerm<T> boundTerm) {
+  private Expression bindUnaryOperation(StructType struct, BoundTerm<T> boundTerm) {
     switch (op()) {
       case IS_NULL:
-        if (!boundTerm.producesNull()) {
+        if (!boundTerm.producesNull()
+            && allAncestorFieldsAreRequired(struct, boundTerm.ref().fieldId())) {
           return Expressions.alwaysFalse();
         } else if (boundTerm.type().equals(Types.UnknownType.get())) {
           return Expressions.alwaysTrue();
         }
         return new BoundUnaryPredicate<>(Operation.IS_NULL, boundTerm);
       case NOT_NULL:
-        if (!boundTerm.producesNull()) {
+        if (!boundTerm.producesNull()
+            && allAncestorFieldsAreRequired(struct, boundTerm.ref().fieldId())) {
           return Expressions.alwaysTrue();
         } else if (boundTerm.type().equals(Types.UnknownType.get())) {
           return Expressions.alwaysFalse();
@@ -155,6 +158,11 @@
     }
   }
 
+  private boolean allAncestorFieldsAreRequired(StructType struct, int fieldId) {
+    return TypeUtil.ancestorFields(struct.asSchema(), fieldId).stream()
+        .allMatch(Types.NestedField::isRequired);
+  }
+
   private boolean floatingType(Type.TypeID typeID) {
     return Type.TypeID.DOUBLE.equals(typeID) || Type.TypeID.FLOAT.equals(typeID);
   }
diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
index 486b1d6..b1c556b 100644
--- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
+++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java
@@ -223,6 +223,30 @@
   }
 
   /**
+   * Searches in the given schema for all ancestor fields of the given field ID. If the field ID is
+   * defined in a nested type, then all of its ancestor fields are returned. If the field ID is not
+   * nested, an empty list is returned.
+   *
+   * @param schema The schema to search for the field ID
+   * @param fieldId The field ID to find the parents of
+   * @return A list of all ancestor fields of the given field ID if the field ID points to a nested
+   *     field. If the field ID is not a nested field, then an empty list is returned.
+   */
+  public static List<Types.NestedField> ancestorFields(Schema schema, int fieldId) {
+    Map<Integer, Integer> idToParent = TypeUtil.indexParents(schema.asStruct());
+    List<Types.NestedField> parents = Lists.newArrayList();
+    if (idToParent.containsKey(fieldId)) {
+      Integer parentId = idToParent.get(fieldId);
+      while (parentId != null) {
+        parents.add(schema.findField(parentId));
+        parentId = idToParent.get(parentId);
+      }
+    }
+
+    return parents;
+  }
+
+  /**
    * Assigns fresh ids from the {@link NextID nextId function} for all fields in a type.
    *
    * @param type a type
diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestBoundReference.java b/api/src/test/java/org/apache/iceberg/expressions/TestBoundReference.java
deleted file mode 100644
index ed921b2..0000000
--- a/api/src/test/java/org/apache/iceberg/expressions/TestBoundReference.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.expressions;
-
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.apache.iceberg.types.Types.NestedField.required;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.stream.Stream;
-import org.apache.iceberg.Accessor;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.types.Types;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-public class TestBoundReference {
-  // Build a schema with a single nested struct with optionalList.size() levels with the following
-  // structure:
-  // s1: struct(s2: struct(s3: struct(..., sn: struct(leaf: int))))
-  // where each s{i} is an optional struct if optionalList.get(i) is true and a required struct if
-  // false
-  private static Schema buildSchemaFromOptionalList(List<Boolean> optionalList, String leafName) {
-    Preconditions.checkArgument(
-        optionalList != null && !optionalList.isEmpty(), "optionalList must not be null or empty");
-    Types.NestedField leaf =
-        optionalList.get(optionalList.size() - 1)
-            ? optional(optionalList.size(), leafName, Types.IntegerType.get())
-            : required(optionalList.size(), leafName, Types.IntegerType.get());
-
-    Types.StructType current = Types.StructType.of(leaf);
-
-    for (int i = optionalList.size() - 2; i >= 0; i--) {
-      int id = i + 1;
-      String name = "s" + (i + 1);
-      current =
-          Types.StructType.of(
-              optionalList.get(i) ? optional(id, name, current) : required(id, name, current));
-    }
-
-    return new Schema(current.fields());
-  }
-
-  private static Stream<Arguments> producesNullCases() {
-    // the test cases specify two arguments:
-    // - the first is a list of booleans that indicate whether fields in the nested sequence of
-    //   structs are optional or required. For example, [false, true, false] will construct a
-    //   struct like s1.s2.s3 with s1 being required, s2 being optional, and s3 being required.
-    // - the second is a boolean that indicates whether calling producesNull() on the BoundReference
-    //   of the leaf field should return true or false.
-    return Stream.of(
-        // basic fields, no struct levels
-        Arguments.of(Arrays.asList(false), false),
-        Arguments.of(Arrays.asList(true), true),
-        // one level
-        Arguments.of(Arrays.asList(false, false), false),
-        Arguments.of(Arrays.asList(false, true), true),
-        Arguments.of(Arrays.asList(true, false), true),
-        // two levels
-        Arguments.of(Arrays.asList(false, false, false), false),
-        Arguments.of(Arrays.asList(false, false, true), true),
-        Arguments.of(Arrays.asList(true, false, false), true),
-        Arguments.of(Arrays.asList(false, true, false), true),
-        // three levels
-        Arguments.of(Arrays.asList(false, false, false, false), false),
-        Arguments.of(Arrays.asList(false, false, false, true), true),
-        Arguments.of(Arrays.asList(true, false, false, false), true),
-        Arguments.of(Arrays.asList(false, true, false, false), true),
-        // four levels
-        Arguments.of(Arrays.asList(false, false, false, false, false), false),
-        Arguments.of(Arrays.asList(false, false, false, false, true), true),
-        Arguments.of(Arrays.asList(true, false, false, false, false), true),
-        Arguments.of(Arrays.asList(false, true, true, true, false), true));
-  }
-
-  @ParameterizedTest
-  @MethodSource("producesNullCases")
-  public void testProducesNull(List<Boolean> optionalList, boolean expectedProducesNull) {
-    String leafName = "leaf";
-    Schema schema = buildSchemaFromOptionalList(optionalList, leafName);
-    int leafId = optionalList.size();
-    Types.NestedField leafField = schema.findField(leafId);
-    Accessor<StructLike> accessor = schema.accessorForField(leafId);
-
-    BoundReference<Integer> ref = new BoundReference<>(leafField, accessor, leafName);
-    assertThat(ref.producesNull()).isEqualTo(expectedProducesNull);
-  }
-}
diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestExpressionBinding.java b/api/src/test/java/org/apache/iceberg/expressions/TestExpressionBinding.java
index 5293681..24e58ad 100644
--- a/api/src/test/java/org/apache/iceberg/expressions/TestExpressionBinding.java
+++ b/api/src/test/java/org/apache/iceberg/expressions/TestExpressionBinding.java
@@ -38,13 +38,20 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.StructType;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.FieldSource;
+import org.junit.jupiter.params.provider.MethodSource;
 
 public class TestExpressionBinding {
   private static final StructType STRUCT =
@@ -421,4 +428,85 @@
     assertThat(pred.term()).as("Should use a BoundExtract").isInstanceOf(BoundExtract.class);
     assertThat(pred.term().type()).isEqualTo(Types.fromPrimitiveString(typeName));
   }
+
+  private static Stream<Arguments> nullCasesWithNestedStructs() {
+    // the test cases specify two arguments:
+    // - the first is a list of booleans that indicate whether fields in the nested sequence of
+    //   structs are optional or required. For example, [true, false, true] will construct a
+    //   struct like s1.s2.s3 with s1 being required, s2 being optional, and s3 being required.
+    // - the second is an expression that indicates what is expected.
+    return Stream.of(
+        // basic fields, no struct levels
+        Arguments.of(Arrays.asList(true), Expressions.alwaysFalse()),
+        Arguments.of(Arrays.asList(false), Expressions.isNull("leaf")),
+        // one level
+        Arguments.of(Arrays.asList(true, true), Expressions.alwaysFalse()),
+        Arguments.of(Arrays.asList(true, false), Expressions.isNull("leaf")),
+        Arguments.of(Arrays.asList(false, true), Expressions.isNull("leaf")),
+        // two levels
+        Arguments.of(Arrays.asList(true, true, true), Expressions.alwaysFalse()),
+        Arguments.of(Arrays.asList(true, true, false), Expressions.isNull("leaf")),
+        Arguments.of(Arrays.asList(false, true, true), Expressions.isNull("leaf")),
+        Arguments.of(Arrays.asList(true, false, true), Expressions.isNull("leaf")),
+        // three levels
+        Arguments.of(Arrays.asList(true, true, true, true), Expressions.alwaysFalse()),
+        Arguments.of(Arrays.asList(true, true, true, false), Expressions.isNull("leaf")),
+        Arguments.of(Arrays.asList(false, true, true, true), Expressions.isNull("leaf")),
+        Arguments.of(Arrays.asList(true, false, true, true), Expressions.isNull("leaf")),
+        // four levels
+        Arguments.of(Arrays.asList(true, true, true, true, true), Expressions.alwaysFalse()),
+        Arguments.of(Arrays.asList(true, true, true, true, false), Expressions.isNull("leaf")),
+        Arguments.of(Arrays.asList(false, true, true, true, true), Expressions.isNull("leaf")),
+        Arguments.of(Arrays.asList(true, false, false, false, true), Expressions.isNull("leaf")));
+  }
+
+  private Schema buildNestedSchema(List<Boolean> requiredFields, String leafName) {
+    // Build a schema with a single nested struct with requiredFields.size() levels with the
+    // following structure:
+    // s1: struct(s2: struct(s3: struct(..., sn: struct(leaf: int))))
+    // where each s{i} is a required struct if requiredFields.get(i) is true and an optional struct
+    // if false
+    Preconditions.checkArgument(
+        requiredFields != null && !requiredFields.isEmpty(),
+        "Invalid required fields: null or empty");
+    Types.NestedField leaf =
+        requiredFields.get(requiredFields.size() - 1)
+            ? required(requiredFields.size(), leafName, Types.IntegerType.get())
+            : optional(requiredFields.size(), leafName, Types.IntegerType.get());
+
+    Types.StructType current = Types.StructType.of(leaf);
+
+    for (int i = requiredFields.size() - 2; i >= 0; i--) {
+      int id = i + 1;
+      String name = "s" + (i + 1);
+      current =
+          Types.StructType.of(
+              requiredFields.get(i) ? required(id, name, current) : optional(id, name, current));
+    }
+
+    return new Schema(current.fields());
+  }
+
+  @ParameterizedTest
+  @MethodSource("nullCasesWithNestedStructs")
+  public void testIsNullWithNestedStructs(List<Boolean> requiredFields, Expression expression) {
+    String leafName = "leaf";
+    Schema schema = buildNestedSchema(requiredFields, leafName);
+    int leafId = requiredFields.size();
+    int level = 1;
+    StringBuilder pathBuilder = new StringBuilder();
+    while (level < leafId) {
+      pathBuilder.append("s").append(level).append(".");
+      level++;
+    }
+
+    String path = pathBuilder.append(leafName).toString();
+    Expression bound = Binder.bind(schema.asStruct(), isNull(path));
+    TestHelpers.assertAllReferencesBound("IsNull", bound);
+    assertThat(bound.op()).isEqualTo(expression.op());
+
+    bound = Binder.bind(schema.asStruct(), Expressions.notNull(path));
+    TestHelpers.assertAllReferencesBound("NotNull", bound);
+    assertThat(bound.op()).isEqualTo(expression.negate().op());
+  }
 }
diff --git a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java
index 7069d89..2f4fbf3 100644
--- a/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java
+++ b/api/src/test/java/org/apache/iceberg/expressions/TestInclusiveMetricsEvaluator.java
@@ -73,6 +73,21 @@
           optional(13, "no_nan_stats", Types.DoubleType.get()),
           optional(14, "some_empty", Types.StringType.get()));
 
+  private static final Schema NESTED_SCHEMA =
+      new Schema(
+          required(
+              100,
+              "required_address",
+              Types.StructType.of(
+                  required(102, "required_street1", Types.StringType.get()),
+                  optional(103, "optional_street1", Types.StringType.get()))),
+          optional(
+              101,
+              "optional_address",
+              Types.StructType.of(
+                  required(104, "required_street2", Types.StringType.get()),
+                  optional(105, "optional_street2", Types.StringType.get()))));
+
   private static final int INT_MIN_VALUE = 30;
   private static final int INT_MAX_VALUE = 79;
 
@@ -173,7 +188,7 @@
 
   private static final DataFile FILE_5 =
       new TestDataFile(
-          "file_4.avro",
+          "file_5.avro",
           Row.of(),
           50,
           // any value counts, including nulls
@@ -187,6 +202,22 @@
           // upper bounds
           ImmutableMap.of(3, toByteBuffer(StringType.get(), "abcdefghi")));
 
+  private static final DataFile FILE_6 =
+      new TestDataFile(
+          "file_6.avro",
+          Row.of(),
+          10,
+          // any value counts, including nulls
+          ImmutableMap.of(100, 5L, 101, 5L, 102, 5L, 103, 5L, 104, 5L, 105, 5L),
+          // null value counts
+          ImmutableMap.of(100, 0L, 101, 5L, 103, 5L, 104, 5L, 105, 5L),
+          // nan value counts
+          null,
+          // lower bounds
+          null,
+          // upper bounds
+          null);
+
   @Test
   public void testAllNulls() {
     boolean shouldRead = new InclusiveMetricsEvaluator(SCHEMA, notNull("all_nulls")).eval(FILE);
@@ -863,4 +894,80 @@
     shouldRead = new InclusiveMetricsEvaluator(SCHEMA, notIn("no_nulls", "abc", "def")).eval(FILE);
     assertThat(shouldRead).as("Should read: notIn on no nulls column").isTrue();
   }
+
+  @Test
+  public void testIsNullInNestedStruct() {
+    // read required_address and its nested fields
+    boolean shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, isNull("required_address")).eval(FILE_6);
+    assertThat(shouldRead).as("Should not read: required_address is required").isFalse();
+
+    shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, isNull("required_address.required_street1"))
+            .eval(FILE_6);
+    assertThat(shouldRead)
+        .as("Should not read: required_address.required_street1 is required")
+        .isFalse();
+
+    shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, isNull("required_address.optional_street1"))
+            .eval(FILE_6);
+    assertThat(shouldRead)
+        .as("Should read: required_address.optional_street1 is optional")
+        .isTrue();
+
+    // read optional_address and its nested fields
+    shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, isNull("optional_address")).eval(FILE_6);
+    assertThat(shouldRead).as("Should read: optional_address is optional").isTrue();
+
+    shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, isNull("optional_address.required_street2"))
+            .eval(FILE_6);
+    assertThat(shouldRead).as("Should read: optional_address is optional").isTrue();
+
+    shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, isNull("optional_address.optional_street2"))
+            .eval(FILE_6);
+    assertThat(shouldRead).as("Should read: optional_address is optional").isTrue();
+  }
+
+  @Test
+  public void testNotNullInNestedStruct() {
+    // read required_address and its nested fields
+    boolean shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, notNull("required_address")).eval(FILE_6);
+    assertThat(shouldRead).as("Should read: required_address is required").isTrue();
+
+    shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, notNull("required_address.required_street1"))
+            .eval(FILE_6);
+    assertThat(shouldRead)
+        .as("Should read: required_address.required_street1 is required")
+        .isTrue();
+
+    shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, notNull("required_address.optional_street1"))
+            .eval(FILE_6);
+    assertThat(shouldRead)
+        .as("Should not read: required_address.optional_street1 is optional")
+        .isFalse();
+
+    // read optional_address and its nested fields
+    shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, notNull("optional_address")).eval(FILE_6);
+    assertThat(shouldRead).as("Should not read: optional_address is optional").isFalse();
+
+    shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, notNull("optional_address.required_street2"))
+            .eval(FILE_6);
+    assertThat(shouldRead).as("Should not read: optional_address is optional").isFalse();
+
+    shouldRead =
+        new InclusiveMetricsEvaluator(NESTED_SCHEMA, notNull("optional_address.optional_street2"))
+            .eval(FILE_6);
+    assertThat(shouldRead)
+        .as("Should not read: optional_address.optional_street2 is optional")
+        .isFalse();
+  }
 }
diff --git a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
index 63f2027..d4742f5 100644
--- a/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
+++ b/api/src/test/java/org/apache/iceberg/types/TestTypeUtil.java
@@ -849,4 +849,100 @@
     Schema reassignedSchema = TypeUtil.reassignDoc(schema, docSourceSchema);
     assertThat(reassignedSchema.asStruct()).isEqualTo(docSourceSchema.asStruct());
   }
+
+  @Test
+  public void ancestorFieldsInEmptySchema() {
+    assertThat(TypeUtil.ancestorFields(new Schema(), -1)).isEmpty();
+    assertThat(TypeUtil.ancestorFields(new Schema(), 1)).isEmpty();
+  }
+
+  @Test
+  public void ancestorFieldsInNonNestedSchema() {
+    Schema schema =
+        new Schema(
+            required(0, "a", Types.IntegerType.get()), required(1, "A", Types.IntegerType.get()));
+
+    assertThat(TypeUtil.ancestorFields(schema, 0)).isEmpty();
+    assertThat(TypeUtil.ancestorFields(schema, 1)).isEmpty();
+  }
+
+  @Test
+  public void ancestorFieldsInNestedSchema() {
+    Types.NestedField innerPreferences =
+        optional(
+            8,
+            "inner_preferences",
+            Types.StructType.of(
+                required(12, "feature3", Types.BooleanType.get()),
+                optional(13, "feature4", Types.BooleanType.get())));
+    Types.NestedField preferences =
+        optional(
+            3,
+            "preferences",
+            Types.StructType.of(
+                required(6, "feature1", Types.BooleanType.get()),
+                optional(7, "feature2", Types.BooleanType.get()),
+                innerPreferences));
+
+    // define locations map with all nested fields
+    Types.StructType locationsKeyStruct =
+        Types.StructType.of(
+            required(20, "address", Types.StringType.get()),
+            required(21, "city", Types.StringType.get()),
+            required(22, "state", Types.StringType.get()),
+            required(23, "zip", IntegerType.get()));
+    Types.StructType locationsValueStruct =
+        Types.StructType.of(
+            required(14, "lat", Types.FloatType.get()),
+            required(15, "long", Types.FloatType.get()));
+    Types.NestedField locationsKey = required(9, "key", locationsKeyStruct);
+    Types.NestedField locationsValue = required(10, "value", locationsValueStruct);
+    Types.NestedField locations =
+        required(
+            4,
+            "locations",
+            Types.MapType.ofRequired(9, 10, locationsKeyStruct, locationsValueStruct));
+
+    // define points list with all nested fields
+    Types.StructType pointsStruct =
+        Types.StructType.of(
+            required(16, "x", Types.LongType.get()), required(17, "y", Types.LongType.get()));
+    Types.NestedField pointsElement = optional(11, "element", pointsStruct);
+    Types.NestedField points = optional(5, "points", Types.ListType.ofOptional(11, pointsStruct));
+
+    Types.NestedField id = required(1, "id", IntegerType.get());
+    Types.NestedField data = optional(2, "data", Types.StringType.get());
+    Schema schema = new Schema(id, data, preferences, locations, points);
+
+    // non-nested fields don't have parents
+    assertThat(TypeUtil.ancestorFields(schema, id.fieldId())).isEmpty();
+    assertThat(TypeUtil.ancestorFields(schema, data.fieldId())).isEmpty();
+
+    // verify preferences struct and all of its nested fields (6-8, 12+13)
+    assertThat(TypeUtil.ancestorFields(schema, preferences.fieldId())).isEmpty();
+    assertThat(TypeUtil.ancestorFields(schema, 6)).containsExactly(preferences);
+    assertThat(TypeUtil.ancestorFields(schema, 7)).containsExactly(preferences);
+    assertThat(TypeUtil.ancestorFields(schema, innerPreferences.fieldId()))
+        .containsExactly(preferences);
+    assertThat(TypeUtil.ancestorFields(schema, 12)).containsExactly(innerPreferences, preferences);
+    assertThat(TypeUtil.ancestorFields(schema, 13)).containsExactly(innerPreferences, preferences);
+
+    // verify locations map and all of its nested fields (IDs 9+10, 20-23 and 14+15)
+    assertThat(TypeUtil.ancestorFields(schema, locations.fieldId())).isEmpty();
+    assertThat(TypeUtil.ancestorFields(schema, locationsKey.fieldId())).containsExactly(locations);
+    assertThat(TypeUtil.ancestorFields(schema, 20)).containsExactly(locationsKey, locations);
+    assertThat(TypeUtil.ancestorFields(schema, 21)).containsExactly(locationsKey, locations);
+    assertThat(TypeUtil.ancestorFields(schema, 22)).containsExactly(locationsKey, locations);
+    assertThat(TypeUtil.ancestorFields(schema, 23)).containsExactly(locationsKey, locations);
+    assertThat(TypeUtil.ancestorFields(schema, locationsValue.fieldId()))
+        .containsExactly(locations);
+    assertThat(TypeUtil.ancestorFields(schema, 14)).containsExactly(locationsValue, locations);
+    assertThat(TypeUtil.ancestorFields(schema, 15)).containsExactly(locationsValue, locations);
+
+    // verify points list and all of its nested fields (IDs 11 and 16+17)
+    assertThat(TypeUtil.ancestorFields(schema, points.fieldId())).isEmpty();
+    assertThat(TypeUtil.ancestorFields(schema, pointsElement.fieldId())).containsExactly(points);
+    assertThat(TypeUtil.ancestorFields(schema, 16)).containsExactly(pointsElement, points);
+    assertThat(TypeUtil.ancestorFields(schema, 17)).containsExactly(pointsElement, points);
+  }
 }