DRILL-8478. HashPartition memory leak when OutOfMemoryException is encountered (#2874) (#2875)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 5c84557..0be1ea4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -147,6 +147,11 @@
try {
this.hashTable = baseHashTable.createAndSetupHashTable(null);
+ this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
+ tmpBatchesList = new ArrayList<>();
+ if (numPartitions > 1) {
+ allocateNewCurrentBatchAndHV();
+ }
} catch (ClassTransformationException e) {
throw UserException.unsupportedError(e)
.message("Code generation error - likely an error in the code.")
@@ -157,11 +162,11 @@
.build(logger);
} catch (SchemaChangeException sce) {
throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce);
- }
- this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
- tmpBatchesList = new ArrayList<>();
- if (numPartitions > 1) {
- allocateNewCurrentBatchAndHV();
+ } catch (OutOfMemoryException oom) {
+ close();
+ throw UserException.memoryError(oom)
+ .message("Failed to allocate hash partition.")
+ .build(logger);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
index 41217ca..42dbc1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
@@ -1312,7 +1312,9 @@
}
// clean (and deallocate) each partition, and delete its spill file
for (HashPartition partn : partitions) {
- partn.close();
+ if (partn != null) {
+ partn.close();
+ }
}
// delete any spill file left in unread spilled partitions