add single input string expression dimension vector selector and better expression planning (#11213)

* add single input string expression dimension vector selector and better expression planning

* better

* fixes

* oops

* rework how vector processor factories choose string processors, fix to be less aggressive about vectorizing

* oops

* javadocs, renaming

* more javadocs

* benchmarks

* use string expression vector processor with vector size 1 instead of expr.eval

* better logging

* javadocs, surprising number of the the

* more

* simplify
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
index 0cf24ec..cb5ce5f 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
@@ -178,13 +178,20 @@
       // 26: group by string expr with non-expr agg
       "SELECT CONCAT(string2, '-', long2), SUM(double1) FROM foo GROUP BY 1 ORDER BY 2",
       // 27: group by string expr with expr agg
-      "SELECT CONCAT(string2, '-', long2), SUM(long1 * double4) FROM foo GROUP BY 1 ORDER BY 2"
+      "SELECT CONCAT(string2, '-', long2), SUM(long1 * double4) FROM foo GROUP BY 1 ORDER BY 2",
+      // 28: group by single input string low cardinality expr with expr agg
+      "SELECT CONCAT(string2, '-', 'foo'), SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 2",
+      // 28: group by single input string high cardinality expr with expr agg
+      "SELECT CONCAT(string3, '-', 'foo'), SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 2"
   );
 
   @Param({"5000000"})
   private int rowsPerSegment;
 
-  @Param({"false", "force"})
+  @Param({
+      "false",
+      "force"
+  })
   private String vectorize;
 
   @Param({
@@ -217,7 +224,9 @@
       "24",
       "25",
       "26",
-      "27"
+      "27",
+      "28",
+      "29"
   })
   private String query;
 
diff --git a/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java b/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java
index a5b8bad..8d59e6b 100644
--- a/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java
+++ b/core/src/main/java/org/apache/druid/concurrent/LifecycleLock.java
@@ -257,7 +257,7 @@
   }
 
   /**
-   * Finalizes stopping the the LifecycleLock. This method must be called before exit from stop() on this object,
+   * Finalizes stopping the LifecycleLock. This method must be called before exit from stop() on this object,
    * usually in a finally block. If you're using a restartable object, use {@link #exitStopAndReset()} instead.
    *
    * @throws IllegalMonitorStateException if {@link #canStop()} is not yet called on this LifecycleLock
diff --git a/core/src/main/java/org/apache/druid/math/expr/Function.java b/core/src/main/java/org/apache/druid/math/expr/Function.java
index a0b8e59..ed7082e 100644
--- a/core/src/main/java/org/apache/druid/math/expr/Function.java
+++ b/core/src/main/java/org/apache/druid/math/expr/Function.java
@@ -647,7 +647,7 @@
       if (evals.isEmpty()) {
         // The GREATEST/LEAST functions are not in the SQL standard. Emulate the behavior of postgres (return null if
         // all expressions are null, otherwise skip null values) since it is used as a base for a wide number of
-        // databases. This also matches the behavior the the long/double greatest/least post aggregators. Some other
+        // databases. This also matches the behavior the long/double greatest/least post aggregators. Some other
         // databases (e.g., MySQL) return null if any expression is null.
         // https://www.postgresql.org/docs/9.5/functions-conditional.html
         // https://dev.mysql.com/doc/refman/8.0/en/comparison-operators.html#function_least
diff --git a/core/src/main/java/org/apache/druid/math/expr/FunctionalExpr.java b/core/src/main/java/org/apache/druid/math/expr/FunctionalExpr.java
index 5e164e1..eb040df 100644
--- a/core/src/main/java/org/apache/druid/math/expr/FunctionalExpr.java
+++ b/core/src/main/java/org/apache/druid/math/expr/FunctionalExpr.java
@@ -68,6 +68,11 @@
     return args.stream().map(IdentifierExpr::toString).collect(Collectors.toList());
   }
 
+  public List<String> stringifyIdentifiers()
+  {
+    return args.stream().map(IdentifierExpr::stringify).collect(Collectors.toList());
+  }
+
   ImmutableList<IdentifierExpr> getIdentifierExprs()
   {
     return args;
@@ -99,7 +104,7 @@
   @Override
   public String stringify()
   {
-    return StringUtils.format("(%s) -> %s", ARG_JOINER.join(getIdentifiers()), expr.stringify());
+    return StringUtils.format("(%s) -> %s", ARG_JOINER.join(stringifyIdentifiers()), expr.stringify());
   }
 
   @Override
diff --git a/docs/operations/security-overview.md b/docs/operations/security-overview.md
index c51e7d2..e1a1dd7 100644
--- a/docs/operations/security-overview.md
+++ b/docs/operations/security-overview.md
@@ -122,7 +122,7 @@
 
 ## Authentication and authorization
 
-You can configure authentication and authorization to control access to the the Druid APIs. Then configure users, roles, and permissions, as described in the following sections. Make the configuration changes in the `common.runtime.properties` file on all Druid servers in the cluster.
+You can configure authentication and authorization to control access to the Druid APIs. Then configure users, roles, and permissions, as described in the following sections. Make the configuration changes in the `common.runtime.properties` file on all Druid servers in the cluster.
 
 Within Druid's operating context, authenticators control the way user identities are verified. Authorizers employ user roles to relate authenticated users to the datasources they are permitted to access. You can set the finest-grained permissions on a per-datasource basis.
 
diff --git a/docs/querying/caching.md b/docs/querying/caching.md
index 3ee0191..5d6affe 100644
--- a/docs/querying/caching.md
+++ b/docs/querying/caching.md
@@ -63,7 +63,7 @@
 
 - On Historicals, the default. Enable segment-level cache population on Historicals for larger production clusters to prevent Brokers from having to merge all query results. When you enable cache population on Historicals instead of Brokers, the Historicals merge their own local results and put less strain on the Brokers.
 
-- On ingestion tasks in the Peon or Indexer service. Larger production clusters should enable segment-level cache population on task services only to prevent Brokers from having to merge all query results. When you enable cache population on task execution services instead of Brokers, the the task execution services to merge their own local results and put less strain on the Brokers.
+- On ingestion tasks in the Peon or Indexer service. Larger production clusters should enable segment-level cache population on task services only to prevent Brokers from having to merge all query results. When you enable cache population on task execution services instead of Brokers, the task execution services to merge their own local results and put less strain on the Brokers.
 
      Task executor services only support caches that store data locally. For example the `caffeine` cache. This restriction exists because the cache stores results at the level of intermediate partial segments generated by the ingestion tasks. These intermediate partial segments may not be identical across task replicas. Therefore task executor services ignore remote cache types such as `memcached`.
 
diff --git a/docs/querying/datasource.md b/docs/querying/datasource.md
index 8fd121d..a9dbaab 100644
--- a/docs/querying/datasource.md
+++ b/docs/querying/datasource.md
@@ -54,7 +54,7 @@
 [data ingestion](../ingestion/index.md). They are split up into segments, distributed around the cluster,
 and queried in parallel.
 
-In [Druid SQL](sql.md#from), table datasources reside in the the `druid` schema. This is the default schema, so table
+In [Druid SQL](sql.md#from), table datasources reside in the `druid` schema. This is the default schema, so table
 datasources can be referenced as either `druid.dataSourceName` or simply `dataSourceName`.
 
 In native queries, table datasources can be referenced using their names as strings (as in the example above), or by
@@ -92,7 +92,7 @@
 <!--END_DOCUSAURUS_CODE_TABS-->
 
 Lookup datasources correspond to Druid's key-value [lookup](lookups.md) objects. In [Druid SQL](sql.md#from),
-they reside in the the `lookup` schema. They are preloaded in memory on all servers, so they can be accessed rapidly.
+they reside in the `lookup` schema. They are preloaded in memory on all servers, so they can be accessed rapidly.
 They can be joined onto regular tables using the [join operator](#join).
 
 Lookup datasources are key-value oriented and always have exactly two columns: `k` (the key) and `v` (the value), and
diff --git a/docs/querying/sorting-orders.md b/docs/querying/sorting-orders.md
index 34f8057..9609d12 100644
--- a/docs/querying/sorting-orders.md
+++ b/docs/querying/sorting-orders.md
@@ -49,7 +49,7 @@
 When comparing two unparseable values (e.g., "hello" and "world"), this ordering will sort by comparing the unparsed strings lexicographically.
 
 ## Strlen
-Sorts values by the their string lengths. When there is a tie, this comparator falls back to using the String compareTo method.
+Sorts values by their string lengths. When there is a tie, this comparator falls back to using the String compareTo method.
 
 ## Version
 Sorts values as versions, e.g.: "10.0 sorts after 9.0", "1.0.0-SNAPSHOT sorts after 1.0.0".
diff --git a/extendedset/src/main/java/org/apache/druid/extendedset/intset/IntSet.java b/extendedset/src/main/java/org/apache/druid/extendedset/intset/IntSet.java
index df90c1c..8342978 100755
--- a/extendedset/src/main/java/org/apache/druid/extendedset/intset/IntSet.java
+++ b/extendedset/src/main/java/org/apache/druid/extendedset/intset/IntSet.java
@@ -77,7 +77,7 @@
     int next();

 

     /**

-     * Skips all the elements before the the specified element, so that

+     * Skips all the elements before the specified element, so that

      * {@link #next()} gives the given element or, if it does not exist, the

      * element immediately after according to the sorting provided by this

      * set.

diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java b/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java
index a576771..610d611 100644
--- a/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java
+++ b/extensions-core/hdfs-storage/src/main/java/org/apache/hadoop/fs/HadoopFsWrapper.java
@@ -26,7 +26,7 @@
 import java.lang.reflect.Method;
 
 /**
- * This wrapper class is created to be able to access some of the the "protected" methods inside Hadoop's
+ * This wrapper class is created to be able to access some of the "protected" methods inside Hadoop's
  * FileSystem class. Those are supposed to become public eventually or more appropriate alternatives would be
  * provided.
  * This is a hack and should be removed when no longer necessary.
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogram.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogram.java
index ed67b8d..fa37725 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogram.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/ApproximateHistogram.java
@@ -1520,7 +1520,7 @@
    *
    * @param probabilities array of probabilities
    *
-   * @return an array of length probabilities.length representing the the approximate sample quantiles
+   * @return an array of length probabilities.length representing the approximate sample quantiles
    * corresponding to the given probabilities
    */
 
diff --git a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java
index e10203c..754e0df 100644
--- a/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java
+++ b/extensions-core/histogram/src/main/java/org/apache/druid/query/aggregation/histogram/FixedBucketsHistogram.java
@@ -94,7 +94,7 @@
   public static final byte SPARSE_ENCODING_MODE = 0x02;
 
   /**
-   * Determines how the the histogram handles outliers.
+   * Determines how the histogram handles outliers.
    *
    * Ignore:   do not track outliers at all
    * Overflow: track outlier counts in upperOutlierCount and lowerOutlierCount.
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java
index 948d63d..35341c2 100644
--- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/FixedBucketsHistogramQuantileSqlAggregatorTest.java
@@ -123,8 +123,6 @@
   @Test
   public void testQuantileOnFloatAndLongs() throws Exception
   {
-    cannotVectorize();
-
     final List<Object[]> expectedResults = ImmutableList.of(
         new Object[]{
             1.0299999713897705,
@@ -238,8 +236,6 @@
   @Test
   public void testQuantileOnCastedString() throws Exception
   {
-    cannotVectorize();
-
     testQuery(
         "SELECT\n"
         + "APPROX_QUANTILE_FIXED_BUCKETS(CAST(dim1 AS DOUBLE), 0.01, 20, 0.0, 10.0),\n"
diff --git a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
index 48993ca..720d2fa 100644
--- a/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
+++ b/extensions-core/histogram/src/test/java/org/apache/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java
@@ -121,7 +121,6 @@
   @Test
   public void testQuantileOnFloatAndLongs() throws Exception
   {
-    cannotVectorize();
     testQuery(
         "SELECT\n"
         + "APPROX_QUANTILE(m1, 0.01),\n"
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java
index 069751d..a141113 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java
@@ -117,4 +117,25 @@
     }
     return NilGroupByVectorColumnSelector.INSTANCE;
   }
+
+  /**
+   * The group by engine vector processor has a more relaxed approach to choosing to use a dictionary encoded string
+   * selector over an object selector than some of the other {@link VectorColumnProcessorFactory} implementations.
+   *
+   * Basically, if a valid dictionary exists, we will use it to group on dictionary ids (so that we can use
+   * {@link SingleValueStringGroupByVectorColumnSelector} whenever possible instead of
+   * {@link DictionaryBuildingSingleValueStringGroupByVectorColumnSelector}).
+   *
+   * We do this even for things like virtual columns that have a single string input, because it allows deferring
+   * accessing any of the actual string values, which involves at minimum reading utf8 byte values and converting
+   * them to string form (if not already cached), and in the case of expressions, computing the expression output for
+   * the string input.
+   */
+  @Override
+  public boolean useDictionaryEncodedSelector(ColumnCapabilities capabilities)
+  {
+    Preconditions.checkArgument(capabilities != null, "Capabilities must not be null");
+    Preconditions.checkArgument(capabilities.getType() == ValueType.STRING, "Must only be called on a STRING column");
+    return capabilities.isDictionaryEncoded().isTrue();
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
index 848c185..f66d51a 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
@@ -42,6 +42,7 @@
 import org.apache.druid.query.groupby.epinephelinae.HashVectorGrouper;
 import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
 import org.apache.druid.query.vector.VectorCursorGranularizer;
+import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.ColumnProcessors;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.VirtualColumns;
@@ -59,7 +60,6 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class VectorGroupByEngine
@@ -75,18 +75,18 @@
       @Nullable final Filter filter
   )
   {
-    Function<String, ColumnCapabilities> capabilitiesFunction = name ->
-        query.getVirtualColumns().getColumnCapabilitiesWithFallback(adapter, name);
+    final ColumnInspector inspector = query.getVirtualColumns().wrapInspector(adapter);
 
-    return canVectorizeDimensions(capabilitiesFunction, query.getDimensions())
-           && query.getDimensions().stream().allMatch(DimensionSpec::canVectorize)
-           && query.getAggregatorSpecs().stream().allMatch(aggregatorFactory -> aggregatorFactory.canVectorize(adapter))
+    return adapter.canVectorize(filter, query.getVirtualColumns(), false)
+           && canVectorizeDimensions(inspector, query.getDimensions())
            && VirtualColumns.shouldVectorize(query, query.getVirtualColumns(), adapter)
-           && adapter.canVectorize(filter, query.getVirtualColumns(), false);
+           && query.getAggregatorSpecs()
+                   .stream()
+                   .allMatch(aggregatorFactory -> aggregatorFactory.canVectorize(inspector));
   }
 
   public static boolean canVectorizeDimensions(
-      final Function<String, ColumnCapabilities> capabilitiesFunction,
+      final ColumnInspector inspector,
       final List<DimensionSpec> dimensions
   )
   {
@@ -94,6 +94,10 @@
         .stream()
         .allMatch(
             dimension -> {
+              if (!dimension.canVectorize()) {
+                return false;
+              }
+
               if (dimension.mustDecorate()) {
                 // group by on multi value dimensions are not currently supported
                 // DimensionSpecs that decorate may turn singly-valued columns into multi-valued selectors.
@@ -102,7 +106,7 @@
               }
 
               // Now check column capabilities.
-              final ColumnCapabilities columnCapabilities = capabilitiesFunction.apply(dimension.getDimension());
+              final ColumnCapabilities columnCapabilities = inspector.getColumnCapabilities(dimension.getDimension());
               // null here currently means the column does not exist, nil columns can be vectorized
               if (columnCapabilities == null) {
                 return true;
diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
index 75cb498..e93cc97 100644
--- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
@@ -39,6 +39,7 @@
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.filter.Filter;
 import org.apache.druid.query.vector.VectorCursorGranularizer;
+import org.apache.druid.segment.ColumnInspector;
 import org.apache.druid.segment.SegmentMissingException;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.VirtualColumns;
@@ -66,7 +67,7 @@
   @VisibleForTesting
   public TimeseriesQueryEngine()
   {
-    this.bufferPool = new StupidPool<>("dummy", () -> ByteBuffer.allocate(1000000));
+    this.bufferPool = new StupidPool<>("dummy", () -> ByteBuffer.allocate(10000000));
   }
 
   @Inject
@@ -94,10 +95,12 @@
     final Granularity gran = query.getGranularity();
     final boolean descending = query.isDescending();
 
+    final ColumnInspector inspector = query.getVirtualColumns().wrapInspector(adapter);
+
     final boolean doVectorize = QueryContexts.getVectorize(query).shouldVectorize(
-        query.getAggregatorSpecs().stream().allMatch(aggregatorFactory -> aggregatorFactory.canVectorize(adapter))
+        adapter.canVectorize(filter, query.getVirtualColumns(), descending)
         && VirtualColumns.shouldVectorize(query, query.getVirtualColumns(), adapter)
-        && adapter.canVectorize(filter, query.getVirtualColumns(), descending)
+        && query.getAggregatorSpecs().stream().allMatch(aggregatorFactory -> aggregatorFactory.canVectorize(inspector))
     );
 
     final Sequence<Result<TimeseriesResultValue>> result;
diff --git a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java
index ac5b21f..d7c2fcd 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/types/TopNColumnAggregatesProcessor.java
@@ -115,7 +115,7 @@
   void initAggregateStore();
 
   /**
-   * Closes all on heap {@link Aggregator} associated withe the aggregates processor
+   * Closes all on heap {@link Aggregator} associated with the aggregates processor
    */
   void closeAggregators();
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java
index 1830712..ffb06ed 100644
--- a/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java
+++ b/processing/src/main/java/org/apache/druid/segment/ColumnProcessors.java
@@ -219,13 +219,27 @@
     } else if (dimensionSpec.getExtractionFn() != null) {
       // DimensionSpec is applying an extractionFn but *not* decorating. We have some insight into how the
       // extractionFn will behave, so let's use it.
+      final boolean dictionaryEncoded;
+      final boolean unique;
+      final boolean sorted;
+      if (columnCapabilities != null) {
+        dictionaryEncoded = columnCapabilities.isDictionaryEncoded().isTrue();
+        unique = columnCapabilities.areDictionaryValuesUnique().isTrue();
+        sorted = columnCapabilities.areDictionaryValuesSorted().isTrue();
+      } else {
+        dictionaryEncoded = false;
+        unique = false;
+        sorted = false;
+      }
 
       return new ColumnCapabilitiesImpl()
           .setType(ValueType.STRING)
-          .setDictionaryValuesSorted(dimensionSpec.getExtractionFn().preservesOrdering())
-          .setDictionaryValuesUnique(dimensionSpec.getExtractionFn().getExtractionType()
-                                     == ExtractionFn.ExtractionType.ONE_TO_ONE)
-          .setHasMultipleValues(dimensionSpec.mustDecorate() || mayBeMultiValue(columnCapabilities));
+          .setDictionaryEncoded(dictionaryEncoded)
+          .setDictionaryValuesSorted(sorted && dimensionSpec.getExtractionFn().preservesOrdering())
+          .setDictionaryValuesUnique(
+              unique && dimensionSpec.getExtractionFn().getExtractionType() == ExtractionFn.ExtractionType.ONE_TO_ONE
+          )
+          .setHasMultipleValues(mayBeMultiValue(columnCapabilities));
     } else {
       // No transformation. Pass through underlying types.
       return columnCapabilities;
@@ -318,8 +332,8 @@
 
     switch (capabilities.getType()) {
       case STRING:
-        // if column is not uniquely dictionary encoded, use an object selector
-        if (capabilities.isDictionaryEncoded().isFalse() || capabilities.areDictionaryValuesUnique().isFalse()) {
+        // let the processor factory decide if it prefers to use an object selector or dictionary encoded selector
+        if (!processorFactory.useDictionaryEncodedSelector(capabilities)) {
           return processorFactory.makeObjectProcessor(
               capabilities,
               objectSelectorFn.apply(selectorFactory)
diff --git a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java
index b2df1f9..d3d5697 100644
--- a/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/VectorColumnProcessorFactory.java
@@ -19,7 +19,9 @@
 
 package org.apache.druid.segment;
 
+import com.google.common.base.Preconditions;
 import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.vector.MultiValueDimensionVectorSelector;
 import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
 import org.apache.druid.segment.vector.VectorObjectSelector;
@@ -86,4 +88,27 @@
    * cases where the dictionary does not exist or is not expected to be useful.
    */
   T makeObjectProcessor(@SuppressWarnings("unused") ColumnCapabilities capabilities, VectorObjectSelector selector);
+
+  /**
+   * The processor factory can influence the decision on whether or not to prefer a dictionary encoded column value
+   * selector over a an object selector by examining the {@link ColumnCapabilities}.
+   *
+   * By default, all processor factories prefer to use a dictionary encoded selector if the column has a dictionary
+   * available ({@link ColumnCapabilities#isDictionaryEncoded()} is true), and there is a unique mapping of dictionary
+   * id to value ({@link ColumnCapabilities#areDictionaryValuesUnique()} is true), but this can be overridden
+   * if there is more appropriate behavior for a given processor.
+   *
+   * For processors, this means by default only actual dictionary encoded string columns (likely from real segments)
+   * will use {@link SingleValueDimensionVectorSelector} and {@link MultiValueDimensionVectorSelector}, while
+   * processors on things like string expression virtual columns will prefer to use {@link VectorObjectSelector}. In
+   * other words, it is geared towards use cases where there is a clear opportunity to benefit to deferring having to
+   * deal with the actual string value in exchange for the increased complexity of dealing with dictionary encoded
+   * selectors.
+   */
+  default boolean useDictionaryEncodedSelector(ColumnCapabilities capabilities)
+  {
+    Preconditions.checkArgument(capabilities != null, "Capabilities must not be null");
+    Preconditions.checkArgument(capabilities.getType() == ValueType.STRING, "Must only be called on a STRING column");
+    return capabilities.isDictionaryEncoded().and(capabilities.areDictionaryValuesUnique()).isTrue();
+  }
 }
diff --git a/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java b/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java
index 3b141d4..effcf33 100644
--- a/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java
+++ b/processing/src/main/java/org/apache/druid/segment/VirtualColumns.java
@@ -43,6 +43,7 @@
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorObjectSelector;
 import org.apache.druid.segment.vector.VectorValueSelector;
+import org.apache.druid.segment.virtual.VirtualizedColumnInspector;
 import org.apache.druid.segment.virtual.VirtualizedColumnSelectorFactory;
 
 import javax.annotation.Nullable;
@@ -414,6 +415,10 @@
     return virtualColumns.toArray(new VirtualColumn[0]);
   }
 
+  /**
+   * Creates a {@link VirtualizedColumnSelectorFactory} which can create column selectors for {@link #virtualColumns}
+   * in addition to selectors for all physical columns in the underlying factory.
+   */
   public ColumnSelectorFactory wrap(final ColumnSelectorFactory baseFactory)
   {
     if (virtualColumns.isEmpty()) {
@@ -423,6 +428,19 @@
     }
   }
 
+  /**
+   * Creates a {@link VirtualizedColumnInspector} that provides {@link ColumnCapabilities} information for all
+   * {@link #virtualColumns} in addition to the capabilities of all physical columns in the underlying inspector.
+   */
+  public ColumnInspector wrapInspector(ColumnInspector inspector)
+  {
+    if (virtualColumns.isEmpty()) {
+      return inspector;
+    } else {
+      return new VirtualizedColumnInspector(inspector, this);
+    }
+  }
+
   @Override
   public byte[] getCacheKey()
   {
diff --git a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilities.java b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilities.java
index 1d85d2b..e6d0920 100644
--- a/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilities.java
+++ b/processing/src/main/java/org/apache/druid/segment/column/ColumnCapabilities.java
@@ -41,6 +41,7 @@
    *
    * If ValueType is COMPLEX, then the typeName associated with it.
    */
+  @Nullable
   String getComplexTypeName();
 
   /**
diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java
index b1ab3a9..59ef034 100644
--- a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java
+++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlan.java
@@ -19,10 +19,14 @@
 
 package org.apache.druid.segment.virtual;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Iterables;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprType;
 import org.apache.druid.math.expr.Parser;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
 import org.apache.druid.segment.column.ValueType;
 
 import javax.annotation.Nullable;
@@ -40,16 +44,18 @@
       */
     CONSTANT,
     /**
-     * expression has a single, single valued input, and is dictionary encoded if the value is a string
+     * expression has a single, single valued input, and is dictionary encoded if the value is a string, and does
+     * not produce non-scalar output
      */
     SINGLE_INPUT_SCALAR,
     /**
      * expression has a single input, which may produce single or multi-valued output, but if so, it must be implicitly
-     * mappable  (i.e. the expression is not treating its input as an array and not wanting to output an array)
+     * mappable  (i.e. the expression is not treating its input as an array and does not produce non-scalar output)
      */
     SINGLE_INPUT_MAPPABLE,
     /**
-     * expression must be implicitly mapped across the multiple values per row of known multi-value inputs
+     * expression must be implicitly mapped across the multiple values per row of known multi-value inputs, the final
+     * output will be multi-valued
      */
     NEEDS_APPLIED,
     /**
@@ -57,15 +63,16 @@
      */
     UNKNOWN_INPUTS,
     /**
-     * expression has inputs whose type was incomplete, such as unknown multi-valuedness
+     * expression has inputs whose type was incomplete, such as unknown multi-valuedness, which are not explicitly
+     * used as possibly multi-valued/array inputs
      */
     INCOMPLETE_INPUTS,
     /**
-     * expression explicitly using multi-valued inputs as array inputs
+     * expression explicitly using multi-valued inputs as array inputs or has array inputs
      */
     NON_SCALAR_INPUTS,
     /**
-     * expression produces explict multi-valued output, or implicit multi-valued output via mapping
+     * expression produces explict multi-valued output
      */
     NON_SCALAR_OUTPUT,
     /**
@@ -74,6 +81,7 @@
     VECTORIZABLE
   }
 
+  private final ColumnInspector baseInputInspector;
   private final Expr expression;
   private final Expr.BindingAnalysis analysis;
   private final EnumSet<Trait> traits;
@@ -86,6 +94,7 @@
   private final List<String> unappliedInputs;
 
   ExpressionPlan(
+      ColumnInspector baseInputInspector,
       Expr expression,
       Expr.BindingAnalysis analysis,
       EnumSet<Trait> traits,
@@ -95,6 +104,7 @@
       List<String> unappliedInputs
   )
   {
+    this.baseInputInspector = baseInputInspector;
     this.expression = expression;
     this.analysis = analysis;
     this.traits = traits;
@@ -104,16 +114,28 @@
     this.unappliedInputs = unappliedInputs;
   }
 
+  /**
+   * An expression with no inputs is a constant
+   */
   public boolean isConstant()
   {
     return analysis.getRequiredBindings().isEmpty();
   }
 
+  /**
+   * Gets the original expression that was planned
+   */
   public Expr getExpression()
   {
     return expression;
   }
 
+  /**
+   * If an expression uses a multi-valued input in a scalar manner, the expression can be automatically transformed
+   * to map these values across the expression, applying the original expression to every value.
+   *
+   * @see Parser#applyUnappliedBindings(Expr, Expr.BindingAnalysis, List)
+   */
   public Expr getAppliedExpression()
   {
     if (is(Trait.NEEDS_APPLIED)) {
@@ -122,61 +144,184 @@
     return expression;
   }
 
+  /**
+   * If an expression uses a multi-valued input in a scalar manner, and the expression contains an accumulator such as
+   * for use as part of an aggregator, the expression can be automatically transformed to fold the accumulator across
+   * the values of the original expression.
+   *
+   * @see Parser#foldUnappliedBindings(Expr, Expr.BindingAnalysis, List, String)
+   */
   public Expr getAppliedFoldExpression(String accumulatorId)
   {
     if (is(Trait.NEEDS_APPLIED)) {
+      Preconditions.checkState(
+          !unappliedInputs.contains(accumulatorId),
+          "Accumulator cannot be implicitly transformed, if it is an ARRAY or multi-valued type it must"
+          + " be used explicitly as such"
+      );
       return Parser.foldUnappliedBindings(expression, analysis, unappliedInputs, accumulatorId);
     }
     return expression;
   }
 
-  public Expr.BindingAnalysis getAnalysis()
-  {
-    return analysis;
-  }
-
-  public boolean is(Trait... flags)
-  {
-    return is(traits, flags);
-  }
-
-  public boolean any(Trait... flags)
-  {
-    return any(traits, flags);
-  }
-
+  /**
+   * The output type of the original expression.
+   *
+   * Note that this might not be the true for the expressions provided by {@link #getAppliedExpression()}
+   * or {@link #getAppliedFoldExpression(String)}, should the expression have any unapplied inputs
+   */
   @Nullable
   public ExprType getOutputType()
   {
     return outputType;
   }
 
+  /**
+   * If and only if the column has a single input, get the {@link ValueType} of that input
+   */
   @Nullable
   public ValueType getSingleInputType()
   {
     return singleInputType;
   }
 
+  /**
+   * If and only if the expression has a single input, get the name of that input
+   */
   public String getSingleInputName()
   {
     return Iterables.getOnlyElement(analysis.getRequiredBindings());
   }
 
+  /**
+   * Get set of inputs which were completely missing information, possibly a non-existent column or from a column
+   * selector factory with incomplete information
+   */
   public Set<String> getUnknownInputs()
   {
     return unknownInputs;
   }
 
+  /**
+   * Returns basic analysis of the inputs to an {@link Expr} and how they are used
+   *
+   * @see Expr.BindingAnalysis
+   */
+  public Expr.BindingAnalysis getAnalysis()
+  {
+    return analysis;
+  }
+
+  /**
+   * Tries to construct the most appropriate {@link ColumnCapabilities} for this plan given the {@link #outputType} and
+   * {@link #traits} inferred by the {@link ExpressionPlanner}, optionally with the help of hint {@link ValueType}.
+   *
+   * If no output type was able to be inferred during planning, returns null
+   */
+  @Nullable
+  public ColumnCapabilities inferColumnCapabilities(@Nullable ValueType outputTypeHint)
+  {
+    if (outputType != null) {
+      final ValueType inferredValueType = ExprType.toValueType(outputType);
+
+      if (inferredValueType.isNumeric()) {
+        // if float was explicitly specified preserve it, because it will currently never be the computed output type
+        // since there is no float expression type
+        if (ValueType.FLOAT == outputTypeHint) {
+          return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.FLOAT);
+        }
+        return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(inferredValueType);
+      }
+
+      // null constants can sometimes trip up the type inference to report STRING, so check if explicitly supplied
+      // output type is numeric and stick with that if so
+      if (outputTypeHint != null && outputTypeHint.isNumeric()) {
+        return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(outputTypeHint);
+      }
+
+      // fancy string stuffs
+      if (ValueType.STRING == inferredValueType) {
+        // constant strings are supported as dimension selectors, set them as dictionary encoded and unique for all the
+        // bells and whistles the engines have to offer
+        if (isConstant()) {
+          return ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+                                       .setDictionaryEncoded(true)
+                                       .setDictionaryValuesUnique(true)
+                                       .setDictionaryValuesSorted(true)
+                                       .setHasNulls(expression.isNullLiteral());
+        }
+
+        // single input strings also have an optimization which allow defering evaluation time until dictionary encoded
+        // column lookup, so if the underlying column is a dictionary encoded string then we can report as such
+        if (any(Trait.SINGLE_INPUT_SCALAR, Trait.SINGLE_INPUT_MAPPABLE)) {
+          ColumnCapabilities underlyingCapabilities = baseInputInspector.getColumnCapabilities(getSingleInputName());
+          if (underlyingCapabilities != null) {
+            // since we don't know if the expression is 1:1 or if it retains ordering we can only piggy back only
+            // report as dictionary encoded, but it still allows us to use algorithms which work with dictionaryIds
+            // to create a dictionary encoded selector instead of an object selector to defer expression evaluation
+            // until query time
+            return ColumnCapabilitiesImpl.copyOf(underlyingCapabilities)
+                                         .setType(ValueType.STRING)
+                                         .setDictionaryValuesSorted(false)
+                                         .setDictionaryValuesUnique(false)
+                                         .setHasNulls(true);
+          }
+        }
+      }
+
+      // we don't have to check for unknown input here because output type is unable to be inferred if we don't know
+      // the complete set of input types
+      if (any(Trait.NON_SCALAR_OUTPUT, Trait.NEEDS_APPLIED)) {
+        // if the hint requested a string, use a string
+        if (ValueType.STRING == outputTypeHint) {
+          return ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(ValueType.STRING);
+        }
+        // maybe something is looking for a little fun and wants arrays? let whatever it is through
+        return ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(ExprType.toValueType(outputType));
+      }
+
+      // if we got here, lets call it single value string output, non-dictionary encoded
+      return ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities();
+    }
+    // we don't know what we don't know
+    return null;
+  }
+
+  /**
+   * Returns true if all of the supplied traits are true in this plan
+   */
+  public boolean is(Trait... flags)
+  {
+    return is(traits, flags);
+  }
+
+  /**
+   * Returns true if any of the supplied traits are true in this plan
+   */
+  public boolean any(Trait... flags)
+  {
+    return any(traits, flags);
+  }
+
+  /**
+   * Returns true if all of the supplied traits are true in the supplied set
+   */
   static boolean is(EnumSet<Trait> traits, Trait... args)
   {
     return Arrays.stream(args).allMatch(traits::contains);
   }
 
+  /**
+   * Returns true if any of the supplied traits are true in the supplied set
+   */
   static boolean any(EnumSet<Trait> traits, Trait... args)
   {
     return Arrays.stream(args).anyMatch(traits::contains);
   }
 
+  /**
+   * Returns true if none of the supplied traits are true in the supplied set
+   */
   static boolean none(EnumSet<Trait> traits, Trait... args)
   {
     return Arrays.stream(args).noneMatch(traits::contains);
diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
index 71fb9fa..4eb91eb 100644
--- a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
+++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionPlanner.java
@@ -80,17 +80,13 @@
       //    SINGLE_INPUT_MAPPABLE
       // is set when a single input string column, which can be multi-valued, but if so, it must be implicitly mappable
       // (i.e. the expression is not treating its input as an array and not wanting to output an array)
-      if (capabilities != null) {
+      if (capabilities != null && !analysis.hasInputArrays() && !analysis.isOutputArray()) {
         boolean isSingleInputMappable = false;
-        boolean isSingleInputScalar = capabilities.hasMultipleValues().isFalse() &&
-                                      !analysis.hasInputArrays() &&
-                                      !analysis.isOutputArray();
+        boolean isSingleInputScalar = capabilities.hasMultipleValues().isFalse();
         if (capabilities.getType() == ValueType.STRING) {
           isSingleInputScalar &= capabilities.isDictionaryEncoded().isTrue();
           isSingleInputMappable = capabilities.isDictionaryEncoded().isTrue() &&
-                                  !capabilities.hasMultipleValues().isUnknown() &&
-                                  !analysis.hasInputArrays() &&
-                                  !analysis.isOutputArray();
+                                  !capabilities.hasMultipleValues().isUnknown();
         }
 
         // if satisfied, set single input output type and flags
@@ -155,8 +151,7 @@
     final boolean shouldComputeOutput = ExpressionPlan.none(
         traits,
         ExpressionPlan.Trait.UNKNOWN_INPUTS,
-        ExpressionPlan.Trait.INCOMPLETE_INPUTS,
-        ExpressionPlan.Trait.NEEDS_APPLIED
+        ExpressionPlan.Trait.INCOMPLETE_INPUTS
     );
 
     if (shouldComputeOutput) {
@@ -168,16 +163,12 @@
       traits.add(ExpressionPlan.Trait.NON_SCALAR_OUTPUT);
 
       // single input mappable may not produce array output explicitly, only through implicit mapping
+      traits.remove(ExpressionPlan.Trait.SINGLE_INPUT_SCALAR);
       traits.remove(ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE);
     }
 
-    // if implicit mapping is in play, output will be multi-valued but may still use SINGLE_INPUT_MAPPABLE optimization
-    if (ExpressionPlan.is(traits, ExpressionPlan.Trait.NEEDS_APPLIED)) {
-      traits.add(ExpressionPlan.Trait.NON_SCALAR_OUTPUT);
-    }
-
     // vectorized expressions do not support incomplete, multi-valued inputs or outputs, or implicit mapping
-    // they also do support unknown inputs, but they also do not currently have to deal with them, as missing
+    // they also do not support unknown inputs, but they also do not currently have to deal with them, as missing
     // capabilites is indicative of a non-existent column instead of an unknown schema. If this ever changes,
     // this check should also change
     boolean supportsVector = ExpressionPlan.none(
@@ -194,7 +185,9 @@
       outputType = expression.getOutputType(inspector);
       traits.add(ExpressionPlan.Trait.VECTORIZABLE);
     }
+
     return new ExpressionPlan(
+        inspector,
         expression,
         analysis,
         traits,
diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
index fc6ddff..d32d689 100644
--- a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
+++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionSelectors.java
@@ -184,10 +184,10 @@
   {
     final ExpressionPlan plan = ExpressionPlanner.plan(columnSelectorFactory, expression);
 
-    if (plan.is(ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE)) {
+    if (plan.any(ExpressionPlan.Trait.SINGLE_INPUT_SCALAR, ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE)) {
       final String column = plan.getSingleInputName();
       if (plan.getSingleInputType() == ValueType.STRING) {
-        return new SingleStringInputDimensionSelector(
+        return new SingleStringInputDeferredEvaluationExpressionDimensionSelector(
             columnSelectorFactory.makeDimensionSelector(DefaultDimensionSpec.of(column)),
             expression
         );
diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java
index b6ef0e8..95e1f6c 100644
--- a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java
+++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVectorSelectors.java
@@ -23,6 +23,7 @@
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprType;
 import org.apache.druid.math.expr.vector.ExprVectorProcessor;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
 import org.apache.druid.query.expression.ExprUtils;
 import org.apache.druid.segment.column.ColumnCapabilities;
 import org.apache.druid.segment.column.ValueType;
@@ -54,7 +55,13 @@
       String constant = plan.getExpression().eval(ExprUtils.nilBindings()).asString();
       return ConstantVectorSelectors.singleValueDimensionVectorSelector(factory.getReadableVectorInspector(), constant);
     }
-    throw new IllegalStateException("Only constant expressions currently support dimension selectors");
+    if (plan.is(ExpressionPlan.Trait.SINGLE_INPUT_SCALAR) && ExprType.STRING == plan.getOutputType()) {
+      return new SingleStringInputDeferredEvaluationExpressionDimensionVectorSelector(
+          factory.makeSingleValueDimensionSelector(DefaultDimensionSpec.of(plan.getSingleInputName())),
+          plan.getExpression()
+      );
+    }
+    throw new IllegalStateException("Only constant and single input string expressions currently support dictionary encoded selectors");
   }
 
   public static VectorValueSelector makeVectorValueSelector(
diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java
index c0de087..c0a5d41 100644
--- a/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java
+++ b/processing/src/main/java/org/apache/druid/segment/virtual/ExpressionVirtualColumn.java
@@ -30,7 +30,6 @@
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.math.expr.Expr;
 import org.apache.druid.math.expr.ExprMacroTable;
-import org.apache.druid.math.expr.ExprType;
 import org.apache.druid.math.expr.Parser;
 import org.apache.druid.query.cache.CacheKeyBuilder;
 import org.apache.druid.query.dimension.DimensionSpec;
@@ -191,59 +190,33 @@
   public ColumnCapabilities capabilities(ColumnInspector inspector, String columnName)
   {
     final ExpressionPlan plan = ExpressionPlanner.plan(inspector, parsedExpression.get());
-
-    if (plan.getOutputType() != null) {
-
-      final ExprType inferredOutputType = plan.getOutputType();
-      if (outputType != null && ExprType.fromValueType(outputType) != inferredOutputType) {
-        log.warn(
-            "Projected output type %s of expression %s does not match provided type %s",
-            plan.getOutputType(),
-            expression,
-            outputType
-        );
-      }
-      final ValueType valueType = ExprType.toValueType(inferredOutputType);
-
-      if (valueType.isNumeric()) {
-        // if float was explicitly specified preserve it, because it will currently never be the computed output type
-        if (ValueType.FLOAT == outputType) {
-          return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.FLOAT);
+    final ColumnCapabilities inferred = plan.inferColumnCapabilities(outputType);
+    // if we can infer the column capabilities from the expression plan, then use that
+    if (inferred != null) {
+      // explicit outputType is used as a hint, how did it compare to the planners inferred output type?
+      if (inferred.getType() != outputType && outputType != null) {
+        // if both sides are numeric, let it slide and log at debug level
+        // but mismatches involving strings and arrays might be worth knowing about so warn
+        if (!inferred.getType().isNumeric() && !outputType.isNumeric()) {
+          log.warn(
+              "Projected output type %s of expression %s does not match provided type %s",
+              inferred.getType(),
+              expression,
+              outputType
+          );
+        } else {
+          log.debug(
+              "Projected output type %s of expression %s does not match provided type %s",
+              inferred.getType(),
+              expression,
+              outputType
+          );
         }
-        return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(valueType);
       }
-
-      // null constants can sometimes trip up the type inference to report STRING, so check if explicitly supplied
-      // output type is numeric and stick with that if so
-      if (outputType != null && outputType.isNumeric()) {
-        return ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(outputType);
-      }
-
-      // array types shouldn't escape the expression system currently, so coerce anything past this point into some
-      // style of string
-
-      // we don't have to check for unknown input here because output type is unable to be inferred if we don't know
-      // the complete set of input types
-      if (plan.any(ExpressionPlan.Trait.NON_SCALAR_OUTPUT, ExpressionPlan.Trait.NEEDS_APPLIED)) {
-        // always a multi-value string since wider engine does not yet support array types
-        return new ColumnCapabilitiesImpl().setType(ValueType.STRING).setHasMultipleValues(true);
-      }
-
-      // constant strings are supported as dimension selectors, set them as dictionary encoded and unique
-      if (plan.isConstant()) {
-        return new ColumnCapabilitiesImpl().setType(ValueType.STRING)
-                                           .setDictionaryEncoded(true)
-                                           .setDictionaryValuesUnique(true)
-                                           .setDictionaryValuesSorted(true)
-                                           .setHasMultipleValues(false);
-      }
-
-      // if we got here, lets call it single value string output, non-dictionary encoded
-      return new ColumnCapabilitiesImpl().setType(ValueType.STRING)
-                                         .setHasMultipleValues(false)
-                                         .setDictionaryEncoded(false);
+      return inferred;
     }
-    // fallback to
+
+    // fallback to default capabilities
     return capabilities(columnName);
   }
 
diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/SingleStringInputDimensionSelector.java b/processing/src/main/java/org/apache/druid/segment/virtual/SingleStringInputDeferredEvaluationExpressionDimensionSelector.java
similarity index 81%
rename from processing/src/main/java/org/apache/druid/segment/virtual/SingleStringInputDimensionSelector.java
rename to processing/src/main/java/org/apache/druid/segment/virtual/SingleStringInputDeferredEvaluationExpressionDimensionSelector.java
index 0a6cee8..f4f5838 100644
--- a/processing/src/main/java/org/apache/druid/segment/virtual/SingleStringInputDimensionSelector.java
+++ b/processing/src/main/java/org/apache/druid/segment/virtual/SingleStringInputDeferredEvaluationExpressionDimensionSelector.java
@@ -34,16 +34,23 @@
 import javax.annotation.Nullable;
 
 /**
- * A DimensionSelector decorator that computes an expression on top of it. See {@link ExpressionSelectors} for details
- * on how expression selectors are constructed.
+ * A {@link DimensionSelector} decorator that directly exposes the underlying dictionary id in {@link #getRow},
+ * saving expression computation until {@link #lookupName} is called. This allows for performing operations like
+ * grouping on the native dictionary ids, and deferring expression evaluation until after which can dramatically
+ * reduce the total number of evaluations.
+ *
+ * @see ExpressionSelectors for details on how expression selectors are constructed.
+ *
+ * @see SingleStringInputDeferredEvaluationExpressionDimensionVectorSelector for the vectorized version of
+ * this selector.
  */
-public class SingleStringInputDimensionSelector implements DimensionSelector
+public class SingleStringInputDeferredEvaluationExpressionDimensionSelector implements DimensionSelector
 {
   private final DimensionSelector selector;
   private final Expr expression;
   private final SingleInputBindings bindings = new SingleInputBindings();
 
-  public SingleStringInputDimensionSelector(
+  public SingleStringInputDeferredEvaluationExpressionDimensionSelector(
       final DimensionSelector selector,
       final Expr expression
   )
diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/SingleStringInputDeferredEvaluationExpressionDimensionVectorSelector.java b/processing/src/main/java/org/apache/druid/segment/virtual/SingleStringInputDeferredEvaluationExpressionDimensionVectorSelector.java
new file mode 100644
index 0000000..56813a4
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/virtual/SingleStringInputDeferredEvaluationExpressionDimensionVectorSelector.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.virtual;
+
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.vector.ExprVectorProcessor;
+import org.apache.druid.segment.DimensionDictionarySelector;
+import org.apache.druid.segment.IdLookup;
+import org.apache.druid.segment.vector.SingleValueDimensionVectorSelector;
+
+import javax.annotation.Nullable;
+
+/**
+ * A {@link SingleValueDimensionVectorSelector} decorator that directly exposes the underlying dictionary ids in
+ * {@link #getRowVector}, saving expression computation until {@link #lookupName} is called. This allows for
+ * performing operations like grouping on the native dictionary ids, and deferring expression evaluation until
+ * after, which can dramatically reduce the total number of evaluations.
+ *
+ * @see ExpressionVectorSelectors for details on how expression vector selectors are constructed.
+ *
+ * @see SingleStringInputDeferredEvaluationExpressionDimensionSelector for the non-vectorized version of this selector.
+ */
+public class SingleStringInputDeferredEvaluationExpressionDimensionVectorSelector
+    implements SingleValueDimensionVectorSelector
+{
+  private final SingleValueDimensionVectorSelector selector;
+  private final ExprVectorProcessor<String[]> stringProcessor;
+  private final StringLookupVectorInputBindings inputBinding;
+
+  public SingleStringInputDeferredEvaluationExpressionDimensionVectorSelector(
+      SingleValueDimensionVectorSelector selector,
+      Expr expression
+  )
+  {
+    // Verify selector has a working dictionary.
+    if (selector.getValueCardinality() == DimensionDictionarySelector.CARDINALITY_UNKNOWN
+        || !selector.nameLookupPossibleInAdvance()) {
+      throw new ISE(
+          "Selector of class[%s] does not have a dictionary, cannot use it.",
+          selector.getClass().getName()
+      );
+    }
+    this.selector = selector;
+    this.inputBinding = new StringLookupVectorInputBindings();
+    this.stringProcessor = expression.buildVectorized(inputBinding);
+  }
+
+  @Override
+  public int getValueCardinality()
+  {
+    return CARDINALITY_UNKNOWN;
+  }
+
+  @Nullable
+  @Override
+  public String lookupName(int id)
+  {
+    inputBinding.currentValue[0] = selector.lookupName(id);
+    return stringProcessor.evalVector(inputBinding).values()[0];
+  }
+
+  @Override
+  public boolean nameLookupPossibleInAdvance()
+  {
+    return true;
+  }
+
+  @Nullable
+  @Override
+  public IdLookup idLookup()
+  {
+    return null;
+  }
+
+  @Override
+  public int[] getRowVector()
+  {
+    return selector.getRowVector();
+  }
+
+  @Override
+  public int getMaxVectorSize()
+  {
+    return selector.getMaxVectorSize();
+  }
+
+  @Override
+  public int getCurrentVectorSize()
+  {
+    return selector.getCurrentVectorSize();
+  }
+
+  /**
+   * Special single element vector input bindings used for processing the string value for {@link #lookupName(int)}
+   *
+   * Vector size is fixed to 1 because {@link #lookupName} operates on a single dictionary value at a time. If a
+   * bulk lookup method is ever added, these vector bindings should be modified to process the results with actual
+   * vectors.
+   */
+  private static final class StringLookupVectorInputBindings implements Expr.VectorInputBinding
+  {
+    private final String[] currentValue = new String[1];
+
+    @Nullable
+    @Override
+    public ExprType getType(String name)
+    {
+      return ExprType.STRING;
+    }
+
+    @Override
+    public int getMaxVectorSize()
+    {
+      return 1;
+    }
+
+    @Override
+    public int getCurrentVectorSize()
+    {
+      return 1;
+    }
+
+    @Override
+    public int getCurrentVectorId()
+    {
+      return -1;
+    }
+
+    @Override
+    public <T> T[] getObjectVector(String name)
+    {
+      return (T[]) currentValue;
+    }
+
+    @Override
+    public long[] getLongVector(String name)
+    {
+      throw new UnsupportedOperationException("attempt to get long[] from string[] only scalar binding");
+    }
+
+    @Override
+    public double[] getDoubleVector(String name)
+    {
+      throw new UnsupportedOperationException("attempt to get double[] from string[] only scalar binding");
+    }
+
+    @Nullable
+    @Override
+    public boolean[] getNullVector(String name)
+    {
+      throw new UnsupportedOperationException("attempt to get boolean[] null vector from string[] only scalar binding");
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/VirtualizedColumnInspector.java b/processing/src/main/java/org/apache/druid/segment/virtual/VirtualizedColumnInspector.java
new file mode 100644
index 0000000..bba7f76
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/segment/virtual/VirtualizedColumnInspector.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.virtual;
+
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.VirtualColumns;
+import org.apache.druid.segment.column.ColumnCapabilities;
+
+import javax.annotation.Nullable;
+
+/**
+ * Provides {@link ColumnCapabilities} for both virtual and non-virtual columns by building on top of another base
+ * {@link ColumnInspector}.
+ *
+ * {@link VirtualColumns} are provided with the base inspector so that they may potentially infer output types to
+ * construct the appropriate capabilities for virtual columns, while the base inspector directly supplies the
+ * capabilities for non-virtual columns.
+ */
+public class VirtualizedColumnInspector implements ColumnInspector
+{
+  protected final VirtualColumns virtualColumns;
+  protected final ColumnInspector baseInspector;
+
+  public VirtualizedColumnInspector(
+      ColumnInspector baseInspector,
+      VirtualColumns virtualColumns
+  )
+  {
+    this.virtualColumns = virtualColumns;
+    this.baseInspector = baseInspector;
+  }
+
+  @Nullable
+  @Override
+  public ColumnCapabilities getColumnCapabilities(String columnName)
+  {
+    if (virtualColumns.exists(columnName)) {
+      return virtualColumns.getColumnCapabilities(baseInspector, columnName);
+    } else {
+      return baseInspector.getColumnCapabilities(columnName);
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/segment/virtual/VirtualizedColumnSelectorFactory.java b/processing/src/main/java/org/apache/druid/segment/virtual/VirtualizedColumnSelectorFactory.java
index d37de2e..15fce04 100644
--- a/processing/src/main/java/org/apache/druid/segment/virtual/VirtualizedColumnSelectorFactory.java
+++ b/processing/src/main/java/org/apache/druid/segment/virtual/VirtualizedColumnSelectorFactory.java
@@ -25,22 +25,21 @@
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.DimensionSelector;
 import org.apache.druid.segment.VirtualColumns;
-import org.apache.druid.segment.column.ColumnCapabilities;
 
-import javax.annotation.Nullable;
-
-public class VirtualizedColumnSelectorFactory implements ColumnSelectorFactory
+/**
+ * {@link ColumnSelectorFactory} which can create selectors for both virtual and non-virtual columns
+ */
+public class VirtualizedColumnSelectorFactory extends VirtualizedColumnInspector implements ColumnSelectorFactory
 {
   private final ColumnSelectorFactory baseFactory;
-  private final VirtualColumns virtualColumns;
 
   public VirtualizedColumnSelectorFactory(
       ColumnSelectorFactory baseFactory,
       VirtualColumns virtualColumns
   )
   {
+    super(baseFactory, virtualColumns);
     this.baseFactory = Preconditions.checkNotNull(baseFactory, "baseFactory");
-    this.virtualColumns = Preconditions.checkNotNull(virtualColumns, "virtualColumns");
   }
 
   @Override
@@ -62,15 +61,4 @@
       return baseFactory.makeColumnValueSelector(columnName);
     }
   }
-
-  @Nullable
-  @Override
-  public ColumnCapabilities getColumnCapabilities(String columnName)
-  {
-    if (virtualColumns.exists(columnName)) {
-      return virtualColumns.getColumnCapabilities(baseFactory, columnName);
-    } else {
-      return baseFactory.getColumnCapabilities(columnName);
-    }
-  }
 }
diff --git a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionPlannerTest.java b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionPlannerTest.java
new file mode 100644
index 0000000..c6d273e
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionPlannerTest.java
@@ -0,0 +1,863 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.virtual;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.math.expr.ExprType;
+import org.apache.druid.math.expr.Parser;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.ColumnInspector;
+import org.apache.druid.segment.column.ColumnCapabilities;
+import org.apache.druid.segment.column.ColumnCapabilitiesImpl;
+import org.apache.druid.segment.column.ValueType;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class ExpressionPlannerTest extends InitializedNullHandlingTest
+{
+  public static final ColumnInspector SYNTHETIC_INSPECTOR = new ColumnInspector()
+  {
+    private final Map<String, ColumnCapabilities> capabilitiesMap =
+        ImmutableMap.<String, ColumnCapabilities>builder()
+                    .put(
+                        "long1",
+                        ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.LONG)
+                    )
+                    .put(
+                        "long2",
+                        ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.LONG)
+                    )
+                    .put(
+                        "float1",
+                        ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.FLOAT)
+                    )
+                    .put(
+                        "float2",
+                        ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.FLOAT)
+                    )
+                    .put(
+                        "double1",
+                        ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE)
+                    )
+                    .put(
+                        "double2",
+                        ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(ValueType.DOUBLE)
+                    )
+                    .put(
+                        "scalar_string",
+                        ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
+                    )
+                    .put(
+                        // segment style single value dictionary encoded with unique sorted dictionary
+                        "scalar_dictionary_string",
+                        new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+                                                    .setDictionaryEncoded(true)
+                                                    .setHasBitmapIndexes(true)
+                                                    .setDictionaryValuesSorted(true)
+                                                    .setDictionaryValuesUnique(true)
+                                                    .setHasMultipleValues(false)
+                    )
+                    .put(
+                        // dictionary encoded but not unique or sorted, maybe an indexed table from a join result
+                        "scalar_dictionary_string_nonunique",
+                        new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+                                                    .setDictionaryEncoded(true)
+                                                    .setHasBitmapIndexes(false)
+                                                    .setDictionaryValuesSorted(false)
+                                                    .setDictionaryValuesUnique(false)
+                                                    .setHasMultipleValues(false)
+                    )
+                    .put(
+                        // string with unknown multi-valuedness
+                        "string_unknown",
+                        new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+                    )
+                    .put(
+                        // dictionary encoded multi valued string dimension
+                        "multi_dictionary_string",
+                        new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+                                                    .setDictionaryEncoded(true)
+                                                    .setHasBitmapIndexes(true)
+                                                    .setDictionaryValuesUnique(true)
+                                                    .setDictionaryValuesSorted(true)
+                                                    .setHasMultipleValues(true)
+                    )
+                    .put(
+                        // simple multi valued string dimension unsorted
+                        "multi_dictionary_string_nonunique",
+                        new ColumnCapabilitiesImpl().setType(ValueType.STRING)
+                                                    .setDictionaryEncoded(false)
+                                                    .setHasBitmapIndexes(false)
+                                                    .setDictionaryValuesUnique(false)
+                                                    .setDictionaryValuesSorted(false)
+                                                    .setHasMultipleValues(true)
+                    )
+                    .put(
+                        "string_array_1",
+                        ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(ValueType.STRING_ARRAY)
+                    )
+                    .put(
+                        "string_array_2",
+                        ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(ValueType.STRING_ARRAY)
+                    )
+                    .put(
+                        "long_array_1",
+                        ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(ValueType.LONG_ARRAY)
+                    )
+                    .put(
+                        "long_array_2",
+                        ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(ValueType.LONG_ARRAY)
+                    )
+                    .put(
+                        "double_array_1",
+                        ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(ValueType.DOUBLE_ARRAY)
+                    )
+                    .put(
+                        "double_array_2",
+                        ColumnCapabilitiesImpl.createSimpleArrayColumnCapabilities(ValueType.DOUBLE_ARRAY)
+                    )
+                    .build();
+
+    @Nullable
+    @Override
+    public ColumnCapabilities getColumnCapabilities(String column)
+    {
+      return capabilitiesMap.get(column);
+    }
+  };
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testUnknown()
+  {
+    // column has no capabilities
+    // the vectorize query engine contracts is such that the lack of column capabilities is indicative of a nil column
+    // so this is vectorizable
+    // for non-vectorized expression processing, this will probably end up using a selector that examines inputs on a
+    // row by row basis to determine if the expression needs applied to multi-valued inputs
+
+    ExpressionPlan thePlan = plan("concat(x, 'x')");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT,
+            ExpressionPlan.Trait.CONSTANT
+        )
+    );
+    // this expression has no "unapplied bindings", nothing to apply
+    Assert.assertEquals("concat(\"x\", 'x')", thePlan.getAppliedExpression().stringify());
+    Assert.assertEquals("concat(\"x\", 'x')", thePlan.getAppliedFoldExpression("__acc").stringify());
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+    ColumnCapabilities inferred = thePlan.inferColumnCapabilities(null);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.STRING, inferred.getType());
+    Assert.assertNull(inferred.getComplexTypeName());
+    Assert.assertTrue(inferred.hasNulls().isTrue());
+    Assert.assertFalse(inferred.isDictionaryEncoded().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesSorted().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesUnique().isMaybeTrue());
+    Assert.assertFalse(inferred.hasMultipleValues().isMaybeTrue());
+    Assert.assertFalse(inferred.hasBitmapIndexes());
+    Assert.assertFalse(inferred.hasSpatialIndexes());
+
+    // what if both inputs are unknown, can we know things?
+    thePlan = plan("x * y");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.UNKNOWN_INPUTS
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.VECTORIZABLE,
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT,
+            ExpressionPlan.Trait.CONSTANT
+        )
+    );
+
+    Assert.assertEquals("(\"x\" * \"y\")", thePlan.getAppliedExpression().stringify());
+    Assert.assertEquals("(\"x\" * \"y\")", thePlan.getAppliedFoldExpression("__acc").stringify());
+    Assert.assertNull(thePlan.getOutputType());
+    Assert.assertNull(thePlan.inferColumnCapabilities(null));
+    // no we cannot
+  }
+
+  @Test
+  public void testScalarStringNondictionaryEncoded()
+  {
+    ExpressionPlan thePlan = plan("concat(scalar_string, 'x')");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT
+        )
+    );
+    Assert.assertEquals("concat(\"scalar_string\", 'x')", thePlan.getAppliedExpression().stringify());
+    Assert.assertEquals("concat(\"scalar_string\", 'x')", thePlan.getAppliedFoldExpression("__acc").stringify());
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+    ColumnCapabilities inferred = thePlan.inferColumnCapabilities(null);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.STRING, inferred.getType());
+    Assert.assertNull(inferred.getComplexTypeName());
+    Assert.assertTrue(inferred.hasNulls().isTrue());
+    Assert.assertFalse(inferred.isDictionaryEncoded().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesSorted().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesUnique().isMaybeTrue());
+    Assert.assertFalse(inferred.hasMultipleValues().isMaybeTrue());
+    Assert.assertFalse(inferred.hasBitmapIndexes());
+    Assert.assertFalse(inferred.hasSpatialIndexes());
+  }
+
+  @Test
+  public void testScalarNumeric()
+  {
+    ExpressionPlan thePlan = plan("long1 + 5");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT
+        )
+    );
+    Assert.assertEquals("(\"long1\" + 5)", thePlan.getAppliedExpression().stringify());
+    Assert.assertEquals("(\"long1\" + 5)", thePlan.getAppliedFoldExpression("__acc").stringify());
+    Assert.assertEquals("(\"long1\" + 5)", thePlan.getAppliedFoldExpression("long1").stringify());
+    Assert.assertEquals(ExprType.LONG, thePlan.getOutputType());
+    ColumnCapabilities inferred = thePlan.inferColumnCapabilities(null);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.LONG, inferred.getType());
+    Assert.assertNull(inferred.getComplexTypeName());
+    if (NullHandling.sqlCompatible()) {
+      Assert.assertTrue(inferred.hasNulls().isMaybeTrue());
+    } else {
+      Assert.assertFalse(inferred.hasNulls().isMaybeTrue());
+    }
+    Assert.assertFalse(inferred.isDictionaryEncoded().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesSorted().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesUnique().isMaybeTrue());
+    Assert.assertFalse(inferred.hasMultipleValues().isMaybeTrue());
+    Assert.assertFalse(inferred.hasBitmapIndexes());
+    Assert.assertFalse(inferred.hasSpatialIndexes());
+
+    thePlan = plan("long1 + 5.0");
+    Assert.assertEquals(ExprType.DOUBLE, thePlan.getOutputType());
+
+    thePlan = plan("double1 * double2");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT
+        )
+    );
+    Assert.assertEquals("(\"double1\" * \"double2\")", thePlan.getAppliedExpression().stringify());
+    Assert.assertEquals("(\"double1\" * \"double2\")", thePlan.getAppliedFoldExpression("__acc").stringify());
+    Assert.assertEquals("(\"double1\" * \"double2\")", thePlan.getAppliedFoldExpression("double1").stringify());
+    Assert.assertEquals(ExprType.DOUBLE, thePlan.getOutputType());
+    inferred = thePlan.inferColumnCapabilities(null);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.DOUBLE, inferred.getType());
+    Assert.assertNull(inferred.getComplexTypeName());
+    if (NullHandling.sqlCompatible()) {
+      Assert.assertTrue(inferred.hasNulls().isMaybeTrue());
+    } else {
+      Assert.assertFalse(inferred.hasNulls().isMaybeTrue());
+    }
+    Assert.assertFalse(inferred.isDictionaryEncoded().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesSorted().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesUnique().isMaybeTrue());
+    Assert.assertFalse(inferred.hasMultipleValues().isMaybeTrue());
+    Assert.assertFalse(inferred.hasBitmapIndexes());
+    Assert.assertFalse(inferred.hasSpatialIndexes());
+  }
+
+  @Test
+  public void testScalarStringDictionaryEncoded()
+  {
+    ExpressionPlan thePlan = plan("concat(scalar_dictionary_string, 'x')");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT
+        )
+    );
+    Assert.assertEquals("concat(\"scalar_dictionary_string\", 'x')", thePlan.getAppliedExpression().stringify());
+    Assert.assertEquals(
+        "concat(\"scalar_dictionary_string\", 'x')",
+        thePlan.getAppliedFoldExpression("__acc").stringify()
+    );
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+    ColumnCapabilities inferred = thePlan.inferColumnCapabilities(null);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.STRING, inferred.getType());
+    Assert.assertNull(inferred.getComplexTypeName());
+    Assert.assertTrue(inferred.hasNulls().isTrue());
+    Assert.assertTrue(inferred.isDictionaryEncoded().isTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesSorted().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesUnique().isMaybeTrue());
+    Assert.assertFalse(inferred.hasMultipleValues().isMaybeTrue());
+    Assert.assertTrue(inferred.hasBitmapIndexes());
+    Assert.assertFalse(inferred.hasSpatialIndexes());
+
+    // multiple input columns
+    thePlan = plan("concat(scalar_dictionary_string, scalar_dictionary_string_nonunique)");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT
+        )
+    );
+    Assert.assertEquals(
+        "concat(\"scalar_dictionary_string\", \"scalar_dictionary_string_nonunique\")",
+        thePlan.getAppliedExpression().stringify()
+    );
+    Assert.assertEquals(
+        "concat(\"scalar_dictionary_string\", \"scalar_dictionary_string_nonunique\")",
+        thePlan.getAppliedFoldExpression("__acc").stringify()
+    );
+    // what if scalar_dictionary_string_nonunique is an accumulator instead? nope, still no NEEDS_APPLIED so nothing to do
+    Assert.assertEquals(
+        "concat(\"scalar_dictionary_string\", \"scalar_dictionary_string_nonunique\")",
+        thePlan.getAppliedFoldExpression("scalar_dictionary_string_nonunique").stringify()
+    );
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+    inferred = thePlan.inferColumnCapabilities(null);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.STRING, inferred.getType());
+    Assert.assertNull(inferred.getComplexTypeName());
+    Assert.assertTrue(inferred.hasNulls().isTrue());
+    Assert.assertFalse(inferred.isDictionaryEncoded().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesSorted().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesUnique().isMaybeTrue());
+    Assert.assertFalse(inferred.hasMultipleValues().isMaybeTrue());
+    Assert.assertFalse(inferred.hasBitmapIndexes());
+    Assert.assertFalse(inferred.hasSpatialIndexes());
+
+    // array output of dictionary encoded string are not considered single scalar/mappable, nor vectorizable
+    thePlan = plan("array(scalar_dictionary_string)");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+  }
+
+  @Test
+  public void testMultiValueStringDictionaryEncoded()
+  {
+    ExpressionPlan thePlan = plan("concat(multi_dictionary_string, 'x')");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+    ColumnCapabilities inferred = thePlan.inferColumnCapabilities(null);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.STRING, inferred.getType());
+    Assert.assertNull(inferred.getComplexTypeName());
+    Assert.assertTrue(inferred.hasNulls().isMaybeTrue());
+    Assert.assertTrue(inferred.isDictionaryEncoded().isTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesSorted().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesUnique().isMaybeTrue());
+    Assert.assertTrue(inferred.hasMultipleValues().isTrue());
+    Assert.assertTrue(inferred.hasBitmapIndexes());
+    Assert.assertFalse(inferred.hasSpatialIndexes());
+
+    thePlan = plan("concat(scalar_string, multi_dictionary_string_nonunique)");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.NEEDS_APPLIED
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    Assert.assertEquals(
+        "map((\"multi_dictionary_string_nonunique\") -> concat(\"scalar_string\", \"multi_dictionary_string_nonunique\"), \"multi_dictionary_string_nonunique\")",
+        thePlan.getAppliedExpression().stringify()
+    );
+    Assert.assertEquals(
+        "fold((\"multi_dictionary_string_nonunique\", \"scalar_string\") -> concat(\"scalar_string\", \"multi_dictionary_string_nonunique\"), \"multi_dictionary_string_nonunique\", \"scalar_string\")",
+        thePlan.getAppliedFoldExpression("scalar_string").stringify()
+    );
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+    inferred = thePlan.inferColumnCapabilities(null);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.STRING, inferred.getType());
+    Assert.assertTrue(inferred.hasMultipleValues().isTrue());
+
+    thePlan = plan("concat(multi_dictionary_string, multi_dictionary_string_nonunique)");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.NEEDS_APPLIED
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+    // whoa
+    Assert.assertEquals(
+        "cartesian_map((\"multi_dictionary_string\", \"multi_dictionary_string_nonunique\") -> concat(\"multi_dictionary_string\", \"multi_dictionary_string_nonunique\"), \"multi_dictionary_string\", \"multi_dictionary_string_nonunique\")",
+        thePlan.getAppliedExpression().stringify()
+    );
+    // sort of funny, but technically correct
+    Assert.assertEquals(
+        "cartesian_fold((\"multi_dictionary_string\", \"multi_dictionary_string_nonunique\", \"__acc\") -> concat(\"multi_dictionary_string\", \"multi_dictionary_string_nonunique\"), \"multi_dictionary_string\", \"multi_dictionary_string_nonunique\", \"__acc\")",
+        thePlan.getAppliedFoldExpression("__acc").stringify()
+    );
+    inferred = thePlan.inferColumnCapabilities(null);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.STRING, inferred.getType());
+    Assert.assertTrue(inferred.hasMultipleValues().isTrue());
+
+    thePlan = plan("array_append(multi_dictionary_string, 'foo')");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+  }
+
+  @Test
+  public void testMultiValueStringDictionaryEncodedIllegalAccumulator()
+  {
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage(
+        "Accumulator cannot be implicitly transformed, if it is an ARRAY or multi-valued type it must be used explicitly as such"
+    );
+    ExpressionPlan thePlan = plan("concat(multi_dictionary_string, 'x')");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+
+    thePlan = plan("concat(multi_dictionary_string, multi_dictionary_string_nonunique)");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.NEEDS_APPLIED
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    // what happens if we try to use a multi-valued input that was not explicitly used as multi-valued as the
+    // accumulator?
+    thePlan.getAppliedFoldExpression("multi_dictionary_string");
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+  }
+
+  @Test
+  public void testIncompleteString()
+  {
+    ExpressionPlan thePlan = plan("concat(string_unknown, 'x')");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    // incomplete inputs are not transformed either, rather this will need to be detected and handled on a row-by-row
+    // basis
+    Assert.assertEquals("concat(\"string_unknown\", 'x')", thePlan.getAppliedExpression().stringify());
+    Assert.assertEquals("concat(\"string_unknown\", 'x')", thePlan.getAppliedFoldExpression("__acc").stringify());
+    // incomplete and unknown skip output type since we don't reliably know
+    Assert.assertNull(thePlan.getOutputType());
+    Assert.assertNull(thePlan.inferColumnCapabilities(null));
+  }
+
+  @Test
+  public void testArrayOutput()
+  {
+    // its ok to use scalar inputs to array expressions, string columns cant help it if sometimes they are single
+    // valued and sometimes they are multi-valued
+    ExpressionPlan thePlan = plan("array_append(scalar_string, 'x')");
+    assertArrayInAndOut(thePlan);
+    // with a string hint, it should look like a multi-valued string
+    ColumnCapabilities inferred = thePlan.inferColumnCapabilities(ValueType.STRING);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.STRING, inferred.getType());
+    Assert.assertNull(inferred.getComplexTypeName());
+    Assert.assertTrue(inferred.hasNulls().isMaybeTrue());
+    Assert.assertFalse(inferred.isDictionaryEncoded().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesSorted().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesUnique().isMaybeTrue());
+    Assert.assertTrue(inferred.hasMultipleValues().isTrue());
+    Assert.assertFalse(inferred.hasBitmapIndexes());
+    Assert.assertFalse(inferred.hasSpatialIndexes());
+    // with no hint though, let the array free
+    inferred = thePlan.inferColumnCapabilities(ValueType.STRING_ARRAY);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.STRING_ARRAY, inferred.getType());
+    Assert.assertNull(inferred.getComplexTypeName());
+    Assert.assertTrue(inferred.hasNulls().isMaybeTrue());
+    Assert.assertFalse(inferred.isDictionaryEncoded().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesSorted().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesUnique().isMaybeTrue());
+    Assert.assertTrue(inferred.hasMultipleValues().isTrue());
+    Assert.assertFalse(inferred.hasBitmapIndexes());
+    Assert.assertFalse(inferred.hasSpatialIndexes());
+
+    Assert.assertEquals("array_append(\"scalar_string\", 'x')", thePlan.getAppliedExpression().stringify());
+    Assert.assertEquals("array_append(\"scalar_string\", 'x')", thePlan.getAppliedFoldExpression("__acc").stringify());
+    Assert.assertEquals(ExprType.STRING_ARRAY, thePlan.getOutputType());
+
+    // multi-valued are cool too
+    thePlan = plan("array_append(multi_dictionary_string, 'x')");
+    assertArrayInAndOut(thePlan);
+
+    // what about incomplete inputs with arrays? they are not reported as incomplete because they are treated as arrays
+    thePlan = plan("array_append(string_unknown, 'x')");
+    assertArrayInAndOut(thePlan);
+    Assert.assertEquals(ExprType.STRING_ARRAY, thePlan.getOutputType());
+
+    // what about if it is the scalar argument? there it is
+    thePlan = plan("array_append(multi_dictionary_string, string_unknown)");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    // incomplete and unknown skip output type since we don't reliably know
+    Assert.assertNull(thePlan.getOutputType());
+
+    // array types are cool too
+    thePlan = plan("array_append(string_array_1, 'x')");
+    assertArrayInAndOut(thePlan);
+
+    thePlan = plan("array_append(string_array_1, 'x')");
+    assertArrayInAndOut(thePlan);
+  }
+
+
+  @Test
+  public void testScalarOutputMultiValueInput()
+  {
+    ExpressionPlan thePlan = plan("array_to_string(array_append(scalar_string, 'x'), ',')");
+    assertArrayInput(thePlan);
+    ColumnCapabilities inferred = thePlan.inferColumnCapabilities(ValueType.STRING);
+    Assert.assertNotNull(inferred);
+    Assert.assertEquals(ValueType.STRING, inferred.getType());
+    Assert.assertNull(inferred.getComplexTypeName());
+    Assert.assertTrue(inferred.hasNulls().isTrue());
+    Assert.assertFalse(inferred.isDictionaryEncoded().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesSorted().isMaybeTrue());
+    Assert.assertFalse(inferred.areDictionaryValuesUnique().isMaybeTrue());
+    Assert.assertFalse(inferred.hasMultipleValues().isMaybeTrue());
+    Assert.assertFalse(inferred.hasBitmapIndexes());
+    Assert.assertFalse(inferred.hasSpatialIndexes());
+
+    Assert.assertEquals(
+        "array_to_string(array_append(\"scalar_string\", 'x'), ',')",
+        thePlan.getAppliedExpression().stringify()
+    );
+    Assert.assertEquals(
+        "array_to_string(array_append(\"scalar_string\", 'x'), ',')",
+        thePlan.getAppliedFoldExpression("__acc").stringify()
+    );
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+
+    // what about a multi-valued input
+    thePlan = plan("array_to_string(array_append(scalar_string, multi_dictionary_string), ',')");
+    assertArrayInput(thePlan);
+
+    Assert.assertEquals(
+        "array_to_string(map((\"multi_dictionary_string\") -> array_append(\"scalar_string\", \"multi_dictionary_string\"), \"multi_dictionary_string\"), ',')",
+        thePlan.getAppliedExpression().stringify()
+    );
+    Assert.assertEquals(
+        "array_to_string(fold((\"multi_dictionary_string\", \"scalar_string\") -> array_append(\"scalar_string\", \"multi_dictionary_string\"), \"multi_dictionary_string\", \"scalar_string\"), ',')",
+        thePlan.getAppliedFoldExpression("scalar_string").stringify()
+    );
+    // why is this null
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+  }
+
+  @Test
+  public void testScalarOutputArrayInput()
+  {
+    ExpressionPlan thePlan = plan("array_to_string(array_append(string_array_1, 'x'), ',')");
+    assertArrayInput(thePlan);
+
+    Assert.assertEquals(
+        "array_to_string(array_append(\"string_array_1\", 'x'), ',')",
+        thePlan.getAppliedExpression().stringify()
+    );
+    Assert.assertEquals(
+        "array_to_string(array_append(\"string_array_1\", 'x'), ',')",
+        thePlan.getAppliedFoldExpression("__acc").stringify()
+    );
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+
+
+    thePlan = plan("array_to_string(array_concat(string_array_1, string_array_2), ',')");
+    assertArrayInput(thePlan);
+    Assert.assertEquals(ExprType.STRING, thePlan.getOutputType());
+
+    thePlan = plan("fold((x, acc) -> acc + x, array_concat(long_array_1, long_array_2), 0)");
+    assertArrayInput(thePlan);
+    Assert.assertEquals(
+        "fold((\"x\", \"acc\") -> (\"acc\" + \"x\"), array_concat(\"long_array_1\", \"long_array_2\"), 0)",
+        thePlan.getAppliedExpression().stringify()
+    );
+    Assert.assertEquals(
+        "fold((\"x\", \"acc\") -> (\"acc\" + \"x\"), array_concat(\"long_array_1\", \"long_array_2\"), 0)",
+        thePlan.getAppliedFoldExpression("__acc").stringify()
+    );
+    Assert.assertEquals(ExprType.LONG, thePlan.getOutputType());
+
+    thePlan = plan("fold((x, acc) -> acc * x, array_concat(double_array_1, double_array_2), 0.0)");
+    assertArrayInput(thePlan);
+    Assert.assertEquals(
+        "fold((\"x\", \"acc\") -> (\"acc\" * \"x\"), array_concat(\"double_array_1\", \"double_array_2\"), 0.0)",
+        thePlan.getAppliedExpression().stringify()
+    );
+    Assert.assertEquals(
+        "fold((\"x\", \"acc\") -> (\"acc\" * \"x\"), array_concat(\"double_array_1\", \"double_array_2\"), 0.0)",
+        thePlan.getAppliedFoldExpression("__acc").stringify()
+    );
+    Assert.assertEquals(ExprType.DOUBLE, thePlan.getOutputType());
+  }
+
+  @Test
+  public void testArrayConstruction()
+  {
+    ExpressionPlan thePlan = plan("array(long1, long2)");
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+    Assert.assertEquals(ExprType.LONG_ARRAY, thePlan.getOutputType());
+
+    thePlan = plan("array(long1, double1)");
+    Assert.assertEquals(ExprType.DOUBLE_ARRAY, thePlan.getOutputType());
+    thePlan = plan("array(long1, double1, scalar_string)");
+    Assert.assertEquals(ExprType.STRING_ARRAY, thePlan.getOutputType());
+  }
+
+  private static ExpressionPlan plan(String expression)
+  {
+    return ExpressionPlanner.plan(SYNTHETIC_INSPECTOR, Parser.parse(expression, TestExprMacroTable.INSTANCE));
+  }
+
+  private static void assertArrayInput(ExpressionPlan thePlan)
+  {
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT,
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+  }
+
+  private static void assertArrayInAndOut(ExpressionPlan thePlan)
+  {
+    Assert.assertTrue(
+        thePlan.is(
+            ExpressionPlan.Trait.NON_SCALAR_INPUTS,
+            ExpressionPlan.Trait.NON_SCALAR_OUTPUT
+        )
+    );
+    Assert.assertFalse(
+        thePlan.is(
+            ExpressionPlan.Trait.SINGLE_INPUT_SCALAR,
+            ExpressionPlan.Trait.SINGLE_INPUT_MAPPABLE,
+            ExpressionPlan.Trait.INCOMPLETE_INPUTS,
+            ExpressionPlan.Trait.UNKNOWN_INPUTS,
+            ExpressionPlan.Trait.NEEDS_APPLIED,
+            ExpressionPlan.Trait.VECTORIZABLE
+        )
+    );
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java
index c53676b..40955f6 100644
--- a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVectorSelectorsTest.java
@@ -88,7 +88,11 @@
       "long2",
       "float2",
       "double2",
-      "string3"
+      "string3",
+      "string1 + string3",
+      "concat(string1, string2, string3)",
+      "concat(string1, 'x')",
+      "concat(string1, nonexistent)"
   );
 
   private static final int ROWS_PER_SEGMENT = 100_000;
diff --git a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java
index aefb993..93c86fd 100644
--- a/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/virtual/ExpressionVirtualColumnTest.java
@@ -268,7 +268,7 @@
   {
     DimensionSpec spec = new DefaultDimensionSpec("expr", "expr");
 
-    // do some ugly faking to test if SingleStringInputDimensionSelector is created for multi-value expressions when possible
+    // do some ugly faking to test if SingleStringInputDeferredEvaluationExpressionDimensionSelector is created for multi-value expressions when possible
     ColumnSelectorFactory factory = new ColumnSelectorFactory()
     {
       @Override
@@ -331,7 +331,7 @@
           @Override
           public boolean nameLookupPossibleInAdvance()
           {
-            // fake this so when SingleStringInputDimensionSelector it doesn't explode
+            // fake this so when SingleStringInputDeferredEvaluationExpressionDimensionSelector it doesn't explode
             return true;
           }
 
@@ -365,7 +365,7 @@
     final BaseObjectColumnValueSelector selectorExplicit =
         SCALE_LIST_SELF_EXPLICIT.makeDimensionSelector(spec, factory);
 
-    Assert.assertTrue(selectorImplicit instanceof SingleStringInputDimensionSelector);
+    Assert.assertTrue(selectorImplicit instanceof SingleStringInputDeferredEvaluationExpressionDimensionSelector);
     Assert.assertTrue(selectorExplicit instanceof ExpressionMultiValueDimensionSelector);
   }
 
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
index db45169..3d0ec0f 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/BalancerStrategy.java
@@ -37,7 +37,7 @@
 public interface BalancerStrategy
 {
   /**
-   * Find the best server to move a {@link DataSegment} to according the the balancing strategy.
+   * Find the best server to move a {@link DataSegment} to according the balancing strategy.
    * @param proposalSegment segment to move
    * @param serverHolders servers to consider as move destinations
    * @return The server to move to, or null if no move should be made or no server is suitable
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
index adc32f6..1c3ab9c 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java
@@ -224,12 +224,15 @@
 
   // Add additional context to the given context map for when the
   // timeseries query has timestamp_floor expression on the timestamp dimension
-  public static Map<String, Object> getTimeseriesContextWithFloorTime(Map<String, Object> context,
-                                                                      String timestampResultField)
+  public static Map<String, Object> getTimeseriesContextWithFloorTime(
+      Map<String, Object> context,
+      String timestampResultField
+  )
   {
-    return ImmutableMap.<String, Object>builder().putAll(context)
-                                                .put(TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, timestampResultField)
-                                                .build();
+    return ImmutableMap.<String, Object>builder()
+                       .putAll(context)
+                       .put(TimeseriesQuery.CTX_TIMESTAMP_RESULT_FIELD, timestampResultField)
+                       .build();
   }
 
   // Matches QUERY_CONTEXT_LOS_ANGELES
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteParameterQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteParameterQueryTest.java
index c23cd7d..f5512b5 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteParameterQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteParameterQueryTest.java
@@ -650,9 +650,9 @@
   @Test
   public void testNullParameter() throws Exception
   {
+    cannotVectorize();
     // contrived example of using null as an sql parameter to at least test the codepath because lots of things dont
     // actually work as null and things like 'IS NULL' fail to parse in calcite if expressed as 'IS ?'
-    cannotVectorize();
 
     // this will optimize out the 3rd argument because 2nd argument will be constant and not null
     testQuery(
@@ -704,7 +704,7 @@
                                 ValueType.STRING
                             )
                         )
-                        .setDimensions(dimensions(new DefaultDimensionSpec("v0", "v0", ValueType.STRING)))
+                        .setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0", ValueType.STRING)))
                         .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0")))
                         .setContext(QUERY_CONTEXT_DEFAULT)
                         .build()
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
index 7fb7dde..de788fa 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
@@ -85,6 +85,7 @@
       "SELECT (long1 * long2), SUM(double1) FROM foo GROUP BY 1 ORDER BY 2",
       "SELECT string2, SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 2",
       "SELECT string1 + string2, COUNT(*) FROM foo GROUP BY 1 ORDER BY 2",
+      "SELECT CONCAT(string1, '-', 'foo'), COUNT(*) FROM foo GROUP BY 1 ORDER BY 2",
       "SELECT CONCAT(string1, '-', string2), string3, COUNT(*) FROM foo GROUP BY 1,2 ORDER BY 3",
       "SELECT CONCAT(string1, '-', string2, '-', long1, '-', double1, '-', float1) FROM foo GROUP BY 1"
   );