PARQUET-1724: Use ConcurrentHashMap for Cache in DictionaryPageReader (#712)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
index 2be7ffe..ebc9c84 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
@@ -31,7 +31,10 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
 import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
@@ -48,12 +51,23 @@
 
   private final ParquetFileReader reader;
   private final Map<String, ColumnChunkMetaData> columns;
-  private final Map<String, DictionaryPage> cache = new HashMap<String, DictionaryPage>();
+  private final Map<String, Optional<DictionaryPage>> dictionaryPageCache;
   private ColumnChunkPageReadStore rowGroup = null;
 
+  /**
+   * Instantiate a new DictionaryPageReader.
+   *
+   * @param reader The target ParquetFileReader
+   * @param block The target BlockMetaData
+   *
+   * @throws NullPointerException if {@code reader} or {@code block} is
+   *           {@code null}
+   */
   DictionaryPageReader(ParquetFileReader reader, BlockMetaData block) {
-    this.reader = reader;
-    this.columns = new HashMap<String, ColumnChunkMetaData>();
+    this.reader = Objects.requireNonNull(reader);
+    this.columns = new HashMap<>();
+    this.dictionaryPageCache = new ConcurrentHashMap<>();
+
     for (ColumnChunkMetaData column : block.getColumns()) {
       columns.put(column.getPath().toDotString(), column);
     }
@@ -82,43 +96,28 @@
     ColumnChunkMetaData column = columns.get(dotPath);
     if (column == null) {
       throw new ParquetDecodingException(
-          "Cannot load dictionary, unknown column: " + dotPath);
+          "Failed to load dictionary, unknown column: " + dotPath);
     }
 
-    if (cache.containsKey(dotPath)) {
-      return cache.get(dotPath);
-    }
+    return dictionaryPageCache.computeIfAbsent(dotPath, key -> {
+      try {
+        final DictionaryPage dict =
+            hasDictionaryPage(column) ? reader.readDictionary(column) : null;
 
-    try {
-      synchronized (cache) {
-        // check the cache again in case this thread waited on another reading the same page
-        if (!cache.containsKey(dotPath)) {
-          DictionaryPage dict = hasDictionaryPage(column) ? reader.readDictionary(column) : null;
-          // copy the dictionary to ensure it can be reused if it is returned
-          // more than once. this can happen when a DictionaryFilter has two or
-          // more predicates for the same column.
-          cache.put(dotPath, reusableCopy(dict));
-        }
+        // Copy the dictionary to ensure it can be reused if it is returned
+        // more than once. This can happen when a DictionaryFilter has two or
+        // more predicates for the same column. Cache misses as well.
+        return (dict != null) ? Optional.of(reusableCopy(dict)) : Optional.empty();
+      } catch (IOException e) {
+        throw new ParquetDecodingException("Failed to read dictionary", e);
       }
-
-      return cache.get(dotPath);
-    } catch (IOException e) {
-      throw new ParquetDecodingException(
-          "Failed to read dictionary", e);
-    }
+    }).orElse(null);
   }
 
-  private static DictionaryPage reusableCopy(DictionaryPage dict) {
-    if (dict == null) {
-      return null;
-    }
-    try {
-      return new DictionaryPage(
-          BytesInput.from(dict.getBytes().toByteArray()),
-          dict.getDictionarySize(), dict.getEncoding());
-    } catch (IOException e) {
-      throw new ParquetDecodingException("Cannot read dictionary", e);
-    }
+  private static DictionaryPage reusableCopy(DictionaryPage dict)
+      throws IOException {
+    return new DictionaryPage(BytesInput.from(dict.getBytes().toByteArray()),
+        dict.getDictionarySize(), dict.getEncoding());
   }
 
   private boolean hasDictionaryPage(ColumnChunkMetaData column) {