enrich expression cache key information to support expressions which depend on external state (#11358)

* enrich expression cache key information to support expressions which depend on external state such as lookups

* cache rules everything around me

* low carb

* rename
diff --git a/core/src/main/java/org/apache/druid/math/expr/Expr.java b/core/src/main/java/org/apache/druid/math/expr/Expr.java
index 4df2bf8..a61a6d5 100644
--- a/core/src/main/java/org/apache/druid/math/expr/Expr.java
+++ b/core/src/main/java/org/apache/druid/math/expr/Expr.java
@@ -23,8 +23,10 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.apache.druid.annotations.SubclassesMustOverrideEqualsAndHashCode;
+import org.apache.druid.java.util.common.Cacheable;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.math.expr.vector.ExprVectorProcessor;
+import org.apache.druid.query.cache.CacheKeyBuilder;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -38,8 +40,9 @@
  * immutable.
  */
 @SubclassesMustOverrideEqualsAndHashCode
-public interface Expr
+public interface Expr extends Cacheable
 {
+
   String NULL_LITERAL = "null";
   Joiner ARG_JOINER = Joiner.on(", ");
 
@@ -171,6 +174,12 @@
     throw Exprs.cannotVectorize(this);
   }
 
+  @Override
+  default byte[] getCacheKey()
+  {
+    return new CacheKeyBuilder(Exprs.EXPR_CACHE_KEY).appendString(stringify()).build();
+  }
+
   /**
    * Mechanism to supply input types for the bindings which will back {@link IdentifierExpr}, to use in the aid of
    * inferring the output type of an expression with {@link #getOutputType}. A null value means that either the binding
diff --git a/core/src/main/java/org/apache/druid/math/expr/Exprs.java b/core/src/main/java/org/apache/druid/math/expr/Exprs.java
index cdff172..618afa5 100644
--- a/core/src/main/java/org/apache/druid/math/expr/Exprs.java
+++ b/core/src/main/java/org/apache/druid/math/expr/Exprs.java
@@ -29,6 +29,9 @@
 
 public class Exprs
 {
+  public static final byte EXPR_CACHE_KEY = 0x00;
+  public static final byte LOOKUP_EXPR_CACHE_KEY = 0x01;
+
   public static UnsupportedOperationException cannotVectorize(Expr expr)
   {
     return new UOE("Unable to vectorize expression:[%s]", expr.stringify());
diff --git a/processing/src/main/java/org/apache/druid/query/cache/CacheKeyBuilder.java b/core/src/main/java/org/apache/druid/query/cache/CacheKeyBuilder.java
similarity index 100%
rename from processing/src/main/java/org/apache/druid/query/cache/CacheKeyBuilder.java
rename to core/src/main/java/org/apache/druid/query/cache/CacheKeyBuilder.java
diff --git a/core/src/test/java/org/apache/druid/math/expr/ApplyFunctionTest.java b/core/src/test/java/org/apache/druid/math/expr/ApplyFunctionTest.java
index d352bfd..a63f0ec 100644
--- a/core/src/test/java/org/apache/druid/math/expr/ApplyFunctionTest.java
+++ b/core/src/test/java/org/apache/druid/math/expr/ApplyFunctionTest.java
@@ -171,6 +171,8 @@
 
     Assert.assertEquals(expr.stringify(), roundTrip.stringify());
     Assert.assertEquals(expr.stringify(), roundTripFlatten.stringify());
+    Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
+    Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey());
   }
 
   private void assertExpr(final String expression, final Object[] expectedResult)
@@ -196,6 +198,8 @@
 
     Assert.assertEquals(expr.stringify(), roundTrip.stringify());
     Assert.assertEquals(expr.stringify(), roundTripFlatten.stringify());
+    Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
+    Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey());
   }
 
   private void assertExpr(final String expression, final Double[] expectedResult)
@@ -224,5 +228,7 @@
 
     Assert.assertEquals(expr.stringify(), roundTrip.stringify());
     Assert.assertEquals(expr.stringify(), roundTripFlatten.stringify());
+    Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
+    Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey());
   }
 }
diff --git a/core/src/test/java/org/apache/druid/math/expr/FunctionTest.java b/core/src/test/java/org/apache/druid/math/expr/FunctionTest.java
index 1557749..5ded90f 100644
--- a/core/src/test/java/org/apache/druid/math/expr/FunctionTest.java
+++ b/core/src/test/java/org/apache/druid/math/expr/FunctionTest.java
@@ -605,11 +605,14 @@
     final Expr roundTrip = Parser.parse(exprNoFlatten.stringify(), ExprMacroTable.nil());
     Assert.assertEquals(expr.stringify(), expectedResult, roundTrip.eval(bindings).value());
 
+
     final Expr roundTripFlatten = Parser.parse(expr.stringify(), ExprMacroTable.nil());
     Assert.assertEquals(expr.stringify(), expectedResult, roundTripFlatten.eval(bindings).value());
 
     Assert.assertEquals(expr.stringify(), roundTrip.stringify());
     Assert.assertEquals(expr.stringify(), roundTripFlatten.stringify());
+    Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
+    Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey());
   }
 
   private void assertArrayExpr(final String expression, @Nullable final Object[] expectedResult)
@@ -626,5 +629,7 @@
 
     Assert.assertEquals(expr.stringify(), roundTrip.stringify());
     Assert.assertEquals(expr.stringify(), roundTripFlatten.stringify());
+    Assert.assertArrayEquals(expr.getCacheKey(), roundTrip.getCacheKey());
+    Assert.assertArrayEquals(expr.getCacheKey(), roundTripFlatten.getCacheKey());
   }
 }
diff --git a/processing/src/test/java/org/apache/druid/query/cache/CacheKeyBuilderTest.java b/core/src/test/java/org/apache/druid/query/cache/CacheKeyBuilderTest.java
similarity index 100%
rename from processing/src/test/java/org/apache/druid/query/cache/CacheKeyBuilderTest.java
rename to core/src/test/java/org/apache/druid/query/cache/CacheKeyBuilderTest.java
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
index 34a6ead..cb91b5b 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorUtil.java
@@ -20,9 +20,11 @@
 package org.apache.druid.query.aggregation;
 
 import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.Lists;
 import org.apache.druid.guice.annotations.PublicApi;
 import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprEval;
 import org.apache.druid.query.monomorphicprocessing.RuntimeShapeInspector;
@@ -40,10 +42,12 @@
 import org.apache.druid.segment.virtual.ExpressionVectorSelectors;
 
 import javax.annotation.Nullable;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 @PublicApi
@@ -360,4 +364,25 @@
     }
     return columnSelectorFactory.makeValueSelector(fieldName);
   }
+
+  public static Supplier<byte[]> getSimpleAggregatorCacheKeySupplier(
+      byte aggregatorType,
+      String fieldName,
+      Supplier<Expr> fieldExpression
+  )
+  {
+    return Suppliers.memoize(() -> {
+      byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
+      byte[] expressionBytes = Optional.ofNullable(fieldExpression.get())
+                                       .map(Expr::getCacheKey)
+                                       .orElse(StringUtils.EMPTY_BYTES);
+
+      return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
+                       .put(aggregatorType)
+                       .put(fieldNameBytes)
+                       .put(AggregatorUtil.STRING_SEPARATOR)
+                       .put(expressionBytes)
+                       .array();
+    });
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java
index ce22009..79bd6c6 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMaxAggregatorFactory.java
@@ -22,14 +22,13 @@
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.StringUtils;
+import com.google.common.base.Supplier;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorValueSelector;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,6 +36,8 @@
  */
 public class DoubleMaxAggregatorFactory extends SimpleDoubleAggregatorFactory
 {
+  private final Supplier<byte[]> cacheKey;
+
   @JsonCreator
   public DoubleMaxAggregatorFactory(
       @JsonProperty("name") String name,
@@ -46,6 +47,11 @@
   )
   {
     super(macroTable, name, fieldName, expression);
+    this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
+        AggregatorUtil.DOUBLE_MAX_CACHE_TYPE_ID,
+        fieldName,
+        fieldExpression
+    );
   }
 
   public DoubleMaxAggregatorFactory(String name, String fieldName)
@@ -114,15 +120,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
-    byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
-
-    return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
-                     .put(AggregatorUtil.DOUBLE_MAX_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .put(AggregatorUtil.STRING_SEPARATOR)
-                     .put(expressionBytes)
-                     .array();
+    return cacheKey.get();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java
index 9c068ce..124200c 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleMinAggregatorFactory.java
@@ -22,14 +22,13 @@
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.StringUtils;
+import com.google.common.base.Supplier;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorValueSelector;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,6 +36,8 @@
  */
 public class DoubleMinAggregatorFactory extends SimpleDoubleAggregatorFactory
 {
+  private final Supplier<byte[]> cacheKey;
+
   @JsonCreator
   public DoubleMinAggregatorFactory(
       @JsonProperty("name") String name,
@@ -46,6 +47,11 @@
   )
   {
     super(macroTable, name, fieldName, expression);
+    this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
+        AggregatorUtil.DOUBLE_MIN_CACHE_TYPE_ID,
+        fieldName,
+        fieldExpression
+    );
   }
 
   public DoubleMinAggregatorFactory(String name, String fieldName)
@@ -114,15 +120,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
-    byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
-
-    return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
-                     .put(AggregatorUtil.DOUBLE_MIN_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .put(AggregatorUtil.STRING_SEPARATOR)
-                     .put(expressionBytes)
-                     .array();
+    return cacheKey.get();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java
index 1753d18..1958c0d 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/DoubleSumAggregatorFactory.java
@@ -22,14 +22,13 @@
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.StringUtils;
+import com.google.common.base.Supplier;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseDoubleColumnValueSelector;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorValueSelector;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,6 +36,8 @@
  */
 public class DoubleSumAggregatorFactory extends SimpleDoubleAggregatorFactory
 {
+  private final Supplier<byte[]> cacheKey;
+
   @JsonCreator
   public DoubleSumAggregatorFactory(
       @JsonProperty("name") String name,
@@ -46,6 +47,11 @@
   )
   {
     super(macroTable, name, fieldName, expression);
+    this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
+        AggregatorUtil.DOUBLE_SUM_CACHE_TYPE_ID,
+        fieldName,
+        fieldExpression
+    );
   }
 
   public DoubleSumAggregatorFactory(String name, String fieldName)
@@ -114,15 +120,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
-    byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
-
-    return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
-                     .put(AggregatorUtil.DOUBLE_SUM_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .put(AggregatorUtil.STRING_SEPARATOR)
-                     .put(expressionBytes)
-                     .array();
+    return cacheKey.get();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaAggregatorFactory.java
index e40000d..fd918fe 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/ExpressionLambdaAggregatorFactory.java
@@ -234,10 +234,10 @@
         .appendStrings(fields)
         .appendString(initialValueExpressionString)
         .appendString(initialCombineValueExpressionString)
-        .appendString(foldExpressionString)
-        .appendString(combineExpressionString)
-        .appendString(compareExpressionString)
-        .appendString(finalizeExpressionString)
+        .appendCacheable(foldExpression.get())
+        .appendCacheable(combineExpression.get())
+        .appendCacheable(combineExpression.get())
+        .appendCacheable(finalizeExpression.get())
         .appendInt(maxSizeBytes.getBytesInInt())
         .build();
   }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/FloatMaxAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/FloatMaxAggregatorFactory.java
index 9398f62..8b6dccf 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/FloatMaxAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/FloatMaxAggregatorFactory.java
@@ -22,14 +22,13 @@
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.StringUtils;
+import com.google.common.base.Supplier;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseFloatColumnValueSelector;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorValueSelector;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,6 +36,8 @@
  */
 public class FloatMaxAggregatorFactory extends SimpleFloatAggregatorFactory
 {
+  private final Supplier<byte[]> cacheKey;
+
   @JsonCreator
   public FloatMaxAggregatorFactory(
       @JsonProperty("name") String name,
@@ -46,6 +47,11 @@
   )
   {
     super(macroTable, name, fieldName, expression);
+    this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
+        AggregatorUtil.FLOAT_MAX_CACHE_TYPE_ID,
+        fieldName,
+        fieldExpression
+    );
   }
 
   public FloatMaxAggregatorFactory(String name, String fieldName)
@@ -114,15 +120,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
-    byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
-
-    return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
-                     .put(AggregatorUtil.FLOAT_MAX_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .put(AggregatorUtil.STRING_SEPARATOR)
-                     .put(expressionBytes)
-                     .array();
+    return cacheKey.get();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/FloatMinAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/FloatMinAggregatorFactory.java
index 465aa38..cf35efa 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/FloatMinAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/FloatMinAggregatorFactory.java
@@ -22,14 +22,13 @@
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.StringUtils;
+import com.google.common.base.Supplier;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseFloatColumnValueSelector;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorValueSelector;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,6 +36,8 @@
  */
 public class FloatMinAggregatorFactory extends SimpleFloatAggregatorFactory
 {
+  private final Supplier<byte[]> cacheKey;
+
   @JsonCreator
   public FloatMinAggregatorFactory(
       @JsonProperty("name") String name,
@@ -46,6 +47,11 @@
   )
   {
     super(macroTable, name, fieldName, expression);
+    this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
+        AggregatorUtil.FLOAT_MIN_CACHE_TYPE_ID,
+        fieldName,
+        fieldExpression
+    );
   }
 
   public FloatMinAggregatorFactory(String name, String fieldName)
@@ -114,15 +120,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
-    byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
-
-    return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
-                     .put(AggregatorUtil.FLOAT_MIN_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .put(AggregatorUtil.STRING_SEPARATOR)
-                     .put(expressionBytes)
-                     .array();
+    return cacheKey.get();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java
index 3175b40..7d47a36 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/FloatSumAggregatorFactory.java
@@ -22,14 +22,13 @@
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.StringUtils;
+import com.google.common.base.Supplier;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseFloatColumnValueSelector;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorValueSelector;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,6 +36,8 @@
  */
 public class FloatSumAggregatorFactory extends SimpleFloatAggregatorFactory
 {
+  private final Supplier<byte[]> cacheKey;
+
   @JsonCreator
   public FloatSumAggregatorFactory(
       @JsonProperty("name") String name,
@@ -46,6 +47,11 @@
   )
   {
     super(macroTable, name, fieldName, expression);
+    this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
+        AggregatorUtil.FLOAT_SUM_CACHE_TYPE_ID,
+        fieldName,
+        fieldExpression
+    );
   }
 
   public FloatSumAggregatorFactory(String name, String fieldName)
@@ -114,15 +120,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
-    byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
-
-    return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
-                     .put(AggregatorUtil.FLOAT_SUM_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .put(AggregatorUtil.STRING_SEPARATOR)
-                     .put(expressionBytes)
-                     .array();
+    return cacheKey.get();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/LongMaxAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/LongMaxAggregatorFactory.java
index a9bc7c5..9260b6d4 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/LongMaxAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/LongMaxAggregatorFactory.java
@@ -22,14 +22,13 @@
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.StringUtils;
+import com.google.common.base.Supplier;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorValueSelector;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,6 +36,8 @@
  */
 public class LongMaxAggregatorFactory extends SimpleLongAggregatorFactory
 {
+  private final Supplier<byte[]> cacheKey;
+
   @JsonCreator
   public LongMaxAggregatorFactory(
       @JsonProperty("name") String name,
@@ -46,6 +47,11 @@
   )
   {
     super(macroTable, name, fieldName, expression);
+    this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
+        AggregatorUtil.LONG_MAX_CACHE_TYPE_ID,
+        fieldName,
+        fieldExpression
+    );
   }
 
   public LongMaxAggregatorFactory(String name, String fieldName)
@@ -114,15 +120,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
-    byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
-
-    return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
-                     .put(AggregatorUtil.LONG_MAX_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .put(AggregatorUtil.STRING_SEPARATOR)
-                     .put(expressionBytes)
-                     .array();
+    return cacheKey.get();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java
index 18a3ee6..5509014 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/LongMinAggregatorFactory.java
@@ -22,14 +22,13 @@
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.StringUtils;
+import com.google.common.base.Supplier;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorValueSelector;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,6 +36,8 @@
  */
 public class LongMinAggregatorFactory extends SimpleLongAggregatorFactory
 {
+  private final Supplier<byte[]> cacheKey;
+
   @JsonCreator
   public LongMinAggregatorFactory(
       @JsonProperty("name") String name,
@@ -46,6 +47,11 @@
   )
   {
     super(macroTable, name, fieldName, expression);
+    this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
+        AggregatorUtil.LONG_MIN_CACHE_TYPE_ID,
+        fieldName,
+        fieldExpression
+    );
   }
 
   public LongMinAggregatorFactory(String name, String fieldName)
@@ -114,15 +120,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
-    byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
-
-    return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
-                     .put(AggregatorUtil.LONG_MIN_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .put(AggregatorUtil.STRING_SEPARATOR)
-                     .put(expressionBytes)
-                     .array();
+    return cacheKey.get();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java
index 8c9364f..3fd8cf0 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/LongSumAggregatorFactory.java
@@ -22,14 +22,13 @@
 import com.fasterxml.jackson.annotation.JacksonInject;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.java.util.common.StringUtils;
+import com.google.common.base.Supplier;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.segment.BaseLongColumnValueSelector;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorValueSelector;
 
 import javax.annotation.Nullable;
-import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
 
@@ -37,6 +36,8 @@
  */
 public class LongSumAggregatorFactory extends SimpleLongAggregatorFactory
 {
+  private final Supplier<byte[]> cacheKey;
+
   @JsonCreator
   public LongSumAggregatorFactory(
       @JsonProperty("name") String name,
@@ -46,6 +47,11 @@
   )
   {
     super(macroTable, name, fieldName, expression);
+    this.cacheKey = AggregatorUtil.getSimpleAggregatorCacheKeySupplier(
+        AggregatorUtil.LONG_SUM_CACHE_TYPE_ID,
+        fieldName,
+        fieldExpression
+    );
   }
 
   public LongSumAggregatorFactory(String name, String fieldName)
@@ -114,15 +120,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    byte[] fieldNameBytes = StringUtils.toUtf8WithNullToEmpty(fieldName);
-    byte[] expressionBytes = StringUtils.toUtf8WithNullToEmpty(expression);
-
-    return ByteBuffer.allocate(2 + fieldNameBytes.length + expressionBytes.length)
-                     .put(AggregatorUtil.LONG_SUM_CACHE_TYPE_ID)
-                     .put(fieldNameBytes)
-                     .put(AggregatorUtil.STRING_SEPARATOR)
-                     .put(expressionBytes)
-                     .array();
+    return cacheKey.get();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
index c540018..bd44361 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/SimpleDoubleAggregatorFactory.java
@@ -59,6 +59,7 @@
   protected final boolean storeDoubleAsFloat;
   protected final Supplier<Expr> fieldExpression;
 
+
   public SimpleDoubleAggregatorFactory(
       ExprMacroTable macroTable,
       String name,
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java
index b05ecbe..cab72bd 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/post/ExpressionPostAggregator.java
@@ -75,6 +75,7 @@
 
   private final Supplier<Expr> parsed;
   private final Supplier<Set<String>> dependentFields;
+  private final Supplier<byte[]> cacheKey;
 
   /**
    * Constructor for serialization.
@@ -144,6 +145,12 @@
 
     this.parsed = parsed;
     this.dependentFields = dependentFields;
+    this.cacheKey = Suppliers.memoize(() -> {
+      return new CacheKeyBuilder(PostAggregatorIds.EXPRESSION)
+          .appendCacheable(parsed.get())
+          .appendString(ordering)
+          .build();
+    });
   }
 
 
@@ -229,10 +236,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    return new CacheKeyBuilder(PostAggregatorIds.EXPRESSION)
-        .appendString(expression)
-        .appendString(ordering)
-        .build();
+    return cacheKey.get();
   }
 
   public enum Ordering implements Comparator<Comparable>
diff --git a/processing/src/main/java/org/apache/druid/query/expression/LookupExprMacro.java b/processing/src/main/java/org/apache/druid/query/expression/LookupExprMacro.java
index 9b2f96e..9b25e5d 100644
--- a/processing/src/main/java/org/apache/druid/query/expression/LookupExprMacro.java
+++ b/processing/src/main/java/org/apache/druid/query/expression/LookupExprMacro.java
@@ -27,6 +27,8 @@
 import org.apache.druid.math.expr.ExprEval;
 import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.Exprs;
+import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.lookup.LookupExtractorFactoryContainerProvider;
 import org.apache.druid.query.lookup.RegisteredLookupExtractionFn;
 
@@ -108,6 +110,14 @@
       {
         return StringUtils.format("%s(%s, %s)", FN_NAME, arg.stringify(), lookupExpr.stringify());
       }
+
+      @Override
+      public byte[] getCacheKey()
+      {
+        return new CacheKeyBuilder(Exprs.LOOKUP_EXPR_CACHE_KEY).appendString(stringify())
+                                                               .appendCacheable(extractionFn)
+                                                               .build();
+      }
     }
 
     return new LookupExpr(arg);
diff --git a/processing/src/main/java/org/apache/druid/query/filter/ExpressionDimFilter.java b/processing/src/main/java/org/apache/druid/query/filter/ExpressionDimFilter.java
index 6692733..32fc2c3 100644
--- a/processing/src/main/java/org/apache/druid/query/filter/ExpressionDimFilter.java
+++ b/processing/src/main/java/org/apache/druid/query/filter/ExpressionDimFilter.java
@@ -25,6 +25,7 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.collect.RangeSet;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprMacroTable;
@@ -40,6 +41,7 @@
 {
   private final String expression;
   private final Supplier<Expr> parsed;
+  private final Supplier<byte[]> cacheKey;
   @Nullable
   private final FilterTuning filterTuning;
 
@@ -53,6 +55,11 @@
     this.expression = expression;
     this.filterTuning = filterTuning;
     this.parsed = Parser.lazyParse(expression, macroTable);
+    this.cacheKey = Suppliers.memoize(() -> {
+      return new CacheKeyBuilder(DimFilterUtils.EXPRESSION_CACHE_ID)
+          .appendCacheable(parsed.get())
+          .build();
+    });
   }
 
   @VisibleForTesting
@@ -102,9 +109,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    return new CacheKeyBuilder(DimFilterUtils.EXPRESSION_CACHE_ID)
-        .appendString(expression)
-        .build();
+    return cacheKey.get();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java
index 343cd8a..c0de087 100644
--- a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java
+++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java
@@ -60,6 +60,7 @@
   @Nullable
   private final ValueType outputType;
   private final Supplier<Expr> parsedExpression;
+  private final Supplier<byte[]> cacheKey;
 
   @JsonCreator
   public ExpressionVirtualColumn(
@@ -73,6 +74,7 @@
     this.expression = Preconditions.checkNotNull(expression, "expression");
     this.outputType = outputType;
     this.parsedExpression = Parser.lazyParse(expression, macroTable);
+    this.cacheKey = makeCacheKeySupplier();
   }
 
   /**
@@ -90,6 +92,7 @@
     this.expression = parsedExpression.toString();
     this.outputType = outputType;
     this.parsedExpression = Suppliers.ofInstance(parsedExpression);
+    this.cacheKey = makeCacheKeySupplier();
   }
 
   @JsonProperty("name")
@@ -259,14 +262,7 @@
   @Override
   public byte[] getCacheKey()
   {
-    CacheKeyBuilder builder = new CacheKeyBuilder(VirtualColumnCacheHelper.CACHE_TYPE_ID_EXPRESSION)
-        .appendString(name)
-        .appendString(expression);
-
-    if (outputType != null) {
-      builder.appendString(outputType.toString());
-    }
-    return builder.build();
+    return cacheKey.get();
   }
 
   @Override
@@ -299,4 +295,18 @@
            ", outputType=" + outputType +
            '}';
   }
+
+  private Supplier<byte[]> makeCacheKeySupplier()
+  {
+    return Suppliers.memoize(() -> {
+      CacheKeyBuilder builder = new CacheKeyBuilder(VirtualColumnCacheHelper.CACHE_TYPE_ID_EXPRESSION)
+          .appendString(name)
+          .appendCacheable(parsedExpression.get());
+
+      if (outputType != null) {
+        builder.appendString(outputType.toString());
+      }
+      return builder.build();
+    });
+  }
 }
diff --git a/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java b/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java
index 503fa1c..07f225d 100644
--- a/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java
+++ b/server/src/test/java/org/apache/druid/query/expression/LookupEnabledTestExprMacroTable.java
@@ -33,6 +33,7 @@
 
 import javax.annotation.Nullable;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -48,13 +49,16 @@
 
   private LookupEnabledTestExprMacroTable()
   {
-    super(
-        Lists.newArrayList(
-            Iterables.concat(
-                TestExprMacroTable.INSTANCE.getMacros(),
-                Collections.singletonList(
-                    new LookupExprMacro(createTestLookupProvider(ImmutableMap.of("foo", "xfoo")))
-                )
+    super(makeTestMacros(ImmutableMap.of("foo", "xfoo")));
+  }
+
+  public static List<ExprMacro> makeTestMacros(final Map<String, String> theLookup)
+  {
+    return Lists.newArrayList(
+        Iterables.concat(
+            TestExprMacroTable.INSTANCE.getMacros(),
+            Collections.singletonList(
+                new LookupExprMacro(createTestLookupProvider(theLookup))
             )
         )
     );
diff --git a/server/src/test/java/org/apache/druid/query/expression/LookupExprMacroTest.java b/server/src/test/java/org/apache/druid/query/expression/LookupExprMacroTest.java
index ce8c9e3..b330bda 100644
--- a/server/src/test/java/org/apache/druid/query/expression/LookupExprMacroTest.java
+++ b/server/src/test/java/org/apache/druid/query/expression/LookupExprMacroTest.java
@@ -21,6 +21,7 @@
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprMacroTable;
 import org.apache.druid.math.expr.InputBindings;
 import org.apache.druid.math.expr.Parser;
 import org.apache.druid.testing.InitializedNullHandlingTest;
@@ -54,6 +55,32 @@
     assertExpr("lookup(x, 'lookylook')", null);
   }
 
+  @Test
+  public void testCacheKeyChangesWhenLookupChanges()
+  {
+    final String expression = "lookup(x, 'lookyloo')";
+    final Expr expr = Parser.parse(expression, LookupEnabledTestExprMacroTable.INSTANCE);
+    final Expr exprSameLookup = Parser.parse(expression, LookupEnabledTestExprMacroTable.INSTANCE);
+    final Expr exprChangedLookup = Parser.parse(
+        expression,
+        new ExprMacroTable(LookupEnabledTestExprMacroTable.makeTestMacros(ImmutableMap.of("x", "y", "a", "b")))
+    );
+    // same should have same cache key
+    Assert.assertArrayEquals(expr.getCacheKey(), exprSameLookup.getCacheKey());
+    // different should not have same key
+    final byte[] exprBytes = expr.getCacheKey();
+    final byte[] expr2Bytes = exprChangedLookup.getCacheKey();
+    if (exprBytes.length == expr2Bytes.length) {
+      // only check for equality if lengths are equal
+      boolean allEqual = true;
+      for (int i = 0; i < exprBytes.length; i++) {
+        allEqual = allEqual && (exprBytes[i] == expr2Bytes[i]);
+      }
+      Assert.assertFalse(allEqual);
+    }
+  }
+
+
   private void assertExpr(final String expression, final Object expectedResult)
   {
     final Expr expr = Parser.parse(expression, LookupEnabledTestExprMacroTable.INSTANCE);