[CALCITE-1045][CALCITE-5127] Support correlation variables in project

To some extend correlation in project was already supported even before
this change. However, the fact that the correlation variables were not
explicitly present (and returned by the operator) creates problems
cause we cannot safely deduce if a column/field is used and thus we may
wrongly remove those fields when using the RelFieldTrimmer, when
merging projections, etc.; see queries and discussion under the
respective JIRAs.

The addition of correlation variables in project also aligns the code
with Filter, Join; the latter explicitly set correlation variables.

Co-authored-by: korlov42 <korlov@gridgain.com>

Close apache/calcite#2813
Close apache/calcite#2623
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java
index b5a77c7..947d6e5 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraProject.java
@@ -28,6 +28,7 @@
 import org.apache.calcite.util.Pair;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -42,7 +43,7 @@
 public class CassandraProject extends Project implements CassandraRel {
   public CassandraProject(RelOptCluster cluster, RelTraitSet traitSet,
       RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
-    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
     assert getConvention() == CassandraRel.CONVENTION;
     assert getConvention() == input.getConvention();
   }
diff --git a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
index ce38ed3..6fc1f47 100644
--- a/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
+++ b/cassandra/src/main/java/org/apache/calcite/adapter/cassandra/CassandraRules.java
@@ -273,8 +273,7 @@
           return false;
         }
       }
-
-      return true;
+      return project.getVariablesSet().isEmpty();
     }
 
     @Override public RelNode convert(RelNode rel) {
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableProject.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableProject.java
index 4bc516c..8b4c0c5 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableProject.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableProject.java
@@ -29,6 +29,7 @@
 import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -54,7 +55,7 @@
       RelNode input,
       List<? extends RexNode> projects,
       RelDataType rowType) {
-    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
     assert getConvention() instanceof EnumerableConvention;
   }
 
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableProjectRule.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableProjectRule.java
index b1b6922..3bd8e69 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableProjectRule.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableProjectRule.java
@@ -17,6 +17,7 @@
 package org.apache.calcite.adapter.enumerable;
 
 import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
 import org.apache.calcite.rel.core.Project;
@@ -42,6 +43,11 @@
     super(config);
   }
 
+  @Override public boolean matches(RelOptRuleCall call) {
+    Project project = call.rel(0);
+    return project.getVariablesSet().isEmpty();
+  }
+
   @Override public RelNode convert(RelNode rel) {
     final Project project = (Project) rel;
     return EnumerableProject.create(
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRelFactories.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRelFactories.java
index ad109c4..05c0890 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRelFactories.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/EnumerableRelFactories.java
@@ -26,6 +26,8 @@
 import org.apache.calcite.rex.RexUtil;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 
+import com.google.common.base.Preconditions;
+
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.List;
@@ -68,7 +70,10 @@
       implements org.apache.calcite.rel.core.RelFactories.ProjectFactory {
     @Override public RelNode createProject(RelNode input, List<RelHint> hints,
         List<? extends RexNode> childExprs,
-        @Nullable List<? extends @Nullable String> fieldNames) {
+        @Nullable List<? extends @Nullable String> fieldNames,
+        Set<CorrelationId> variablesSet) {
+      Preconditions.checkArgument(variablesSet.isEmpty(),
+          "EnumerableProject does not allow variables");
       final RelDataType rowType =
           RexUtil.createStructType(input.getCluster().getTypeFactory(), childExprs,
               fieldNames, SqlValidatorUtil.F_SUGGESTER);
diff --git a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcRules.java b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcRules.java
index 6c9b545..2a97773 100644
--- a/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcRules.java
+++ b/core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcRules.java
@@ -73,6 +73,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
@@ -96,7 +97,9 @@
   protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();
 
   static final RelFactories.ProjectFactory PROJECT_FACTORY =
-      (input, hints, projects, fieldNames) -> {
+      (input, hints, projects, fieldNames, variablesSet) -> {
+        Preconditions.checkArgument(variablesSet.isEmpty(),
+            "JdbcProject does not allow variables");
         final RelOptCluster cluster = input.getCluster();
         final RelDataType rowType =
             RexUtil.createStructType(cluster.getTypeFactory(), projects,
@@ -510,6 +513,11 @@
       return false;
     }
 
+    @Override public boolean matches(RelOptRuleCall call) {
+      Project project = call.rel(0);
+      return project.getVariablesSet().isEmpty();
+    }
+
     @Override public @Nullable RelNode convert(RelNode rel) {
       final Project project = (Project) rel;
 
@@ -535,7 +543,7 @@
         RelNode input,
         List<? extends RexNode> projects,
         RelDataType rowType) {
-      super(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+      super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
       assert getConvention() instanceof JdbcConvention;
     }
 
diff --git a/core/src/main/java/org/apache/calcite/interpreter/Bindables.java b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
index 77b5287..9f879da 100644
--- a/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
+++ b/core/src/main/java/org/apache/calcite/interpreter/Bindables.java
@@ -78,6 +78,7 @@
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.immutables.value.Value;
@@ -386,6 +387,11 @@
       super(config);
     }
 
+    @Override public boolean matches(RelOptRuleCall call) {
+      final LogicalProject project = call.rel(0);
+      return project.getVariablesSet().isEmpty();
+    }
+
     @Override public RelNode convert(RelNode rel) {
       final LogicalProject project = (LogicalProject) rel;
       return new BindableProject(rel.getCluster(),
@@ -403,7 +409,7 @@
   public static class BindableProject extends Project implements BindableRel {
     public BindableProject(RelOptCluster cluster, RelTraitSet traitSet,
         RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
-      super(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+      super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
       assert getConvention() instanceof BindableConvention;
     }
 
diff --git a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
index 0b5f1dd..97ca0bf 100644
--- a/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
+++ b/core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
@@ -889,6 +889,7 @@
     List<RexNode> castExps;
     RelNode input;
     List<RelHint> hints = ImmutableList.of();
+    Set<CorrelationId> correlationVariables;
     if (rel instanceof Project) {
       // No need to create another project node if the rel
       // is already a project.
@@ -899,21 +900,23 @@
           ((Project) rel).getProjects());
       input = rel.getInput(0);
       hints = project.getHints();
+      correlationVariables = project.getVariablesSet();
     } else {
       castExps = RexUtil.generateCastExpressions(
           rexBuilder,
           castRowType,
           rowType);
       input = rel;
+      correlationVariables = ImmutableSet.of();
     }
     if (rename) {
       // Use names and types from castRowType.
       return projectFactory.createProject(input, hints, castExps,
-          castRowType.getFieldNames());
+          castRowType.getFieldNames(), correlationVariables);
     } else {
       // Use names from rowType, types from castRowType.
       return projectFactory.createProject(input, hints, castExps,
-          rowType.getFieldNames());
+          rowType.getFieldNames(), correlationVariables);
     }
   }
 
@@ -3623,7 +3626,8 @@
               : fieldNames.get(i));
       exprList.add(rexBuilder.makeInputRef(rel, source));
     }
-    return projectFactory.createProject(rel, ImmutableList.of(), exprList, outputNameList);
+    return projectFactory.createProject(rel, ImmutableList.of(), exprList, outputNameList,
+        ImmutableSet.of());
   }
 
   /** Predicate for if a {@link Calc} does not contain windowed aggregates. */
diff --git a/core/src/main/java/org/apache/calcite/prepare/LixToRelTranslator.java b/core/src/main/java/org/apache/calcite/prepare/LixToRelTranslator.java
index f7d02f3..2bd4eb0 100644
--- a/core/src/main/java/org/apache/calcite/prepare/LixToRelTranslator.java
+++ b/core/src/main/java/org/apache/calcite/prepare/LixToRelTranslator.java
@@ -39,6 +39,7 @@
 import org.apache.calcite.util.BuiltInMethod;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import java.lang.reflect.Type;
 import java.util.ArrayList;
@@ -108,7 +109,8 @@
         return LogicalProject.create(input,
             ImmutableList.of(),
             toRex(input, (FunctionExpression) call.expressions.get(0)),
-            (List<String>) null);
+            (List<String>) null,
+            ImmutableSet.of());
 
       case WHERE:
         input = translate(getTargetExpression(call));
diff --git a/core/src/main/java/org/apache/calcite/prepare/QueryableRelBuilder.java b/core/src/main/java/org/apache/calcite/prepare/QueryableRelBuilder.java
index 5385c6d..4731e02 100644
--- a/core/src/main/java/org/apache/calcite/prepare/QueryableRelBuilder.java
+++ b/core/src/main/java/org/apache/calcite/prepare/QueryableRelBuilder.java
@@ -49,6 +49,7 @@
 import org.apache.calcite.schema.impl.AbstractTableQueryable;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.checkerframework.checker.nullness.qual.PolyNull;
@@ -549,7 +550,8 @@
     RelNode child = toRel(source);
     List<RexNode> nodes = translator.toRexList(selector, child);
     setRel(
-        LogicalProject.create(child, ImmutableList.of(), nodes, (List<String>)  null));
+        LogicalProject.create(child, ImmutableList.of(), nodes, (List<String>)  null,
+            ImmutableSet.of()));
     return castNonNull(null);
   }
 
diff --git a/core/src/main/java/org/apache/calcite/rel/RelNode.java b/core/src/main/java/org/apache/calcite/rel/RelNode.java
index 21f6586..ef3b878 100644
--- a/core/src/main/java/org/apache/calcite/rel/RelNode.java
+++ b/core/src/main/java/org/apache/calcite/rel/RelNode.java
@@ -152,9 +152,6 @@
    * expression but also used and therefore not available to parents of this
    * relational expression.
    *
-   * <p>Note: only {@link org.apache.calcite.rel.core.Correlate} should set
-   * variables.
-   *
    * @return Names of variables which are set in this relational
    *   expression
    */
diff --git a/core/src/main/java/org/apache/calcite/rel/RelRoot.java b/core/src/main/java/org/apache/calcite/rel/RelRoot.java
index d72c0ae..7e6ba78 100644
--- a/core/src/main/java/org/apache/calcite/rel/RelRoot.java
+++ b/core/src/main/java/org/apache/calcite/rel/RelRoot.java
@@ -27,6 +27,7 @@
 import org.apache.calcite.util.mapping.Mappings;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -166,7 +167,7 @@
     for (Pair<Integer, String> field : fields) {
       projects.add(rexBuilder.makeInputRef(rel, field.left));
     }
-    return LogicalProject.create(rel, hints, projects, Pair.right(fields));
+    return LogicalProject.create(rel, hints, projects, Pair.right(fields), ImmutableSet.of());
   }
 
   public boolean isNameTrivial() {
diff --git a/core/src/main/java/org/apache/calcite/rel/core/Project.java b/core/src/main/java/org/apache/calcite/rel/core/Project.java
index f4a2e3a..4a1033c 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/Project.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/Project.java
@@ -45,6 +45,7 @@
 import org.apache.calcite.util.mapping.Mappings;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.apiguardian.api.API;
 import org.checkerframework.checker.nullness.qual.EnsuresNonNullIf;
@@ -53,6 +54,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 
 import static java.util.Objects.requireNonNull;
@@ -70,6 +72,8 @@
 
   protected final ImmutableList<RelHint> hints;
 
+  protected final ImmutableSet<CorrelationId> variablesSet;
+
   //~ Constructors -----------------------------------------------------------
 
   /**
@@ -81,6 +85,8 @@
    * @param input    Input relational expression
    * @param projects List of expressions for the input columns
    * @param rowType  Output row type
+   * @param variableSet Correlation variables set by this relational expression
+   *                    to be used by nested expressions
    */
   @SuppressWarnings("method.invocation.invalid")
   protected Project(
@@ -89,25 +95,38 @@
       List<RelHint> hints,
       RelNode input,
       List<? extends RexNode> projects,
-      RelDataType rowType) {
+      RelDataType rowType,
+      Set<CorrelationId> variableSet) {
     super(cluster, traits, input);
     assert rowType != null;
     this.exps = ImmutableList.copyOf(projects);
     this.hints = ImmutableList.copyOf(hints);
     this.rowType = rowType;
+    this.variablesSet = ImmutableSet.copyOf(variableSet);
     assert isValid(Litmus.THROW, null);
   }
 
   @Deprecated // to be removed before 2.0
+  protected Project(
+      RelOptCluster cluster,
+      RelTraitSet traits,
+      List<RelHint> hints,
+      RelNode input,
+      List<? extends RexNode> projects,
+      RelDataType rowType) {
+    this(cluster, traits, hints, input, projects, rowType, ImmutableSet.of());
+  }
+
+  @Deprecated // to be removed before 2.0
   protected Project(RelOptCluster cluster, RelTraitSet traits,
       RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
-    this(cluster, traits, ImmutableList.of(), input, projects, rowType);
+    this(cluster, traits, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
   }
 
   @Deprecated // to be removed before 2.0
   protected Project(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
       List<? extends RexNode> projects, RelDataType rowType, int flags) {
-    this(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+    this(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
     Util.discard(flags);
   }
 
@@ -120,7 +139,14 @@
         ImmutableList.of(),
         input.getInput(),
         requireNonNull(input.getExpressionList("exprs"), "exprs"),
-        input.getRowType("exprs", "fields"));
+        input.getRowType("exprs", "fields"),
+        ImmutableSet.copyOf(
+            Util.transform(
+                Optional.ofNullable(input.getIntegerList("variablesSet"))
+                    .orElse(ImmutableList.of()),
+                id -> new CorrelationId(id)
+            )
+        ));
   }
 
   //~ Methods ----------------------------------------------------------------
@@ -264,8 +290,13 @@
     return refs.size();
   }
 
+  @Override public Set<CorrelationId> getVariablesSet() {
+    return variablesSet;
+  }
+
   @Override public RelWriter explainTerms(RelWriter pw) {
     super.explainTerms(pw);
+    pw.itemIf("variablesSet", variablesSet, !variablesSet.isEmpty());
     // Skip writing field names so the optimizer can reuse the projects that differ in
     // field names only
     if (pw.getDetailLevel() == SqlExplainLevel.DIGEST_ATTRIBUTES) {
diff --git a/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java b/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
index 6d5e2c0..3a0d543 100644
--- a/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
+++ b/core/src/main/java/org/apache/calcite/rel/core/RelFactories.java
@@ -163,9 +163,29 @@
      * @param childExprs The projection expressions
      * @param fieldNames The projection field names
      * @return a project
+     * @deprecated Use {@link #createProject(RelNode, List, List, List, Set)} instead
+     */
+    @Deprecated // to be removed before 2.0
+    default RelNode createProject(RelNode input, List<RelHint> hints,
+        List<? extends RexNode> childExprs, @Nullable List<? extends @Nullable String> fieldNames) {
+      return createProject(input, hints, childExprs, fieldNames, ImmutableSet.of());
+    }
+
+    /**
+     * Creates a project.
+     *
+     * @param input The input
+     * @param hints The hints
+     * @param childExprs The projection expressions
+     * @param fieldNames The projection field names
+     * @param variablesSet Correlating variables that are set when reading a row
+     *                     from the input, and which may be referenced from the
+     *                     projection expressions
+     * @return a project
      */
     RelNode createProject(RelNode input, List<RelHint> hints,
-        List<? extends RexNode> childExprs, @Nullable List<? extends @Nullable String> fieldNames);
+        List<? extends RexNode> childExprs, @Nullable List<? extends @Nullable String> fieldNames,
+        Set<CorrelationId> variablesSet);
   }
 
   /**
@@ -174,8 +194,9 @@
    */
   private static class ProjectFactoryImpl implements ProjectFactory {
     @Override public RelNode createProject(RelNode input, List<RelHint> hints,
-        List<? extends RexNode> childExprs, @Nullable List<? extends @Nullable String> fieldNames) {
-      return LogicalProject.create(input, hints, childExprs, fieldNames);
+        List<? extends RexNode> childExprs, @Nullable List<? extends @Nullable String> fieldNames,
+        Set<CorrelationId> variablesSet) {
+      return LogicalProject.create(input, hints, childExprs, fieldNames, variablesSet);
     }
   }
 
diff --git a/core/src/main/java/org/apache/calcite/rel/externalize/RelJson.java b/core/src/main/java/org/apache/calcite/rel/externalize/RelJson.java
index d39947d..f3460b4 100644
--- a/core/src/main/java/org/apache/calcite/rel/externalize/RelJson.java
+++ b/core/src/main/java/org/apache/calcite/rel/externalize/RelJson.java
@@ -73,6 +73,7 @@
 import java.lang.reflect.InvocationTargetException;
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -396,9 +397,9 @@
       return toJson((RexWindowBound) value);
     } else if (value instanceof CorrelationId) {
       return toJson((CorrelationId) value);
-    } else if (value instanceof List) {
+    } else if (value instanceof List || value instanceof Set) {
       final List<@Nullable Object> list = jsonBuilder().list();
-      for (Object o : (List<?>) value) {
+      for (Object o : (Collection<?>) value) {
         list.add(toJson(o));
       }
       return list;
diff --git a/core/src/main/java/org/apache/calcite/rel/logical/LogicalProject.java b/core/src/main/java/org/apache/calcite/rel/logical/LogicalProject.java
index cb64b37..2cfe9e0 100644
--- a/core/src/main/java/org/apache/calcite/rel/logical/LogicalProject.java
+++ b/core/src/main/java/org/apache/calcite/rel/logical/LogicalProject.java
@@ -24,6 +24,7 @@
 import org.apache.calcite.rel.RelInput;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.RelShuttle;
+import org.apache.calcite.rel.core.CorrelationId;
 import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.hint.RelHint;
 import org.apache.calcite.rel.metadata.RelMdCollation;
@@ -35,10 +36,12 @@
 import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 import java.util.List;
+import java.util.Set;
 
 /**
  * Sub-class of {@link org.apache.calcite.rel.core.Project} not
@@ -58,6 +61,8 @@
    * @param input    Input relational expression
    * @param projects List of expressions for the input columns
    * @param rowType  Output row type
+   * @param variablesSet Correlation variables set by this relational expression
+   *                     to be used by nested expressions
    */
   public LogicalProject(
       RelOptCluster cluster,
@@ -65,22 +70,34 @@
       List<RelHint> hints,
       RelNode input,
       List<? extends RexNode> projects,
-      RelDataType rowType) {
-    super(cluster, traitSet, hints, input, projects, rowType);
+      RelDataType rowType,
+      Set<CorrelationId> variablesSet) {
+    super(cluster, traitSet, hints, input, projects, rowType, variablesSet);
     assert traitSet.containsIfApplicable(Convention.NONE);
   }
 
   @Deprecated // to be removed before 2.0
+  public LogicalProject(
+      RelOptCluster cluster,
+      RelTraitSet traitSet,
+      List<RelHint> hints,
+      RelNode input,
+      List<? extends RexNode> projects,
+      RelDataType rowType) {
+    this(cluster, traitSet, hints, input, projects, rowType, ImmutableSet.of());
+  }
+
+  @Deprecated // to be removed before 2.0
   public LogicalProject(RelOptCluster cluster, RelTraitSet traitSet,
       RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
-    this(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+    this(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
   }
 
   @Deprecated // to be removed before 2.0
   public LogicalProject(RelOptCluster cluster, RelTraitSet traitSet,
       RelNode input, List<? extends RexNode> projects, RelDataType rowType,
       int flags) {
-    this(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+    this(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
     Util.discard(flags);
   }
 
@@ -90,7 +107,7 @@
     this(cluster, cluster.traitSetOf(RelCollations.EMPTY),
         ImmutableList.of(), input, projects,
         RexUtil.createStructType(cluster.getTypeFactory(), projects,
-            fieldNames, null));
+            fieldNames, null), ImmutableSet.of());
     Util.discard(flags);
   }
 
@@ -103,32 +120,56 @@
 
   //~ Methods ----------------------------------------------------------------
 
-  /** Creates a LogicalProject. */
+  /**
+   * Creates a LogicalProject.
+   * @deprecated Use {@link #create(RelNode, List, List, List, Set)} instead
+   */
+  @Deprecated // to be removed before 2.0
   public static LogicalProject create(final RelNode input, List<RelHint> hints,
       final List<? extends RexNode> projects,
       @Nullable List<? extends @Nullable String> fieldNames) {
+    return create(input, hints, projects, fieldNames, ImmutableSet.of());
+  }
+
+  /** Creates a LogicalProject. */
+  public static LogicalProject create(final RelNode input, List<RelHint> hints,
+      final List<? extends RexNode> projects,
+      @Nullable List<? extends @Nullable String> fieldNames,
+      final Set<CorrelationId> variablesSet) {
     final RelOptCluster cluster = input.getCluster();
     final RelDataType rowType =
         RexUtil.createStructType(cluster.getTypeFactory(), projects,
             fieldNames, SqlValidatorUtil.F_SUGGESTER);
-    return create(input, hints, projects, rowType);
+    return create(input, hints, projects, rowType, variablesSet);
+  }
+
+  /**
+   * Creates a LogicalProject, specifying row type rather than field names.
+   * @deprecated Use {@link #create(RelNode, List, List, RelDataType, Set)} instead
+   */
+  @Deprecated // to be removed before 2.0
+  public static LogicalProject create(final RelNode input, List<RelHint> hints,
+      final List<? extends RexNode> projects, RelDataType rowType) {
+    return create(input, hints, projects, rowType, ImmutableSet.of());
   }
 
   /** Creates a LogicalProject, specifying row type rather than field names. */
   public static LogicalProject create(final RelNode input, List<RelHint> hints,
-      final List<? extends RexNode> projects, RelDataType rowType) {
+      final List<? extends RexNode> projects, RelDataType rowType,
+      final Set<CorrelationId> variablesSet) {
     final RelOptCluster cluster = input.getCluster();
     final RelMetadataQuery mq = cluster.getMetadataQuery();
     final RelTraitSet traitSet =
         cluster.traitSet().replace(Convention.NONE)
             .replaceIfs(RelCollationTraitDef.INSTANCE,
                 () -> RelMdCollation.project(mq, input, projects));
-    return new LogicalProject(cluster, traitSet, hints, input, projects, rowType);
+    return new LogicalProject(cluster, traitSet, hints, input, projects, rowType, variablesSet);
   }
 
   @Override public LogicalProject copy(RelTraitSet traitSet, RelNode input,
       List<RexNode> projects, RelDataType rowType) {
-    return new LogicalProject(getCluster(), traitSet, hints, input, projects, rowType);
+    return new LogicalProject(getCluster(), traitSet, hints, input, projects, rowType,
+        variablesSet);
   }
 
   @Override public RelNode accept(RelShuttle shuttle) {
@@ -137,7 +178,7 @@
 
   @Override public RelNode withHints(List<RelHint> hintList) {
     return new LogicalProject(getCluster(), traitSet, hintList,
-        input, getProjects(), getRowType());
+        input, getProjects(), getRowType(), variablesSet);
   }
 
   @Override public boolean deepEquals(@Nullable Object obj) {
diff --git a/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java b/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java
index dc0ca12..3d39e9e 100644
--- a/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java
+++ b/core/src/main/java/org/apache/calcite/rel/rel2sql/RelToSqlConverter.java
@@ -901,7 +901,8 @@
                   sort2,
                   ImmutableList.of(),
                   project.getProjects(),
-                  project.getRowType());
+                  project.getRowType(),
+                  project.getVariablesSet());
           return visit(project2);
         }
       }
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/FilterProjectTransposeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/FilterProjectTransposeRule.java
index 13665c4..5be607f 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/FilterProjectTransposeRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/FilterProjectTransposeRule.java
@@ -187,9 +187,10 @@
     RelNode newProject =
         config.isCopyProject()
             ? project.copy(project.getTraitSet(), newFilterRel,
-                project.getProjects(), project.getRowType())
+            project.getProjects(), project.getRowType())
             : relBuilder.push(newFilterRel)
-                .project(project.getProjects(), project.getRowType().getFieldNames())
+                .project(project.getProjects(), project.getRowType().getFieldNames(), false,
+                    project.getVariablesSet())
                 .build();
 
     call.transformTo(newProject);
diff --git a/core/src/main/java/org/apache/calcite/rel/rules/ProjectWindowTransposeRule.java b/core/src/main/java/org/apache/calcite/rel/rules/ProjectWindowTransposeRule.java
index 1ed0652..770052f 100644
--- a/core/src/main/java/org/apache/calcite/rel/rules/ProjectWindowTransposeRule.java
+++ b/core/src/main/java/org/apache/calcite/rel/rules/ProjectWindowTransposeRule.java
@@ -37,6 +37,7 @@
 import org.apache.calcite.util.ImmutableBitSet;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.immutables.value.Value;
 
@@ -85,7 +86,7 @@
       return;
     }
 
-    // Put a DrillProjectRel below LogicalWindow
+    // Put a Project below LogicalWindow
     final List<RexNode> exps = new ArrayList<>();
     final RelDataTypeFactory.Builder builder =
         cluster.getTypeFactory().builder();
@@ -99,7 +100,7 @@
 
     final LogicalProject projectBelowWindow =
         new LogicalProject(cluster, window.getTraitSet(), ImmutableList.of(),
-            window.getInput(), exps, builder.build());
+            window.getInput(), exps, builder.build(), ImmutableSet.of());
 
     // Create a new LogicalWindow with necessary inputs only
     final List<Window.Group> groups = new ArrayList<>();
diff --git a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
index 3f3fbd2..2a27d7c 100644
--- a/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
+++ b/core/src/main/java/org/apache/calcite/rel/stream/StreamRules.java
@@ -86,7 +86,8 @@
           LogicalProject.create(newDelta,
               project.getHints(),
               project.getProjects(),
-              project.getRowType().getFieldNames());
+              project.getRowType().getFieldNames(),
+              project.getVariablesSet());
       call.transformTo(newProject);
     }
 
diff --git a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
index 571019c..c32047d 100644
--- a/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
+++ b/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
@@ -836,7 +836,7 @@
       }
       rel =
           LogicalProject.create(rel, ImmutableList.of(),
-              Pair.left(newProjects), Pair.right(newProjects));
+              Pair.left(newProjects), Pair.right(newProjects), project.getVariablesSet());
       bb.root = rel;
       distinctify(bb, false);
       rel = bb.root();
@@ -857,7 +857,7 @@
 
       rel =
           LogicalProject.create(rel, ImmutableList.of(),
-              Pair.left(undoProjects), Pair.right(undoProjects));
+              Pair.left(undoProjects), Pair.right(undoProjects), ImmutableSet.of());
       bb.setRoot(
           rel,
           false);
@@ -936,7 +936,8 @@
           LogicalProject.create(bb.root(),
               ImmutableList.of(),
               exprs,
-              rowType.getFieldNames().subList(0, fieldCount)),
+              rowType.getFieldNames().subList(0, fieldCount),
+              ImmutableSet.of()),
           false);
     }
   }
@@ -4540,7 +4541,13 @@
     final RelNode r;
     final CorrelationUse p = getCorrelationUse(bb, project);
     if (p != null) {
-      r = p.r;
+      assert p.r instanceof Project;
+      // correlation variables have been normalized in p.r, we should use expressions
+      // in p.r instead of the original exprs
+      Project project1 = (Project) p.r;
+      r = relBuilder.push(bb.root())
+          .projectNamed(project1.getProjects(), fieldNames, true, ImmutableSet.of(p.id))
+          .build();
     } else {
       r = project;
     }
@@ -6626,7 +6633,8 @@
           newInput,
           project.getHints(),
           newProjections.build(),
-          project.getRowType().getFieldNames());
+          project.getRowType().getFieldNames(),
+          project.getVariablesSet());
     }
 
     private Set<Integer> requiredJsonOutputFromParent(RelNode relNode) {
diff --git a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
index 6eb3838..0c7c530 100644
--- a/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
+++ b/core/src/main/java/org/apache/calcite/tools/RelBuilder.java
@@ -1817,7 +1817,24 @@
    */
   public RelBuilder project(Iterable<? extends RexNode> nodes,
       Iterable<? extends @Nullable String> fieldNames, boolean force) {
-    return project_(nodes, fieldNames, ImmutableList.of(), force);
+    return project(nodes, fieldNames, force, ImmutableSet.of());
+  }
+
+  /**
+   * The same with {@link #project(Iterable, Iterable, boolean)}, with additional
+   * variablesSet param.
+   *
+   * @param nodes Expressions
+   * @param fieldNames Suggested field names
+   * @param force create project even if it is identity
+   * @param variablesSet Correlating variables that are set when reading a row
+   *                     from the input, and which may be referenced from the
+   *                     projection expressions
+   */
+  public RelBuilder project(Iterable<? extends RexNode> nodes,
+      Iterable<? extends @Nullable String> fieldNames, boolean force,
+      Iterable<CorrelationId> variablesSet) {
+    return project_(nodes, fieldNames, ImmutableList.of(), force, variablesSet);
   }
 
   /** Creates a {@link Project} of all original fields, plus the given
@@ -1891,10 +1908,12 @@
       Iterable<? extends RexNode> nodes,
       Iterable<? extends @Nullable String> fieldNames,
       Iterable<RelHint> hints,
-      boolean force) {
+      boolean force,
+      Iterable<CorrelationId> variablesSet) {
     final Frame frame = requireNonNull(peek_(), "frame stack is empty");
     final RelDataType inputRowType = frame.rel.getRowType();
     final List<RexNode> nodeList = Lists.newArrayList(nodes);
+    final Set<CorrelationId> variables = ImmutableSet.copyOf(variablesSet);
 
     // Perform a quick check for identity. We'll do a deeper check
     // later when we've derived column names.
@@ -1908,9 +1927,11 @@
       fieldNameList.add(null);
     }
 
+    // Do not merge projection when top projection has correlation variables
     bloat:
     if (frame.rel instanceof Project
-        && config.bloat() >= 0) {
+        && config.bloat() >= 0
+        && variables.isEmpty()) {
       final Project project = (Project) frame.rel;
       // Populate field names. If the upper expression is an input ref and does
       // not have a recommended name, use the name of the underlying field.
@@ -1958,7 +1979,9 @@
       final ImmutableSet.Builder<RelHint> mergedHints = ImmutableSet.builder();
       mergedHints.addAll(project.getHints());
       mergedHints.addAll(hints);
-      return project_(newNodes, fieldNameList, mergedHints.build(), force);
+      // Keep bottom projection's variablesSet.
+      return project_(newNodes, fieldNameList, mergedHints.build(), force,
+          ImmutableSet.copyOf(project.getVariablesSet()));
     }
 
     // Simplify expressions.
@@ -2043,7 +2066,8 @@
         struct.projectFactory.createProject(frame.rel,
             ImmutableList.copyOf(hints),
             ImmutableList.copyOf(nodeList),
-            fieldNameList);
+            fieldNameList,
+            variables);
     stack.pop();
     stack.push(new Frame(project, fields.build()));
     return this;
@@ -2068,6 +2092,32 @@
    */
   public RelBuilder projectNamed(Iterable<? extends RexNode> nodes,
       @Nullable Iterable<? extends @Nullable String> fieldNames, boolean force) {
+    return projectNamed(nodes, fieldNames, force, ImmutableSet.of());
+  }
+
+  /** Creates a {@link Project} of the given
+   * expressions and field names, and optionally optimizing.
+   *
+   * <p>If {@code fieldNames} is null, or if a particular entry in
+   * {@code fieldNames} is null, derives field names from the input
+   * expressions.
+   *
+   * <p>If {@code force} is false,
+   * and the input is a {@code Project},
+   * and the expressions  make the trivial projection ($0, $1, ...),
+   * modifies the input.
+   *
+   * @param nodes       Expressions
+   * @param fieldNames  Suggested field names, or null to generate
+   * @param force       Whether to create a renaming Project if the
+   *                    projections are trivial
+   * @param variablesSet Correlating variables that are set when reading a row
+   *                     from the input, and which may be referenced from the
+   *                     projection expressions
+   */
+  public RelBuilder projectNamed(Iterable<? extends RexNode> nodes,
+      @Nullable Iterable<? extends @Nullable String> fieldNames, boolean force,
+      Iterable<CorrelationId> variablesSet) {
     @SuppressWarnings("unchecked") final List<? extends RexNode> nodeList =
         nodes instanceof List ? (List) nodes : ImmutableList.copyOf(nodes);
     final List<@Nullable String> fieldNameList =
@@ -2103,7 +2153,7 @@
         stack.push(new Frame(newValues, frame.fields));
       }
     } else {
-      project(nodeList, rowType.getFieldNames(), force);
+      project(nodeList, rowType.getFieldNames(), force, variablesSet);
     }
     return this;
   }
@@ -2344,8 +2394,7 @@
           newProjects.add(project.getProjects().get(i));
           builder.add(project.getRowType().getFieldList().get(i));
         }
-        r = project.copy(cluster.traitSet(), project.getInput(), newProjects,
-            builder.build());
+        r = project.copy(cluster.traitSet(), project.getInput(), newProjects, builder.build());
       }
     }
 
@@ -3278,7 +3327,8 @@
                 struct.projectFactory.createProject(sort,
                     project.getHints(),
                     project.getProjects(),
-                    Pair.right(project.getNamedProjects())));
+                    Pair.right(project.getNamedProjects()),
+                    project.getVariablesSet()));
             return this;
           }
         }
diff --git a/core/src/test/java/org/apache/calcite/plan/RelOptUtilTest.java b/core/src/test/java/org/apache/calcite/plan/RelOptUtilTest.java
index 2f22e78..54e9c88 100644
--- a/core/src/test/java/org/apache/calcite/plan/RelOptUtilTest.java
+++ b/core/src/test/java/org/apache/calcite/plan/RelOptUtilTest.java
@@ -49,6 +49,7 @@
 import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
@@ -693,7 +694,8 @@
             ImmutableList.of(
                 fieldEmpno.getName(),
                 fieldEname.getName(),
-                "JOB_CNT"));
+                "JOB_CNT"),
+            ImmutableSet.of());
     assertThat(castNode1.explain(), is(expectNode1.explain()));
     // Change the field JOB_CNT field name again.
     // The projection expect to be merged.
@@ -716,7 +718,8 @@
             ImmutableList.of(
                 fieldEmpno.getName(),
                 fieldEname.getName(),
-                "JOB_CNT2"));
+                "JOB_CNT2"),
+            ImmutableSet.of());
     assertThat(castNode2.explain(), is(expectNode2.explain()));
   }
 
diff --git a/core/src/test/java/org/apache/calcite/plan/RelWriterTest.java b/core/src/test/java/org/apache/calcite/plan/RelWriterTest.java
index f2277fd..798be9d 100644
--- a/core/src/test/java/org/apache/calcite/plan/RelWriterTest.java
+++ b/core/src/test/java/org/apache/calcite/plan/RelWriterTest.java
@@ -592,7 +592,8 @@
                           RexWindowBounds.following(
                               rexBuilder.makeExactLiteral(BigDecimal.ONE)),
                           false, true, false, false, false)),
-                  ImmutableList.of("field0", "field1", "field2"));
+                  ImmutableList.of("field0", "field1", "field2"),
+                  ImmutableSet.of());
           final RelJsonWriter writer = new RelJsonWriter();
           project.explain(writer);
           return writer.asString();
@@ -998,6 +999,21 @@
         .assertThatPlan(isLinux(expected));
   }
 
+  @Test void testProjectionWithCorrelationVariables() {
+    final Function<RelBuilder, RelNode> relFn = b -> b.scan("EMP")
+        .project(
+            ImmutableList.of(b.field("ENAME")),
+            ImmutableList.of("ename"),
+            true,
+            ImmutableSet.of(b.getCluster().createCorrel()))
+        .build();
+
+    final String expected = "LogicalProject(variablesSet=[[$cor0]], ename=[$1])\n"
+        + "  LogicalTableScan(table=[[scott, EMP]])\n";
+    relFn(relFn)
+        .assertThatPlan(isLinux(expected));
+  }
+
   @Test void testOverWithoutOrderKey() {
     // Equivalent SQL:
     //   SELECT count(*) OVER (PARTITION BY deptno) FROM emp
diff --git a/core/src/test/java/org/apache/calcite/plan/volcano/TraitPropagationTest.java b/core/src/test/java/org/apache/calcite/plan/volcano/TraitPropagationTest.java
index 825e143..4a24f26 100644
--- a/core/src/test/java/org/apache/calcite/plan/volcano/TraitPropagationTest.java
+++ b/core/src/test/java/org/apache/calcite/plan/volcano/TraitPropagationTest.java
@@ -72,6 +72,7 @@
 import org.apache.calcite.util.ImmutableBitSet;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.immutables.value.Value;
@@ -164,7 +165,8 @@
               (RexNode) rexBuilder.makeInputRef(stringType, 0),
               rexBuilder.makeInputRef(integerType, 1)),
           typeFactory.builder().add("s", stringType).add("i", integerType)
-          .build());
+              .build(),
+          ImmutableSet.of());
 
       // aggregate on s, count
       AggregateCall aggCall = AggregateCall.create(SqlStdOperatorTable.COUNT,
@@ -370,7 +372,7 @@
   private static class PhysProj extends Project implements Phys {
     PhysProj(RelOptCluster cluster, RelTraitSet traits, RelNode child,
         List<RexNode> exps, RelDataType rowType) {
-      super(cluster, traits, ImmutableList.of(), child, exps, rowType);
+      super(cluster, traits, ImmutableList.of(), child, exps, rowType, ImmutableSet.of());
     }
 
     public static PhysProj create(final RelNode input,
diff --git a/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java b/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java
index 307c3a9..5f0c8a8 100644
--- a/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelMetadataTest.java
@@ -1620,7 +1620,8 @@
     final LogicalProject project = LogicalProject.create(empSort,
         ImmutableList.of(),
         projects,
-        ImmutableList.of("a", "b", "c", "d"));
+        ImmutableList.of("a", "b", "c", "d"),
+        ImmutableSet.of());
 
     final LogicalTableScan deptScan =
         LogicalTableScan.create(cluster, deptTable, ImmutableList.of());
@@ -1868,7 +1869,8 @@
                     rexBuilder.makeExactLiteral(BigDecimal.ONE)),
                 rexBuilder.makeCall(SqlStdOperatorTable.CHAR_LENGTH,
                     rexBuilder.makeInputRef(filter, 1))),
-            (List<String>) null);
+            (List<String>) null,
+            ImmutableSet.of());
     rowSize = mq.getAverageRowSize(deptProject);
     columnSizes = mq.getAverageColumnSizes(deptProject);
     assertThat(columnSizes.size(), equalTo(4));
diff --git a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
index 606c63c..97de092 100644
--- a/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
+++ b/core/src/test/java/org/apache/calcite/test/RelOptRulesTest.java
@@ -126,6 +126,7 @@
 import org.apache.calcite.util.ImmutableBitSet;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.immutables.value.Value;
 import org.junit.jupiter.api.Disabled;
@@ -6883,7 +6884,7 @@
         RelNode input,
         List<? extends RexNode> projects,
         RelDataType rowType) {
-      super(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+      super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
     }
 
     public MyProject copy(RelTraitSet traitSet, RelNode input,
diff --git a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
index aecd47e..0ebe200 100644
--- a/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/test/SqlToRelConverterTest.java
@@ -3383,6 +3383,30 @@
     sql(sql).withDecorrelate(true).ok();
   }
 
+  @Test void testCorrelationInProjectionWithScan() {
+    final String sql = "select array(select e.deptno) from emp e";
+    sql(sql).withExpand(false).withDecorrelate(false).ok();
+  }
+
+  @Test void testCorrelationInProjectionWithProjection() {
+    final String sql = "select array(select e.deptno)\n"
+        + "from (select deptno, ename from emp) e";
+    sql(sql).withExpand(false).withDecorrelate(false).ok();
+  }
+
+  @Test void testMultiCorrelationInProjectionWithProjection() {
+    final String sql = "select cardinality(array(select e.deptno)), array(select e.ename)[0]\n"
+        + "from (select deptno, ename from emp) e";
+    sql(sql).withExpand(false).withDecorrelate(false).ok();
+  }
+
+  @Test void testCorrelationInProjectionWithCorrelatedProjection() {
+    final String sql = "select cardinality(arr) from"
+        + "(select array(select e.deptno) arr\n"
+        + "from (select deptno, ename from emp) e)";
+    sql(sql).withExpand(false).withDecorrelate(false).ok();
+  }
+
   @Test void testCustomColumnResolving() {
     final String sql = "select k0 from struct.t";
     sql(sql).ok();
diff --git a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
index b146e73..863a8f1 100644
--- a/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/RelOptRulesTest.xml
@@ -11934,7 +11934,7 @@
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(EXPR$0=[> SOME($0, {
+LogicalProject(variablesSet=[[$cor0]], EXPR$0=[> SOME($0, {
 LogicalProject(DEPTNO=[$0])
   LogicalFilter(condition=[=($cor0.JOB, $1)])
     LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
@@ -11976,7 +11976,7 @@
     </Resource>
     <Resource name="planBefore">
       <![CDATA[
-LogicalProject(SAL=[$5], EXPR$1=[NOT(IN($0, {
+LogicalProject(variablesSet=[[$cor0]], SAL=[$5], EXPR$1=[NOT(IN($0, {
 LogicalProject(DEPTNO=[$0])
   LogicalFilter(condition=[=($cor0.JOB, $1)])
     LogicalTableScan(table=[[CATALOG, SALES, DEPT]])
diff --git a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
index 44f6fae..818f2a6 100644
--- a/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
+++ b/core/src/test/resources/org/apache/calcite/test/SqlToRelConverterTest.xml
@@ -737,7 +737,7 @@
   <TestCase name="testCorrelatedScalarSubQueryInSelectList">
     <Resource name="planNotExpanded">
       <![CDATA[
-LogicalProject(DEPTNO=[$0], I0=[$SCALAR_QUERY({
+LogicalProject(variablesSet=[[$cor0]], DEPTNO=[$0], I0=[$SCALAR_QUERY({
 LogicalAggregate(group=[{}], EXPR$0=[MIN($0)])
   LogicalProject($f0=[1])
     LogicalFilter(condition=[>($0, $cor0.DEPTNO)])
@@ -857,7 +857,7 @@
     </Resource>
     <Resource name="planNotExpanded">
       <![CDATA[
-LogicalProject(DEPTNO=[$7], EXPR$1=[$SCALAR_QUERY({
+LogicalProject(variablesSet=[[$cor0]], DEPTNO=[$7], EXPR$1=[$SCALAR_QUERY({
 LogicalProject(NAME=[$0])
   LogicalTableFunctionScan(invocation=[DEDUP($cor0.DEPTNO, $cor0.DEPTNO)], rowType=[RecordType(VARCHAR(1024) NAME)])
 })])
@@ -969,6 +969,53 @@
   and emp.deptno in (dept.deptno, dept.deptno))]]>
     </Resource>
   </TestCase>
+  <TestCase name="testCorrelationInProjectionWithCorrelatedProjection">
+    <Resource name="sql">
+      <![CDATA[select cardinality(arr) from
+(select array(select e.deptno) arr
+from (select deptno, ename from emp) e)]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalProject(variablesSet=[[$cor0]], EXPR$0=[CARDINALITY(ARRAY({
+LogicalProject(DEPTNO=[$cor0.DEPTNO])
+  LogicalValues(tuples=[[{ 0 }]])
+}))])
+  LogicalProject(DEPTNO=[$7], ENAME=[$1])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCorrelationInProjectionWithProjection">
+    <Resource name="sql">
+      <![CDATA[select array(select e.deptno)
+from (select deptno, ename from emp) e]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalProject(variablesSet=[[$cor0]], EXPR$0=[ARRAY({
+LogicalProject(DEPTNO=[$cor0.DEPTNO])
+  LogicalValues(tuples=[[{ 0 }]])
+})])
+  LogicalProject(DEPTNO=[$7], ENAME=[$1])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testCorrelationInProjectionWithScan">
+    <Resource name="sql">
+      <![CDATA[select array(select e.deptno) from emp e]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalProject(variablesSet=[[$cor0]], EXPR$0=[ARRAY({
+LogicalProject(DEPTNO=[$cor0.DEPTNO])
+  LogicalValues(tuples=[[{ 0 }]])
+})])
+  LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testCorrelationInWithSubQuery">
     <Resource name="plan">
       <![CDATA[
@@ -4382,6 +4429,25 @@
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testMultiCorrelationInProjectionWithProjection">
+    <Resource name="sql">
+      <![CDATA[select cardinality(array(select e.deptno)), array(select e.ename)[0]
+from (select deptno, ename from emp) e]]>
+    </Resource>
+    <Resource name="plan">
+      <![CDATA[
+LogicalProject(variablesSet=[[$cor0]], EXPR$0=[CARDINALITY(ARRAY({
+LogicalProject(DEPTNO=[$cor0.DEPTNO])
+  LogicalValues(tuples=[[{ 0 }]])
+}))], EXPR$1=[ITEM(ARRAY({
+LogicalProject(ENAME=[$cor0.ENAME])
+  LogicalValues(tuples=[[{ 0 }]])
+}), 0)])
+  LogicalProject(DEPTNO=[$7], ENAME=[$1])
+    LogicalTableScan(table=[[CATALOG, SALES, EMP]])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testMultiset">
     <Resource name="plan">
       <![CDATA[
diff --git a/core/src/test/resources/sql/agg.iq b/core/src/test/resources/sql/agg.iq
index ec9b019..4adc4fd 100644
--- a/core/src/test/resources/sql/agg.iq
+++ b/core/src/test/resources/sql/agg.iq
@@ -1334,12 +1334,17 @@
 !ok
 
 # As above, but with correlation
-!if (fixed.calcite1045) {
 SELECT SUM(
   (select char_length(dname) from "scott".dept where dept.deptno = emp.empno)) as s
 FROM "scott".emp;
++---+
+| S |
++---+
+|   |
++---+
+(1 row)
+
 !ok
-!}
 
 # FUSION rolled up using CARDINALITY
 select cardinality(fusion(empnos)) as f_empnos_length
diff --git a/core/src/test/resources/sql/sub-query.iq b/core/src/test/resources/sql/sub-query.iq
index e33e094..a22ee00 100644
--- a/core/src/test/resources/sql/sub-query.iq
+++ b/core/src/test/resources/sql/sub-query.iq
@@ -310,50 +310,97 @@
 
 !ok
 
-!if (fixed.calcite1045) {
 # Correlated IN sub-query in WHERE clause of JOIN
 select empno from "scott".emp as e
 join "scott".dept as d using (deptno)
 where e.job in (
   select e2.job from "scott".emp as e2 where e2.deptno > e.deptno);
- EMPNO
--------
-  7369
-  7566
-  7782
-  7876
-  7934
++-------+
+| EMPNO |
++-------+
+|  7369 |
+|  7566 |
+|  7782 |
+|  7876 |
+|  7934 |
++-------+
 (5 rows)
 
 !ok
-EnumerableCalc(expr#0..5=[{inputs}], EMPNO=[$t0])
-  EnumerableHashJoin(condition=[=($2, $5)], joinType=[inner])
+EnumerableCalc(expr#0..4=[{inputs}], EMPNO=[$t0])
+  EnumerableHashJoin(condition=[=($2, $5)], joinType=[semi])
     EnumerableCalc(expr#0..4=[{inputs}], EMPNO=[$t2], JOB=[$t3], DEPTNO=[$t4], JOB0=[$t0], DEPTNO0=[$t1])
       EnumerableHashJoin(condition=[AND(=($1, $4), =($0, $3))], joinType=[inner])
-        EnumerableCalc(expr#0..1=[{inputs}], JOB=[$t1], DEPTNO=[$t0])
-          EnumerableAggregate(group=[{0, 2}])
-            EnumerableCalc(expr#0..3=[{inputs}], expr#4=[>($t3, $t0)], proj#0..3=[{exprs}], $condition=[$t4])
-              EnumerableNestedLoopJoin(condition=[true], joinType=[inner])
-                EnumerableAggregate(group=[{7}])
+        EnumerableAggregate(group=[{1, 3}])
+          EnumerableNestedLoopJoin(condition=[>($2, $3)], joinType=[inner])
+            EnumerableCalc(expr#0..7=[{inputs}], EMPNO=[$t0], JOB=[$t2], DEPTNO=[$t7])
+              EnumerableTableScan(table=[[scott, EMP]])
+            EnumerableAggregate(group=[{1}])
+              EnumerableHashJoin(condition=[=($1, $2)], joinType=[semi])
+                EnumerableCalc(expr#0..7=[{inputs}], EMPNO=[$t0], DEPTNO=[$t7])
                   EnumerableTableScan(table=[[scott, EMP]])
-                EnumerableCalc(expr#0..7=[{inputs}], EMPNO=[$t0], JOB=[$t2], DEPTNO=[$t7])
-                  EnumerableTableScan(table=[[scott, EMP]])
+                EnumerableCalc(expr#0..2=[{inputs}], DEPTNO=[$t0])
+                  EnumerableTableScan(table=[[scott, DEPT]])
         EnumerableCalc(expr#0..7=[{inputs}], EMPNO=[$t0], JOB=[$t2], DEPTNO=[$t7])
           EnumerableTableScan(table=[[scott, EMP]])
     EnumerableCalc(expr#0..2=[{inputs}], DEPTNO=[$t0])
       EnumerableTableScan(table=[[scott, DEPT]])
 !plan
-!}
 
-!if (fixed.calcite1045) {
 # Correlated NOT IN sub-query in WHERE clause of JOIN
 select empno from "scott".emp as e
 join "scott".dept as d using (deptno)
 where e.job not in (
   select e2.job from "scott".emp as e2 where e2.deptno > e.deptno);
++-------+
+| EMPNO |
++-------+
+|  7499 |
+|  7521 |
+|  7654 |
+|  7698 |
+|  7788 |
+|  7839 |
+|  7844 |
+|  7900 |
+|  7902 |
++-------+
+(9 rows)
+
 !ok
+EnumerableCalc(expr#0..9=[{inputs}], expr#10=[0], expr#11=[=($t5, $t10)], expr#12=[IS NULL($t1)], expr#13=[IS NOT NULL($t9)], expr#14=[<($t6, $t5)], expr#15=[OR($t12, $t13, $t14)], expr#16=[IS NOT TRUE($t15)], expr#17=[OR($t11, $t16)], EMPNO=[$t0], $condition=[$t17])
+  EnumerableMergeJoin(condition=[AND(=($1, $7), =($2, $8))], joinType=[left])
+    EnumerableSort(sort0=[$1], sort1=[$2], dir0=[ASC], dir1=[ASC])
+      EnumerableHashJoin(condition=[=($2, $4)], joinType=[left])
+        EnumerableCalc(expr#0..3=[{inputs}], EMPNO=[$t1], JOB=[$t2], DEPTNO=[$t3], DEPTNO0=[$t0])
+          EnumerableHashJoin(condition=[=($0, $3)], joinType=[inner])
+            EnumerableCalc(expr#0..2=[{inputs}], DEPTNO=[$t0])
+              EnumerableTableScan(table=[[scott, DEPT]])
+            EnumerableCalc(expr#0..7=[{inputs}], EMPNO=[$t0], JOB=[$t2], DEPTNO=[$t7])
+              EnumerableTableScan(table=[[scott, EMP]])
+        EnumerableAggregate(group=[{3}], c=[COUNT()], ck=[COUNT($1)])
+          EnumerableNestedLoopJoin(condition=[>($2, $3)], joinType=[inner])
+            EnumerableCalc(expr#0..7=[{inputs}], EMPNO=[$t0], JOB=[$t2], DEPTNO=[$t7])
+              EnumerableTableScan(table=[[scott, EMP]])
+            EnumerableAggregate(group=[{1}])
+              EnumerableHashJoin(condition=[=($1, $2)], joinType=[semi])
+                EnumerableCalc(expr#0..7=[{inputs}], EMPNO=[$t0], DEPTNO=[$t7])
+                  EnumerableTableScan(table=[[scott, EMP]])
+                EnumerableCalc(expr#0..2=[{inputs}], DEPTNO=[$t0])
+                  EnumerableTableScan(table=[[scott, DEPT]])
+    EnumerableSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC])
+      EnumerableCalc(expr#0..1=[{inputs}], expr#2=[true], expr#3=[IS NOT NULL($t0)], proj#0..2=[{exprs}], $condition=[$t3])
+        EnumerableAggregate(group=[{1, 3}])
+          EnumerableNestedLoopJoin(condition=[>($2, $3)], joinType=[inner])
+            EnumerableCalc(expr#0..7=[{inputs}], EMPNO=[$t0], JOB=[$t2], DEPTNO=[$t7])
+              EnumerableTableScan(table=[[scott, EMP]])
+            EnumerableAggregate(group=[{1}])
+              EnumerableHashJoin(condition=[=($1, $2)], joinType=[semi])
+                EnumerableCalc(expr#0..7=[{inputs}], EMPNO=[$t0], DEPTNO=[$t7])
+                  EnumerableTableScan(table=[[scott, EMP]])
+                EnumerableCalc(expr#0..2=[{inputs}], DEPTNO=[$t0])
+                  EnumerableTableScan(table=[[scott, DEPT]])
 !plan
-!}
 
 # Condition that returns a NULL key.
 # Tested on Oracle.
@@ -549,23 +596,21 @@
 !ok
 
 # Two scalar sub-queries
-!if (fixed.calcite1045) {
 select deptno,
   (select min(1) from "scott".emp where empno > d.deptno) as i0,
   (select min(0) from "scott".emp where deptno = d.deptno and ename = 'SMITH') as i1
 from "scott".dept as d;
-+--------+----+---+
-| DEPTNO | I0 | I1|
-+--------+----+---+
-|     10 |  1 |   |
-|     20 |  1 | 0 |
-|     30 |  1 |   |
-|     40 |  1 |   |
-+--------+----+---+
++--------+----+----+
+| DEPTNO | I0 | I1 |
++--------+----+----+
+|     10 |  1 |    |
+|     20 |  1 |  0 |
+|     30 |  1 |    |
+|     40 |  1 |    |
++--------+----+----+
 (4 rows)
 
 !ok
-!}
 
 # Correlated scalar sub-query
 SELECT d.dname,
@@ -3513,4 +3558,15 @@
         EnumerableValues(tuples=[[{ 0 }]])
 !plan
 
+# Test case for correlated sub-query
+SELECT ARRAY(SELECT s.x) FROM (SELECT 1 as x) s;
++--------+
+| EXPR$0 |
++--------+
+| [1]    |
++--------+
+(1 row)
+
+!ok
+
 # End sub-query.iq
diff --git a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
index 3c7800f..963ea3c 100644
--- a/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
+++ b/druid/src/main/java/org/apache/calcite/adapter/druid/DruidRules.java
@@ -356,6 +356,11 @@
       super(config);
     }
 
+    @Override public boolean matches(RelOptRuleCall call) {
+      final Project project = call.rel(0);
+      return project.getVariablesSet().isEmpty();
+    }
+
     @Override public void onMatch(RelOptRuleCall call) {
       final Project project = call.rel(0);
       final DruidQuery query = call.rel(1);
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
index 0a341c6..5aadcbc 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchProject.java
@@ -29,6 +29,7 @@
 import org.apache.calcite.util.Pair;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -43,7 +44,7 @@
 public class ElasticsearchProject extends Project implements ElasticsearchRel {
   ElasticsearchProject(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
       List<? extends RexNode> projects, RelDataType rowType) {
-    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
     assert getConvention() == ElasticsearchRel.CONVENTION;
     assert getConvention() == input.getConvention();
   }
diff --git a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
index 78261b9..00685af 100644
--- a/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
+++ b/elasticsearch/src/main/java/org/apache/calcite/adapter/elasticsearch/ElasticsearchRules.java
@@ -21,6 +21,7 @@
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelCollations;
@@ -293,6 +294,11 @@
       super(config);
     }
 
+    @Override public boolean matches(RelOptRuleCall call) {
+      final LogicalProject project = call.rel(0);
+      return project.getVariablesSet().isEmpty();
+    }
+
     @Override public RelNode convert(RelNode relNode) {
       final LogicalProject project = (LogicalProject) relNode;
       final RelTraitSet traitSet = project.getTraitSet().replace(out);
diff --git a/geode/src/main/java/org/apache/calcite/adapter/geode/rel/GeodeProject.java b/geode/src/main/java/org/apache/calcite/adapter/geode/rel/GeodeProject.java
index da5cf40..fc7a0f3 100644
--- a/geode/src/main/java/org/apache/calcite/adapter/geode/rel/GeodeProject.java
+++ b/geode/src/main/java/org/apache/calcite/adapter/geode/rel/GeodeProject.java
@@ -29,6 +29,7 @@
 import org.apache.calcite.util.Pair;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -45,7 +46,7 @@
 
   GeodeProject(RelOptCluster cluster, RelTraitSet traitSet,
       RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
-    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
     assert getConvention() == GeodeRel.CONVENTION;
     assert getConvention() == input.getConvention();
   }
diff --git a/geode/src/main/java/org/apache/calcite/adapter/geode/rel/GeodeRules.java b/geode/src/main/java/org/apache/calcite/adapter/geode/rel/GeodeRules.java
index 5fd12fa..4ba331c 100644
--- a/geode/src/main/java/org/apache/calcite/adapter/geode/rel/GeodeRules.java
+++ b/geode/src/main/java/org/apache/calcite/adapter/geode/rel/GeodeRules.java
@@ -40,6 +40,8 @@
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 
+import com.google.common.base.Preconditions;
+
 import org.immutables.value.Value;
 
 import java.util.ArrayList;
@@ -145,12 +147,13 @@
           return false;
         }
       }
-
-      return true;
+      return project.getVariablesSet().isEmpty();
     }
 
     @Override public RelNode convert(RelNode rel) {
       final LogicalProject project = (LogicalProject) rel;
+      Preconditions.checkArgument(project.getVariablesSet().isEmpty(),
+          "GeodeProject does now allow variables");
       final RelTraitSet traitSet =
           project.getTraitSet().replace(getOutConvention());
       return new GeodeProject(
diff --git a/innodb/src/main/java/org/apache/calcite/adapter/innodb/InnodbProject.java b/innodb/src/main/java/org/apache/calcite/adapter/innodb/InnodbProject.java
index b7e657a..202068a 100644
--- a/innodb/src/main/java/org/apache/calcite/adapter/innodb/InnodbProject.java
+++ b/innodb/src/main/java/org/apache/calcite/adapter/innodb/InnodbProject.java
@@ -28,6 +28,7 @@
 import org.apache.calcite.util.Pair;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -42,7 +43,7 @@
 public class InnodbProject extends Project implements InnodbRel {
   InnodbProject(RelOptCluster cluster, RelTraitSet traitSet,
       RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
-    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
     assert getConvention() == InnodbRel.CONVENTION;
     assert getConvention() == input.getConvention();
   }
diff --git a/innodb/src/main/java/org/apache/calcite/adapter/innodb/InnodbRules.java b/innodb/src/main/java/org/apache/calcite/adapter/innodb/InnodbRules.java
index cf381de..5b8a675 100644
--- a/innodb/src/main/java/org/apache/calcite/adapter/innodb/InnodbRules.java
+++ b/innodb/src/main/java/org/apache/calcite/adapter/innodb/InnodbRules.java
@@ -140,8 +140,7 @@
           return false;
         }
       }
-
-      return true;
+      return project.getVariablesSet().isEmpty();
     }
 
     @Override public RelNode convert(RelNode rel) {
diff --git a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoProject.java b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoProject.java
index 55bce98..ee0721f 100644
--- a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoProject.java
+++ b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoProject.java
@@ -30,6 +30,7 @@
 import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -43,7 +44,7 @@
 public class MongoProject extends Project implements MongoRel {
   public MongoProject(RelOptCluster cluster, RelTraitSet traitSet,
       RelNode input, List<? extends RexNode> projects, RelDataType rowType) {
-    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
     assert getConvention() == MongoRel.CONVENTION;
     assert getConvention() == input.getConvention();
   }
diff --git a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoRules.java b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoRules.java
index 9e9ac37..fa10035 100644
--- a/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoRules.java
+++ b/mongodb/src/main/java/org/apache/calcite/adapter/mongodb/MongoRules.java
@@ -21,6 +21,7 @@
 import org.apache.calcite.adapter.java.JavaTypeFactory;
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.InvalidRelException;
 import org.apache.calcite.rel.RelCollations;
@@ -310,6 +311,11 @@
       super(config);
     }
 
+    @Override public boolean matches(RelOptRuleCall call) {
+      final LogicalProject project = call.rel(0);
+      return project.getVariablesSet().isEmpty();
+    }
+
     @Override public RelNode convert(RelNode rel) {
       final LogicalProject project = (LogicalProject) rel;
       final RelTraitSet traitSet = project.getTraitSet().replace(out);
diff --git a/pig/src/main/java/org/apache/calcite/adapter/pig/PigProject.java b/pig/src/main/java/org/apache/calcite/adapter/pig/PigProject.java
index 11d2e90..e62c910 100644
--- a/pig/src/main/java/org/apache/calcite/adapter/pig/PigProject.java
+++ b/pig/src/main/java/org/apache/calcite/adapter/pig/PigProject.java
@@ -25,6 +25,7 @@
 import org.apache.calcite.rex.RexNode;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 
 import java.util.List;
 
@@ -35,7 +36,7 @@
   /** Creates a PigProject. */
   public PigProject(RelOptCluster cluster, RelTraitSet traitSet, RelNode input,
       List<? extends RexNode> projects, RelDataType rowType) {
-    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType);
+    super(cluster, traitSet, ImmutableList.of(), input, projects, rowType, ImmutableSet.of());
     assert getConvention() == PigRel.CONVENTION;
   }
 
diff --git a/pig/src/main/java/org/apache/calcite/adapter/pig/PigRules.java b/pig/src/main/java/org/apache/calcite/adapter/pig/PigRules.java
index 3def5e0..d3daa35 100644
--- a/pig/src/main/java/org/apache/calcite/adapter/pig/PigRules.java
+++ b/pig/src/main/java/org/apache/calcite/adapter/pig/PigRules.java
@@ -18,6 +18,7 @@
 
 import org.apache.calcite.plan.Convention;
 import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
 import org.apache.calcite.plan.RelTraitSet;
 import org.apache.calcite.rel.RelNode;
 import org.apache.calcite.rel.convert.ConverterRule;
@@ -108,6 +109,11 @@
       super(config);
     }
 
+    @Override public boolean matches(RelOptRuleCall call) {
+      final LogicalProject project = call.rel(0);
+      return project.getVariablesSet().isEmpty();
+    }
+
     @Override public RelNode convert(RelNode rel) {
       final LogicalProject project = (LogicalProject) rel;
       final RelTraitSet traitSet = project.getTraitSet().replace(PigRel.CONVENTION);
diff --git a/splunk/src/main/java/org/apache/calcite/adapter/splunk/SplunkPushDownRule.java b/splunk/src/main/java/org/apache/calcite/adapter/splunk/SplunkPushDownRule.java
index 0e7c737..5422472 100644
--- a/splunk/src/main/java/org/apache/calcite/adapter/splunk/SplunkPushDownRule.java
+++ b/splunk/src/main/java/org/apache/calcite/adapter/splunk/SplunkPushDownRule.java
@@ -305,7 +305,7 @@
       return rel;
     }
     return LogicalProject.create(rel, proj.getHints(),
-        proj.getProjects(), proj.getRowType());
+        proj.getProjects(), proj.getRowType(), proj.getVariablesSet());
   }
 
   // TODO: use StringBuilder instead of String
diff --git a/testkit/src/main/java/org/apache/calcite/test/RelMetadataFixture.java b/testkit/src/main/java/org/apache/calcite/test/RelMetadataFixture.java
index 090816c..862ca1c 100644
--- a/testkit/src/main/java/org/apache/calcite/test/RelMetadataFixture.java
+++ b/testkit/src/main/java/org/apache/calcite/test/RelMetadataFixture.java
@@ -41,6 +41,7 @@
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.ImmutableBitSet;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSortedSet;
 import com.google.common.collect.Iterables;
@@ -195,6 +196,8 @@
     metadataConfig.applyMetadata(rel.getCluster());
     if (convertAsCalc) {
       Project project = (Project) rel;
+      Preconditions.checkArgument(project.getVariablesSet().isEmpty(),
+          "Calc does not allow variables");
       RexProgram program = RexProgram.create(
           project.getInput().getRowType(),
           project.getProjects(),
diff --git a/testkit/src/main/java/org/apache/calcite/test/catalog/MockCatalogReader.java b/testkit/src/main/java/org/apache/calcite/test/catalog/MockCatalogReader.java
index 8cc3539..ca1cbf9 100644
--- a/testkit/src/main/java/org/apache/calcite/test/catalog/MockCatalogReader.java
+++ b/testkit/src/main/java/org/apache/calcite/test/catalog/MockCatalogReader.java
@@ -86,6 +86,7 @@
 import org.apache.calcite.util.Util;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 
 import org.checkerframework.checker.nullness.qual.Nullable;
@@ -903,7 +904,8 @@
       return LogicalProject.create(rel,
           ImmutableList.of(),
           Pair.left(projects),
-          Pair.right(projects));
+          Pair.right(projects),
+          ImmutableSet.of());
     }
 
     @Override public <T> T unwrap(Class<T> clazz) {