PARQUET-2054: fix TCP leaking when calling ParquetFileWriter.appendFile (#913)

* use try-with-resource statement for ParquetFileReader to call close explicitly
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
index fbeebdf..d7aa82d 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/CheckParquet251Command.java
@@ -108,10 +108,8 @@
           }));
 
       // now check to see if the data is actually corrupt
-      ParquetFileReader reader = new ParquetFileReader(getConf(),
-          fakeMeta, path, footer.getBlocks(), columns);
-
-      try {
+      try (ParquetFileReader reader = new ParquetFileReader(getConf(),
+        fakeMeta, path, footer.getBlocks(), columns)) {
         PageStatsValidator validator = new PageStatsValidator();
         for (PageReadStore pages = reader.readNextRowGroup(); pages != null;
              pages = reader.readNextRowGroup()) {
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
index ca29dd0..988ab0f 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/SchemaCommand.java
@@ -119,9 +119,10 @@
 
       switch (format) {
         case PARQUET:
-          return new ParquetFileReader(
-              getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER)
-              .getFileMetaData().getSchema().toString();
+          try (ParquetFileReader reader = new ParquetFileReader(
+            getConf(), qualifiedPath(source), ParquetMetadataConverter.NO_FILTER)) {
+            return reader.getFileMetaData().getSchema().toString();
+          }
         default:
           throw new IllegalArgumentException(String.format(
               "Could not get a Parquet schema for format %s: %s", format, source));
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
index 20a694f..7a167ed 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowDictionaryCommand.java
@@ -64,56 +64,57 @@
 
     String source = targets.get(0);
 
-    ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source));
-    MessageType schema = reader.getFileMetaData().getSchema();
-    ColumnDescriptor descriptor = Util.descriptor(column, schema);
-    PrimitiveType type = Util.primitive(column, schema);
-    Preconditions.checkNotNull(type);
+    try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
+      MessageType schema = reader.getFileMetaData().getSchema();
+      ColumnDescriptor descriptor = Util.descriptor(column, schema);
+      PrimitiveType type = Util.primitive(column, schema);
+      Preconditions.checkNotNull(type);
 
-    DictionaryPageReadStore dictionaryReader;
-    int rowGroup = 0;
-    while ((dictionaryReader = reader.getNextDictionaryReader()) != null) {
-      DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor);
+      DictionaryPageReadStore dictionaryReader;
+      int rowGroup = 0;
+      while ((dictionaryReader = reader.getNextDictionaryReader()) != null) {
+        DictionaryPage page = dictionaryReader.readDictionaryPage(descriptor);
 
-      Dictionary dict = page.getEncoding().initDictionary(descriptor, page);
+        Dictionary dict = page.getEncoding().initDictionary(descriptor, page);
 
-      console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize());
-      for (int i = 0; i <= dict.getMaxId(); i += 1) {
-        switch(type.getPrimitiveTypeName()) {
-          case BINARY:
-            if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
+        console.info("\nRow group {} dictionary for \"{}\":", rowGroup, column, page.getCompressedSize());
+        for (int i = 0; i <= dict.getMaxId(); i += 1) {
+          switch(type.getPrimitiveTypeName()) {
+            case BINARY:
+              if (type.getLogicalTypeAnnotation() instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
+                console.info("{}: {}", String.format("%6d", i),
+                    Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70));
+              } else {
+                console.info("{}: {}", String.format("%6d", i),
+                    Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70));
+              }
+              break;
+            case INT32:
               console.info("{}: {}", String.format("%6d", i),
-                  Util.humanReadable(dict.decodeToBinary(i).toStringUsingUTF8(), 70));
-            } else {
+                dict.decodeToInt(i));
+              break;
+            case INT64:
               console.info("{}: {}", String.format("%6d", i),
-                  Util.humanReadable(dict.decodeToBinary(i).getBytesUnsafe(), 70));
-            }
-            break;
-          case INT32:
-            console.info("{}: {}", String.format("%6d", i),
-              dict.decodeToInt(i));
-            break;
-          case INT64:
-            console.info("{}: {}", String.format("%6d", i),
-                dict.decodeToLong(i));
-            break;
-          case FLOAT:
-            console.info("{}: {}", String.format("%6d", i),
-                dict.decodeToFloat(i));
-            break;
-          case DOUBLE:
-            console.info("{}: {}", String.format("%6d", i),
-                dict.decodeToDouble(i));
-            break;
-          default:
-            throw new IllegalArgumentException(
-                "Unknown dictionary type: " + type.getPrimitiveTypeName());
+                  dict.decodeToLong(i));
+              break;
+            case FLOAT:
+              console.info("{}: {}", String.format("%6d", i),
+                  dict.decodeToFloat(i));
+              break;
+            case DOUBLE:
+              console.info("{}: {}", String.format("%6d", i),
+                  dict.decodeToDouble(i));
+              break;
+            default:
+              throw new IllegalArgumentException(
+                  "Unknown dictionary type: " + type.getPrimitiveTypeName());
+          }
         }
+
+        reader.skipNextRowGroup();
+
+        rowGroup += 1;
       }
-
-      reader.skipNextRowGroup();
-
-      rowGroup += 1;
     }
 
     console.info("");
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
index 5832106..bf030ac 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/commands/ShowPagesCommand.java
@@ -75,57 +75,57 @@
         "Cannot process multiple Parquet files.");
 
     String source = targets.get(0);
-    ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source));
-
-    MessageType schema = reader.getFileMetaData().getSchema();
-    Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap();
-    if (this.columns == null || this.columns.isEmpty()) {
-      for (ColumnDescriptor descriptor : schema.getColumns()) {
-        columns.put(descriptor, primitive(schema, descriptor.getPath()));
-      }
-    } else {
-      for (String column : this.columns) {
-        columns.put(descriptor(column, schema), primitive(column, schema));
-      }
-    }
-
-    CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec();
-    // accumulate formatted lines to print by column
-    Map<String, List<String>> formatted = Maps.newLinkedHashMap();
-    PageFormatter formatter = new PageFormatter();
-    PageReadStore pageStore;
-    int rowGroupNum = 0;
-    while ((pageStore = reader.readNextRowGroup()) != null) {
-      for (ColumnDescriptor descriptor : columns.keySet()) {
-        List<String> lines = formatted.get(columnName(descriptor));
-        if (lines == null) {
-          lines = Lists.newArrayList();
-          formatted.put(columnName(descriptor), lines);
+    try (ParquetFileReader reader = ParquetFileReader.open(getConf(), qualifiedPath(source))) {
+      MessageType schema = reader.getFileMetaData().getSchema();
+      Map<ColumnDescriptor, PrimitiveType> columns = Maps.newLinkedHashMap();
+      if (this.columns == null || this.columns.isEmpty()) {
+        for (ColumnDescriptor descriptor : schema.getColumns()) {
+          columns.put(descriptor, primitive(schema, descriptor.getPath()));
         }
-
-        formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
-        PageReader pages = pageStore.getPageReader(descriptor);
-
-        DictionaryPage dict = pages.readDictionaryPage();
-        if (dict != null) {
-          lines.add(formatter.format(dict));
-        }
-        DataPage page;
-        while ((page = pages.readPage()) != null) {
-          lines.add(formatter.format(page));
+      } else {
+        for (String column : this.columns) {
+          columns.put(descriptor(column, schema), primitive(column, schema));
         }
       }
-      rowGroupNum += 1;
-    }
 
-    // TODO: Show total column size and overall size per value in the column summary line
-    for (String columnName : formatted.keySet()) {
-      console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-')));
-      console.info(formatter.getHeader());
-      for (String line : formatted.get(columnName)) {
-        console.info(line);
+      CompressionCodecName codec = reader.getRowGroups().get(0).getColumns().get(0).getCodec();
+      // accumulate formatted lines to print by column
+      Map<String, List<String>> formatted = Maps.newLinkedHashMap();
+      PageFormatter formatter = new PageFormatter();
+      PageReadStore pageStore;
+      int rowGroupNum = 0;
+      while ((pageStore = reader.readNextRowGroup()) != null) {
+        for (ColumnDescriptor descriptor : columns.keySet()) {
+          List<String> lines = formatted.get(columnName(descriptor));
+          if (lines == null) {
+            lines = Lists.newArrayList();
+            formatted.put(columnName(descriptor), lines);
+          }
+
+          formatter.setContext(rowGroupNum, columns.get(descriptor), codec);
+          PageReader pages = pageStore.getPageReader(descriptor);
+
+          DictionaryPage dict = pages.readDictionaryPage();
+          if (dict != null) {
+            lines.add(formatter.format(dict));
+          }
+          DataPage page;
+          while ((page = pages.readPage()) != null) {
+            lines.add(formatter.format(page));
+          }
+        }
+        rowGroupNum += 1;
       }
-      console.info("");
+
+      // TODO: Show total column size and overall size per value in the column summary line
+      for (String columnName : formatted.keySet()) {
+        console.info(String.format("\nColumn: %s\n%s", columnName, new TextStringBuilder(80).appendPadding(80, '-')));
+        console.info(formatter.getHeader());
+        for (String line : formatted.get(columnName)) {
+          console.info(line);
+        }
+        console.info("");
+      }
     }
 
     return 0;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index a246a52..afbdf76 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -887,7 +887,9 @@
    */
   @Deprecated
   public void appendFile(Configuration conf, Path file) throws IOException {
-    ParquetFileReader.open(conf, file).appendTo(this);
+    try (ParquetFileReader reader = ParquetFileReader.open(conf, file)) {
+      reader.appendTo(this);
+    }
   }
 
   public void appendFile(InputFile file) throws IOException {
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 73ef70e..5b8c5ed 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -204,35 +204,37 @@
     assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings());
 
     { // read first block of col #1
-      ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
-          Arrays.asList(rowGroup), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
-      PageReadStore pages = r.readNextRowGroup();
-      assertEquals(3, pages.getRowCount());
-      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
-      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
-      assertNull(r.readNextRowGroup());
+      try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+          Arrays.asList(rowGroup), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) {
+        PageReadStore pages = r.readNextRowGroup();
+        assertEquals(3, pages.getRowCount());
+        validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+        validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+        assertNull(r.readNextRowGroup());
+      }
     }
 
     { // read all blocks of col #1 and #2
 
-      ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
-          readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+      try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+          readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) {
 
-      PageReadStore pages = r.readNextRowGroup();
-      assertEquals(3, pages.getRowCount());
-      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
-      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
-      validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
-      validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
-      validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
+        PageReadStore pages = r.readNextRowGroup();
+        assertEquals(3, pages.getRowCount());
+        validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+        validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+        validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
+        validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
+        validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
 
-      pages = r.readNextRowGroup();
-      assertEquals(4, pages.getRowCount());
+        pages = r.readNextRowGroup();
+        assertEquals(4, pages.getRowCount());
 
-      validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
-      validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
+        validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
+        validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
 
-      assertNull(r.readNextRowGroup());
+        assertNull(r.readNextRowGroup());
+      }
     }
     PrintFooter.main(new String[] {path.toString()});
   }
@@ -281,12 +283,14 @@
     w.endBlock();
     w.end(new HashMap<>());
     ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
-    ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
-      Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)));
-    BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
-    BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0));
-    assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello"))));
-    assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world"))));
+
+    try (ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+      Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath)))) {
+      BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0));
+      BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0));
+      assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello"))));
+      assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world"))));
+    }
   }
 
   @Test
@@ -340,16 +344,16 @@
     expectedEncoding.add(PLAIN);
     assertEquals(expectedEncoding, readFooter.getBlocks().get(0).getColumns().get(0).getEncodings());
 
-    ParquetFileReader reader = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
-      readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
-
-    PageReadStore pages = reader.readNextRowGroup();
-    assertEquals(14, pages.getRowCount());
-    validateV2Page(SCHEMA, pages, PATH1, 3, 4, 1, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12);
-    validateV2Page(SCHEMA, pages, PATH1, 3, 3, 0, repLevels.toByteArray(), defLevels.toByteArray(),data.toByteArray(), 12);
-    validateV2Page(SCHEMA, pages, PATH2, 3, 5, 2, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
-    validateV2Page(SCHEMA, pages, PATH2, 2, 2, 0, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
-    assertNull(reader.readNextRowGroup());
+    try (ParquetFileReader reader = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
+      readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) {
+      PageReadStore pages = reader.readNextRowGroup();
+      assertEquals(14, pages.getRowCount());
+      validateV2Page(SCHEMA, pages, PATH1, 3, 4, 1, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12);
+      validateV2Page(SCHEMA, pages, PATH1, 3, 3, 0, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12);
+      validateV2Page(SCHEMA, pages, PATH2, 3, 5, 2, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
+      validateV2Page(SCHEMA, pages, PATH2, 2, 2, 0, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12);
+      assertNull(reader.readNextRowGroup());
+    }
   }
 
   @Test
@@ -426,35 +430,37 @@
         120, readFooter.getBlocks().get(1).getStartingPos());
 
     { // read first block of col #1
-      ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
-          Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
-      PageReadStore pages = r.readNextRowGroup();
-      assertEquals(3, pages.getRowCount());
-      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
-      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
-      assertNull(r.readNextRowGroup());
+      try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+          Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) {
+        PageReadStore pages = r.readNextRowGroup();
+        assertEquals(3, pages.getRowCount());
+        validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+        validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+        assertNull(r.readNextRowGroup());
+      }
     }
 
     { // read all blocks of col #1 and #2
 
-      ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
-          readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+      try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+          readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) {
 
-      PageReadStore pages = r.readNextRowGroup();
-      assertEquals(3, pages.getRowCount());
-      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
-      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
-      validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
-      validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
-      validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
+        PageReadStore pages = r.readNextRowGroup();
+        assertEquals(3, pages.getRowCount());
+        validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+        validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+        validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
+        validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
+        validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
 
-      pages = r.readNextRowGroup();
-      assertEquals(4, pages.getRowCount());
+        pages = r.readNextRowGroup();
+        assertEquals(4, pages.getRowCount());
 
-      validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
-      validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
+        validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
+        validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
 
-      assertNull(r.readNextRowGroup());
+        assertNull(r.readNextRowGroup());
+      }
     }
     PrintFooter.main(new String[] {path.toString()});
   }
@@ -533,35 +539,36 @@
         109, readFooter.getBlocks().get(1).getStartingPos());
 
     { // read first block of col #1
-      ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
-          Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
-      PageReadStore pages = r.readNextRowGroup();
-      assertEquals(3, pages.getRowCount());
-      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
-      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
-      assertNull(r.readNextRowGroup());
+      try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+          Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)))) {
+        PageReadStore pages = r.readNextRowGroup();
+        assertEquals(3, pages.getRowCount());
+        validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+        validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+        assertNull(r.readNextRowGroup());
+      }
     }
 
     { // read all blocks of col #1 and #2
 
-      ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
-          readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)));
+      try (ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path,
+          readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2)))) {
+        PageReadStore pages = r.readNextRowGroup();
+        assertEquals(3, pages.getRowCount());
+        validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
+        validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
+        validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
+        validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
+        validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
 
-      PageReadStore pages = r.readNextRowGroup();
-      assertEquals(3, pages.getRowCount());
-      validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));
-      validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1));
-      validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2));
-      validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2));
-      validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2));
+        pages = r.readNextRowGroup();
+        assertEquals(4, pages.getRowCount());
 
-      pages = r.readNextRowGroup();
-      assertEquals(4, pages.getRowCount());
+        validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
+        validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
 
-      validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3));
-      validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4));
-
-      assertNull(r.readNextRowGroup());
+        assertNull(r.readNextRowGroup());
+      }
     }
     PrintFooter.main(new String[] {path.toString()});
   }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 9e9b735..b2ae72a 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -270,14 +270,15 @@
       }
     }
 
-    ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()));
-    BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0);
-    BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData)
-      .readBloomFilter(blockMetaData.getColumns().get(0));
+    try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, new Configuration()))) {
+      BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0);
+      BloomFilter bloomFilter = reader.getBloomFilterDataReader(blockMetaData)
+        .readBloomFilter(blockMetaData.getColumns().get(0));
 
-    for (String name: testNames) {
-      assertTrue(bloomFilter.findHash(
-        LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
+      for (String name : testNames) {
+        assertTrue(bloomFilter.findHash(
+          LongHashFunction.xx(0).hashBytes(Binary.fromString(name).toByteBuffer())));
+      }
     }
   }
 
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java
index 69e11c1..fdb7c86 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestReadWriteEncodingStats.java
@@ -93,29 +93,30 @@
     writeData(writer);
     writer.close();
 
-    ParquetFileReader reader = ParquetFileReader.open(CONF, path);
-    assertEquals("Should have one row group", 1, reader.getRowGroups().size());
-    BlockMetaData rowGroup = reader.getRowGroups().get(0);
+    try (ParquetFileReader reader = ParquetFileReader.open(CONF, path)) {
+      assertEquals("Should have one row group", 1, reader.getRowGroups().size());
+      BlockMetaData rowGroup = reader.getRowGroups().get(0);
 
-    ColumnChunkMetaData dictColumn = rowGroup.getColumns().get(0);
-    EncodingStats dictStats = dictColumn.getEncodingStats();
-    assertNotNull("Dict column should have non-null encoding stats", dictStats);
-    assertTrue("Dict column should have a dict page", dictStats.hasDictionaryPages());
-    assertTrue("Dict column should have dict-encoded pages", dictStats.hasDictionaryEncodedPages());
-    assertFalse("Dict column should not have non-dict pages", dictStats.hasNonDictionaryEncodedPages());
+      ColumnChunkMetaData dictColumn = rowGroup.getColumns().get(0);
+      EncodingStats dictStats = dictColumn.getEncodingStats();
+      assertNotNull("Dict column should have non-null encoding stats", dictStats);
+      assertTrue("Dict column should have a dict page", dictStats.hasDictionaryPages());
+      assertTrue("Dict column should have dict-encoded pages", dictStats.hasDictionaryEncodedPages());
+      assertFalse("Dict column should not have non-dict pages", dictStats.hasNonDictionaryEncodedPages());
 
-    ColumnChunkMetaData plainColumn = rowGroup.getColumns().get(1);
-    EncodingStats plainStats = plainColumn.getEncodingStats();
-    assertNotNull("Plain column should have non-null encoding stats", plainStats);
-    assertFalse("Plain column should not have a dict page", plainStats.hasDictionaryPages());
-    assertFalse("Plain column should not have dict-encoded pages", plainStats.hasDictionaryEncodedPages());
-    assertTrue("Plain column should have non-dict pages", plainStats.hasNonDictionaryEncodedPages());
+      ColumnChunkMetaData plainColumn = rowGroup.getColumns().get(1);
+      EncodingStats plainStats = plainColumn.getEncodingStats();
+      assertNotNull("Plain column should have non-null encoding stats", plainStats);
+      assertFalse("Plain column should not have a dict page", plainStats.hasDictionaryPages());
+      assertFalse("Plain column should not have dict-encoded pages", plainStats.hasDictionaryEncodedPages());
+      assertTrue("Plain column should have non-dict pages", plainStats.hasNonDictionaryEncodedPages());
 
-    ColumnChunkMetaData fallbackColumn = rowGroup.getColumns().get(2);
-    EncodingStats fallbackStats = fallbackColumn.getEncodingStats();
-    assertNotNull("Fallback column should have non-null encoding stats", fallbackStats);
-    assertTrue("Fallback column should have a dict page", fallbackStats.hasDictionaryPages());
-    assertTrue("Fallback column should have dict-encoded pages", fallbackStats.hasDictionaryEncodedPages());
-    assertTrue("Fallback column should have non-dict pages", fallbackStats.hasNonDictionaryEncodedPages());
+      ColumnChunkMetaData fallbackColumn = rowGroup.getColumns().get(2);
+      EncodingStats fallbackStats = fallbackColumn.getEncodingStats();
+      assertNotNull("Fallback column should have non-null encoding stats", fallbackStats);
+      assertTrue("Fallback column should have a dict page", fallbackStats.hasDictionaryPages());
+      assertTrue("Fallback column should have dict-encoded pages", fallbackStats.hasDictionaryEncodedPages());
+      assertTrue("Fallback column should have non-dict pages", fallbackStats.hasNonDictionaryEncodedPages());
+    }
   }
 }