Improve Auto scaler pendingTaskBased provisioning strategy to handle when there are no currently running worker node better (#11440)
* fix pendingTaskBased
* fix doc
* address comments
* address comments
* address comments
* address comments
* address comments
* address comments
* address comments
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index f010792..dd0a4f2 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1015,6 +1015,7 @@
|`druid.indexer.autoscale.pendingTaskTimeout`|How long a task can be in "pending" state before the Overlord tries to scale up.|PT30S|
|`druid.indexer.autoscale.workerVersion`|If set, will only create nodes of set version during autoscaling. Overrides dynamic configuration. |null|
|`druid.indexer.autoscale.workerPort`|The port that MiddleManagers will run on.|8080|
+|`druid.indexer.autoscale.workerCapacityHint`| Worker capacity for determining the number of workers needed for auto scaling when there is currently no worker running. If unset or set to value of 0 or less, auto scaler will scale to `minNumWorkers` in autoScaler config instead. This value should typically be equal to `druid.worker.capacity` when you have a homogeneous cluster and the average of `druid.worker.capacity` across the workers when you have a heterogeneous cluster. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|-1|
##### Supervisors
diff --git a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java
index 3c8f529..e307b32 100644
--- a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java
+++ b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java
@@ -80,8 +80,8 @@
@JsonProperty("envConfig") GceEnvironmentConfig envConfig
)
{
- Preconditions.checkArgument(minNumWorkers > 0,
- "minNumWorkers must be greater than 0");
+ Preconditions.checkArgument(minNumWorkers >= 0,
+ "minNumWorkers must be greater than or equal to 0");
this.minNumWorkers = minNumWorkers;
Preconditions.checkArgument(maxNumWorkers > 0,
"maxNumWorkers must be greater than 0");
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java
index 3ef70e9..4ab98ed 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java
@@ -29,6 +29,8 @@
@JsonProperty
private int maxScalingStep = 10;
+ @JsonProperty
+ private int workerCapacityHint = -1;
public int getMaxScalingStep()
{
@@ -76,4 +78,14 @@
return this;
}
+ public int getWorkerCapacityHint()
+ {
+ return workerCapacityHint;
+ }
+
+ public PendingTaskBasedWorkerProvisioningConfig setWorkerCapacityHint(int workerCapacityHint)
+ {
+ this.workerCapacityHint = workerCapacityHint;
+ return this;
+ }
}
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
index cf562f1..28d1bef 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.overlord.autoscaling;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
@@ -60,11 +61,14 @@
{
private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerProvisioningStrategy.class);
+ public static final String ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET = "As minNumWorkers is set to 0, workerCapacityHint must be greater than 0. workerCapacityHint value set is %d";
private static final String SCHEME = "http";
+ @VisibleForTesting
@Nullable
- static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig(
+ public static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig(
Supplier<WorkerBehaviorConfig> workerConfigRef,
+ SimpleWorkerProvisioningConfig config,
String action,
EmittingLogger log
)
@@ -87,6 +91,13 @@
log.error("No autoScaler available, cannot %s workers", action);
return null;
}
+ if (config instanceof PendingTaskBasedWorkerProvisioningConfig
+ && workerConfig.getAutoScaler().getMinNumWorkers() == 0
+ && ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint() <= 0
+ ) {
+ log.error(ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint());
+ return null;
+ }
return workerConfig;
}
@@ -157,7 +168,7 @@
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
log.debug("Workers: %d %s", workers.size(), workers);
boolean didProvision = false;
- final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log);
+ final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config, "provision", log);
if (workerConfig == null) {
return false;
}
@@ -246,14 +257,19 @@
log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount);
final int currValidWorkers = getCurrValidWorkers(workers);
- // If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need
- // since we are not aware of the expectedWorkerCapacity.
- int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks(
- remoteTaskRunnerConfig,
- workerConfig,
- pendingTasks,
- workers
- );
+ // If there are no worker and workerCapacityHint config is not set (-1) or invalid (<= 0), then spin up minWorkerCount
+ // as we cannot determine the exact capacity here to fulfill the need.
+ // However, if there are no worker but workerCapacityHint config is set (>0), then we can
+ // determine the number of workers needed using workerCapacityHint config as expected worker capacity
+ int moreWorkersNeeded = currValidWorkers == 0 && config.getWorkerCapacityHint() <= 0
+ ? minWorkerCount
+ : getWorkersNeededToAssignTasks(
+ remoteTaskRunnerConfig,
+ workerConfig,
+ pendingTasks,
+ workers,
+ config.getWorkerCapacityHint()
+ );
log.debug("More workers needed: %d", moreWorkersNeeded);
int want = Math.max(
@@ -280,7 +296,8 @@
final WorkerTaskRunnerConfig workerTaskRunnerConfig,
final DefaultWorkerBehaviorConfig workerConfig,
final Collection<Task> pendingTasks,
- final Collection<ImmutableWorkerInfo> workers
+ final Collection<ImmutableWorkerInfo> workers,
+ final int workerCapacityHint
)
{
final Collection<ImmutableWorkerInfo> validWorkers = Collections2.filter(
@@ -295,7 +312,7 @@
}
WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy();
int need = 0;
- int capacity = getExpectedWorkerCapacity(workers);
+ int capacity = getExpectedWorkerCapacity(workers, workerCapacityHint);
log.info("Expected worker capacity: %d", capacity);
// Simulate assigning tasks to dummy workers using configured workerSelectStrategy
@@ -333,7 +350,7 @@
{
Collection<ImmutableWorkerInfo> zkWorkers = runner.getWorkers();
log.debug("Workers: %d [%s]", zkWorkers.size(), zkWorkers);
- final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log);
+ final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config, "terminate", log);
if (workerConfig == null) {
return false;
}
@@ -441,12 +458,18 @@
return currValidWorkers;
}
- private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers)
+ private static int getExpectedWorkerCapacity(final Collection<ImmutableWorkerInfo> workers, final int workerCapacityHint)
{
int size = workers.size();
if (size == 0) {
- // No existing workers assume capacity per worker as 1
- return 1;
+ // No existing workers
+ if (workerCapacityHint > 0) {
+ // Return workerCapacityHint if it is set in config
+ return workerCapacityHint;
+ } else {
+ // Assume capacity per worker as 1
+ return 1;
+ }
} else {
// Assume all workers have same capacity
return workers.iterator().next().getWorker().getCapacity();
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java
index a17014c..afdaa57 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java
@@ -121,7 +121,7 @@
Collection<ImmutableWorkerInfo> workers = runner.getWorkers();
boolean didProvision = false;
final DefaultWorkerBehaviorConfig workerConfig =
- PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log);
+ PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config, "provision", log);
if (workerConfig == null) {
return false;
}
@@ -186,7 +186,7 @@
{
Collection<? extends TaskRunnerWorkItem> pendingTasks = runner.getPendingTasks();
final DefaultWorkerBehaviorConfig workerConfig =
- PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log);
+ PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config, "terminate", log);
if (workerConfig == null) {
return false;
}
diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
index 4520794..34af0ca 100644
--- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
+++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.overlord.autoscaling;
import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.common.guava.DSuppliers;
import org.apache.druid.indexer.TaskLocation;
@@ -43,6 +44,7 @@
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEventBuilder;
+import org.easymock.Capture;
import org.easymock.EasyMock;
import org.joda.time.DateTime;
import org.joda.time.Period;
@@ -107,9 +109,34 @@
}
@Test
+ public void testFailIfMinWorkerIsZeroAndWorkerHintNotSet()
+ {
+ EmittingLogger mockLogger = EasyMock.createMock(EmittingLogger.class);
+ Capture<String> capturedArgument = Capture.newInstance();
+ mockLogger.error(EasyMock.capture(capturedArgument), EasyMock.anyInt());
+
+ PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
+ .setMaxScalingDuration(new Period(1000))
+ .setNumEventsToTrack(10)
+ .setPendingTaskTimeout(new Period(0))
+ .setWorkerVersion(MIN_VERSION)
+ .setMaxScalingStep(2);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+ EasyMock.replay(autoScaler, mockLogger);
+ DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(
+ DSuppliers.of(workerConfig),
+ config,
+ "test",
+ mockLogger
+ );
+ Assert.assertNull(defaultWorkerBehaviorConfig);
+ Assert.assertEquals(PendingTaskBasedWorkerProvisioningStrategy.ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, capturedArgument.getValue());
+ }
+
+ @Test
public void testSuccessfulInitialMinWorkersProvision()
{
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(new ArrayList<String>());
@@ -138,9 +165,104 @@
}
@Test
+ public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldProvisionMinimumAsCurrentIsBelowMinimum()
+ {
+ PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
+ .setMaxScalingDuration(new Period(1000))
+ .setNumEventsToTrack(10)
+ .setPendingTaskTimeout(new Period(0))
+ .setWorkerVersion(MIN_VERSION)
+ .setMaxScalingStep(2)
+ .setWorkerCapacityHint(30);
+ strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+ config,
+ DSuppliers.of(workerConfig),
+ new ProvisioningSchedulerConfig(),
+ new Supplier<ScheduledExecutorService>()
+ {
+ @Override
+ public ScheduledExecutorService get()
+ {
+ return executorService;
+ }
+ }
+ );
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
+ EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
+ EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
+ .andReturn(new ArrayList<String>());
+ RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
+ // No pending tasks
+ EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
+ new ArrayList<>()
+ );
+ EasyMock.expect(runner.getWorkers()).andReturn(
+ Collections.emptyList()
+ );
+ EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
+ EasyMock.expect(autoScaler.provision()).andReturn(
+ new AutoScalingData(Collections.singletonList("aNode"))
+ ).times(3);
+ EasyMock.replay(runner, autoScaler);
+ Provisioner provisioner = strategy.makeProvisioner(runner);
+ boolean provisionedSomething = provisioner.doProvision();
+ Assert.assertTrue(provisionedSomething);
+ Assert.assertTrue(provisioner.getStats().toList().size() == 3);
+ for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) {
+ Assert.assertTrue(
+ event.getEvent() == ScalingStats.EVENT.PROVISION
+ );
+ }
+ }
+
+ @Test
+ public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldNotProvisionAsMinimumIsZero()
+ {
+ PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
+ .setMaxScalingDuration(new Period(1000))
+ .setNumEventsToTrack(10)
+ .setPendingTaskTimeout(new Period(0))
+ .setWorkerVersion(MIN_VERSION)
+ .setMaxScalingStep(2)
+ .setWorkerCapacityHint(30);
+ strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+ config,
+ DSuppliers.of(workerConfig),
+ new ProvisioningSchedulerConfig(),
+ new Supplier<ScheduledExecutorService>()
+ {
+ @Override
+ public ScheduledExecutorService get()
+ {
+ return executorService;
+ }
+ }
+ );
+ // minWorkerCount is 0
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2);
+ EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
+ EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
+ .andReturn(new ArrayList<String>());
+ RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
+ // No pending tasks
+ EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
+ new ArrayList<>()
+ );
+ EasyMock.expect(runner.getWorkers()).andReturn(
+ Collections.emptyList()
+ );
+ EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig());
+ EasyMock.replay(runner, autoScaler);
+ Provisioner provisioner = strategy.makeProvisioner(runner);
+ boolean provisionedSomething = provisioner.doProvision();
+ Assert.assertFalse(provisionedSomething);
+ Assert.assertEquals(0, provisioner.getStats().toList().size());
+ }
+
+ @Test
public void testSuccessfulMinWorkersProvision()
{
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(new ArrayList<String>());
@@ -174,7 +296,7 @@
@Test
public void testSuccessfulMinWorkersProvisionWithOldVersionNodeRunning()
{
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(new ArrayList<String>());
@@ -207,9 +329,9 @@
}
@Test
- public void testSomethingProvisioning()
+ public void testProvisioning()
{
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(new ArrayList<String>()).times(2);
@@ -258,6 +380,153 @@
}
@Test
+ public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButNonEmptyCurrentlyRunningWorkerShouldUseCapcityFromRunningWorker()
+ {
+ PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
+ .setMaxScalingDuration(new Period(1000))
+ .setNumEventsToTrack(10)
+ .setPendingTaskTimeout(new Period(0))
+ .setWorkerVersion(MIN_VERSION)
+ .setMaxScalingStep(2)
+ .setWorkerCapacityHint(30);
+ strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+ config,
+ DSuppliers.of(workerConfig),
+ new ProvisioningSchedulerConfig(),
+ new Supplier<ScheduledExecutorService>()
+ {
+ @Override
+ public ScheduledExecutorService get()
+ {
+ return executorService;
+ }
+ }
+ );
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3);
+ EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1);
+ EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
+ .andReturn(new ArrayList<String>()).times(2);
+ EasyMock.expect(autoScaler.provision()).andReturn(
+ new AutoScalingData(Collections.singletonList("fake"))
+ ).times(2);
+ RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
+ // two pending tasks
+ EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
+ ImmutableList.of(
+ NoopTask.create(),
+ NoopTask.create()
+ )
+ ).times(2);
+ // Capacity for current worker is 1
+ EasyMock.expect(runner.getWorkers()).andReturn(
+ Arrays.asList(
+ new TestZkWorker(testTask).toImmutable(),
+ new TestZkWorker(testTask, "http", "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node
+ )
+ ).times(2);
+ EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1);
+ EasyMock.replay(runner);
+ EasyMock.replay(autoScaler);
+
+ Provisioner provisioner = strategy.makeProvisioner(runner);
+ boolean provisionedSomething = provisioner.doProvision();
+
+ // Expect to use capacity from current worker (which is 1)
+ // and since there are two pending tasks, we will need two more workers
+ Assert.assertTrue(provisionedSomething);
+ Assert.assertEquals(2, provisioner.getStats().toList().size());
+ DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
+ Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent());
+ Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(1).getEvent());
+
+ provisionedSomething = provisioner.doProvision();
+
+ Assert.assertFalse(provisionedSomething);
+ Assert.assertTrue(
+ provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
+ );
+ DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
+ Assert.assertTrue(
+ createdTime.equals(anotherCreatedTime)
+ );
+
+ EasyMock.verify(autoScaler);
+ EasyMock.verify(runner);
+ }
+
+ @Test
+ public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButEmptyCurrentlyRunningWorkerShouldUseCapcityFromHintConfig()
+ {
+ PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig()
+ .setMaxScalingDuration(new Period(1000))
+ .setNumEventsToTrack(10)
+ .setPendingTaskTimeout(new Period(0))
+ .setWorkerVersion(MIN_VERSION)
+ .setMaxScalingStep(2)
+ .setWorkerCapacityHint(30);
+ strategy = new PendingTaskBasedWorkerProvisioningStrategy(
+ config,
+ DSuppliers.of(workerConfig),
+ new ProvisioningSchedulerConfig(),
+ new Supplier<ScheduledExecutorService>()
+ {
+ @Override
+ public ScheduledExecutorService get()
+ {
+ return executorService;
+ }
+ }
+ );
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3);
+ EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1);
+ EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
+ .andReturn(new ArrayList<String>()).times(2);
+ EasyMock.expect(autoScaler.provision()).andReturn(
+ new AutoScalingData(Collections.singletonList("fake"))
+ ).times(1);
+ RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
+ // two pending tasks
+ EasyMock.expect(runner.getPendingTaskPayloads()).andReturn(
+ ImmutableList.of(
+ NoopTask.create(),
+ NoopTask.create()
+ )
+ ).times(2);
+ // No currently running worker node
+ EasyMock.expect(runner.getWorkers()).andReturn(
+ Collections.emptyList()
+ ).times(2);
+
+ EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1);
+ EasyMock.replay(runner);
+ EasyMock.replay(autoScaler);
+
+ Provisioner provisioner = strategy.makeProvisioner(runner);
+ boolean provisionedSomething = provisioner.doProvision();
+
+ // Expect to use capacity from workerCapacityHint config (which is 30)
+ // and since there are two pending tasks, we will need one more worker
+ Assert.assertTrue(provisionedSomething);
+ Assert.assertEquals(1, provisioner.getStats().toList().size());
+ DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp();
+ Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent());
+
+ provisionedSomething = provisioner.doProvision();
+
+ Assert.assertFalse(provisionedSomething);
+ Assert.assertTrue(
+ provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION
+ );
+ DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp();
+ Assert.assertTrue(
+ createdTime.equals(anotherCreatedTime)
+ );
+
+ EasyMock.verify(autoScaler);
+ EasyMock.verify(runner);
+ }
+
+ @Test
public void testProvisionAlert() throws Exception
{
ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
@@ -266,7 +535,7 @@
EasyMock.expectLastCall();
EasyMock.replay(emitter);
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(new ArrayList<String>()).times(2);
@@ -323,7 +592,7 @@
@Test
public void testDoSuccessfulTerminate()
{
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(new ArrayList<String>());
EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn(
@@ -343,7 +612,8 @@
)
).times(2);
EasyMock.expect(runner.getWorkers()).andReturn(
- Collections.singletonList(
+ ImmutableList.of(
+ new TestZkWorker(testTask).toImmutable(),
new TestZkWorker(testTask).toImmutable()
)
).times(2);
@@ -367,7 +637,7 @@
@Test
public void testSomethingTerminating()
{
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(Collections.singletonList("ip")).times(2);
EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn(
@@ -377,7 +647,9 @@
RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class);
EasyMock.expect(runner.getWorkers()).andReturn(
- Collections.singletonList(
+ ImmutableList.of(
+ new TestZkWorker(testTask).toImmutable(),
+ new TestZkWorker(testTask).toImmutable(),
new TestZkWorker(testTask).toImmutable()
)
).times(2);
@@ -411,7 +683,7 @@
public void testNoActionNeeded()
{
EasyMock.reset(autoScaler);
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(Collections.singletonList("ip"));
EasyMock.replay(autoScaler);
@@ -442,7 +714,7 @@
EasyMock.verify(autoScaler);
EasyMock.reset(autoScaler);
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(Collections.singletonList("ip"));
@@ -460,7 +732,7 @@
{
// Don't terminate anything
EasyMock.reset(autoScaler);
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(Collections.singletonList("ip"));
EasyMock.replay(autoScaler);
@@ -487,7 +759,7 @@
// Don't provision anything
EasyMock.reset(autoScaler);
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(Collections.singletonList("ip"));
@@ -498,7 +770,7 @@
EasyMock.reset(autoScaler);
// Increase minNumWorkers
- EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3);
+ EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2);
EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5);
EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject()))
.andReturn(Collections.singletonList("ip"));