fix projections filtered on virtual columns (#18532)

diff --git a/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java b/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
index 2015c3c..9503844 100644
--- a/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
+++ b/processing/src/main/java/org/apache/druid/segment/incremental/OnHeapAggregateProjection.java
@@ -36,8 +36,6 @@
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.EncodedKeyComponent;
-import org.apache.druid.segment.RowAdapters;
-import org.apache.druid.segment.RowBasedColumnSelectorFactory;
 import org.apache.druid.segment.VirtualColumn;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.CapabilitiesBasedFormat;
@@ -46,7 +44,6 @@
 import org.apache.druid.segment.column.ColumnFormat;
 import org.apache.druid.segment.column.ColumnHolder;
 import org.apache.druid.segment.column.ColumnType;
-import org.apache.druid.segment.column.RowSignature;
 import org.apache.druid.segment.column.ValueType;
 
 import javax.annotation.Nullable;
@@ -128,28 +125,7 @@
     initializeAndValidateAggregators(projectionSpec, getBaseTableDimensionDesc, getBaseTableAggregatorFactory);
 
     if (projectionSpec.getFilter() != null) {
-      RowSignature.Builder bob = RowSignature.builder();
-      if (projectionSchema.getTimeColumnPosition() < 0) {
-        bob.addTimeColumn();
-      }
-      for (String groupingColumn : projectionSchema.getGroupingColumns()) {
-        if (projectionSchema.getTimeColumnName().equals(groupingColumn)) {
-          bob.addTimeColumn();
-        } else {
-          bob.add(groupingColumn, dimensionsMap.get(groupingColumn).getCapabilities().toColumnType());
-        }
-      }
-      valueMatcher = projectionSchema.getFilter()
-                                     .toFilter()
-                                     .makeMatcher(
-                                         RowBasedColumnSelectorFactory.create(
-                                             RowAdapters.standardRow(),
-                                             inputRowHolder::getRow,
-                                             bob.build(),
-                                             false,
-                                             false
-                                         )
-                                     );
+      valueMatcher = projectionSchema.getFilter().toFilter().makeMatcher(virtualSelectorFactory);
     } else {
       valueMatcher = null;
     }
diff --git a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
index 0eb1c70..ba8506a 100644
--- a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java
@@ -62,6 +62,7 @@
 import org.apache.druid.query.expression.TestExprMacroTable;
 import org.apache.druid.query.expression.TimestampFloorExprMacro;
 import org.apache.druid.query.filter.EqualityFilter;
+import org.apache.druid.query.filter.TypedInFilter;
 import org.apache.druid.query.groupby.GroupByQuery;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByQueryMetrics;
@@ -333,6 +334,20 @@
                                  new LongDimensionSchema(ColumnHolder.TIME_COLUMN_NAME),
                                  new StringDimensionSchema("a")
                              )
+                             .build(),
+      AggregateProjectionSpec.builder("filtered_c_plus_d")
+                             .virtualColumns(
+                                 Granularities.toVirtualColumn(Granularities.HOUR, "__gran"),
+                                 new ExpressionVirtualColumn(
+                                     "__c_plus_d",
+                                     "c + d",
+                                     ColumnType.DOUBLE,
+                                     TestExprMacroTable.INSTANCE
+                                 )
+                             )
+                             .filter(new TypedInFilter("__c_plus_d", ColumnType.DOUBLE, List.of(2.1, 4.2), null, null))
+                             .groupingColumns(new LongDimensionSchema("__gran"))
+                             .aggregators(new LongSumAggregatorFactory("sum_c", "c"))
                              .build()
   );
 
@@ -394,7 +409,7 @@
                         )
                         .collect(Collectors.toList());
 
-  @Parameterized.Parameters(name = "name: {0}, segmentTimeOrdered: {5}, autoSchema: {6}")
+  @Parameterized.Parameters(name = "name: {0}, segmentTimeOrdered: {5}, autoSchema: {6}, writeNullColumns: {7}")
   public static Collection<?> constructorFeeder()
   {
     final List<Object[]> constructors = new ArrayList<>();
@@ -484,7 +499,8 @@
                   new IncrementalIndexCursorFactory(rollupIndex),
                   new IncrementalIndexTimeBoundaryInspector(rollupIndex),
                   !sortByDim,
-                  autoSchema
+                  autoSchema,
+                  writeNullColumns
               });
             } else {
               QueryableIndex index = CLOSER.register(makeBuilder(
@@ -502,7 +518,8 @@
                   new QueryableIndexCursorFactory(rollupIndex),
                   QueryableIndexTimeBoundaryInspector.create(rollupIndex),
                   !sortByDim,
-                  autoSchema
+                  autoSchema,
+                  writeNullColumns
               });
             }
           }
@@ -541,7 +558,8 @@
       CursorFactory rollupProjectionsCursorFactory,
       TimeBoundaryInspector rollupProjectionsTimeBoundaryInspector,
       boolean segmentSortedByTime,
-      boolean autoSchema
+      boolean autoSchema,
+      boolean writeNullColumns
   )
   {
     this.projectionsCursorFactory = projectionsCursorFactory;
@@ -1831,6 +1849,42 @@
     );
   }
 
+  @Test
+  public void testProjectionGroupFilteredOnVirtualColumn()
+  {
+    final GroupByQuery query =
+        GroupByQuery.builder()
+                    .setDataSource("test")
+                    .setGranularity(Granularities.ALL)
+                    .setInterval(Intervals.ETERNITY)
+                    .setVirtualColumns(
+                        new ExpressionVirtualColumn(
+                            "v0",
+                            "c + d",
+                            ColumnType.DOUBLE,
+                            TestExprMacroTable.INSTANCE
+                        )
+                    )
+                    .setDimFilter(
+                        new TypedInFilter("v0", ColumnType.DOUBLE, List.of(2.1, 4.2), null, null)
+                    )
+                    .setAggregatorSpecs(new LongSumAggregatorFactory("c", "c"))
+                    .build();
+
+    final ExpectedProjectionGroupBy queryMetrics =
+        new ExpectedProjectionGroupBy("filtered_c_plus_d");
+    final CursorBuildSpec buildSpec = GroupingEngine.makeCursorBuildSpec(query, queryMetrics);
+
+    assertCursorProjection(buildSpec, queryMetrics, 2);
+
+    testGroupBy(
+        query,
+        queryMetrics,
+        makeArrayResultSet(new Object[]{6L})
+    );
+  }
+
+
   private void testGroupBy(GroupByQuery query, ExpectedProjectionGroupBy queryMetrics, List<Object[]> expectedResults)
   {
     testGroupBy(projectionsCursorFactory, projectionsTimeBoundaryInspector, query, queryMetrics, expectedResults);