vector group by support for string expressions (#11010)

* vector group by support for string expressions

* fix test

* comments, javadoc
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
index 0fbe44b..0cf24ec 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/SqlExpressionBenchmark.java
@@ -174,7 +174,11 @@
       // 24: group by long expr with non-expr agg
       "SELECT (long1 * long2), SUM(double1) FROM foo GROUP BY 1 ORDER BY 2",
       // 25: group by non-expr with expr agg
-      "SELECT string2, SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 2"
+      "SELECT string2, SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 2",
+      // 26: group by string expr with non-expr agg
+      "SELECT CONCAT(string2, '-', long2), SUM(double1) FROM foo GROUP BY 1 ORDER BY 2",
+      // 27: group by string expr with expr agg
+      "SELECT CONCAT(string2, '-', long2), SUM(long1 * double4) FROM foo GROUP BY 1 ORDER BY 2"
   );
 
   @Param({"5000000"})
@@ -211,7 +215,9 @@
       "22",
       "23",
       "24",
-      "25"
+      "25",
+      "26",
+      "27"
   })
   private String query;
 
diff --git a/core/src/main/java/org/apache/druid/math/expr/BinaryMathOperatorExpr.java b/core/src/main/java/org/apache/druid/math/expr/BinaryMathOperatorExpr.java
index d7daac9..384249b 100644
--- a/core/src/main/java/org/apache/druid/math/expr/BinaryMathOperatorExpr.java
+++ b/core/src/main/java/org/apache/druid/math/expr/BinaryMathOperatorExpr.java
@@ -24,6 +24,7 @@
 import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.math.expr.vector.ExprVectorProcessor;
 import org.apache.druid.math.expr.vector.VectorMathProcessors;
+import org.apache.druid.math.expr.vector.VectorStringProcessors;
 
 import javax.annotation.Nullable;
 
@@ -63,12 +64,19 @@
   @Override
   public boolean canVectorize(InputBindingInspector inspector)
   {
-    return inspector.areNumeric(left, right) && inspector.canVectorize(left, right);
+    return inspector.areScalar(left, right) && inspector.canVectorize(left, right);
   }
 
   @Override
   public <T> ExprVectorProcessor<T> buildVectorized(VectorInputBindingInspector inspector)
   {
+    ExprType type = ExprTypeConversion.operator(
+        left.getOutputType(inspector),
+        right.getOutputType(inspector)
+    );
+    if (ExprType.STRING.equals(type)) {
+      return VectorStringProcessors.concat(inspector, left, right);
+    }
     return VectorMathProcessors.plus(inspector, left, right);
   }
 }
diff --git a/core/src/main/java/org/apache/druid/math/expr/Expr.java b/core/src/main/java/org/apache/druid/math/expr/Expr.java
index 3c0e4c7..4df2bf8 100644
--- a/core/src/main/java/org/apache/druid/math/expr/Expr.java
+++ b/core/src/main/java/org/apache/druid/math/expr/Expr.java
@@ -214,6 +214,36 @@
     }
 
     /**
+     * Check if all provided {@link Expr} can infer the output type as {@link ExprType#isScalar()} (non-array) with a
+     * value of true.
+     *
+     * There must be at least one expression with a computable scalar output type for this method to return true.
+     */
+    default boolean areScalar(List<Expr> args)
+    {
+      boolean scalar = true;
+      for (Expr arg : args) {
+        ExprType argType = arg.getOutputType(this);
+        if (argType == null) {
+          continue;
+        }
+        scalar &= argType.isScalar();
+      }
+      return scalar;
+    }
+
+    /**
+     * Check if all provided {@link Expr} can infer the output type as {@link ExprType#isScalar()} (non-array) with a
+     * value of true.
+     *
+     * There must be at least one expression with a computable scalar output type for this method to return true.
+     */
+    default boolean areScalar(Expr... args)
+    {
+      return areScalar(Arrays.asList(args));
+    }
+
+    /**
      * Check if every provided {@link Expr} computes {@link Expr#canVectorize(InputBindingInspector)} to a value of true
      */
     default boolean canVectorize(List<Expr> args)
diff --git a/core/src/main/java/org/apache/druid/math/expr/ExprType.java b/core/src/main/java/org/apache/druid/math/expr/ExprType.java
index 4c9949d..eaacf56 100644
--- a/core/src/main/java/org/apache/druid/math/expr/ExprType.java
+++ b/core/src/main/java/org/apache/druid/math/expr/ExprType.java
@@ -42,6 +42,11 @@
     return isNumeric(this);
   }
 
+  public boolean isScalar()
+  {
+    return isScalar(this);
+  }
+
   /**
    * The expression system does not distinguish between {@link ValueType#FLOAT} and {@link ValueType#DOUBLE}, and
    * cannot currently handle {@link ValueType#COMPLEX} inputs. This method will convert {@link ValueType#FLOAT} to
@@ -131,6 +136,11 @@
     return LONG.equals(type) || DOUBLE.equals(type);
   }
 
+  public static boolean isScalar(@Nullable ExprType exprType)
+  {
+    return !isArray(exprType);
+  }
+
   public static boolean isArray(@Nullable ExprType type)
   {
     return LONG_ARRAY.equals(type) || DOUBLE_ARRAY.equals(type) || STRING_ARRAY.equals(type);
diff --git a/core/src/main/java/org/apache/druid/math/expr/Function.java b/core/src/main/java/org/apache/druid/math/expr/Function.java
index 5322cd6..874d1ed 100644
--- a/core/src/main/java/org/apache/druid/math/expr/Function.java
+++ b/core/src/main/java/org/apache/druid/math/expr/Function.java
@@ -30,6 +30,7 @@
 import org.apache.druid.math.expr.vector.ExprVectorProcessor;
 import org.apache.druid.math.expr.vector.VectorMathProcessors;
 import org.apache.druid.math.expr.vector.VectorProcessors;
+import org.apache.druid.math.expr.vector.VectorStringProcessors;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.joda.time.format.DateTimeFormat;
@@ -2191,6 +2192,21 @@
     {
       return ExprType.STRING;
     }
+
+    @Override
+    public boolean canVectorize(Expr.InputBindingInspector inspector, List<Expr> args)
+    {
+      return inspector.areScalar(args) && inspector.canVectorize(args);
+    }
+
+    @Override
+    public <T> ExprVectorProcessor<T> asVectorProcessor(
+        Expr.VectorInputBindingInspector inspector,
+        List<Expr> args
+    )
+    {
+      return VectorStringProcessors.concat(inspector, args);
+    }
   }
 
   class StrlenFunc implements Function
diff --git a/core/src/main/java/org/apache/druid/math/expr/vector/BivariateFunctionVectorObjectProcessor.java b/core/src/main/java/org/apache/druid/math/expr/vector/BivariateFunctionVectorObjectProcessor.java
index ff92b9f..2f1279a 100644
--- a/core/src/main/java/org/apache/druid/math/expr/vector/BivariateFunctionVectorObjectProcessor.java
+++ b/core/src/main/java/org/apache/druid/math/expr/vector/BivariateFunctionVectorObjectProcessor.java
@@ -25,7 +25,10 @@
 import java.lang.reflect.Array;
 
 /**
- * Base {@link ExprVectorProcessor} for expressions and functions with 2 'object' typed inputs (strings, arrays)
+ * Base {@link ExprVectorProcessor} for expressions and functions with 2 'object' typed inputs (strings, arrays).
+ * 
+ * In SQL compatible null handling mode, for a row with either left or right input as a null value, it will be handled
+ * by {@link #processNull(int)} instead of {@link #processIndex(Object, Object, int)}.
  */
 public abstract class BivariateFunctionVectorObjectProcessor<TLeftInput, TRightInput, TOutput>
     implements ExprVectorProcessor<TOutput>
diff --git a/core/src/main/java/org/apache/druid/math/expr/vector/StringOutMultiStringInVectorProcessor.java b/core/src/main/java/org/apache/druid/math/expr/vector/StringOutMultiStringInVectorProcessor.java
new file mode 100644
index 0000000..8c68445
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/math/expr/vector/StringOutMultiStringInVectorProcessor.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.math.expr.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprType;
+
+/**
+ * many strings enter, one string leaves...
+ */
+public abstract class StringOutMultiStringInVectorProcessor implements ExprVectorProcessor<String[]>
+{
+  final ExprVectorProcessor<String[]>[] inputs;
+  final int maxVectorSize;
+  final String[] outValues;
+  final boolean sqlCompatible = NullHandling.sqlCompatible();
+
+  protected StringOutMultiStringInVectorProcessor(
+      ExprVectorProcessor<String[]>[] inputs,
+      int maxVectorSize
+  )
+  {
+    this.inputs = inputs;
+    this.maxVectorSize = maxVectorSize;
+    this.outValues = new String[maxVectorSize];
+  }
+
+  @Override
+  public ExprType getOutputType()
+  {
+    return ExprType.STRING;
+  }
+
+  @Override
+  public ExprEvalVector<String[]> evalVector(Expr.VectorInputBinding bindings)
+  {
+    final int currentSize = bindings.getCurrentVectorSize();
+    final String[][] in = new String[inputs.length][];
+    for (int i = 0; i < inputs.length; i++) {
+      in[i] = inputs[i].evalVector(bindings).values();
+    }
+
+    for (int i = 0; i < currentSize; i++) {
+      processIndex(in, i);
+    }
+    return new ExprEvalStringVector(outValues);
+  }
+
+  abstract void processIndex(String[][] in, int i);
+}
diff --git a/core/src/main/java/org/apache/druid/math/expr/vector/StringOutStringsInFunctionVectorProcessor.java b/core/src/main/java/org/apache/druid/math/expr/vector/StringOutStringsInFunctionVectorProcessor.java
index b744b03..35f38bc 100644
--- a/core/src/main/java/org/apache/druid/math/expr/vector/StringOutStringsInFunctionVectorProcessor.java
+++ b/core/src/main/java/org/apache/druid/math/expr/vector/StringOutStringsInFunctionVectorProcessor.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.math.expr.vector;
 
-import org.apache.druid.common.config.NullHandling;
 import org.apache.druid.math.expr.ExprType;
 
 import javax.annotation.Nullable;
@@ -27,8 +26,6 @@
 public abstract class StringOutStringsInFunctionVectorProcessor
     extends BivariateFunctionVectorObjectProcessor<String[], String[], String[]>
 {
-  final boolean sqlCompatible = NullHandling.sqlCompatible();
-
   protected StringOutStringsInFunctionVectorProcessor(
       ExprVectorProcessor<String[]> left,
       ExprVectorProcessor<String[]> right,
@@ -44,7 +41,7 @@
   }
 
   @Nullable
-  abstract String processValue(@Nullable String leftVal, @Nullable String rightVal);
+  protected abstract String processValue(@Nullable String leftVal, @Nullable String rightVal);
 
   @Override
   void processIndex(String[] strings, String[] strings2, int i)
diff --git a/core/src/main/java/org/apache/druid/math/expr/vector/VectorStringProcessors.java b/core/src/main/java/org/apache/druid/math/expr/vector/VectorStringProcessors.java
new file mode 100644
index 0000000..bbfbd68
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/math/expr/vector/VectorStringProcessors.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.math.expr.vector;
+
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.math.expr.Expr;
+import org.apache.druid.math.expr.ExprType;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+public class VectorStringProcessors
+{
+  public static <T> ExprVectorProcessor<T> concat(Expr.VectorInputBindingInspector inspector, Expr left, Expr right)
+  {
+    final ExprVectorProcessor processor;
+    if (NullHandling.sqlCompatible()) {
+      processor = new StringOutStringsInFunctionVectorProcessor(
+          left.buildVectorized(inspector),
+          right.buildVectorized(inspector),
+          inspector.getMaxVectorSize()
+      )
+      {
+        @Nullable
+        @Override
+        protected String processValue(@Nullable String leftVal, @Nullable String rightVal)
+        {
+          // in sql compatible mode, nulls are handled by super class and never make it here...
+          return leftVal + rightVal;
+        }
+      };
+    } else {
+      processor = new StringOutStringsInFunctionVectorProcessor(
+          left.buildVectorized(inspector),
+          right.buildVectorized(inspector),
+          inspector.getMaxVectorSize()
+      )
+      {
+        @Nullable
+        @Override
+        protected String processValue(@Nullable String leftVal, @Nullable String rightVal)
+        {
+          return NullHandling.nullToEmptyIfNeeded(leftVal) + NullHandling.nullToEmptyIfNeeded(rightVal);
+        }
+      };
+    }
+    return processor;
+  }
+
+  public static <T> ExprVectorProcessor<T> concat(Expr.VectorInputBindingInspector inspector, List<Expr> inputs)
+  {
+    final ExprVectorProcessor<String[]>[] inputProcessors = new ExprVectorProcessor[inputs.size()];
+    for (int i = 0; i < inputs.size(); i++) {
+      inputProcessors[i] = CastToTypeVectorProcessor.cast(inputs.get(i).buildVectorized(inspector), ExprType.STRING);
+    }
+    final ExprVectorProcessor processor = new StringOutMultiStringInVectorProcessor(
+        inputProcessors,
+        inspector.getMaxVectorSize()
+    )
+    {
+      @Override
+      void processIndex(String[][] in, int i)
+      {
+        // Result of concatenation is null if any of the Values is null.
+        // e.g. 'select CONCAT(null, "abc") as c;' will return null as per Standard SQL spec.
+        String first = NullHandling.nullToEmptyIfNeeded(in[0][i]);
+        if (first == null) {
+          outValues[i] = null;
+          return;
+        }
+        final StringBuilder builder = new StringBuilder(first);
+        for (int inputNumber = 1; inputNumber < in.length; inputNumber++) {
+          final String s = NullHandling.nullToEmptyIfNeeded(in[inputNumber][i]);
+          if (s == null) {
+            outValues[i] = null;
+            return;
+          } else {
+            builder.append(s);
+          }
+        }
+        outValues[i] = builder.toString();
+      }
+    };
+    return processor;
+  }
+}
diff --git a/core/src/test/java/org/apache/druid/math/expr/VectorExprSanityTest.java b/core/src/test/java/org/apache/druid/math/expr/VectorExprSanityTest.java
index 7fd5504..6d769e0 100644
--- a/core/src/test/java/org/apache/druid/math/expr/VectorExprSanityTest.java
+++ b/core/src/test/java/org/apache/druid/math/expr/VectorExprSanityTest.java
@@ -202,6 +202,15 @@
     testFunctions(types, templates, args);
   }
 
+  @Test
+  public void testStringFns()
+  {
+    testExpression("s1 + s2", types);
+    testExpression("s1 + '-' + s2", types);
+    testExpression("concat(s1, s2)", types);
+    testExpression("concat(s1,'-',s2,'-',l1,'-',d1)", types);
+  }
+
   static void testFunctions(Map<String, ExprType> types, String[] templates, String[] args)
   {
     for (String template : templates) {
diff --git a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java
index f29fbc2..2377dc3 100644
--- a/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java
+++ b/extensions-core/druid-bloom-filter/src/test/java/org/apache/druid/query/filter/sql/BloomDimFilterSqlTest.java
@@ -177,9 +177,6 @@
   @Test
   public void testBloomFilterVirtualColumn() throws Exception
   {
-    // Cannot vectorize due to expression virtual columns.
-    cannotVectorize();
-
     BloomKFilter filter = new BloomKFilter(1500);
     filter.addString("def-foo");
     byte[] bytes = BloomFilterSerializersModule.bloomKFilterToBytes(filter);
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DictionaryBuildingSingleValueStringGroupByVectorColumnSelector.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DictionaryBuildingSingleValueStringGroupByVectorColumnSelector.java
new file mode 100644
index 0000000..d83166f
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/DictionaryBuildingSingleValueStringGroupByVectorColumnSelector.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.vector;
+
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.memory.WritableMemory;
+import org.apache.druid.common.config.NullHandling;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.segment.vector.VectorObjectSelector;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A {@link GroupByVectorColumnSelector} that builds an internal String<->Integer dictionary, used for grouping
+ * single-valued STRING columns which are not natively dictionary encoded, e.g. expression virtual columns.
+ *
+ * This is effectively the {@link VectorGroupByEngine} analog of
+ * {@link org.apache.druid.query.groupby.epinephelinae.column.DictionaryBuildingStringGroupByColumnSelectorStrategy}
+ */
+public class DictionaryBuildingSingleValueStringGroupByVectorColumnSelector implements GroupByVectorColumnSelector
+{
+  private static final int GROUP_BY_MISSING_VALUE = -1;
+
+  private final VectorObjectSelector selector;
+
+  private int nextId = 0;
+  private final List<String> dictionary = new ArrayList<>();
+  private final Object2IntOpenHashMap<String> reverseDictionary = new Object2IntOpenHashMap<>();
+
+  {
+    reverseDictionary.defaultReturnValue(-1);
+  }
+
+  public DictionaryBuildingSingleValueStringGroupByVectorColumnSelector(VectorObjectSelector selector)
+  {
+    this.selector = selector;
+  }
+
+
+  @Override
+  public int getGroupingKeySize()
+  {
+    return Integer.BYTES;
+  }
+
+  @Override
+  public void writeKeys(
+      final WritableMemory keySpace,
+      final int keySize,
+      final int keyOffset,
+      final int startRow,
+      final int endRow
+  )
+  {
+    final Object[] vector = selector.getObjectVector();
+
+    for (int i = startRow, j = keyOffset; i < endRow; i++, j += keySize) {
+      final String value = (String) vector[i];
+      final int dictId = reverseDictionary.getInt(value);
+      if (dictId < 0) {
+        dictionary.add(value);
+        reverseDictionary.put(value, nextId);
+        keySpace.putInt(j, nextId);
+        nextId++;
+      } else {
+        keySpace.putInt(j, dictId);
+      }
+    }
+  }
+
+  @Override
+  public void writeKeyToResultRow(
+      final Memory keyMemory,
+      final int keyOffset,
+      final ResultRow resultRow,
+      final int resultRowPosition
+  )
+  {
+    final int id = keyMemory.getInt(keyOffset);
+    // GROUP_BY_MISSING_VALUE is used to indicate empty rows, which are omitted from the result map.
+    if (id != GROUP_BY_MISSING_VALUE) {
+      final String value = dictionary.get(id);
+      resultRow.set(resultRowPosition, value);
+    } else {
+      resultRow.set(resultRowPosition, NullHandling.defaultStringValue());
+    }
+  }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java
index 6f332de..069751d 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/GroupByVectorColumnProcessorFactory.java
@@ -113,9 +113,7 @@
   )
   {
     if (ValueType.STRING.equals(capabilities.getType())) {
-      throw new UnsupportedOperationException(
-          "Vectorized groupBys on non-dictionary encoded string columns with object selectors are not yet implemented"
-      );
+      return new DictionaryBuildingSingleValueStringGroupByVectorColumnSelector(selector);
     }
     return NilGroupByVectorColumnSelector.INSTANCE;
   }
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
index 374ba66..848c185 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
@@ -46,7 +46,6 @@
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.ColumnCapabilities;
-import org.apache.druid.segment.column.ValueType;
 import org.apache.druid.segment.filter.Filters;
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 import org.apache.druid.segment.vector.VectorCursor;
@@ -108,12 +107,7 @@
               if (columnCapabilities == null) {
                 return true;
               }
-              // strings must be single valued, dictionary encoded, and have unique dictionary entries
-              if (ValueType.STRING.equals(columnCapabilities.getType())) {
-                return columnCapabilities.hasMultipleValues().isFalse() &&
-                       columnCapabilities.isDictionaryEncoded().isTrue() &&
-                       columnCapabilities.areDictionaryValuesUnique().isTrue();
-              }
+              // must be single valued
               return columnCapabilities.hasMultipleValues().isFalse();
             });
   }
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
index 70818e9..eb1ea62 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java
@@ -1016,10 +1016,6 @@
   @Test
   public void testGroupByWithStringVirtualColumn()
   {
-    // Cannot vectorize due to virtual columns.
-    // all virtual columns are single input column, so it will work for group by v1, even with multi-value inputs
-    cannotVectorize();
-
     GroupByQuery query = makeQueryBuilder()
         .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
         .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
@@ -1081,6 +1077,69 @@
   }
 
   @Test
+  public void testGroupByWithStringVirtualColumnVectorizable()
+  {
+    GroupByQuery query = makeQueryBuilder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setVirtualColumns(
+            new ExpressionVirtualColumn(
+                "vc",
+                "cast(quality, 'STRING')",
+                ValueType.STRING,
+                TestExprMacroTable.INSTANCE
+            )
+        )
+        .setDimensions(new DefaultDimensionSpec("vc", "alias"))
+        .setAggregatorSpecs(QueryRunnerTestHelper.ROWS_COUNT, new LongSumAggregatorFactory("idx", "index"))
+        .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
+        .build();
+
+    List<ResultRow> expectedResults = Arrays.asList(
+        makeRow(query, "2011-04-01", "alias", "automotive", "rows", 1L, "idx", 135L),
+        makeRow(query, "2011-04-01", "alias", "business", "rows", 1L, "idx", 118L),
+        makeRow(
+            query,
+            "2011-04-01",
+            "alias",
+            "entertainment",
+            "rows",
+            1L,
+            "idx",
+            158L
+        ),
+        makeRow(query, "2011-04-01", "alias", "health", "rows", 1L, "idx", 120L),
+        makeRow(query, "2011-04-01", "alias", "mezzanine", "rows", 3L, "idx", 2870L),
+        makeRow(query, "2011-04-01", "alias", "news", "rows", 1L, "idx", 121L),
+        makeRow(query, "2011-04-01", "alias", "premium", "rows", 3L, "idx", 2900L),
+        makeRow(query, "2011-04-01", "alias", "technology", "rows", 1L, "idx", 78L),
+        makeRow(query, "2011-04-01", "alias", "travel", "rows", 1L, "idx", 119L),
+
+        makeRow(query, "2011-04-02", "alias", "automotive", "rows", 1L, "idx", 147L),
+        makeRow(query, "2011-04-02", "alias", "business", "rows", 1L, "idx", 112L),
+        makeRow(
+            query,
+            "2011-04-02",
+            "alias",
+            "entertainment",
+            "rows",
+            1L,
+            "idx",
+            166L
+        ),
+        makeRow(query, "2011-04-02", "alias", "health", "rows", 1L, "idx", 113L),
+        makeRow(query, "2011-04-02", "alias", "mezzanine", "rows", 3L, "idx", 2447L),
+        makeRow(query, "2011-04-02", "alias", "news", "rows", 1L, "idx", 114L),
+        makeRow(query, "2011-04-02", "alias", "premium", "rows", 3L, "idx", 2505L),
+        makeRow(query, "2011-04-02", "alias", "technology", "rows", 1L, "idx", 97L),
+        makeRow(query, "2011-04-02", "alias", "travel", "rows", 1L, "idx", 126L)
+    );
+
+    Iterable<ResultRow> results = GroupByQueryRunnerTestHelper.runQuery(factory, runner, query);
+    TestHelper.assertExpectedObjects(expectedResults, results, "virtual-column");
+  }
+
+  @Test
   public void testGroupByWithDurationGranularity()
   {
     GroupByQuery query = makeQueryBuilder()
@@ -6336,9 +6395,6 @@
   @Test
   public void testGroupByWithSubtotalsSpecOfDimensionsPrefixes()
   {
-    // Cannot vectorize due to usage of expressions.
-    cannotVectorize();
-
     if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
       return;
     }
@@ -6452,9 +6508,6 @@
   @Test
   public void testGroupByWithSubtotalsSpecGeneral()
   {
-    // Cannot vectorize due to usage of expressions.
-    cannotVectorize();
-
     if (!config.getDefaultStrategy().equals(GroupByStrategySelector.STRATEGY_V2)) {
       return;
     }
diff --git a/processing/src/test/java/org/apache/druid/segment/virtual/VectorizedVirtualColumnTest.java b/processing/src/test/java/org/apache/druid/segment/virtual/VectorizedVirtualColumnTest.java
index 3aa6eef..850c4d5 100644
--- a/processing/src/test/java/org/apache/druid/segment/virtual/VectorizedVirtualColumnTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/virtual/VectorizedVirtualColumnTest.java
@@ -155,8 +155,6 @@
   @Test
   public void testGroupBySingleValueStringNotDictionaryEncoded()
   {
-    // cannot currently group by string columns that are not dictionary encoded
-    cannotVectorize();
     testGroupBy(new ColumnCapabilitiesImpl()
                     .setType(ValueType.STRING)
                     .setDictionaryEncoded(false)
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
index af78872..86ef6dc 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java
@@ -9198,9 +9198,6 @@
   @Test
   public void testRegexpLikeFilter() throws Exception
   {
-    // Cannot vectorize due to usage of regex filter.
-    cannotVectorize();
-
     testQuery(
         "SELECT COUNT(*)\n"
         + "FROM foo\n"
@@ -14718,6 +14715,75 @@
   }
 
   @Test
+  public void testConcatGroup() throws Exception
+  {
+    testQuery(
+        "SELECT CONCAT(dim1, '-', dim1, '_', dim1) as dimX FROM foo GROUP BY 1",
+        ImmutableList.of(
+            new GroupByQuery.Builder()
+                .setDataSource(CalciteTests.DATASOURCE1)
+                .setInterval(querySegmentSpec(Filtration.eternity()))
+                .setVirtualColumns(expressionVirtualColumn(
+                    "v0",
+                    "concat(\"dim1\",'-',\"dim1\",'_',\"dim1\")",
+                    ValueType.STRING
+                ))
+                .setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0")))
+                .setGranularity(Granularities.ALL)
+                .setContext(QUERY_CONTEXT_DEFAULT)
+                .build()
+        ),
+        ImmutableList.of(
+            new Object[]{"-_"},
+            new Object[]{"1-1_1"},
+            new Object[]{"10.1-10.1_10.1"},
+            new Object[]{"2-2_2"},
+            new Object[]{"abc-abc_abc"},
+            new Object[]{"def-def_def"}
+        )
+    );
+
+    final List<Object[]> secondResults;
+    if (useDefault) {
+      secondResults = ImmutableList.of(
+          new Object[]{"10.1x2.0999910.1"},
+          new Object[]{"1ax4.099991"},
+          new Object[]{"2x3.099992"},
+          new Object[]{"abcx6.09999abc"},
+          new Object[]{"ax1.09999"},
+          new Object[]{"defabcx5.09999def"}
+      );
+    } else {
+      secondResults = ImmutableList.of(
+          new Object[]{null},
+          new Object[]{"1ax4.099991"},
+          new Object[]{"2x3.099992"},
+          new Object[]{"ax1.09999"},
+          new Object[]{"defabcx5.09999def"}
+      );
+    }
+    testQuery(
+        "SELECT CONCAT(dim1, CONCAT(dim2,'x'), m2, 9999, dim1) as dimX FROM foo GROUP BY 1",
+        ImmutableList.of(
+            new GroupByQuery.Builder()
+                .setDataSource(CalciteTests.DATASOURCE1)
+                .setInterval(querySegmentSpec(Filtration.eternity()))
+                .setVirtualColumns(expressionVirtualColumn(
+                    "v0",
+                    "concat(\"dim1\",concat(\"dim2\",'x'),\"m2\",9999,\"dim1\")",
+                    ValueType.STRING
+                ))
+                .setDimensions(dimensions(new DefaultDimensionSpec("v0", "d0")))
+                .setGranularity(Granularities.ALL)
+                .setContext(QUERY_CONTEXT_DEFAULT)
+                .build()
+
+        ),
+        secondResults
+    );
+  }
+
+  @Test
   public void testTextcat() throws Exception
   {
     testQuery(
diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
index b97b7f1..7fb7dde 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/SqlVectorizedExpressionSanityTest.java
@@ -83,7 +83,10 @@
       "SELECT TIME_FLOOR(__time, 'PT1H'), SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 2",
       "SELECT TIME_FLOOR(TIMESTAMPADD(DAY, -1, __time), 'PT1H'), SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 1",
       "SELECT (long1 * long2), SUM(double1) FROM foo GROUP BY 1 ORDER BY 2",
-      "SELECT string2, SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 2"
+      "SELECT string2, SUM(long1 * long4) FROM foo GROUP BY 1 ORDER BY 2",
+      "SELECT string1 + string2, COUNT(*) FROM foo GROUP BY 1 ORDER BY 2",
+      "SELECT CONCAT(string1, '-', string2), string3, COUNT(*) FROM foo GROUP BY 1,2 ORDER BY 3",
+      "SELECT CONCAT(string1, '-', string2, '-', long1, '-', double1, '-', float1) FROM foo GROUP BY 1"
   );
 
   private static final int ROWS_PER_SEGMENT = 100_000;