DRILL-8478. HashPartition memory leak when OutOfMemoryException is encountered (#2874) (#2875)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
index 5c84557..0be1ea4 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashPartition.java
@@ -147,6 +147,11 @@
 
     try {
       this.hashTable = baseHashTable.createAndSetupHashTable(null);
+      this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
+      tmpBatchesList = new ArrayList<>();
+      if (numPartitions > 1) {
+        allocateNewCurrentBatchAndHV();
+      }
     } catch (ClassTransformationException e) {
       throw UserException.unsupportedError(e)
         .message("Code generation error - likely an error in the code.")
@@ -157,11 +162,11 @@
         .build(logger);
     } catch (SchemaChangeException sce) {
       throw new IllegalStateException("Unexpected Schema Change while creating a hash table",sce);
-    }
-    this.hjHelper = semiJoin ? null : new HashJoinHelper(context, allocator);
-    tmpBatchesList = new ArrayList<>();
-    if (numPartitions > 1) {
-      allocateNewCurrentBatchAndHV();
+    } catch (OutOfMemoryException oom) {
+      close();
+      throw UserException.memoryError(oom)
+          .message("Failed to allocate hash partition.")
+          .build(logger);
     }
   }
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
index 41217ca..42dbc1c 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/AbstractHashBinaryRecordBatch.java
@@ -1312,7 +1312,9 @@
     }
     // clean (and deallocate) each partition, and delete its spill file
     for (HashPartition partn : partitions) {
-      partn.close();
+      if (partn != null) {
+        partn.close();
+      }
     }
 
     // delete any spill file left in unread spilled partitions