[CALCITE-6219] 'Must-filter' columns

A table can declare that some of its columns must be filtered
by implementing `interface SemanticTable`. If such columns
are not filtered in a WHERE or HAVING clause, the validator
throws.

There are several purposes of these columns, one of which is
to prevent expensive full-table scans (for example, reading
all Orders without restricting on orderDate).

Implementation is via the method
SqlValidatorNamespace.getMustFilterFields(). For a table
namespace, that method returns the declared must-filter
fields. For a query namespce, that method returns any
must-filter fields that have not been filtered in that query;
such fields become the responsibility of the enclosing query.

If a field is must-filter and is not in the SELECT clause of a
sub-query, that is also an error, because of course it is now
impossible for the enclosing query to filter it.

Close apache/calcite#3688

Co-authored-by: Julian Hyde <jhyde@apache.org>
diff --git a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
index 5a3c6e4..86e6779 100644
--- a/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
+++ b/core/src/main/java/org/apache/calcite/runtime/CalciteResource.java
@@ -1085,6 +1085,9 @@
   @BaseMessage("A table function at most has one input table with row semantics. Table function ''{0}'' has multiple input tables with row semantics")
   ExInst<SqlValidatorException> multipleRowSemanticsTables(String funcName);
 
+  @BaseMessage("SQL statement did not contain filters on the following fields: {0}")
+  ExInst<SqlValidatorException> mustFilterFieldsMissing(String mustFilterFields);
+
   @BaseMessage("BIT_GET/GETBIT error: negative position {0,number} not allowed")
   ExInst<CalciteException> illegalNegativeBitGetPosition(int position);
 
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/AbstractNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/AbstractNamespace.java
index 3f16078..4ce24bd 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/AbstractNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/AbstractNamespace.java
@@ -20,6 +20,7 @@
 import org.apache.calcite.rel.type.RelDataTypeFactory;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
 import org.apache.calcite.util.Util;
 
@@ -57,6 +58,10 @@
   /** As {@link #rowType}, but not necessarily a struct. */
   protected @Nullable RelDataType type;
 
+  /** Ordinals of fields that must be filtered. Initially the empty set, but
+   * should typically be re-assigned on validate. */
+  protected ImmutableBitSet mustFilterFields = ImmutableBitSet.of();
+
   protected final @Nullable SqlNode enclosingNode;
 
   //~ Constructors -----------------------------------------------------------
@@ -159,6 +164,11 @@
     return ImmutableList.of();
   }
 
+  @Override public ImmutableBitSet getMustFilterFields() {
+    return requireNonNull(mustFilterFields,
+        "mustFilterFields (maybe validation is not complete?)");
+  }
+
   @Override public SqlMonotonicity getMonotonicity(String columnName) {
     return SqlMonotonicity.NOT_MONOTONIC;
   }
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/IdentifierNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/IdentifierNamespace.java
index c3fab93..770f8b9 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/IdentifierNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/IdentifierNamespace.java
@@ -210,6 +210,7 @@
       }
     }
 
+    this.mustFilterFields = resolvedNamespace.getMustFilterFields();
     RelDataType rowType = resolvedNamespace.getRowType();
 
     if (extendList != null) {
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SemanticTable.java b/core/src/main/java/org/apache/calcite/sql/validate/SemanticTable.java
new file mode 100644
index 0000000..b115dbb
--- /dev/null
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SemanticTable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.sql.validate;
+
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Extension to {@link SqlValidatorTable} with extra, optional metadata.
+ *
+ * <p>Used to flag individual columns as 'must-filter'.
+ */
+public interface SemanticTable {
+  /** Returns the filter expression for {@code column}
+   * if it is a {@link #mustFilter(int) must-filter} column,
+   * or null if it is not a must-filter column.
+   *
+   * @param column Column ordinal (0-based)
+   *
+   * @throws IndexOutOfBoundsException if column ordinal is out of range */
+  default @Nullable String getFilter(int column) {
+    return null;
+  }
+
+  /** Returns whether {@code column} must be filtered in any query
+   * that references this table.
+   *
+   * @param column Column ordinal (0-based)
+   *
+   * @throws IndexOutOfBoundsException if column ordinal is out of range */
+  default boolean mustFilter(int column) {
+    return getFilter(column) != null;
+  }
+}
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlQualified.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlQualified.java
index 48bb492..2ec1099 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlQualified.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlQualified.java
@@ -23,6 +23,8 @@
 
 import java.util.List;
 
+import static java.util.Objects.hash;
+
 /**
  * Fully-qualified identifier.
  *
@@ -46,6 +48,29 @@
     this.identifier = identifier;
   }
 
+  @Override public int hashCode() {
+    return hash(identifier.names, prefixLength);
+  }
+
+  @Override public boolean equals(@Nullable Object obj) {
+    // Two SqlQualified instances are equivalent if they are of the same
+    // identifier and same prefix length. Thus, in
+    //
+    //  SELECT e.address, e.address.zipcode
+    //  FROM employees AS e
+    //
+    // "e.address" is {identifier=[e, address], prefixLength=1}
+    // and is distinct from "e.address.zipcode".
+    //
+    // We assume that all SqlQualified instances being compared are resolved
+    // from the same SqlValidatorScope, and therefore we do not need to look
+    // at namespace to distinguish them.
+    return this == obj
+        || obj instanceof SqlQualified
+        && prefixLength == ((SqlQualified) obj).prefixLength
+        && identifier.names.equals(((SqlQualified) obj).identifier.names);
+  }
+
   @Override public String toString() {
     return "{id: " + identifier + ", prefix: " + prefixLength + "}";
   }
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
index 0338981..8612da7 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
@@ -135,6 +135,7 @@
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Calendar;
 import java.util.Collection;
 import java.util.Collections;
@@ -143,15 +144,19 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.TreeSet;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 import java.util.function.UnaryOperator;
 import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
@@ -1160,6 +1165,23 @@
     SqlNode node = namespace.getNode();
     if (node != null) {
       setValidatedNodeType(node, namespace.getType());
+
+      if (node == top) {
+        // A top-level namespace must not return any must-filter fields.
+        // A non-top-level namespace (e.g. a subquery) may return must-filter
+        // fields; these are neutralized if the consuming query filters on them.
+        final ImmutableBitSet mustFilterFields =
+            namespace.getMustFilterFields();
+        if (!mustFilterFields.isEmpty()) {
+          // Set of field names, sorted alphabetically for determinism.
+          Set<String> fieldNameSet =
+              StreamSupport.stream(mustFilterFields.spliterator(), false)
+                  .map(namespace.getRowType().getFieldNames()::get)
+                  .collect(Collectors.toCollection(TreeSet::new));
+          throw newValidationError(node,
+              RESOURCE.mustFilterFieldsMissing(fieldNameSet.toString()));
+        }
+      }
     }
   }
 
@@ -2740,7 +2762,7 @@
    * @param alias       Name of this query within its parent. Must be specified
    *                    if usingScope != null
    */
-  private void registerQuery(
+  protected void registerQuery(
       SqlValidatorScope parentScope,
       @Nullable SqlValidatorScope usingScope,
       SqlNode node,
@@ -3870,6 +3892,65 @@
         validateSelectList(selectItems, select, targetRowType);
     ns.setType(rowType);
 
+    // Deduce which columns must be filtered.
+    ns.mustFilterFields = ImmutableBitSet.of();
+    if (from != null) {
+      final Set<SqlQualified> qualifieds = new LinkedHashSet<>();
+      for (ScopeChild child : fromScope.children) {
+        final List<String> fieldNames =
+            child.namespace.getRowType().getFieldNames();
+        child.namespace.getMustFilterFields()
+            .forEachInt(i ->
+                qualifieds.add(
+                    SqlQualified.create(fromScope, 1, child.namespace,
+                        new SqlIdentifier(
+                            ImmutableList.of(child.name, fieldNames.get(i)),
+                            SqlParserPos.ZERO))));
+      }
+      if (!qualifieds.isEmpty()) {
+        if (select.getWhere() != null) {
+          forEachQualified(select.getWhere(), getWhereScope(select),
+              qualifieds::remove);
+        }
+        if (select.getHaving() != null) {
+          forEachQualified(select.getHaving(), getHavingScope(select),
+              qualifieds::remove);
+        }
+
+        // Each of the must-filter fields identified must be returned as a
+        // SELECT item, which is then flagged as must-filter.
+        final BitSet mustFilterFields = new BitSet();
+        final List<SqlNode> expandedSelectItems =
+            requireNonNull(fromScope.getExpandedSelectList(),
+                "expandedSelectList");
+        forEach(expandedSelectItems, (selectItem, i) -> {
+          selectItem = stripAs(selectItem);
+          if (selectItem instanceof SqlIdentifier) {
+            SqlQualified qualified =
+                fromScope.fullyQualify((SqlIdentifier) selectItem);
+            if (qualifieds.remove(qualified)) {
+              // SELECT item #i referenced a must-filter column that was not
+              // filtered in the WHERE or HAVING. It becomes a must-filter
+              // column for our consumer.
+              mustFilterFields.set(i);
+            }
+          }
+        });
+
+        // If there are must-filter fields that are not in the SELECT clause,
+        // this is an error.
+        if (!qualifieds.isEmpty()) {
+          throw newValidationError(select,
+              RESOURCE.mustFilterFieldsMissing(
+                  qualifieds.stream()
+                      .map(q -> q.suffix().get(0))
+                      .collect(Collectors.toCollection(TreeSet::new))
+                      .toString()));
+        }
+        ns.mustFilterFields = ImmutableBitSet.fromBitSet(mustFilterFields);
+      }
+    }
+
     // Validate ORDER BY after we have set ns.rowType because in some
     // dialects you can refer to columns of the select list, e.g.
     // "SELECT empno AS x FROM emp ORDER BY x"
@@ -3885,6 +3966,19 @@
     }
   }
 
+  /** For each identifier in an expression, resolves it to a qualified name
+   * and calls the provided action. */
+  private static void forEachQualified(SqlNode node, SqlValidatorScope scope,
+      Consumer<SqlQualified> consumer) {
+    node.accept(new SqlBasicVisitor<Void>() {
+      @Override public Void visit(SqlIdentifier id) {
+        final SqlQualified qualified = scope.fullyQualify(id);
+        consumer.accept(qualified);
+        return null;
+      }
+    });
+  }
+
   private void checkRollUpInSelectList(SqlSelect select) {
     SqlValidatorScope scope = getSelectScope(select);
     for (SqlNode item : SqlNonNullableAccessors.getSelectList(select)) {
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorNamespace.java
index ddcacee..77a8dc0 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorNamespace.java
@@ -19,6 +19,7 @@
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Pair;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -212,4 +213,10 @@
    * @param modality Modality
    */
   boolean supportsModality(SqlModality modality);
+
+  /** Returns the ordinals (in the row type) of the "must-filter" fields,
+   * fields that that must be filtered in a query. */
+  default ImmutableBitSet getMustFilterFields() {
+    return ImmutableBitSet.of();
+  }
 }
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorUtil.java b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorUtil.java
index 3c4565b..fd18c2d 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorUtil.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/SqlValidatorUtil.java
@@ -381,8 +381,7 @@
       SqlValidatorCatalogReader catalogReader,
       RelDataTypeFactory typeFactory,
       SqlValidator.Config config) {
-    return new SqlValidatorImpl(opTab, catalogReader, typeFactory,
-        config);
+    return new SqlValidatorImpl(opTab, catalogReader, typeFactory, config);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/TableNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/TableNamespace.java
index 06da097..8e8ab27 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/TableNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/TableNamespace.java
@@ -26,6 +26,7 @@
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
@@ -36,6 +37,7 @@
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.calcite.util.ImmutableBitSet.toImmutableBitSet;
 import static org.apache.calcite.util.Static.RESOURCE;
 
 import static java.util.Objects.requireNonNull;
@@ -58,6 +60,15 @@
   }
 
   @Override protected RelDataType validateImpl(RelDataType targetRowType) {
+    this.mustFilterFields = ImmutableBitSet.of();
+    table.maybeUnwrap(SemanticTable.class)
+        .ifPresent(semanticTable ->
+            this.mustFilterFields =
+                table.getRowType().getFieldList().stream()
+                    .map(RelDataTypeField::getIndex)
+                    .filter(semanticTable::mustFilter)
+                    .collect(toImmutableBitSet()));
+
     if (extendedFields.isEmpty()) {
       return table.getRowType();
     }
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/WithItemNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/WithItemNamespace.java
index 92c7e8f..5c0a857 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/WithItemNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/WithItemNamespace.java
@@ -41,6 +41,7 @@
     final SqlValidatorNamespace childNs =
         validator.getNamespaceOrThrow(getQuery());
     final RelDataType rowType = childNs.getRowTypeSansSystemColumns();
+    mustFilterFields = childNs.getMustFilterFields();
     SqlNodeList columnList = withItem.columnList;
     if (columnList == null) {
       return rowType;
diff --git a/core/src/main/java/org/apache/calcite/sql/validate/WithNamespace.java b/core/src/main/java/org/apache/calcite/sql/validate/WithNamespace.java
index 7ae775b..bec57b7 100644
--- a/core/src/main/java/org/apache/calcite/sql/validate/WithNamespace.java
+++ b/core/src/main/java/org/apache/calcite/sql/validate/WithNamespace.java
@@ -24,6 +24,8 @@
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * Namespace for <code>WITH</code> clause.
  */
@@ -56,9 +58,13 @@
     }
     final SqlValidatorScope scope2 =
         validator.getWithScope(Util.last(with.withList));
+    final SqlValidatorNamespace bodyNamespace =
+        requireNonNull(validator.getNamespace(with.body), "namespace");
+
     validator.validateQuery(with.body, scope2, targetRowType);
     final RelDataType rowType = validator.getValidatedNodeType(with.body);
     validator.setValidatedNodeType(with, rowType);
+    mustFilterFields = bodyNamespace.getMustFilterFields();
     return rowType;
   }
 
diff --git a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
index d429440..684e6c7 100644
--- a/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
+++ b/core/src/main/resources/org/apache/calcite/runtime/CalciteResource.properties
@@ -354,6 +354,7 @@
 InvalidOrderBy=Only tables with set semantics may be ordered. Invalid ORDER BY clause in the {0,number,#}-th operand of table function ''{1}''
 MultipleRowSemanticsTables=A table function at most has one input table with row semantics. Table function ''{0}'' has multiple input tables with row semantics
 NoOperator=No operator for ''{0}'' with kind: ''{1}'', syntax: ''{2}'' during JSON deserialization
+MustFilterFieldsMissing=SQL statement did not contain filters on the following fields: {0}
 IllegalNegativeBitGetPosition=BIT_GET/GETBIT error: negative position {0,number} not allowed
 IllegalBitGetPositionExceedsLimit=BIT_GET/GETBIT error: position {0,number} exceeds the bit upper limit {1,number}
 # End CalciteResource.properties
diff --git a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
index f888d4e..1308aa9 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlValidatorTest.java
@@ -51,6 +51,7 @@
 import org.apache.calcite.sql.validate.SqlValidatorImpl;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.test.catalog.CountingFactory;
+import org.apache.calcite.test.catalog.MustFilterMockCatalogReader;
 import org.apache.calcite.testlib.annotations.LocaleEnUs;
 import org.apache.calcite.tools.ValidationException;
 import org.apache.calcite.util.Bug;
@@ -69,12 +70,14 @@
 import java.io.StringReader;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.TreeSet;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
@@ -11765,6 +11768,226 @@
     assertThat(resultType, hasToString("INTEGER"));
   }
 
+  /**
+   * Tests validation of must-filter columns.
+   *
+   * <p>If a table that implements
+   * {@link org.apache.calcite.sql.validate.SemanticTable} tags fields as
+   * 'must-filter', and the SQL query does not contain a WHERE or HAVING clause
+   * on each of the tagged columns, the validator should throw an error.
+   */
+  @Test void testMustFilterColumns() {
+    final SqlValidatorFixture fixture = fixture()
+        .withParserConfig(c -> c.withQuoting(Quoting.BACK_TICK))
+        .withOperatorTable(operatorTableFor(SqlLibrary.BIG_QUERY))
+        .withCatalogReader(MustFilterMockCatalogReader::create);
+    // Basic query
+    fixture.withSql("select empno\n"
+            + "from emp\n"
+            + "where job = 'doctor'\n"
+            + "and empno = 1")
+        .ok();
+    fixture.withSql("^select *\n"
+            + "from emp\n"
+            + "where concat(emp.empno, ' ') = 'abc'^")
+        .fails(missingFilters("JOB"));
+
+    // SUBQUERIES
+    fixture.withSql("select * from (\n"
+            + "  select * from emp where empno = 1)\n"
+            + "where job = 'doctor'")
+        .ok();
+    // Deceitful alias #1. Filter on 'j' is a filter on the underlying 'job'.
+    fixture.withSql("select * from (\n"
+            + "  select job as j, ename as job\n"
+            + "  from emp\n"
+            + "  where empno = 1)\n"
+            + "where j = 'doctor'")
+        .ok();
+      // Deceitful alias #2. Filter on 'job' is a filter on the underlying
+      // 'ename', so the underlying 'job' is missing a filter.
+    fixture.withSql("^select * from (\n"
+            + "  select job as j, ename as job\n"
+            + "  from emp\n"
+            + "  where empno = 1)\n"
+            + "where job = 'doctor'^")
+        .fails(missingFilters("J"));
+    fixture.withSql("select * from (\n"
+            + "  select * from emp where job = 'doctor')\n"
+            + "where empno = 1")
+        .ok();
+    fixture.withSql("select * from (\n"
+            + "  select empno from emp where job = 'doctor')\n"
+            + "where empno = 1")
+        .ok();
+    fixture.withSql("^select * from (\n"
+            + "  select * from emp where empno = 1)^")
+        .fails(missingFilters("JOB"));
+    fixture.withSql("^select * from (select * from `SALES`.`EMP`) as a1^ ")
+        .fails(missingFilters("EMPNO", "JOB"));
+
+    // JOINs
+    fixture.withSql("^select *\n"
+            + "from emp\n"
+            + "join dept on emp.deptno = dept.deptno^")
+        .fails(missingFilters("EMPNO", "JOB", "NAME"));
+    fixture.withSql("^select *\n"
+            + "from emp\n"
+            + "join dept on emp.deptno = dept.deptno\n"
+            + "where emp.empno = 1^")
+        .fails(missingFilters("JOB", "NAME"));
+    fixture.withSql("select *\n"
+            + "from emp\n"
+            + "join dept on emp.deptno = dept.deptno\n"
+            + "where emp.empno = 1\n"
+            + "and emp.job = 'doctor'\n"
+            + "and dept.name = 'ACCOUNTING'")
+        .ok();
+    fixture.withSql("select *\n"
+            + "from emp\n"
+            + "join dept on emp.deptno = dept.deptno\n"
+            + "where empno = 1\n"
+            + "and job = 'doctor'\n"
+            + "and dept.name = 'ACCOUNTING'")
+        .ok();
+
+    // Self-join
+    fixture.withSql("^select *\n"
+            + "from `SALES`.emp a1\n"
+            + "join `SALES`.emp a2 on a1.empno = a2.empno^")
+        .fails(missingFilters("EMPNO", "EMPNO0", "JOB", "JOB0"));
+    fixture.withSql("^select *\n"
+            + "from emp a1\n"
+            + "join emp a2 on a1.empno = a2.empno\n"
+            + "where a2.empno = 1\n"
+            + "and a1.empno = 1\n"
+            + "and a2.job = 'doctor'^")
+        // There are two JOB columns but only one is filtered
+        .fails(missingFilters("JOB"));
+    fixture.withSql("select *\n"
+            + "from emp a1\n"
+            + "join emp a2 on a1.empno = a2.empno\n"
+            + "where a1.empno = 1\n"
+            + "and a1.job = 'doctor'\n"
+            + "and a2.empno = 2\n"
+            + "and a2.job = 'undertaker'\n")
+        .ok();
+    fixture.withSql("^select *\n"
+            + " from (select * from `SALES`.`EMP`) as a1\n"
+            + "join (select * from `SALES`.`EMP`) as a2\n"
+            + "  on a1.`EMPNO` = a2.`EMPNO`^")
+        .fails(missingFilters("EMPNO", "EMPNO0", "JOB", "JOB0"));
+
+
+    // USING
+    fixture.withSql("^select *\n"
+            + "from emp\n"
+            + "join dept using(deptno)\n"
+            + "where emp.empno = 1^")
+        .fails(missingFilters("JOB", "NAME"));
+    fixture.withSql("select *\n"
+            + "from emp\n"
+            + "join dept using(deptno)\n"
+            + "where emp.empno = 1\n"
+            + "and emp.job = 'doctor'\n"
+            + "and dept.name = 'ACCOUNTING'")
+        .ok();
+
+    // GROUP BY (HAVING)
+    fixture.withSql("select *\n"
+            + "from dept\n"
+            + "group by deptno, name\n"
+            + "having name = 'accounting_dept'")
+        .ok();
+    fixture.withSql("^select *\n"
+            + "from dept\n"
+            + "group by deptno, name^")
+        .fails(missingFilters("NAME"));
+    fixture.withSql("select name\n"
+            + "from dept\n"
+            + "group by name\n"
+            + "having name = 'accounting'")
+        .ok();
+    fixture.withSql("^select name\n"
+            + "from dept\n"
+            + "group by name^ ")
+        .fails(missingFilters("NAME"));
+    fixture.withSql("select sum(sal)\n"
+            + "from emp\n"
+            + "where empno > 10\n"
+            + "and job = 'doctor'\n"
+            + "group by empno\n"
+            + "having sum(sal) > 100")
+        .ok();
+    fixture.withSql("^select sum(sal)\n"
+            + "from emp\n"
+            + "where empno > 10\n"
+            + "group by empno\n"
+            + "having sum(sal) > 100^")
+        .fails(missingFilters("JOB"));
+
+    // CTE
+    fixture.withSql("^WITH cte AS (\n"
+            + "  select * from emp order by empno)^\n"
+            + "SELECT * from cte")
+        .fails(missingFilters("EMPNO", "JOB"));
+    fixture.withSql("^WITH cte AS (\n"
+            + "  select * from emp where empno = 1)^\n"
+            + "SELECT * from cte")
+        .fails(missingFilters("JOB"));
+    fixture.withSql("WITH cte AS (\n"
+            + "  select *\n"
+            + "  from emp\n"
+            + "  where empno = 1\n"
+            + "  and job = 'doctor')\n"
+            + "SELECT * from cte")
+        .ok();
+    fixture.withSql("^WITH cte AS (\n"
+            + "  select * from emp)^\n"
+            + "SELECT *\n"
+            + "from cte\n"
+            + "where empno = 1")
+        .fails(missingFilters("JOB"));
+    fixture.withSql("WITH cte AS (\n"
+            + "  select * from emp)\n"
+            + "SELECT *\n"
+            + "from cte\n"
+            + "where empno = 1\n"
+            + "and job = 'doctor'")
+        .ok();
+    fixture.withSql("WITH cte AS (\n"
+        + "  select * from emp where empno = 1)\n"
+        + "SELECT *\n"
+        + "from cte\n"
+        + "where job = 'doctor'")
+        .ok();
+    fixture.withSql("WITH cte AS (\n"
+            + "  select empno, job from emp)\n"
+            + "SELECT *\n"
+            + "from cte\n"
+            + "where empno = 1\n"
+            + "and job = 'doctor'")
+        .ok();
+
+    // Filters are missing on EMPNO and JOB, but the error message only
+    // complains about JOB because EMPNO is in the SELECT clause, and could
+    // theoretically be filtered by an enclosing query.
+    fixture.withSql("^select empno\n"
+            + "from emp^")
+        .fails(missingFilters("JOB"));
+    fixture.withSql("^select empno,\n"
+            + "  sum(sal) over (order by mgr)\n"
+            + "from emp^")
+        .fails(missingFilters("JOB"));
+  }
+
+  /** Returns a message that the particular columns are not filtered. */
+  private static String missingFilters(String... args) {
+    return "SQL statement did not contain filters on the following fields: \\["
+        + String.join(", ", new TreeSet<>(Arrays.asList(args)))
+        + "\\]";
+  }
+
   @Test void testAccessingNestedFieldsOfNullableRecord() {
     sql("select ROW_COLUMN_ARRAY[0].NOT_NULL_FIELD from NULLABLEROWS.NR_T1")
         .withExtendedCatalog()
diff --git a/testkit/src/main/java/org/apache/calcite/test/catalog/MockCatalogReader.java b/testkit/src/main/java/org/apache/calcite/test/catalog/MockCatalogReader.java
index 8b489f2..a467ab2 100644
--- a/testkit/src/main/java/org/apache/calcite/test/catalog/MockCatalogReader.java
+++ b/testkit/src/main/java/org/apache/calcite/test/catalog/MockCatalogReader.java
@@ -74,6 +74,7 @@
 import org.apache.calcite.sql.SqlCall;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.validate.SemanticTable;
 import org.apache.calcite.sql.validate.SqlModality;
 import org.apache.calcite.sql.validate.SqlMonotonicity;
 import org.apache.calcite.sql.validate.SqlNameMatcher;
@@ -90,6 +91,7 @@
 import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
@@ -1005,6 +1007,49 @@
     }
   }
 
+  /** Mock implementation of {@link MockTable} that supports must-filter fields.
+   *
+   * <p>Must-filter fields are declared via methods in the {@link SemanticTable}
+   * interface. */
+  public static class MustFilterMockTable
+      extends MockTable implements SemanticTable {
+    private final Map<String, String> fieldFilters;
+
+    MustFilterMockTable(MockCatalogReader catalogReader, String catalogName,
+        String schemaName, String name, boolean stream, boolean temporal,
+        double rowCount, @Nullable ColumnResolver resolver,
+        InitializerExpressionFactory initializerExpressionFactory,
+        Map<String, String> fieldFilters) {
+      super(catalogReader, catalogName, schemaName, name, stream, temporal,
+          rowCount, resolver, initializerExpressionFactory);
+      this.fieldFilters = ImmutableMap.copyOf(fieldFilters);
+    }
+
+    /** Creates a MustFilterMockTable. */
+    public static MustFilterMockTable create(MockCatalogReader catalogReader,
+        MockSchema schema, String name, boolean stream, double rowCount,
+        @Nullable ColumnResolver resolver,
+        InitializerExpressionFactory initializerExpressionFactory,
+        boolean temporal, Map<String, String> fieldFilters) {
+      MustFilterMockTable table =
+          new MustFilterMockTable(catalogReader, schema.getCatalogName(),
+              schema.name, name, stream, temporal, rowCount, resolver,
+              initializerExpressionFactory, fieldFilters);
+      schema.addTable(name);
+      return table;
+    }
+
+    @Override public @Nullable String getFilter(int column) {
+      String columnName = columnList.get(column).getKey();
+      return fieldFilters.get(columnName);
+    }
+
+    @Override public boolean mustFilter(int column) {
+      String columnName = columnList.get(column).getKey();
+      return fieldFilters.containsKey(columnName);
+    }
+  }
+
   /** Wrapper around a {@link MockTable}, giving it a {@link Table} interface.
    * You can get the {@code MockTable} by calling {@link #unwrap(Class)}. */
   private static class WrapperTable implements Table, Wrapper {
diff --git a/testkit/src/main/java/org/apache/calcite/test/catalog/MustFilterMockCatalogReader.java b/testkit/src/main/java/org/apache/calcite/test/catalog/MustFilterMockCatalogReader.java
new file mode 100644
index 0000000..69cc3ef
--- /dev/null
+++ b/testkit/src/main/java/org/apache/calcite/test/catalog/MustFilterMockCatalogReader.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.test.catalog;
+
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.validate.SqlValidatorCatalogReader;
+import org.apache.calcite.sql2rel.NullInitializerExpressionFactory;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Mock catalog reader that tags a few columns in the tables as must-filter.
+ *
+ * <p>Used for testing must-filter validation.
+ * See {@code org.apache.calcite.test.SqlValidatorTest#testMustFilterColumns()}.
+ */
+public class MustFilterMockCatalogReader extends MockCatalogReader {
+
+  MustFilterMockCatalogReader(RelDataTypeFactory typeFactory,
+      boolean caseSensitive) {
+    super(typeFactory, caseSensitive);
+  }
+
+  public static SqlValidatorCatalogReader create(RelDataTypeFactory typeFactory,
+      boolean caseSensitive) {
+    return new MustFilterMockCatalogReader(typeFactory, caseSensitive).init();
+  }
+
+  @Override public MockCatalogReader init() {
+    MockSchema salesSchema = new MockSchema("SALES");
+    registerSchema(salesSchema);
+
+    // Register "EMP" table. Must-filter fields are "EMPNO", "JOB".
+    MustFilterMockTable empTable =
+        MustFilterMockTable.create(this, salesSchema, "EMP",
+            false, 14, null, NullInitializerExpressionFactory.INSTANCE,
+            false, ImmutableMap.of("EMPNO", "10", "JOB", "JOB_1"));
+
+    final RelDataType integerType =
+        typeFactory.createSqlType(SqlTypeName.INTEGER);
+    final RelDataType timestampType =
+        typeFactory.createSqlType(SqlTypeName.TIMESTAMP);
+    final RelDataType varcharType =
+        typeFactory.createSqlType(SqlTypeName.VARCHAR);
+    final RelDataType booleanType =
+        typeFactory.createSqlType(SqlTypeName.BOOLEAN);
+    empTable.addColumn("EMPNO", integerType, true);
+    empTable.addColumn("ENAME", varcharType);
+    empTable.addColumn("JOB", varcharType);
+    empTable.addColumn("MGR", integerType);
+    empTable.addColumn("HIREDATE", timestampType);
+    empTable.addColumn("SAL", integerType);
+    empTable.addColumn("COMM", integerType);
+    empTable.addColumn("DEPTNO", integerType);
+    empTable.addColumn("SLACKER", booleanType);
+    registerTable(empTable);
+
+    // Register "DEPT" table. "NAME" is a must-filter field.
+    MustFilterMockTable deptTable =
+        MustFilterMockTable.create(this, salesSchema, "DEPT",
+            false, 14, null, NullInitializerExpressionFactory.INSTANCE,
+            false, ImmutableMap.of("NAME", "ACCOUNTING_DEPT"));
+    deptTable.addColumn("DEPTNO", integerType, true);
+    deptTable.addColumn("NAME", varcharType);
+    registerTable(deptTable);
+    return this;
+  }
+}