Update default maxSegmentsInNodeLoadingQueue (#11540)

* Update default maxSegmentsInNodeLoadingQueue

Update the default maxSegmentsInNodeLoadingQueue from 0 (unbounded) to 100.

An unbounded maxSegmentsInNodeLoadingQueue can cause cluster instability.
Since this is the default druid operators need to run into this instability
and then look through the docs to see that the recommended value for a large
cluster is 1000. This change makes it so the default will prevent clusters
from falling over as they grow over time.

* update tests

* codestyle
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 8caaf1c..09b336d 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -840,7 +840,7 @@
 |`killDataSourceWhitelist`|List of specific data sources for which kill tasks are sent if property `druid.coordinator.kill.on` is true. This can be a list of comma-separated data source names or a JSON array.|none|
 |`killAllDataSources`|Send kill tasks for ALL dataSources if property `druid.coordinator.kill.on` is true. If this is set to true then `killDataSourceWhitelist` must not be specified or be empty list.|false|
 |`killPendingSegmentsSkipList`|List of data sources for which pendingSegments are _NOT_ cleaned up if property `druid.coordinator.kill.pendingSegments.on` is true. This can be a list of comma-separated data sources or a JSON array.|none|
-|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 0 (loading queue is unbounded) |0|
+|`maxSegmentsInNodeLoadingQueue`|The maximum number of segments that could be queued for loading to any given server. This parameter could be used to speed up segments loading process, especially if there are "slow" nodes in the cluster (with low loading speed) or if too much segments scheduled to be replicated to some particular node (faster loading could be preferred to better segments distribution). Desired value depends on segments loading speed, acceptable replication time and number of nodes. Value 1000 could be a start point for a rather big cluster. Default value is 100. |100|
 |`decommissioningNodes`| List of historical servers to 'decommission'. Coordinator will not assign new segments to 'decommissioning' servers,  and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `decommissioningMaxPercentOfMaxSegmentsToMove`.|none|
 |`decommissioningMaxPercentOfMaxSegmentsToMove`|  The maximum number of segments that may be moved away from 'decommissioning' servers to non-decommissioning (that is, active) servers during one Coordinator run. This value is relative to the total maximum segment movements allowed during one run which is determined by `maxSegmentsToMove`. If `decommissioningMaxPercentOfMaxSegmentsToMove` is 0, segments will neither be moved from _or to_ 'decommissioning' servers, effectively putting them in a sort of "maintenance" mode that will not participate in balancing or assignment by load rules. Decommissioning can also become stalled if there are no available active servers to place the segments. By leveraging the maximum percent of decommissioning segment movements, an operator can prevent active servers from overload by prioritizing balancing, or decrease decommissioning time instead. The value should be between 0 and 100.|70|
 |`pauseCoordination`| Boolean flag for whether or not the coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` Interface. Such duties include: Segment balancing, Segment compaction, Emission of metrics controlled by the dynamic coordinator config `emitBalancingStats`, Submitting kill tasks for unused segments (if enabled), Logging of used segments in the cluster, Marking of newly unused or overshadowed segments, Matching and execution of load/drop rules for used segments, Unloading segments that are no longer marked as used from Historical servers. An example of when an admin may want to pause coordination would be if they are doing deep storage maintenance on HDFS Name Nodes with downtime and don't want the coordinator to be directing Historical Nodes to hit the Name Node with API requests until maintenance is done and the deep store is declared healthy for use again. |false|
diff --git a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
index d4bf3e8..29d8052 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/CoordinatorDynamicConfig.java
@@ -61,7 +61,9 @@
   private final int balancerComputeThreads;
   private final boolean emitBalancingStats;
 
-  /** If true {@link KillUnusedSegments} sends kill tasks for unused segments in all data sources. */
+  /**
+   * If true {@link KillUnusedSegments} sends kill tasks for unused segments in all data sources.
+   */
   private final boolean killUnusedSegmentsInAllDataSources;
 
   /**
@@ -74,7 +76,7 @@
   /**
    * Stale pending segments belonging to the data sources in this list are not killed by {@link
    * KillStalePendingSegments}. In other words, segments in these data sources are "protected".
-   *
+   * <p>
    * Pending segments are considered "stale" when their created_time is older than {@link
    * KillStalePendingSegments#KEEP_PENDING_SEGMENTS_OFFSET} from now.
    */
@@ -134,7 +136,7 @@
       // Keeping the legacy 'killPendingSegmentsSkipList' property name for backward compatibility. When the project is
       // updated to Jackson 2.9 it could be changed, see https://github.com/apache/druid/issues/7152
       @JsonProperty("killPendingSegmentsSkipList") Object dataSourcesToNotKillStalePendingSegmentsIn,
-      @JsonProperty("maxSegmentsInNodeLoadingQueue") int maxSegmentsInNodeLoadingQueue,
+      @JsonProperty("maxSegmentsInNodeLoadingQueue") @Nullable Integer maxSegmentsInNodeLoadingQueue,
       @JsonProperty("decommissioningNodes") Object decommissioningNodes,
       @JsonProperty("decommissioningMaxPercentOfMaxSegmentsToMove") int decommissioningMaxPercentOfMaxSegmentsToMove,
       @JsonProperty("pauseCoordination") boolean pauseCoordination,
@@ -149,11 +151,12 @@
     this.maxSegmentsToMove = maxSegmentsToMove;
 
     if (percentOfSegmentsToConsiderPerMove == null) {
-      log.debug("percentOfSegmentsToConsiderPerMove was null! This is likely because your metastore does not "
-               + "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value "
-               + "to the Druid default of %f. It is recommended that you re-submit your dynamic config with your "
-               + "desired value for percentOfSegmentsToConsideredPerMove",
-               Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
+      log.debug(
+          "percentOfSegmentsToConsiderPerMove was null! This is likely because your metastore does not "
+          + "reflect this configuration being added to Druid in a recent release. Druid is defaulting the value "
+          + "to the Druid default of %f. It is recommended that you re-submit your dynamic config with your "
+          + "desired value for percentOfSegmentsToConsideredPerMove",
+          Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE
       );
       percentOfSegmentsToConsiderPerMove = Builder.DEFAULT_PERCENT_OF_SEGMENTS_TO_CONSIDER_PER_MOVE;
     }
@@ -172,7 +175,9 @@
     this.specificDataSourcesToKillUnusedSegmentsIn = parseJsonStringOrArray(specificDataSourcesToKillUnusedSegmentsIn);
     this.dataSourcesToNotKillStalePendingSegmentsIn =
         parseJsonStringOrArray(dataSourcesToNotKillStalePendingSegmentsIn);
-    this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue;
+    this.maxSegmentsInNodeLoadingQueue = maxSegmentsInNodeLoadingQueue == null
+                                         ? Builder.DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE
+                                         : maxSegmentsInNodeLoadingQueue;
     this.decommissioningNodes = parseJsonStringOrArray(decommissioningNodes);
     Preconditions.checkArgument(
         decommissioningMaxPercentOfMaxSegmentsToMove >= 0 && decommissioningMaxPercentOfMaxSegmentsToMove <= 100,
@@ -517,7 +522,7 @@
     private static final boolean DEFAULT_EMIT_BALANCING_STATS = false;
     private static final boolean DEFAULT_USE_BATCHED_SEGMENT_SAMPLER = false;
     private static final boolean DEFAULT_KILL_UNUSED_SEGMENTS_IN_ALL_DATA_SOURCES = false;
-    private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 0;
+    private static final int DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100;
     private static final int DEFAULT_DECOMMISSIONING_MAX_SEGMENTS_TO_MOVE_PERCENT = 70;
     private static final boolean DEFAULT_PAUSE_COORDINATION = false;
     private static final boolean DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT = false;
@@ -732,7 +737,8 @@
           : decommissioningMaxPercentOfMaxSegmentsToMove,
           pauseCoordination == null ? DEFAULT_PAUSE_COORDINATION : pauseCoordination,
           replicateAfterLoadTimeout == null ? DEFAULT_REPLICATE_AFTER_LOAD_TIMEOUT : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD : maxNonPrimaryReplicantsToLoad
+          maxNonPrimaryReplicantsToLoad == null ? DEFAULT_MAX_NON_PRIMARY_REPLICANTS_TO_LOAD
+                                                : maxNonPrimaryReplicantsToLoad
       );
     }
 
@@ -745,7 +751,9 @@
           mergeBytesLimit == null ? defaults.getMergeBytesLimit() : mergeBytesLimit,
           mergeSegmentsLimit == null ? defaults.getMergeSegmentsLimit() : mergeSegmentsLimit,
           maxSegmentsToMove == null ? defaults.getMaxSegmentsToMove() : maxSegmentsToMove,
-          percentOfSegmentsToConsiderPerMove == null ? defaults.getPercentOfSegmentsToConsiderPerMove() : percentOfSegmentsToConsiderPerMove,
+          percentOfSegmentsToConsiderPerMove == null
+          ? defaults.getPercentOfSegmentsToConsiderPerMove()
+          : percentOfSegmentsToConsiderPerMove,
           useBatchedSegmentSampler == null ? defaults.useBatchedSegmentSampler() : useBatchedSegmentSampler,
           replicantLifetime == null ? defaults.getReplicantLifetime() : replicantLifetime,
           replicationThrottleLimit == null ? defaults.getReplicationThrottleLimit() : replicationThrottleLimit,
@@ -769,7 +777,9 @@
           : decommissioningMaxPercentOfMaxSegmentsToMove,
           pauseCoordination == null ? defaults.getPauseCoordination() : pauseCoordination,
           replicateAfterLoadTimeout == null ? defaults.getReplicateAfterLoadTimeout() : replicateAfterLoadTimeout,
-          maxNonPrimaryReplicantsToLoad == null ? defaults.getMaxNonPrimaryReplicantsToLoad() : maxNonPrimaryReplicantsToLoad
+          maxNonPrimaryReplicantsToLoad == null
+          ? defaults.getMaxNonPrimaryReplicantsToLoad()
+          : maxNonPrimaryReplicantsToLoad
       );
     }
   }
diff --git a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
index b6776a8..b96591a 100644
--- a/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
+++ b/server/src/test/java/org/apache/druid/server/http/CoordinatorDynamicConfigTest.java
@@ -31,9 +31,12 @@
 import java.util.Set;
 
 /**
+ *
  */
 public class CoordinatorDynamicConfigTest
 {
+  private static final int EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE = 100;
+
   private final ObjectMapper mapper = TestHelper.makeJsonMapper();
 
   @Test
@@ -69,25 +72,158 @@
     );
     ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
     ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        decommissioning,
+        9,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 9, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        9,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = CoordinatorDynamicConfig.builder().withPauseCoordination(true).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        true,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = CoordinatorDynamicConfig.builder().withPercentOfSegmentsToConsiderPerMove(10).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        10,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        true,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = CoordinatorDynamicConfig.builder().withReplicateAfterLoadTimeout(true).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        10,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        true,
+        true,
+        Integer.MAX_VALUE
+    );
 
     actual = CoordinatorDynamicConfig.builder().withMaxNonPrimaryReplicantsToLoad(10).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 10, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, true, true, 10);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        10,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        true,
+        true,
+        10
+    );
 
   }
 
@@ -118,13 +254,70 @@
     );
     ImmutableSet<String> decommissioning = ImmutableSet.of();
     ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
-    assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 0, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        100,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        decommissioning,
+        0,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = CoordinatorDynamicConfig.builder().withDecommissioningNodes(ImmutableSet.of("host1")).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 0, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        100,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        0,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     actual = CoordinatorDynamicConfig.builder().withDecommissioningMaxPercentOfMaxSegmentsToMove(5).build(actual);
-    assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, ImmutableSet.of("host1"), 5, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        100,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        ImmutableSet.of("host1"),
+        5,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
   }
 
   @Test
@@ -271,7 +464,26 @@
     );
     ImmutableSet<String> decommissioning = ImmutableSet.of("host1", "host2");
     ImmutableSet<String> whitelist = ImmutableSet.of("test1", "test2");
-    assertConfig(actual, 1, 1, 1, 1, 100, 1, 1, 2, true, whitelist, false, 1, decommissioning, 9, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        100,
+        1,
+        1,
+        2,
+        true,
+        whitelist,
+        false,
+        1,
+        decommissioning,
+        9,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
   }
 
   @Test
@@ -301,7 +513,26 @@
         CoordinatorDynamicConfig.class
     );
 
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 1, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        ImmutableSet.of(),
+        true,
+        1,
+        ImmutableSet.of(),
+        0,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
 
     //ensure whitelist is empty when killAllDataSources is true
     try {
@@ -348,7 +579,26 @@
         CoordinatorDynamicConfig.class
     );
 
-    assertConfig(actual, 1, 1, 1, 1, 1, 1, 1, 2, true, ImmutableSet.of(), true, 0, ImmutableSet.of(), 0, false, false, Integer.MAX_VALUE);
+    assertConfig(
+        actual,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        1,
+        2,
+        true,
+        ImmutableSet.of(),
+        true,
+        EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
+        ImmutableSet.of(),
+        0,
+        false,
+        false,
+        Integer.MAX_VALUE
+    );
   }
 
   @Test
@@ -369,7 +619,7 @@
         false,
         emptyList,
         false,
-        0,
+        EXPECTED_DEFAULT_MAX_SEGMENTS_IN_NODE_LOADING_QUEUE,
         emptyList,
         70,
         false,
@@ -388,9 +638,27 @@
 
     Assert.assertEquals(
         current,
-        new CoordinatorDynamicConfig
-            .Builder(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null)
-            .build(current)
+        new CoordinatorDynamicConfig.Builder(
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null
+        ).build(current)
     );
   }