Close aggregators in HashVectorGrouper.close() (#10452)

* Close aggregators in HashVectorGrouper.close()

* reuse grouper

* Add missing dependency
diff --git a/processing/pom.xml b/processing/pom.xml
index 2095187..e4b737d 100644
--- a/processing/pom.xml
+++ b/processing/pom.xml
@@ -244,6 +244,11 @@
             <artifactId>log4j-api</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
index b05ccb5..dae1661 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
@@ -265,7 +265,7 @@
   @Override
   public void close()
   {
-    // Nothing to do.
+    aggregators.close();
   }
 
 
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
index 794f44c..f516ea5 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngine.java
@@ -19,12 +19,14 @@
 
 package org.apache.druid.query.groupby.epinephelinae.vector;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Suppliers;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.memory.WritableMemory;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.guava.BaseSequence;
 import org.apache.druid.java.util.common.guava.Sequence;
+import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.java.util.common.parsers.CloseableIterator;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.aggregation.AggregatorAdapters;
@@ -216,7 +218,8 @@
     );
   }
 
-  private static class VectorGroupByEngineIterator implements CloseableIterator<ResultRow>
+  @VisibleForTesting
+  static class VectorGroupByEngineIterator implements CloseableIterator<ResultRow>
   {
     private final GroupByQuery query;
     private final GroupByQueryConfig querySpecificConfig;
@@ -278,7 +281,7 @@
     @Override
     public ResultRow next()
     {
-      if (delegate == null || !delegate.hasNext()) {
+      if (!hasNext()) {
         throw new NoSuchElementException();
       }
 
@@ -310,22 +313,19 @@
     }
 
     @Override
-    public void remove()
+    public void close() throws IOException
     {
-      throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void close()
-    {
-      cursor.close();
-
+      Closer closer = Closer.create();
+      closer.register(vectorGrouper);
       if (delegate != null) {
-        delegate.close();
+        closer.register(delegate);
       }
+      closer.register(cursor);
+      closer.close();
     }
 
-    private VectorGrouper makeGrouper()
+    @VisibleForTesting
+    VectorGrouper makeGrouper()
     {
       final VectorGrouper grouper;
 
@@ -463,7 +463,7 @@
 
             return resultRow;
           },
-          vectorGrouper
+          () -> {} // Grouper will be closed when VectorGroupByEngineIterator is closed.
       );
     }
   }
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java
new file mode 100644
index 0000000..46cd043
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae;
+
+import com.google.common.base.Suppliers;
+import org.apache.druid.query.aggregation.AggregatorAdapters;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.nio.ByteBuffer;
+
+public class HashVectorGrouperTest
+{
+  @Test
+  public void testCloseAggregatorAdaptorsShouldBeClosed()
+  {
+    final ByteBuffer buffer = ByteBuffer.wrap(new byte[4096]);
+    final AggregatorAdapters aggregatorAdapters = Mockito.mock(AggregatorAdapters.class);
+    final HashVectorGrouper grouper = new HashVectorGrouper(
+        Suppliers.ofInstance(buffer),
+        1024,
+        aggregatorAdapters,
+        Integer.MAX_VALUE,
+        0.f,
+        0
+    );
+    grouper.initVectorized(512);
+    grouper.close();
+    Mockito.verify(aggregatorAdapters, Mockito.times(1)).close();
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java
new file mode 100644
index 0000000..627c2bd
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/vector/VectorGroupByEngineIteratorTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.groupby.epinephelinae.vector;
+
+import org.apache.commons.lang3.mutable.MutableObject;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.query.dimension.DefaultDimensionSpec;
+import org.apache.druid.query.groupby.GroupByQuery;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.epinephelinae.VectorGrouper;
+import org.apache.druid.query.groupby.epinephelinae.vector.VectorGroupByEngine.VectorGroupByEngineIterator;
+import org.apache.druid.segment.DimensionHandlerUtils;
+import org.apache.druid.segment.QueryableIndexStorageAdapter;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.TestIndex;
+import org.apache.druid.segment.filter.Filters;
+import org.apache.druid.segment.vector.VectorCursor;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.joda.time.Interval;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class VectorGroupByEngineIteratorTest extends InitializedNullHandlingTest
+{
+  @Test
+  public void testCreateOneGrouperAndCloseItWhenClose() throws IOException
+  {
+    final Interval interval = TestIndex.DATA_INTERVAL;
+    final AggregatorFactory factory = new DoubleSumAggregatorFactory("index", "index");
+    final GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setGranularity(QueryRunnerTestHelper.DAY_GRAN)
+        .setInterval(interval)
+        .setDimensions(new DefaultDimensionSpec("market", null, null))
+        .setAggregatorSpecs(factory)
+        .build();
+    final StorageAdapter storageAdapter = new QueryableIndexStorageAdapter(TestIndex.getMMappedTestIndex());
+    final ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[4096]);
+    final VectorCursor cursor = storageAdapter.makeVectorCursor(
+        Filters.toFilter(query.getDimFilter()),
+        interval,
+        query.getVirtualColumns(),
+        false,
+        QueryContexts.getVectorSize(query),
+        null
+    );
+    final List<GroupByVectorColumnSelector> dimensions = query.getDimensions().stream().map(
+        dimensionSpec ->
+            DimensionHandlerUtils.makeVectorProcessor(
+                dimensionSpec,
+                GroupByVectorColumnProcessorFactory.instance(),
+                cursor.getColumnSelectorFactory()
+            )
+    ).collect(Collectors.toList());
+    final MutableObject<VectorGrouper> grouperCaptor = new MutableObject<>();
+    final VectorGroupByEngineIterator iterator = new VectorGroupByEngineIterator(
+        query,
+        new GroupByQueryConfig(),
+        storageAdapter,
+        cursor,
+        interval,
+        dimensions,
+        byteBuffer,
+        null
+    )
+    {
+      @Override
+      VectorGrouper makeGrouper()
+      {
+        grouperCaptor.setValue(Mockito.spy(super.makeGrouper()));
+        return grouperCaptor.getValue();
+      }
+    };
+    iterator.close();
+    Mockito.verify(grouperCaptor.getValue()).close();
+  }
+}
diff --git a/processing/src/test/java/org/apache/druid/segment/TestIndex.java b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
index f7baecc..f53f5d9 100644
--- a/processing/src/test/java/org/apache/druid/segment/TestIndex.java
+++ b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
@@ -134,8 +134,8 @@
 
   public static final String[] DOUBLE_METRICS = new String[]{"index", "indexMin", "indexMaxPlusTen"};
   public static final String[] FLOAT_METRICS = new String[]{"indexFloat", "indexMinFloat", "indexMaxFloat"};
+  public static final Interval DATA_INTERVAL = Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
   private static final Logger log = new Logger(TestIndex.class);
-  private static final Interval DATA_INTERVAL = Intervals.of("2011-01-12T00:00:00.000Z/2011-05-01T00:00:00.000Z");
   private static final VirtualColumns VIRTUAL_COLUMNS = VirtualColumns.create(
       Collections.singletonList(
           new ExpressionVirtualColumn("expr", "index + 10", ValueType.FLOAT, TestExprMacroTable.INSTANCE)