SAMZA-2479: Add configurable default for min compaction lag ms (#1305)

diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 52047f9..b588e64 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1586,6 +1586,26 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="store-default-changelog-min-compaction-lag-ms">stores.default.changelog.min.compaction.lag.ms</td>
+                    <td class="default">14400000</td>
+                    <td class="description">
+                        This property defines the default minimum period that must pass before a changelog message can be compacted.
+                        Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be.
+                        This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.
+                    </td>
+                </tr>
+
+                <tr>
+                    <td class="property" id="store-changelog-min-compaction-lag-ms">stores.<span class="store">store-name</span>.changelog.min.compaction.lag.ms</td>
+                    <td class="default">stores.default.changelog.min.compaction.lag.ms</td>
+                    <td class="description">
+                        This property defines the minimum period that must pass before a message in the store's changelog can be compacted.
+                        Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be.
+                        This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="regex-rewriter">
                         Consuming all Kafka topics matching a regular expression<br>
                         <span class="subtitle">
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index 1a2e04b..baf1ea8 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -264,6 +264,8 @@
 ##### <a name="advanced-storage-configurations"></a>[4.1 Advanced Storage Configurations](#advanced-storage-configurations)
 |Name|Default|Description|
 |--- |--- |--- |
+|stores.default.changelog.<br>min.compaction.lag.ms|14400000|This property defines the default minimum period that must pass before a changelog message can be compacted. Be mindful that the larger this value, the larger in size changelog topics will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.|
+|stores.**_store-name_**.changelog.<br>min.compaction.lag.ms|stores.default.changelog.<br>min.compaction.lag.ms|This property defines the minimum period that must pass before a message in the store's changelog can be compacted. Be mindful that the larger this value, the larger in size the changelog topic will be in Kafka due to un-compacted data and the longer your application's time to restore from changelog will be. This may impact your application's startup time if host affinity is not enabled. For changelog topics which have "compact,delete" cleanup policy, this value should be < retention.ms.|
 |stores.default.changelog.<br>replication.factor|2|This property defines the default number of replicas to use for the change log stream.|
 |stores.**_store-name_**.changelog.<br>replication.factor|stores.default.changelog.<br>replication.factor|The property defines the number of replicas to use for the change log stream.|
 |stores.**_store-name_**.changelog.<br>kafka.topic-level-property| |The property allows you to specify topic level settings for the changelog topic to be created. For e.g., you can specify the clean up policy as "stores.mystore.changelog.cleanup.policy=delete". Please refer to the [Kafka documentation](http://kafka.apache.org/documentation.html#configuration) for more topic level configurations.|
diff --git a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index b5687c8..a8b8702 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -166,6 +166,17 @@
   }
 
   /**
+   * Gets the configured default for stores' changelog min.compaction.lag.ms, or if not defined uses the default
+   * value defined in this class.
+   *
+   * @return the default changelog min.compaction.lag.ms
+   */
+  private long getDefaultChangelogMinCompactionLagMs() {
+    String defaultMinCompactLagConfigName = STORE_PREFIX + "default.changelog." + MIN_COMPACTION_LAG_MS;
+    return getLong(defaultMinCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
+  }
+
+  /**
    * Gets the side inputs for the store. A store can have multiple side input streams which can be
    * provided as a comma separated list.
    *
@@ -226,7 +237,7 @@
     checkArgument(get("stores." + storeName + ".changelog.kafka." + MIN_COMPACTION_LAG_MS) == null,
         "Use " + minCompactLagConfigName + " to set kafka min.compaction.lag.ms property.");
 
-    return getLong(minCompactLagConfigName, DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS);
+    return getLong(minCompactLagConfigName, getDefaultChangelogMinCompactionLagMs());
   }
 
   /**
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index baecf99..88fbbe0 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -20,7 +20,9 @@
 package org.apache.samza.config;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -313,13 +315,18 @@
   @Test
   public void testGetChangelogMinCompactionLagMs() {
     // empty config, return default lag ms
+    Map<String, String> configMap = new HashMap<>();
     assertEquals(DEFAULT_CHANGELOG_MIN_COMPACTION_LAG_MS,
-        new StorageConfig(new MapConfig()).getChangelogMinCompactionLagMs(STORE_NAME0));
+        new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
 
-    long lagOverride = TimeUnit.HOURS.toMillis(6);
-    StorageConfig storageConfig = new StorageConfig(
-        new MapConfig(ImmutableMap.of(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0),
-            String.valueOf(lagOverride))));
-    assertEquals(lagOverride, storageConfig.getChangelogMinCompactionLagMs(STORE_NAME0));
+    // override with configured default
+    long defaultLagOverride = TimeUnit.HOURS.toMillis(8);
+    configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, "default"), String.valueOf(defaultLagOverride));
+    assertEquals(defaultLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
+
+    // override for specific store
+    long storeSpecificLagOverride = TimeUnit.HOURS.toMillis(6);
+    configMap.put(String.format(CHANGELOG_MIN_COMPACTION_LAG_MS, STORE_NAME0), String.valueOf(storeSpecificLagOverride));
+    assertEquals(storeSpecificLagOverride, new StorageConfig(new MapConfig(configMap)).getChangelogMinCompactionLagMs(STORE_NAME0));
   }
 }