Reset buffer aggregators when resetting Groupers. (#16296)

Buffer aggregators can contain some cached objects within them, such as
Memory references or HLL Unions. Prior to this patch, various Grouper
implementations were not releasing this state when resetting their own
internal state, which could lead to excessive memory use.

This patch renames AggregatorAdapater#close to "reset", and updates
Grouper implementations to call this reset method whenever they reset
their internal state.

The base method on BufferAggregator and VectorAggregator remains named
"close", for compatibility with existing extensions, but the contract
is adjusted to say that the aggregator may be reused after the method
is called. All existing implementations in core already adhere to this
new contract, except for the ArrayOfDoubles build flavors, which are
updated in this patch to adhere.

Additionally, this patch harmonizes buffer sketch helpers to call their
clear method "clear" rather than a mix of "clear" and "close". (Others
were already using "clear".)
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
index 8e3bfff..0458b50 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregator.java
@@ -75,7 +75,7 @@
   @Override
   public void close()
   {
-    helper.close();
+    helper.clear();
   }
 
   @Override
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
index 2265301..1fa9ee4 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeBufferAggregatorHelper.java
@@ -142,7 +142,7 @@
     }
   }
 
-  public void close()
+  public void clear()
   {
     unions.clear();
     memCache.clear();
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
index 5fec9b9..31ad26c 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchMergeVectorAggregator.java
@@ -102,7 +102,7 @@
   @Override
   public void close()
   {
-    helper.close();
+    helper.clear();
   }
 
   @Override
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
index 34aae3f..60d83f4 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregator.java
@@ -85,7 +85,7 @@
   @Override
   public void close()
   {
-    helper.close();
+    helper.clear();
   }
 
   @Override
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java
index 49856c9..e2f6990 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchBufferAggregatorHelper.java
@@ -95,7 +95,7 @@
   /**
    * Returns a {@link Union} associated with a particular buffer location.
    *
-   * The Union object will be cached in this helper until {@link #close()} is called.
+   * The Union object will be cached in this helper until {@link #clear()} is called.
    */
   public Union getOrCreateUnion(ByteBuffer buf, int position)
   {
@@ -122,7 +122,7 @@
     return union;
   }
 
-  public void close()
+  public void clear()
   {
     unions.clear();
     memCache.clear();
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
index a862265..7d10bc3 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchVectorAggregator.java
@@ -107,6 +107,6 @@
   @Override
   public void close()
   {
-    helper.close();
+    helper.clear();
   }
 }
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
index 7ca1061..b093e73 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildAggregator.java
@@ -27,7 +27,6 @@
 import org.apache.druid.segment.data.IndexedInts;
 
 import javax.annotation.Nullable;
-
 import java.nio.ByteBuffer;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -48,6 +47,7 @@
   @Nullable
   private ArrayOfDoublesUpdatableSketch sketch;
 
+  private final int nominalEntries;
   private final boolean canLookupUtf8;
   private final boolean canCacheById;
   private final LinkedHashMap<Integer, Object> stringCache = new LinkedHashMap<Integer, Object>()
@@ -67,10 +67,7 @@
   {
     this.keySelector = keySelector;
     this.valueSelectors = valueSelectors.toArray(new BaseDoubleColumnValueSelector[0]);
-    values = new double[valueSelectors.size()];
-    sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries)
-                                                       .setNumberOfValues(valueSelectors.size()).build();
-
+    this.nominalEntries = nominalEntries;
     this.canCacheById = this.keySelector.nameLookupPossibleInAdvance();
     this.canLookupUtf8 = this.keySelector.supportsLookupNameUtf8();
   }
@@ -83,6 +80,15 @@
   @Override
   public void aggregate()
   {
+    if (values == null) {
+      values = new double[valueSelectors.length];
+    }
+
+    if (sketch == null) {
+      sketch = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(nominalEntries)
+                                                         .setNumberOfValues(valueSelectors.length).build();
+    }
+
     final IndexedInts keys = keySelector.getRow();
     for (int i = 0; i < valueSelectors.length; i++) {
       if (valueSelectors[i].isNull()) {
diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
index 18906d1..b925220 100644
--- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
+++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchBuildBufferAggregator.java
@@ -73,8 +73,6 @@
     this.valueSelectors = valueSelectors.toArray(new BaseDoubleColumnValueSelector[0]);
     this.nominalEntries = nominalEntries;
     this.maxIntermediateSize = maxIntermediateSize;
-    values = new double[valueSelectors.size()];
-
     this.canCacheById = this.keySelector.nameLookupPossibleInAdvance();
     this.canLookupUtf8 = this.keySelector.supportsLookupNameUtf8();
   }
@@ -92,6 +90,10 @@
   @Override
   public void aggregate(final ByteBuffer buf, final int position)
   {
+    if (values == null) {
+      values = new double[valueSelectors.length];
+    }
+
     for (int i = 0; i < valueSelectors.length; i++) {
       if (valueSelectors[i].isNull()) {
         return;
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java
index 8ae7a33..25c9102 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/AggregatorAdapters.java
@@ -26,7 +26,6 @@
 import org.apache.druid.segment.vector.VectorColumnSelectorFactory;
 
 import javax.annotation.Nullable;
-import java.io.Closeable;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.List;
@@ -42,7 +41,7 @@
  * (2) Query engines are freed from the need to manage how much space each individual aggregator needs. They only
  * need to allocate a block of size "spaceNeeded".
  */
-public class AggregatorAdapters implements Closeable
+public class AggregatorAdapters
 {
   private static final Logger log = new Logger(AggregatorAdapters.class);
 
@@ -230,14 +229,14 @@
   }
 
   /**
-   * Close all of our aggregators.
+   * Reset all of our aggregators, releasing resources held by them. After this, this instance may be reused or
+   * it may be discarded.
    */
-  @Override
-  public void close()
+  public void reset()
   {
     for (Adapter adapter : adapters) {
       try {
-        adapter.close();
+        adapter.reset();
       }
       catch (Exception e) {
         log.warn(e, "Could not close aggregator [%s], skipping.", adapter.getFactory().getName());
@@ -250,7 +249,7 @@
    * BufferAggregator and VectorAggregator. Private, since it doesn't escape this class and the
    * only two implementations are private static classes below.
    */
-  private interface Adapter extends Closeable
+  private interface Adapter
   {
     void init(ByteBuffer buf, int position);
 
@@ -259,8 +258,7 @@
 
     void relocate(int oldPosition, int newPosition, ByteBuffer oldBuffer, ByteBuffer newBuffer);
 
-    @Override
-    void close();
+    void reset();
 
     AggregatorFactory getFactory();
 
@@ -293,7 +291,7 @@
     }
 
     @Override
-    public void close()
+    public void reset()
     {
       aggregator.close();
     }
@@ -352,7 +350,7 @@
     }
 
     @Override
-    public void close()
+    public void reset()
     {
       aggregator.close();
     }
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
index e9fdbea..20d1349 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/BufferAggregator.java
@@ -158,7 +158,11 @@
   }
 
   /**
-   * Release any resources used by the aggregator
+   * Release any resources used by the aggregator. The aggregator may be reused after this call, by calling
+   * {@link #init(ByteBuffer, int)} followed by other methods as normal.
+   *
+   * This call would be more properly named "reset", but we use the name "close" to improve compatibility with
+   * existing aggregator implementations in extensions.
    */
   void close();
 
diff --git a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java
index befff12..a3e506e 100644
--- a/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java
+++ b/processing/src/main/java/org/apache/druid/query/aggregation/VectorAggregator.java
@@ -83,7 +83,11 @@
   }
 
   /**
-   * Release any resources used by the aggregator.
+   * Release any resources used by the aggregator. The aggregator may be reused after this call, by calling
+   * {@link #init(ByteBuffer, int)} followed by other methods as normal.
+   *
+   * This call would be more properly named "reset", but we use the name "close" to improve compatibility with
+   * existing aggregator implementations in extensions.
    */
   void close();
 }
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
index f3bc195..70cf583 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/AbstractBufferHashGrouper.java
@@ -170,7 +170,7 @@
   public void close()
   {
     keySerde.reset();
-    aggregators.close();
+    aggregators.reset();
   }
 
   /**
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
index 0fcb4dd..616ac19 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferArrayGrouper.java
@@ -269,6 +269,7 @@
   {
     // Clear the entire usedFlagBuffer
     usedFlagMemory.clear();
+    aggregators.reset();
   }
 
   @Override
@@ -280,7 +281,7 @@
   @Override
   public void close()
   {
-    aggregators.close();
+    aggregators.reset();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
index 167b322..c4d0469 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/BufferHashGrouper.java
@@ -158,6 +158,7 @@
     offsetList.reset();
     hashTable.reset();
     keySerde.reset();
+    aggregators.reset();
   }
 
   @Override
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
index ad12d05..e5c2801 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouper.java
@@ -205,6 +205,7 @@
     }
 
     this.hashTable = createTable(buffer, tableStart, numBuckets);
+    this.aggregators.reset();
   }
 
   @Override
@@ -256,7 +257,7 @@
   @Override
   public void close()
   {
-    aggregators.close();
+    aggregators.reset();
   }
 
   @VisibleForTesting
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
index 756a822..90a0e1e 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/LimitedBufferHashGrouper.java
@@ -185,6 +185,7 @@
     hashTable.reset();
     keySerde.reset();
     offsetHeap.reset();
+    aggregators.reset();
     heapIndexUpdater.setHashTableBuffer(hashTable.getTableBuffer());
     hasIterated = false;
     offsetHeapIterableSize = 0;
diff --git a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
index 7ae290d..c5e83b8 100644
--- a/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/timeseries/TimeseriesQueryEngine.java
@@ -164,9 +164,9 @@
       }
 
       final VectorColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();
-      final AggregatorAdapters aggregators = closer.register(
-          AggregatorAdapters.factorizeVector(columnSelectorFactory, query.getAggregatorSpecs())
-      );
+      final AggregatorAdapters aggregators =
+          AggregatorAdapters.factorizeVector(columnSelectorFactory, query.getAggregatorSpecs());
+      closer.register(aggregators::reset);
 
       final ResourceHolder<ByteBuffer> bufferHolder = closer.register(bufferPool.take());
 
diff --git a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
index 843d248..f34464a 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
@@ -120,7 +120,7 @@
 
       updateResults(params, theDimValSelector, aggregatesStore, resultBuilder);
 
-      closeAggregators(aggregatesStore);
+      resetAggregators(aggregatesStore);
 
       numProcessed += numToProcess;
       params.getCursor().reset();
@@ -151,7 +151,7 @@
     }
     long processedRows = scanAndAggregate(params, null, aggregatesStore);
     updateResults(params, null, aggregatesStore, resultBuilder);
-    closeAggregators(aggregatesStore);
+    resetAggregators(aggregatesStore);
     params.getCursor().reset();
     if (queryMetrics != null) {
       queryMetrics.addProcessedRows(processedRows);
@@ -199,7 +199,7 @@
       TopNResultBuilder resultBuilder
   );
 
-  protected abstract void closeAggregators(
+  protected abstract void resetAggregators(
       DimValAggregateStore dimValAggregateStore
   );
 
diff --git a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
index 14f3b72..ba5fbf2 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/HeapBasedTopNAlgorithm.java
@@ -112,7 +112,7 @@
   }
 
   @Override
-  protected void closeAggregators(TopNColumnAggregatesProcessor processor)
+  protected void resetAggregators(TopNColumnAggregatesProcessor processor)
   {
     processor.closeAggregators();
   }
diff --git a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
index 6ddda5e..d0c0fb0 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
@@ -768,7 +768,7 @@
   }
 
   @Override
-  protected void closeAggregators(BufferAggregator[] bufferAggregators)
+  protected void resetAggregators(BufferAggregator[] bufferAggregators)
   {
     for (BufferAggregator agg : bufferAggregators) {
       agg.close();
diff --git a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
index 70e01e4..3b60bb6 100644
--- a/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
+++ b/processing/src/main/java/org/apache/druid/query/topn/TimeExtractionTopNAlgorithm.java
@@ -135,7 +135,7 @@
   }
 
   @Override
-  protected void closeAggregators(Map<Object, Aggregator[]> stringMap)
+  protected void resetAggregators(Map<Object, Aggregator[]> stringMap)
   {
     for (Aggregator[] aggregators : stringMap.values()) {
       for (Aggregator agg : aggregators) {
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java
index d5a863a..fd0314a 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/HashVectorGrouperTest.java
@@ -45,7 +45,7 @@
     );
     grouper.initVectorized(512);
     grouper.close();
-    Mockito.verify(aggregatorAdapters, Mockito.times(1)).close();
+    Mockito.verify(aggregatorAdapters, Mockito.times(2)).reset();
   }
 
   @Test