DRILL-8416: Memory leak when the async Parquet reader skips empty pages (#2784)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
index d882504..a38c34d 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
@@ -70,7 +71,7 @@
* invariant here is that there is space for at least one more page in the queue before the Future read task
* is submitted to the pool). This sequence is important. Not doing so can lead to deadlocks - producer
* threads may block on putting data into the queue which is full while the consumer threads might be
- * blocked trying to read from a queue that has no data.
+ * blocked trying to read from a queue that has no /data.
* The first request to the page reader can be either to load a dictionary page or a data page; this leads
* to the rather odd looking code in the constructor since the parent PageReader calls
* loadDictionaryIfExists in the constructor.
@@ -305,6 +306,7 @@
pageHeader.compressed_page_size
);
skip(pageHeader.compressed_page_size);
+ Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release);
return;
}
@@ -325,6 +327,7 @@
default:
logger.warn("skipping page of type {} of size {}", pageHeader.getType(), pageHeader.compressed_page_size);
skip(pageHeader.compressed_page_size);
+ Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
index 780aa6e..80602ab 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestEmptyParquet.java
@@ -416,10 +416,9 @@
}
/**
- * Test a Parquet file containing a zero-byte dictionary page, c.f.
- * DRILL-8023.
+ * Test a Parquet file containing a zero-byte dictionary page.
*/
- @Test
+ @Test // DRILL-8023
public void testEmptyDictPage() throws Exception {
try {
client.alterSession(ExecConstants.PARQUET_NEW_RECORD_READER, false);
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
index b5c1924..a4c518a 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet2/TestDrillParquetReader.java
@@ -1358,4 +1358,27 @@
testRunAndPrint(UserBitShared.QueryType.SQL, "select * from cp.`parquet2/allTypes.parquet`");
}
+ @Test // DRILL-8416
+ public void testEmptyDictPages() throws Exception {
+ String query = "select " +
+ "`name`, `type`, `begin`, `end` " +
+ "from cp.`parquet/empty_dict_pages.parquet` t";
+ String[] columns = {"`name`", "`type`", "`begin`", "`end`"};
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns(columns)
+ .baselineValues("TP_001", "TP", null, null)
+ .baselineValues("TP_002", "TP", null, null)
+ .baselineValues("TP_003", "TP", null, null)
+ .baselineValues("TP_004", "TP", null, null)
+ .baselineValues("TP_005", "TP", null, null)
+ .baselineValues("TP_006", "TP", null, null)
+ .baselineValues("TP_007", "TP", null, null)
+ .baselineValues("TP_008", "TP", null, null)
+ .baselineValues("TP_009", "TP", null, null)
+ .baselineValues("TP_010", "TP", null, null)
+ .build()
+ .run();
+ }
}
diff --git a/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet b/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet
new file mode 100644
index 0000000..9a94f9a
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/empty_dict_pages.parquet
Binary files differ