Improve Auto scaler pendingTaskBased provisioning strategy to handle when there are no currently running worker node better (#11440)

* fix pendingTaskBased

* fix doc

* address comments

* address comments

* address comments

* address comments

* address comments

* address comments

* address comments
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index f010792..dd0a4f2 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1015,6 +1015,7 @@
 |`druid.indexer.autoscale.pendingTaskTimeout`|How long a task can be in "pending" state before the Overlord tries to scale up.|PT30S|
 |`druid.indexer.autoscale.workerVersion`|If set, will only create nodes of set version during autoscaling. Overrides dynamic configuration. |null|
 |`druid.indexer.autoscale.workerPort`|The port that MiddleManagers will run on.|8080|
+|`druid.indexer.autoscale.workerCapacityHint`| Worker capacity for determining the number of workers needed for auto scaling when there is currently no worker running. If unset or set to value of 0 or less, auto scaler will scale to `minNumWorkers` in autoScaler config instead. This value should typically be equal to `druid.worker.capacity` when you have a homogeneous cluster and the average of `druid.worker.capacity` across the workers when you have a heterogeneous cluster. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|-1|
 
 ##### Supervisors
 
diff --git a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java
index 3c8f529..e307b32 100644
--- a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java
+++ b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java
@@ -80,8 +80,8 @@
           @JsonProperty("envConfig") GceEnvironmentConfig envConfig
   )
   {
-    Preconditions.checkArgument(minNumWorkers > 0,
-                                "minNumWorkers must be greater than 0");
+    Preconditions.checkArgument(minNumWorkers >= 0,
+                                "minNumWorkers must be greater than or equal to 0");
     this.minNumWorkers = minNumWorkers;
     Preconditions.checkArgument(maxNumWorkers > 0,
                                 "maxNumWorkers must be greater than 0");
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java
index 3ef70e9..4ab98ed 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java
@@ -29,6 +29,8 @@
   @JsonProperty
   private int maxScalingStep = 10;
 
+  @JsonProperty
+  private int workerCapacityHint = -1;
 
   public int getMaxScalingStep()
   {
@@ -76,4 +78,14 @@
     return this;
   }
 
+  public int getWorkerCapacityHint()
+  {
+    return workerCapacityHint;
+  }
+
+  public PendingTaskBasedWorkerProvisioningConfig setWorkerCapacityHint(int workerCapacityHint)
+  {
+    this.workerCapacityHint = workerCapacityHint;
+    return this;
+  }
 }
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
index cf562f1..28d1bef 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.overlord.autoscaling;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Predicate;
@@ -60,11 +61,14 @@
 {
   private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerProvisioningStrategy.class);
 
+  public static final String ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET = "As minNumWorkers is set to 0, workerCapacityHint must be greater than 0. workerCapacityHint value set is %d";
   private static final String SCHEME = "http";
 
+  @VisibleForTesting
   @Nullable
-  static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig(
+  public static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig(
       Supplier<WorkerBehaviorConfig> workerConfigRef,
+      SimpleWorkerProvisioningConfig config,
       String action,
       EmittingLogger log
   )
@@ -87,6 +91,13 @@
       log.error("No autoScaler available, cannot %s workers", action);
       return null;
     }
+    if (config instanceof PendingTaskBasedWorkerProvisioningConfig
+        && workerConfig.getAutoScaler().getMinNumWorkers() == 0
+        && ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint() <= 0
+    ) {
+      log.error(ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint());
+      return null;
+    }
     return workerConfig;
   }
 
@@ -157,7 +168,7 @@
       Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
       log.debug("Workers: %d %s", workers.size(), workers);
       boolean didProvision = false;
-      final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log);
+      final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config, "provision", log);
       if (workerConfig == null) {
         return false;
       }
@@ -246,14 +257,19 @@
       log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount);
       final int currValidWorkers = getCurrValidWorkers(workers);
 
-      // If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need
-      // since we are not aware of the expectedWorkerCapacity.
-      int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
-          remoteTaskRunnerConfig,
-          workerConfig,
-          pendingTasks,
-          workers
-      );
+      // If there are no worker and workerCapacityHint config is not set (-1) or invalid (<= 0), then spin up minWorkerCount
+      // as we cannot determine the exact capacity here to fulfill the need.
+      // However, if there are no worker but workerCapacityHint config is set (>0), then we can
+      // determine the number of workers needed using workerCapacityHint config as expected worker capacity
+      int moreWorkersNeeded = currValidWorkers == 0 && config.getWorkerCapacityHint() <= 0
+                              ? minWorkerCount
+                              : getWorkersNeededToAssignTasks(
+                                  remoteTaskRunnerConfig,
+                                  workerConfig,
+                                  pendingTasks,
+                                  workers,
+                                  config.getWorkerCapacityHint()
+                              );
       log.debug("More workers needed: %d", moreWorkersNeeded);
 
       int want = Math.max(
@@ -280,7 +296,8 @@
         final WorkerTaskRunnerConfig workerTaskRunnerConfig,
         final DefaultWorkerBehaviorConfig workerConfig,
         final Collection<Task> pendingTasks,
-        final Collection<ImmutableWorkerInfo> workers
+        final Collection<ImmutableWorkerInfo> workers,
+        final int workerCapacityHint
     )
     {
       final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
@@ -295,7 +312,7 @@
       }
       WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
       int need = 0;
-      int capacity = getExpectedWorkerCapacity(workers);
+      int capacity = getExpectedWorkerCapacity(workers, workerCapacityHint);
       log.info("Expected worker capacity: %d", capacity);
 
       // Simulate assigning tasks to dummy workers using configured workerSelectStrategy
@@ -333,7 +350,7 @@
     {
       Collection<ImmutableWorkerInfo> zkWorkers = runner.getWorkers();
       log.debug("Workers: %d [%s]", zkWorkers.size(), zkWorkers);
-      final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log);
+      final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config, "terminate", log);
       if (workerConfig == null) {
         return false;
       }
@@ -441,12 +458,18 @@
     return currValidWorkers;
   }
 
-  private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers)
+  private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers, final int workerCapacityHint)
   {
     int size = workers.size();
     if (size == 0) {
-      // No existing workers assume capacity per worker as 1
-      return 1;
+      // No existing workers
+      if (workerCapacityHint > 0) {
+        // Return workerCapacityHint if it is set in config
+        return workerCapacityHint;
+      } else {
+        // Assume capacity per worker as 1
+        return 1;
+      }
     } else {
       // Assume all workers have same capacity
       return workers.iterator().next().getWorker().getCapacity();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java
index a17014c..afdaa57 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java
@@ -121,7 +121,7 @@
       Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
       boolean didProvision = false;
       final DefaultWorkerBehaviorConfig workerConfig =
-          PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log);
+          PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config, "provision", log);
       if (workerConfig == null) {
         return false;
       }
@@ -186,7 +186,7 @@
     {
       Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
       final DefaultWorkerBehaviorConfig workerConfig =
-          PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log);
+          PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config, "terminate", log);
       if (workerConfig == null) {
         return false;
       }
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
index 4520794..34af0ca 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.overlord.autoscaling;
 
 import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.common.guava.DSuppliers;
 import org.apache.druid.indexer.TaskLocation;
@@ -43,6 +44,7 @@
 import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.joda.time.DateTime;
 import org.joda.time.Period;
@@ -107,9 +109,34 @@
   }
 
   @Test
+  public void testFailIfMinWorkerIsZeroAndWorkerHintNotSet()
+  {
+    EmittingLogger mockLogger = EasyMock.createMock(EmittingLogger.class);
+    Capture<String> capturedArgument = Capture.newInstance();
+    mockLogger.error(EasyMock.capture(capturedArgument), EasyMock.anyInt());
+
+    PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
+        .setMaxScalingDuration(new Period(1000))
+        .setNumEventsToTrack(10)
+        .setPendingTaskTimeout(new Period(0))
+        .setWorkerVersion(MIN_VERSION)
+        .setMaxScalingStep(2);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+    EasyMock.replay(autoScaler, mockLogger);
+    DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(
+        DSuppliers.of(workerConfig),
+        config,
+        "test",
+        mockLogger
+    );
+    Assert.assertNull(defaultWorkerBehaviorConfig);
+    Assert.assertEquals(PendingTaskBasedWorkerProvisioningStrategy.ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, capturedArgument.getValue());
+  }
+
+  @Test
   public void testSuccessfulInitialMinWorkersProvision()
   {
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
     EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(new ArrayList<String>());
@@ -138,9 +165,104 @@
   }
 
   @Test
+  public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldProvisionMinimumAsCurrentIsBelowMinimum()
+  {
+    PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
+        .setMaxScalingDuration(new Period(1000))
+        .setNumEventsToTrack(10)
+        .setPendingTaskTimeout(new Period(0))
+        .setWorkerVersion(MIN_VERSION)
+        .setMaxScalingStep(2)
+        .setWorkerCapacityHint(30);
+    strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+        config,
+        DSuppliers.of(workerConfig),
+        new ProvisioningSchedulerConfig(),
+        new Supplier<ScheduledExecutorService>()
+        {
+          @Override
+          public ScheduledExecutorService get()
+          {
+            return executorService;
+          }
+        }
+    );
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
+    EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
+    EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
+            .andReturn(new ArrayList<String>());
+    RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
+    // No pending tasks
+    EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
+        new ArrayList<>()
+    );
+    EasyMock.expect(runner.getWorkers()).andReturn(
+        Collections.emptyList()
+    );
+    EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
+    EasyMock.expect(autoScaler.provision()).andReturn(
+        new AutoScalingData(Collections.singletonList("aNode"))
+    ).times(3);
+    EasyMock.replay(runner, autoScaler);
+    Provisioner provisioner = strategy.makeProvisioner(runner);
+    boolean provisionedSomething = provisioner.doProvision();
+    Assert.assertTrue(provisionedSomething);
+    Assert.assertTrue(provisioner.getStats().toList().size() == 3);
+    for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) {
+      Assert.assertTrue(
+          event.getEvent() == ScalingStats.EVENT.PROVISION
+      );
+    }
+  }
+
+  @Test
+  public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldNotProvisionAsMinimumIsZero()
+  {
+    PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
+        .setMaxScalingDuration(new Period(1000))
+        .setNumEventsToTrack(10)
+        .setPendingTaskTimeout(new Period(0))
+        .setWorkerVersion(MIN_VERSION)
+        .setMaxScalingStep(2)
+        .setWorkerCapacityHint(30);
+    strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+        config,
+        DSuppliers.of(workerConfig),
+        new ProvisioningSchedulerConfig(),
+        new Supplier<ScheduledExecutorService>()
+        {
+          @Override
+          public ScheduledExecutorService get()
+          {
+            return executorService;
+          }
+        }
+    );
+    // minWorkerCount is 0
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);
+    EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
+    EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
+            .andReturn(new ArrayList<String>());
+    RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
+    // No pending tasks
+    EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
+        new ArrayList<>()
+    );
+    EasyMock.expect(runner.getWorkers()).andReturn(
+        Collections.emptyList()
+    );
+    EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
+    EasyMock.replay(runner, autoScaler);
+    Provisioner provisioner = strategy.makeProvisioner(runner);
+    boolean provisionedSomething = provisioner.doProvision();
+    Assert.assertFalse(provisionedSomething);
+    Assert.assertEquals(0, provisioner.getStats().toList().size());
+  }
+
+  @Test
   public void testSuccessfulMinWorkersProvision()
   {
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
     EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(new ArrayList<String>());
@@ -174,7 +296,7 @@
   @Test
   public void testSuccessfulMinWorkersProvisionWithOldVersionNodeRunning()
   {
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
     EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(new ArrayList<String>());
@@ -207,9 +329,9 @@
   }
 
   @Test
-  public void testSomethingProvisioning()
+  public void testProvisioning()
   {
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3);
     EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(new ArrayList<String>()).times(2);
@@ -258,6 +380,153 @@
   }
 
   @Test
+  public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButNonEmptyCurrentlyRunningWorkerShouldUseCapcityFromRunningWorker()
+  {
+    PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
+        .setMaxScalingDuration(new Period(1000))
+        .setNumEventsToTrack(10)
+        .setPendingTaskTimeout(new Period(0))
+        .setWorkerVersion(MIN_VERSION)
+        .setMaxScalingStep(2)
+        .setWorkerCapacityHint(30);
+    strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+        config,
+        DSuppliers.of(workerConfig),
+        new ProvisioningSchedulerConfig(),
+        new Supplier<ScheduledExecutorService>()
+        {
+          @Override
+          public ScheduledExecutorService get()
+          {
+            return executorService;
+          }
+        }
+    );
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3);
+    EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1);
+    EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
+            .andReturn(new ArrayList<String>()).times(2);
+    EasyMock.expect(autoScaler.provision()).andReturn(
+        new AutoScalingData(Collections.singletonList("fake"))
+    ).times(2);
+    RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
+    // two pending tasks
+    EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
+        ImmutableList.of(
+            NoopTask.create(),
+            NoopTask.create()
+        )
+    ).times(2);
+    // Capacity for current worker is 1
+    EasyMock.expect(runner.getWorkers()).andReturn(
+        Arrays.asList(
+            new TestZkWorker(testTask).toImmutable(),
+            new TestZkWorker(testTask, "http", "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node
+        )
+    ).times(2);
+    EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1);
+    EasyMock.replay(runner);
+    EasyMock.replay(autoScaler);
+
+    Provisioner provisioner = strategy.makeProvisioner(runner);
+    boolean provisionedSomething = provisioner.doProvision();
+
+    // Expect to use capacity from current worker (which is 1)
+    // and since there are two pending tasks, we will need two more workers
+    Assert.assertTrue(provisionedSomething);
+    Assert.assertEquals(2, provisioner.getStats().toList().size());
+    DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
+    Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent());
+    Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(1).getEvent());
+
+    provisionedSomething = provisioner.doProvision();
+
+    Assert.assertFalse(provisionedSomething);
+    Assert.assertTrue(
+        provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
+    );
+    DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
+    Assert.assertTrue(
+        createdTime.equals(anotherCreatedTime)
+    );
+
+    EasyMock.verify(autoScaler);
+    EasyMock.verify(runner);
+  }
+
+  @Test
+  public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButEmptyCurrentlyRunningWorkerShouldUseCapcityFromHintConfig()
+  {
+    PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
+        .setMaxScalingDuration(new Period(1000))
+        .setNumEventsToTrack(10)
+        .setPendingTaskTimeout(new Period(0))
+        .setWorkerVersion(MIN_VERSION)
+        .setMaxScalingStep(2)
+        .setWorkerCapacityHint(30);
+    strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+        config,
+        DSuppliers.of(workerConfig),
+        new ProvisioningSchedulerConfig(),
+        new Supplier<ScheduledExecutorService>()
+        {
+          @Override
+          public ScheduledExecutorService get()
+          {
+            return executorService;
+          }
+        }
+    );
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3);
+    EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1);
+    EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
+            .andReturn(new ArrayList<String>()).times(2);
+    EasyMock.expect(autoScaler.provision()).andReturn(
+        new AutoScalingData(Collections.singletonList("fake"))
+    ).times(1);
+    RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
+    // two pending tasks
+    EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
+        ImmutableList.of(
+            NoopTask.create(),
+            NoopTask.create()
+        )
+    ).times(2);
+    // No currently running worker node
+    EasyMock.expect(runner.getWorkers()).andReturn(
+        Collections.emptyList()
+    ).times(2);
+
+    EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1);
+    EasyMock.replay(runner);
+    EasyMock.replay(autoScaler);
+
+    Provisioner provisioner = strategy.makeProvisioner(runner);
+    boolean provisionedSomething = provisioner.doProvision();
+
+    // Expect to use capacity from workerCapacityHint config (which is 30)
+    // and since there are two pending tasks, we will need one more worker
+    Assert.assertTrue(provisionedSomething);
+    Assert.assertEquals(1, provisioner.getStats().toList().size());
+    DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
+    Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent());
+
+    provisionedSomething = provisioner.doProvision();
+
+    Assert.assertFalse(provisionedSomething);
+    Assert.assertTrue(
+        provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
+    );
+    DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
+    Assert.assertTrue(
+        createdTime.equals(anotherCreatedTime)
+    );
+
+    EasyMock.verify(autoScaler);
+    EasyMock.verify(runner);
+  }
+
+  @Test
   public void testProvisionAlert() throws Exception
   {
     ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
@@ -266,7 +535,7 @@
     EasyMock.expectLastCall();
     EasyMock.replay(emitter);
 
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3);
     EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(new ArrayList<String>()).times(2);
@@ -323,7 +592,7 @@
   @Test
   public void testDoSuccessfulTerminate()
   {
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(new ArrayList<String>());
     EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn(
@@ -343,7 +612,8 @@
         )
     ).times(2);
     EasyMock.expect(runner.getWorkers()).andReturn(
-        Collections.singletonList(
+        ImmutableList.of(
+            new TestZkWorker(testTask).toImmutable(),
             new TestZkWorker(testTask).toImmutable()
         )
     ).times(2);
@@ -367,7 +637,7 @@
   @Test
   public void testSomethingTerminating()
   {
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(Collections.singletonList("ip")).times(2);
     EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn(
@@ -377,7 +647,9 @@
 
     RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
     EasyMock.expect(runner.getWorkers()).andReturn(
-        Collections.singletonList(
+        ImmutableList.of(
+            new TestZkWorker(testTask).toImmutable(),
+            new TestZkWorker(testTask).toImmutable(),
             new TestZkWorker(testTask).toImmutable()
         )
     ).times(2);
@@ -411,7 +683,7 @@
   public void testNoActionNeeded()
   {
     EasyMock.reset(autoScaler);
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(Collections.singletonList("ip"));
     EasyMock.replay(autoScaler);
@@ -442,7 +714,7 @@
     EasyMock.verify(autoScaler);
 
     EasyMock.reset(autoScaler);
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
     EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(Collections.singletonList("ip"));
@@ -460,7 +732,7 @@
   {
     // Don't terminate anything
     EasyMock.reset(autoScaler);
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(Collections.singletonList("ip"));
     EasyMock.replay(autoScaler);
@@ -487,7 +759,7 @@
 
     // Don't provision anything
     EasyMock.reset(autoScaler);
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
     EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(Collections.singletonList("ip"));
@@ -498,7 +770,7 @@
 
     EasyMock.reset(autoScaler);
     // Increase minNumWorkers
-    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
+    EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
     EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
     EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
             .andReturn(Collections.singletonList("ip"));