PARQUET-1724: Use ConcurrentHashMap for Cache in DictionaryPageReader (#712)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
index 2be7ffe..ebc9c84 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DictionaryPageReader.java
@@ -31,7 +31,10 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
@@ -48,12 +51,23 @@
private final ParquetFileReader reader;
private final Map<String, ColumnChunkMetaData> columns;
- private final Map<String, DictionaryPage> cache = new HashMap<String, DictionaryPage>();
+ private final Map<String, Optional<DictionaryPage>> dictionaryPageCache;
private ColumnChunkPageReadStore rowGroup = null;
+ /**
+ * Instantiate a new DictionaryPageReader.
+ *
+ * @param reader The target ParquetFileReader
+ * @param block The target BlockMetaData
+ *
+ * @throws NullPointerException if {@code reader} or {@code block} is
+ * {@code null}
+ */
DictionaryPageReader(ParquetFileReader reader, BlockMetaData block) {
- this.reader = reader;
- this.columns = new HashMap<String, ColumnChunkMetaData>();
+ this.reader = Objects.requireNonNull(reader);
+ this.columns = new HashMap<>();
+ this.dictionaryPageCache = new ConcurrentHashMap<>();
+
for (ColumnChunkMetaData column : block.getColumns()) {
columns.put(column.getPath().toDotString(), column);
}
@@ -82,43 +96,28 @@
ColumnChunkMetaData column = columns.get(dotPath);
if (column == null) {
throw new ParquetDecodingException(
- "Cannot load dictionary, unknown column: " + dotPath);
+ "Failed to load dictionary, unknown column: " + dotPath);
}
- if (cache.containsKey(dotPath)) {
- return cache.get(dotPath);
- }
+ return dictionaryPageCache.computeIfAbsent(dotPath, key -> {
+ try {
+ final DictionaryPage dict =
+ hasDictionaryPage(column) ? reader.readDictionary(column) : null;
- try {
- synchronized (cache) {
- // check the cache again in case this thread waited on another reading the same page
- if (!cache.containsKey(dotPath)) {
- DictionaryPage dict = hasDictionaryPage(column) ? reader.readDictionary(column) : null;
- // copy the dictionary to ensure it can be reused if it is returned
- // more than once. this can happen when a DictionaryFilter has two or
- // more predicates for the same column.
- cache.put(dotPath, reusableCopy(dict));
- }
+ // Copy the dictionary to ensure it can be reused if it is returned
+ // more than once. This can happen when a DictionaryFilter has two or
+ // more predicates for the same column. Cache misses as well.
+ return (dict != null) ? Optional.of(reusableCopy(dict)) : Optional.empty();
+ } catch (IOException e) {
+ throw new ParquetDecodingException("Failed to read dictionary", e);
}
-
- return cache.get(dotPath);
- } catch (IOException e) {
- throw new ParquetDecodingException(
- "Failed to read dictionary", e);
- }
+ }).orElse(null);
}
- private static DictionaryPage reusableCopy(DictionaryPage dict) {
- if (dict == null) {
- return null;
- }
- try {
- return new DictionaryPage(
- BytesInput.from(dict.getBytes().toByteArray()),
- dict.getDictionarySize(), dict.getEncoding());
- } catch (IOException e) {
- throw new ParquetDecodingException("Cannot read dictionary", e);
- }
+ private static DictionaryPage reusableCopy(DictionaryPage dict)
+ throws IOException {
+ return new DictionaryPage(BytesInput.from(dict.getBytes().toByteArray()),
+ dict.getDictionarySize(), dict.getEncoding());
}
private boolean hasDictionaryPage(ColumnChunkMetaData column) {