[SAMZA-2655] Expose RocksDB maxOpenFiles setting as Samza parameter (#1500)

Expose RocksDB maxOpenFiles setting as Samza parameter
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index b588e64..18a782f 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1912,6 +1912,22 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="stores-rocksdb-max-open-files">stores.<span class="store">store-name</span>.<br>rocksdb.max.open.files</td>
+                    <td class="default">-1</td>
+                    <td class="description">
+                        Limits the number of open files that RocksDB can have open at one time.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="stores-rocksdb-max-file-opening-threads">stores.<span class="store">store-name</span>.<br>rocksdb.max.file.opening.threads</td>
+                    <td class="default">16</td>
+                    <td class="description">
+                        Sets the number of threads used to open RocksDB files.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" id="stores-rocksdb-metrics">stores.<span class="store">store-name</span>.<br>rocksdb.metrics.list</td>
                     <td class="default"></td>
                     <td class="description">
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
index b7c47e1..5af2ef8 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbOptionsHelper.java
@@ -44,6 +44,8 @@
   private static final String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
   private static final String ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS = "rocksdb.delete.obsolete.files.period.micros";
   private static final String ROCKSDB_MAX_MANIFEST_FILE_SIZE = "rocksdb.max.manifest.file.size";
+  private static final String ROCKSDB_MAX_OPEN_FILES = "rocksdb.max.open.files";
+  private static final String ROCKSDB_MAX_FILE_OPENING_THREADS = "rocksdb.max.file.opening.threads";
 
   public static Options options(Config storeConfig, int numTasksForContainer, File storeDir, StorageEngineFactory.StoreMode storeMode) {
     Options options = new Options();
@@ -109,6 +111,8 @@
     options.setMaxLogFileSize(storeConfig.getLong(ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, 64 * 1024 * 1024L));
     options.setKeepLogFileNum(storeConfig.getLong(ROCKSDB_KEEP_LOG_FILE_NUM, 2));
     options.setDeleteObsoleteFilesPeriodMicros(storeConfig.getLong(ROCKSDB_DELETE_OBSOLETE_FILES_PERIOD_MICROS, 21600000000L));
+    options.setMaxOpenFiles(storeConfig.getInt(ROCKSDB_MAX_OPEN_FILES, -1));
+    options.setMaxFileOpeningThreads(storeConfig.getInt(ROCKSDB_MAX_FILE_OPENING_THREADS, 16));
     // The default for rocksdb is 18446744073709551615, which is larger than java Long.MAX_VALUE. Hence setting it only if it's passed.
     if (storeConfig.containsKey(ROCKSDB_MAX_MANIFEST_FILE_SIZE)) {
       options.setMaxManifestFileSize(storeConfig.getLong(ROCKSDB_MAX_MANIFEST_FILE_SIZE));
diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
index 0044904..f9a1f24 100644
--- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/descriptors/RocksDbTableDescriptor.java
@@ -49,6 +49,9 @@
   static final public String ROCKSDB_NUM_WRITE_BUFFERS = "rocksdb.num.write.buffers";
   static final public String ROCKSDB_MAX_LOG_FILE_SIZE_BYTES = "rocksdb.max.log.file.size.bytes";
   static final public String ROCKSDB_KEEP_LOG_FILE_NUM = "rocksdb.keep.log.file.num";
+  static final public String ROCKSDB_MAX_OPEN_FILES = "rocksdb.max.open.files";
+  static final public String ROCKSDB_MAX_FILE_OPENING_THREADS = "rocksdb.max.file.opening.threads";
+
 
   private Integer writeBatchSize;
   private Integer objectCacheSize;
@@ -61,6 +64,8 @@
   private Integer numLogFilesToKeep;
   private String compressionType;
   private String compactionStyle;
+  private Integer maxOpenFiles;
+  private Integer maxFileOpeningThreads;
 
   /**
    * Constructs a table descriptor instance
@@ -273,6 +278,35 @@
     return this;
   }
 
+  /**
+   * Limits the number of open files that can be used by the DB.  You may need to increase this if your database
+   * has a large working set. Value -1 means files opened are always kept open.
+   * <p>
+   *  Default value is -1.
+   * <p>
+   * @param maxOpenFiles the number of open files that can be used by the DB.
+   * @return this table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withMaxOpenFiles(int maxOpenFiles) {
+    this.maxOpenFiles = maxOpenFiles;
+    return this;
+  }
+
+  /**
+   * Sets the number of threads used to open DB files.
+   * If max_open_files is -1, DB will open all files on DB::Open(). You can use this option to increase the number of
+   * threads used to open files.
+   * <p>
+   * Default is 16.
+   * <p>
+   * @param maxFileOpeningThreads The number of threads to use when opening DB files.
+   * @return the table descriptor instance
+   */
+  public RocksDbTableDescriptor<K, V> withMaxFileOpeningThreads(int maxFileOpeningThreads) {
+    this.maxFileOpeningThreads = maxFileOpeningThreads;
+    return this;
+  }
+
   @Override
   public String getProviderFactoryClassName() {
     return LocalTableProviderFactory.class.getName();
@@ -320,6 +354,12 @@
     if (numLogFilesToKeep != null) {
       addStoreConfig(ROCKSDB_KEEP_LOG_FILE_NUM, numLogFilesToKeep.toString(), tableConfig);
     }
+    if (maxOpenFiles != null) {
+      addStoreConfig(ROCKSDB_MAX_OPEN_FILES, maxOpenFiles.toString(), tableConfig);
+    }
+    if (maxFileOpeningThreads != null) {
+      addStoreConfig(ROCKSDB_MAX_FILE_OPENING_THREADS, maxFileOpeningThreads.toString(), tableConfig);
+    }
 
     return Collections.unmodifiableMap(tableConfig);
   }
diff --git a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
index c58d123..9fc9938 100644
--- a/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
+++ b/samza-kv-rocksdb/src/test/java/org/apache/samza/storage/kv/descriptors/TestRocksDbTableDescriptor.java
@@ -73,10 +73,12 @@
         .withTtl(7)
         .withWriteBatchSize(8)
         .withWriteBufferSize(9)
+        .withMaxOpenFiles(10)
+        .withMaxFileOpeningThreads(11)
         .withConfig("abc", "xyz")
         .toConfig(createJobConfig());
 
-    Assert.assertEquals(14, tableConfig.size());
+    Assert.assertEquals(16, tableConfig.size());
     assertEquals("1", RocksDbTableDescriptor.ROCKSDB_BLOCK_SIZE_BYTES, tableConfig);
     assertEquals("2", RocksDbTableDescriptor.CONTAINER_CACHE_SIZE_BYTES, tableConfig);
     assertEquals("3", RocksDbTableDescriptor.ROCKSDB_MAX_LOG_FILE_SIZE_BYTES, tableConfig);
@@ -86,6 +88,8 @@
     assertEquals("7", RocksDbTableDescriptor.ROCKSDB_TTL_MS, tableConfig);
     assertEquals("8", RocksDbTableDescriptor.WRITE_BATCH_SIZE, tableConfig);
     assertEquals("9", RocksDbTableDescriptor.CONTAINER_WRITE_BUFFER_SIZE_BYTES, tableConfig);
+    assertEquals("10", RocksDbTableDescriptor.ROCKSDB_MAX_OPEN_FILES, tableConfig);
+    assertEquals("11", RocksDbTableDescriptor.ROCKSDB_MAX_FILE_OPENING_THREADS, tableConfig);
     assertEquals("snappy", RocksDbTableDescriptor.ROCKSDB_COMPRESSION, tableConfig);
     assertEquals("fifo", RocksDbTableDescriptor.ROCKSDB_COMPACTION_STYLE, tableConfig);
     Assert.assertFalse(tableConfig.containsKey(String.format(StorageConfig.CHANGELOG_STREAM, TABLE_ID)));