| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexing.overlord.autoscaling; |
| |
| import com.google.common.base.Supplier; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import org.apache.druid.common.guava.DSuppliers; |
| import org.apache.druid.indexer.TaskLocation; |
| import org.apache.druid.indexer.TaskStatus; |
| import org.apache.druid.indexing.common.TestTasks; |
| import org.apache.druid.indexing.common.task.NoopTask; |
| import org.apache.druid.indexing.common.task.Task; |
| import org.apache.druid.indexing.overlord.RemoteTaskRunner; |
| import org.apache.druid.indexing.overlord.RemoteTaskRunnerWorkItem; |
| import org.apache.druid.indexing.overlord.ZkWorker; |
| import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; |
| import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; |
| import org.apache.druid.indexing.overlord.setup.FillCapacityWorkerSelectStrategy; |
| import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; |
| import org.apache.druid.indexing.worker.TaskAnnouncement; |
| import org.apache.druid.indexing.worker.Worker; |
| import org.apache.druid.indexing.worker.config.WorkerConfig; |
| import org.apache.druid.jackson.DefaultObjectMapper; |
| import org.apache.druid.java.util.common.DateTimes; |
| import org.apache.druid.java.util.common.concurrent.Execs; |
| import org.apache.druid.java.util.emitter.EmittingLogger; |
| import org.apache.druid.java.util.emitter.service.ServiceEmitter; |
| import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; |
| import org.easymock.Capture; |
| import org.easymock.EasyMock; |
| import org.joda.time.DateTime; |
| import org.joda.time.Period; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| */ |
| public class PendingTaskBasedProvisioningStrategyTest |
| { |
| private AutoScaler autoScaler; |
| private Task testTask; |
| private PendingTaskBasedWorkerProvisioningStrategy strategy; |
| private AtomicReference<WorkerBehaviorConfig> workerConfig; |
| private ScheduledExecutorService executorService = Execs.scheduledSingleThreaded("test service"); |
| private static final String MIN_VERSION = "2014-01-00T00:01:00Z"; |
| private static final String INVALID_VERSION = "0"; |
| |
| @Before |
| public void setUp() |
| { |
| autoScaler = EasyMock.createMock(AutoScaler.class); |
| |
| testTask = TestTasks.immediateSuccess("task1"); |
| |
| PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() |
| .setMaxScalingDuration(new Period(1000)) |
| .setNumEventsToTrack(10) |
| .setPendingTaskTimeout(new Period(0)) |
| .setWorkerVersion(MIN_VERSION) |
| .setMaxScalingStep(2); |
| |
| workerConfig = new AtomicReference<>( |
| new DefaultWorkerBehaviorConfig( |
| new FillCapacityWorkerSelectStrategy(null), |
| autoScaler |
| ) |
| ); |
| |
| strategy = new PendingTaskBasedWorkerProvisioningStrategy( |
| config, |
| DSuppliers.of(workerConfig), |
| new ProvisioningSchedulerConfig(), |
| new Supplier<ScheduledExecutorService>() |
| { |
| @Override |
| public ScheduledExecutorService get() |
| { |
| return executorService; |
| } |
| } |
| ); |
| } |
| |
| @Test |
| public void testFailIfMinWorkerIsZeroAndWorkerHintNotSet() |
| { |
| EmittingLogger mockLogger = EasyMock.createMock(EmittingLogger.class); |
| Capture<String> capturedArgument = Capture.newInstance(); |
| mockLogger.error(EasyMock.capture(capturedArgument), EasyMock.anyInt()); |
| |
| PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() |
| .setMaxScalingDuration(new Period(1000)) |
| .setNumEventsToTrack(10) |
| .setPendingTaskTimeout(new Period(0)) |
| .setWorkerVersion(MIN_VERSION) |
| .setMaxScalingStep(2); |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); |
| EasyMock.replay(autoScaler, mockLogger); |
| DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig( |
| DSuppliers.of(workerConfig), |
| config, |
| "test", |
| mockLogger |
| ); |
| Assert.assertNull(defaultWorkerBehaviorConfig); |
| Assert.assertEquals(PendingTaskBasedWorkerProvisioningStrategy.ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, capturedArgument.getValue()); |
| } |
| |
| @Test |
| public void testSuccessfulInitialMinWorkersProvision() |
| { |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(new ArrayList<String>()); |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| // No pending tasks |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| new ArrayList<>() |
| ); |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Collections.emptyList() |
| ); |
| EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); |
| EasyMock.expect(autoScaler.provision()).andReturn( |
| new AutoScalingData(Collections.singletonList("aNode")) |
| ).times(3); |
| EasyMock.replay(runner, autoScaler); |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean provisionedSomething = provisioner.doProvision(); |
| Assert.assertTrue(provisionedSomething); |
| Assert.assertTrue(provisioner.getStats().toList().size() == 3); |
| for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) { |
| Assert.assertTrue( |
| event.getEvent() == ScalingStats.EVENT.PROVISION |
| ); |
| } |
| } |
| |
| @Test |
| public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldProvisionMinimumAsCurrentIsBelowMinimum() |
| { |
| PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() |
| .setMaxScalingDuration(new Period(1000)) |
| .setNumEventsToTrack(10) |
| .setPendingTaskTimeout(new Period(0)) |
| .setWorkerVersion(MIN_VERSION) |
| .setMaxScalingStep(2) |
| .setWorkerCapacityHint(30); |
| strategy = new PendingTaskBasedWorkerProvisioningStrategy( |
| config, |
| DSuppliers.of(workerConfig), |
| new ProvisioningSchedulerConfig(), |
| new Supplier<ScheduledExecutorService>() |
| { |
| @Override |
| public ScheduledExecutorService get() |
| { |
| return executorService; |
| } |
| } |
| ); |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(new ArrayList<String>()); |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| // No pending tasks |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| new ArrayList<>() |
| ); |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Collections.emptyList() |
| ); |
| EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); |
| EasyMock.expect(autoScaler.provision()).andReturn( |
| new AutoScalingData(Collections.singletonList("aNode")) |
| ).times(3); |
| EasyMock.replay(runner, autoScaler); |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean provisionedSomething = provisioner.doProvision(); |
| Assert.assertTrue(provisionedSomething); |
| Assert.assertTrue(provisioner.getStats().toList().size() == 3); |
| for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) { |
| Assert.assertTrue( |
| event.getEvent() == ScalingStats.EVENT.PROVISION |
| ); |
| } |
| } |
| |
| @Test |
| public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldNotProvisionAsMinimumIsZero() |
| { |
| PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() |
| .setMaxScalingDuration(new Period(1000)) |
| .setNumEventsToTrack(10) |
| .setPendingTaskTimeout(new Period(0)) |
| .setWorkerVersion(MIN_VERSION) |
| .setMaxScalingStep(2) |
| .setWorkerCapacityHint(30); |
| strategy = new PendingTaskBasedWorkerProvisioningStrategy( |
| config, |
| DSuppliers.of(workerConfig), |
| new ProvisioningSchedulerConfig(), |
| new Supplier<ScheduledExecutorService>() |
| { |
| @Override |
| public ScheduledExecutorService get() |
| { |
| return executorService; |
| } |
| } |
| ); |
| // minWorkerCount is 0 |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(new ArrayList<String>()); |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| // No pending tasks |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| new ArrayList<>() |
| ); |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Collections.emptyList() |
| ); |
| EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); |
| EasyMock.replay(runner, autoScaler); |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean provisionedSomething = provisioner.doProvision(); |
| Assert.assertFalse(provisionedSomething); |
| Assert.assertEquals(0, provisioner.getStats().toList().size()); |
| } |
| |
| @Test |
| public void testSuccessfulMinWorkersProvision() |
| { |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(new ArrayList<String>()); |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| // No pending tasks |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| new ArrayList<>() |
| ); |
| // 1 node already running, only provision 2 more. |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Collections.singletonList( |
| new TestZkWorker(testTask).toImmutable() |
| ) |
| ); |
| EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); |
| EasyMock.expect(autoScaler.provision()).andReturn( |
| new AutoScalingData(Collections.singletonList("aNode")) |
| ).times(2); |
| EasyMock.replay(runner, autoScaler); |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean provisionedSomething = provisioner.doProvision(); |
| Assert.assertTrue(provisionedSomething); |
| Assert.assertTrue(provisioner.getStats().toList().size() == 2); |
| for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) { |
| Assert.assertTrue( |
| event.getEvent() == ScalingStats.EVENT.PROVISION |
| ); |
| } |
| } |
| |
| @Test |
| public void testSuccessfulMinWorkersProvisionWithOldVersionNodeRunning() |
| { |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(new ArrayList<String>()); |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| // No pending tasks |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| new ArrayList<>() |
| ); |
| // 1 node already running, only provision 2 more. |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Arrays.asList( |
| new TestZkWorker(testTask).toImmutable(), |
| new TestZkWorker(testTask, "http", "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node |
| ) |
| ); |
| EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); |
| EasyMock.expect(autoScaler.provision()).andReturn( |
| new AutoScalingData(Collections.singletonList("aNode")) |
| ).times(2); |
| EasyMock.replay(runner, autoScaler); |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean provisionedSomething = provisioner.doProvision(); |
| Assert.assertTrue(provisionedSomething); |
| Assert.assertTrue(provisioner.getStats().toList().size() == 2); |
| for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) { |
| Assert.assertTrue( |
| event.getEvent() == ScalingStats.EVENT.PROVISION |
| ); |
| } |
| } |
| |
| @Test |
| public void testProvisioning() |
| { |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(new ArrayList<String>()).times(2); |
| EasyMock.expect(autoScaler.provision()).andReturn( |
| new AutoScalingData(Collections.singletonList("fake")) |
| ); |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| Collections.singletonList( |
| NoopTask.create() |
| ) |
| ).times(2); |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Arrays.asList( |
| new TestZkWorker(testTask).toImmutable(), |
| new TestZkWorker(testTask, "http", "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node |
| ) |
| ).times(2); |
| EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1); |
| EasyMock.replay(runner); |
| EasyMock.replay(autoScaler); |
| |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean provisionedSomething = provisioner.doProvision(); |
| |
| Assert.assertTrue(provisionedSomething); |
| Assert.assertTrue(provisioner.getStats().toList().size() == 1); |
| DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); |
| Assert.assertTrue( |
| provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION |
| ); |
| |
| provisionedSomething = provisioner.doProvision(); |
| |
| Assert.assertFalse(provisionedSomething); |
| Assert.assertTrue( |
| provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION |
| ); |
| DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); |
| Assert.assertTrue( |
| createdTime.equals(anotherCreatedTime) |
| ); |
| |
| EasyMock.verify(autoScaler); |
| EasyMock.verify(runner); |
| } |
| |
| @Test |
| public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButNonEmptyCurrentlyRunningWorkerShouldUseCapcityFromRunningWorker() |
| { |
| PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() |
| .setMaxScalingDuration(new Period(1000)) |
| .setNumEventsToTrack(10) |
| .setPendingTaskTimeout(new Period(0)) |
| .setWorkerVersion(MIN_VERSION) |
| .setMaxScalingStep(2) |
| .setWorkerCapacityHint(30); |
| strategy = new PendingTaskBasedWorkerProvisioningStrategy( |
| config, |
| DSuppliers.of(workerConfig), |
| new ProvisioningSchedulerConfig(), |
| new Supplier<ScheduledExecutorService>() |
| { |
| @Override |
| public ScheduledExecutorService get() |
| { |
| return executorService; |
| } |
| } |
| ); |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(new ArrayList<String>()).times(2); |
| EasyMock.expect(autoScaler.provision()).andReturn( |
| new AutoScalingData(Collections.singletonList("fake")) |
| ).times(2); |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| // two pending tasks |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| ImmutableList.of( |
| NoopTask.create(), |
| NoopTask.create() |
| ) |
| ).times(2); |
| // Capacity for current worker is 1 |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Arrays.asList( |
| new TestZkWorker(testTask).toImmutable(), |
| new TestZkWorker(testTask, "http", "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node |
| ) |
| ).times(2); |
| EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1); |
| EasyMock.replay(runner); |
| EasyMock.replay(autoScaler); |
| |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean provisionedSomething = provisioner.doProvision(); |
| |
| // Expect to use capacity from current worker (which is 1) |
| // and since there are two pending tasks, we will need two more workers |
| Assert.assertTrue(provisionedSomething); |
| Assert.assertEquals(2, provisioner.getStats().toList().size()); |
| DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); |
| Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent()); |
| Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(1).getEvent()); |
| |
| provisionedSomething = provisioner.doProvision(); |
| |
| Assert.assertFalse(provisionedSomething); |
| Assert.assertTrue( |
| provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION |
| ); |
| DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); |
| Assert.assertTrue( |
| createdTime.equals(anotherCreatedTime) |
| ); |
| |
| EasyMock.verify(autoScaler); |
| EasyMock.verify(runner); |
| } |
| |
| @Test |
| public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButEmptyCurrentlyRunningWorkerShouldUseCapcityFromHintConfig() |
| { |
| PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() |
| .setMaxScalingDuration(new Period(1000)) |
| .setNumEventsToTrack(10) |
| .setPendingTaskTimeout(new Period(0)) |
| .setWorkerVersion(MIN_VERSION) |
| .setMaxScalingStep(2) |
| .setWorkerCapacityHint(30); |
| strategy = new PendingTaskBasedWorkerProvisioningStrategy( |
| config, |
| DSuppliers.of(workerConfig), |
| new ProvisioningSchedulerConfig(), |
| new Supplier<ScheduledExecutorService>() |
| { |
| @Override |
| public ScheduledExecutorService get() |
| { |
| return executorService; |
| } |
| } |
| ); |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(new ArrayList<String>()).times(2); |
| EasyMock.expect(autoScaler.provision()).andReturn( |
| new AutoScalingData(Collections.singletonList("fake")) |
| ).times(1); |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| // two pending tasks |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| ImmutableList.of( |
| NoopTask.create(), |
| NoopTask.create() |
| ) |
| ).times(2); |
| // No currently running worker node |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Collections.emptyList() |
| ).times(2); |
| |
| EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1); |
| EasyMock.replay(runner); |
| EasyMock.replay(autoScaler); |
| |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean provisionedSomething = provisioner.doProvision(); |
| |
| // Expect to use capacity from workerCapacityHint config (which is 30) |
| // and since there are two pending tasks, we will need one more worker |
| Assert.assertTrue(provisionedSomething); |
| Assert.assertEquals(1, provisioner.getStats().toList().size()); |
| DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); |
| Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent()); |
| |
| provisionedSomething = provisioner.doProvision(); |
| |
| Assert.assertFalse(provisionedSomething); |
| Assert.assertTrue( |
| provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION |
| ); |
| DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); |
| Assert.assertTrue( |
| createdTime.equals(anotherCreatedTime) |
| ); |
| |
| EasyMock.verify(autoScaler); |
| EasyMock.verify(runner); |
| } |
| |
| @Test |
| public void testProvisionAlert() throws Exception |
| { |
| ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class); |
| EmittingLogger.registerEmitter(emitter); |
| emitter.emit(EasyMock.<ServiceEventBuilder>anyObject()); |
| EasyMock.expectLastCall(); |
| EasyMock.replay(emitter); |
| |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(new ArrayList<String>()).times(2); |
| EasyMock.expect(autoScaler.terminateWithIds(EasyMock.anyObject())) |
| .andReturn(null); |
| EasyMock.expect(autoScaler.provision()).andReturn( |
| new AutoScalingData(Collections.singletonList("fake")) |
| ); |
| EasyMock.replay(autoScaler); |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| Collections.singletonList( |
| NoopTask.create() |
| ) |
| ).times(2); |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Arrays.asList( |
| new TestZkWorker(testTask, "http", "hi", "lo", MIN_VERSION, 1).toImmutable(), |
| new TestZkWorker(testTask, "http", "h1", "n1", INVALID_VERSION).toImmutable(), // Invalid version node |
| new TestZkWorker(testTask, "http", "h2", "n1", INVALID_VERSION).toImmutable() // Invalid version node |
| ) |
| ).times(2); |
| EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); |
| EasyMock.replay(runner); |
| |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean provisionedSomething = provisioner.doProvision(); |
| |
| Assert.assertTrue(provisionedSomething); |
| Assert.assertTrue(provisioner.getStats().toList().size() == 1); |
| DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); |
| Assert.assertTrue( |
| provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION |
| ); |
| |
| Thread.sleep(2000); |
| |
| provisionedSomething = provisioner.doProvision(); |
| |
| Assert.assertFalse(provisionedSomething); |
| Assert.assertTrue( |
| provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION |
| ); |
| DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); |
| Assert.assertTrue( |
| createdTime.equals(anotherCreatedTime) |
| ); |
| |
| EasyMock.verify(autoScaler); |
| EasyMock.verify(emitter); |
| EasyMock.verify(runner); |
| } |
| |
| @Test |
| public void testDoSuccessfulTerminate() |
| { |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(new ArrayList<String>()); |
| EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn( |
| new AutoScalingData(new ArrayList<>()) |
| ); |
| EasyMock.replay(autoScaler); |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| EasyMock.expect(runner.getPendingTasks()).andReturn( |
| Collections.singletonList( |
| new RemoteTaskRunnerWorkItem( |
| testTask.getId(), |
| testTask.getType(), |
| null, |
| TaskLocation.unknown(), |
| testTask.getDataSource() |
| ).withQueueInsertionTime(DateTimes.nowUtc()) |
| ) |
| ).times(2); |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| ImmutableList.of( |
| new TestZkWorker(testTask).toImmutable(), |
| new TestZkWorker(testTask).toImmutable() |
| ) |
| ).times(2); |
| EasyMock.expect(runner.markWorkersLazy(EasyMock.anyObject(), EasyMock.anyInt())) |
| .andReturn(Collections.singletonList(new TestZkWorker(testTask).getWorker())); |
| EasyMock.expect(runner.getLazyWorkers()).andReturn(new ArrayList<>()); |
| EasyMock.replay(runner); |
| |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean terminatedSomething = provisioner.doTerminate(); |
| |
| Assert.assertTrue(terminatedSomething); |
| Assert.assertTrue(provisioner.getStats().toList().size() == 1); |
| Assert.assertTrue( |
| provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE |
| ); |
| |
| EasyMock.verify(autoScaler); |
| } |
| |
| @Test |
| public void testSomethingTerminating() |
| { |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(Collections.singletonList("ip")).times(2); |
| EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn( |
| new AutoScalingData(Collections.singletonList("ip")) |
| ); |
| EasyMock.replay(autoScaler); |
| |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| ImmutableList.of( |
| new TestZkWorker(testTask).toImmutable(), |
| new TestZkWorker(testTask).toImmutable(), |
| new TestZkWorker(testTask).toImmutable() |
| ) |
| ).times(2); |
| EasyMock.expect(runner.getLazyWorkers()).andReturn(new ArrayList<>()).times(2); |
| EasyMock.expect(runner.markWorkersLazy(EasyMock.anyObject(), EasyMock.anyInt())) |
| .andReturn(Collections.singletonList(new TestZkWorker(testTask).toImmutable().getWorker())); |
| EasyMock.replay(runner); |
| |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean terminatedSomething = provisioner.doTerminate(); |
| |
| Assert.assertTrue(terminatedSomething); |
| Assert.assertTrue(provisioner.getStats().toList().size() == 1); |
| Assert.assertTrue( |
| provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE |
| ); |
| |
| terminatedSomething = provisioner.doTerminate(); |
| |
| Assert.assertFalse(terminatedSomething); |
| Assert.assertTrue(provisioner.getStats().toList().size() == 1); |
| Assert.assertTrue( |
| provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.TERMINATE |
| ); |
| |
| EasyMock.verify(autoScaler); |
| EasyMock.verify(runner); |
| } |
| |
| @Test |
| public void testNoActionNeeded() |
| { |
| EasyMock.reset(autoScaler); |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(Collections.singletonList("ip")); |
| EasyMock.replay(autoScaler); |
| |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| Collections.singletonList( |
| (Task) NoopTask.create() |
| ) |
| ).times(1); |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Arrays.asList( |
| new TestZkWorker(NoopTask.create()).toImmutable(), |
| new TestZkWorker(NoopTask.create()).toImmutable() |
| ) |
| ).times(2); |
| EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); |
| |
| EasyMock.expect(runner.getLazyWorkers()).andReturn(new ArrayList<>()); |
| EasyMock.expect(runner.markWorkersLazy(EasyMock.anyObject(), EasyMock.anyInt())) |
| .andReturn(Collections.emptyList()); |
| EasyMock.replay(runner); |
| |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean terminatedSomething = provisioner.doTerminate(); |
| |
| Assert.assertFalse(terminatedSomething); |
| EasyMock.verify(autoScaler); |
| |
| EasyMock.reset(autoScaler); |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(Collections.singletonList("ip")); |
| EasyMock.replay(autoScaler); |
| |
| boolean provisionedSomething = provisioner.doProvision(); |
| |
| Assert.assertFalse(provisionedSomething); |
| EasyMock.verify(autoScaler); |
| EasyMock.verify(runner); |
| } |
| |
| @Test |
| public void testMinCountIncrease() |
| { |
| // Don't terminate anything |
| EasyMock.reset(autoScaler); |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(Collections.singletonList("ip")); |
| EasyMock.replay(autoScaler); |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| Collections.emptyList() |
| ).times(2); |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Collections.singletonList( |
| new TestZkWorker(NoopTask.create(), "http", "h1", "i1", MIN_VERSION).toImmutable() |
| ) |
| ).times(3); |
| EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(2); |
| |
| EasyMock.expect(runner.getLazyWorkers()).andReturn(new ArrayList<>()); |
| EasyMock.expect(runner.markWorkersLazy(EasyMock.anyObject(), EasyMock.anyInt())) |
| .andReturn(Collections.emptyList()); |
| EasyMock.replay(runner); |
| |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean terminatedSomething = provisioner.doTerminate(); |
| Assert.assertFalse(terminatedSomething); |
| EasyMock.verify(autoScaler); |
| |
| // Don't provision anything |
| EasyMock.reset(autoScaler); |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(Collections.singletonList("ip")); |
| EasyMock.replay(autoScaler); |
| boolean provisionedSomething = provisioner.doProvision(); |
| Assert.assertFalse(provisionedSomething); |
| EasyMock.verify(autoScaler); |
| |
| EasyMock.reset(autoScaler); |
| // Increase minNumWorkers |
| EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); |
| EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); |
| EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) |
| .andReturn(Collections.singletonList("ip")); |
| EasyMock.expect(autoScaler.provision()).andReturn( |
| new AutoScalingData(Collections.singletonList("h3")) |
| ); |
| // Should provision two new workers |
| EasyMock.expect(autoScaler.provision()).andReturn( |
| new AutoScalingData(Collections.singletonList("h4")) |
| ); |
| EasyMock.replay(autoScaler); |
| provisionedSomething = provisioner.doProvision(); |
| Assert.assertTrue(provisionedSomething); |
| EasyMock.verify(autoScaler); |
| EasyMock.verify(runner); |
| } |
| |
| @Test |
| public void testNullWorkerConfig() |
| { |
| workerConfig.set(null); |
| EasyMock.replay(autoScaler); |
| |
| RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); |
| EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( |
| Collections.singletonList( |
| NoopTask.create() |
| ) |
| ).times(1); |
| EasyMock.expect(runner.getWorkers()).andReturn( |
| Collections.singletonList( |
| new TestZkWorker(null).toImmutable() |
| ) |
| ).times(2); |
| EasyMock.replay(runner); |
| |
| Provisioner provisioner = strategy.makeProvisioner(runner); |
| boolean terminatedSomething = provisioner.doTerminate(); |
| |
| boolean provisionedSomething = provisioner.doProvision(); |
| |
| Assert.assertFalse(terminatedSomething); |
| Assert.assertFalse(provisionedSomething); |
| |
| EasyMock.verify(autoScaler); |
| EasyMock.verify(runner); |
| } |
| |
| private static class TestZkWorker extends ZkWorker |
| { |
| private final Task testTask; |
| |
| public TestZkWorker( |
| Task testTask |
| ) |
| { |
| this(testTask, "http", "host", "ip", MIN_VERSION); |
| } |
| |
| public TestZkWorker( |
| Task testTask, |
| String scheme, |
| String host, |
| String ip, |
| String version |
| ) |
| { |
| this(testTask, scheme, host, ip, version, 1); |
| } |
| |
| public TestZkWorker( |
| Task testTask, |
| String scheme, |
| String host, |
| String ip, |
| String version, |
| int capacity |
| ) |
| { |
| super(new Worker(scheme, host, ip, capacity, version, WorkerConfig.DEFAULT_CATEGORY), null, new DefaultObjectMapper()); |
| |
| this.testTask = testTask; |
| } |
| |
| @Override |
| public Map<String, TaskAnnouncement> getRunningTasks() |
| { |
| if (testTask == null) { |
| return new HashMap<>(); |
| } |
| return ImmutableMap.of( |
| testTask.getId(), |
| TaskAnnouncement.create( |
| testTask, |
| TaskStatus.running(testTask.getId()), |
| TaskLocation.unknown() |
| ) |
| ); |
| } |
| } |
| } |