PHOENIX-5291 Ensure that Phoenix coprocessor close all scanners.
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index f0ce5b2..72ee4a3 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -1158,7 +1158,7 @@
long rowCount = 0; // in case of async, we report 0 as number of rows updated
StatisticsCollectionRunTracker statsRunTracker =
StatisticsCollectionRunTracker.getInstance(config);
- boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet());
+ final boolean runUpdateStats = statsRunTracker.addUpdateStatsCommandRegion(region.getRegionInfo(),scan.getFamilyMap().keySet());
if (runUpdateStats) {
if (!async) {
rowCount = callable.call();
@@ -1187,8 +1187,11 @@
@Override
public void close() throws IOException {
- // No-op because we want to manage closing of the inner scanner ourselves.
- // This happens inside StatsCollectionCallable.
+ // If we ran/scheduled StatsCollectionCallable the delegate
+ // scanner is closed there. Otherwise close it here.
+ if (!runUpdateStats) {
+ super.close();
+ }
}
@Override
@@ -1425,6 +1428,14 @@
+ fullTableName);
Scan scan = new Scan();
scan.setMaxVersions();
+
+ // close the passed scanner since we are returning a brand-new one
+ try {
+ if (s != null) {
+ s.close();
+ }
+ } catch (IOException ignore) {}
+
return new StoreScanner(store, store.getScanInfo(), scan, scanners,
ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);