GEODE-10401: Configurable .drf recovery HashMap overflow threshold (#7828)

Configurable with the jvm parameter:

gemfire.disk.drfHashMapOverflowThreshold

Default value: 805306368

When configured threshold value is reached, then server will overflow to
the new hashmap during the recovery of .drf files. Warning: If you set
threshold parameter over 805306368, then uneeded delay will happen due
to bug in fastutil dependency.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
index 9dee1c1..b544ca0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskStoreImpl.java
@@ -181,12 +181,23 @@
       GeodeGlossary.GEMFIRE_PREFIX + "disk.recoverValuesSync";
 
   /**
+   * When configured threshold value is reached, then server will overflow to
+   * the new hashmap during the recovery of .drf files
+   */
+  public static final String DRF_HASHMAP_OVERFLOW_THRESHOLD_NAME =
+      GeodeGlossary.GEMFIRE_PREFIX + "disk.drfHashMapOverflowThreshold";
+
+  /**
    * Allows recovering values for LRU regions. By default values are not recovered for LRU regions
    * during recovery.
    */
   public static final String RECOVER_LRU_VALUES_PROPERTY_NAME =
       GeodeGlossary.GEMFIRE_PREFIX + "disk.recoverLruValues";
 
+  static final long DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT = 805306368;
+  static final long DRF_HASHMAP_OVERFLOW_THRESHOLD =
+      Long.getLong(DRF_HASHMAP_OVERFLOW_THRESHOLD_NAME, DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT);
+
   boolean RECOVER_VALUES = getBoolean(DiskStoreImpl.RECOVER_VALUE_PROPERTY_NAME, true);
 
   boolean RECOVER_VALUES_SYNC = getBoolean(DiskStoreImpl.RECOVER_VALUES_SYNC_PROPERTY_NAME, false);
@@ -3525,31 +3536,49 @@
       }
 
       try {
-        if (id > 0 && id <= 0x00000000FFFFFFFFL) {
-          currentInts.get().add((int) id);
+        if (shouldOverflow(id)) {
+          overflowToNewHashMap(id);
         } else {
-          currentLongs.get().add(id);
+          if (id > 0 && id <= 0x00000000FFFFFFFFL) {
+            this.currentInts.get().add((int) id);
+          } else {
+            this.currentLongs.get().add(id);
+          }
         }
       } catch (IllegalArgumentException illegalArgumentException) {
         // See GEODE-8029.
-        // Too many entries on the accumulated drf files, overflow and continue.
+        // Too many entries on the accumulated drf files, overflow next [Int|Long]OpenHashSet and
+        // continue.
+        overflowToNewHashMap(id);
+      }
+    }
+
+    boolean shouldOverflow(final long id) {
+      if (id > 0 && id <= 0x00000000FFFFFFFFL) {
+        return currentInts.get().size() == DRF_HASHMAP_OVERFLOW_THRESHOLD;
+      } else {
+        return currentLongs.get().size() == DRF_HASHMAP_OVERFLOW_THRESHOLD;
+      }
+    }
+
+    void overflowToNewHashMap(final long id) {
+      if (DRF_HASHMAP_OVERFLOW_THRESHOLD == DRF_HASHMAP_OVERFLOW_THRESHOLD_DEFAULT) {
         logger.warn(
             "There is a large number of deleted entries within the disk-store, please execute an offline compaction.");
+      }
 
-        // Overflow to the next [Int|Long]OpenHashSet and continue.
-        if (id > 0 && id <= 0x00000000FFFFFFFFL) {
-          IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID);
-          allInts.add(overflownHashSet);
-          currentInts.set(overflownHashSet);
+      if (id > 0 && id <= 0x00000000FFFFFFFFL) {
+        IntOpenHashSet overflownHashSet = new IntOpenHashSet((int) INVALID_ID);
+        allInts.add(overflownHashSet);
+        currentInts.set(overflownHashSet);
 
-          currentInts.get().add((int) id);
-        } else {
-          LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID);
-          allLongs.add(overflownHashSet);
-          currentLongs.set(overflownHashSet);
+        currentInts.get().add((int) id);
+      } else {
+        LongOpenHashSet overflownHashSet = new LongOpenHashSet((int) INVALID_ID);
+        allLongs.add(overflownHashSet);
+        currentLongs.set(overflownHashSet);
 
-          currentLongs.get().add(id);
-        }
+        currentLongs.get().add(id);
       }
     }
 
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetDrfHashSetThresholdTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetDrfHashSetThresholdTest.java
new file mode 100644
index 0000000..ff7e43e
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/OplogEntryIdSetDrfHashSetThresholdTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import org.junit.jupiter.api.Test;
+import org.junitpioneer.jupiter.SetSystemProperty;
+
+import org.apache.geode.internal.cache.DiskStoreImpl.OplogEntryIdSet;
+
+/**
+ * Tests DiskStoreImpl.OplogEntryIdSet
+ */
+public class OplogEntryIdSetDrfHashSetThresholdTest {
+  @Test
+  @SetSystemProperty(key = "gemfire.disk.drfHashMapOverflowThreshold", value = "10")
+  public void addMethodOverflowBasedOnDrfOverflowThresholdParameters() {
+
+    int testEntries = 41;
+    IntOpenHashSet intOpenHashSet = new IntOpenHashSet();
+    LongOpenHashSet longOpenHashSet = new LongOpenHashSet();
+
+    List<IntOpenHashSet> intOpenHashSets =
+        new ArrayList<>(Collections.singletonList(intOpenHashSet));
+    List<LongOpenHashSet> longOpenHashSets =
+        new ArrayList<>(Collections.singletonList(longOpenHashSet));
+
+    OplogEntryIdSet oplogEntryIdSet = new OplogEntryIdSet(intOpenHashSets, longOpenHashSets);
+    IntStream.range(1, testEntries).forEach(oplogEntryIdSet::add);
+    LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
+        .forEach(oplogEntryIdSet::add);
+
+    assertThat(intOpenHashSets).hasSize(4);
+    assertThat(longOpenHashSets).hasSize(4);
+
+    IntStream.range(1, testEntries).forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
+    LongStream.range(0x00000000FFFFFFFFL + 1, 0x00000000FFFFFFFFL + testEntries)
+        .forEach(i -> assertThat(oplogEntryIdSet.contains(i)).isTrue());
+
+  }
+}