| /** |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.aurora.scheduler.maintenance; |
| |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.Executor; |
| |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.inject.AbstractModule; |
| import com.google.inject.Guice; |
| import com.google.inject.Injector; |
| import com.google.inject.TypeLiteral; |
| |
| import org.apache.aurora.common.quantity.Amount; |
| import org.apache.aurora.common.quantity.Time; |
| import org.apache.aurora.common.stats.StatsProvider; |
| import org.apache.aurora.common.testing.easymock.EasyMockTest; |
| import org.apache.aurora.gen.CountSlaPolicy; |
| import org.apache.aurora.gen.HostAttributes; |
| import org.apache.aurora.gen.HostMaintenanceRequest; |
| import org.apache.aurora.gen.HostStatus; |
| import org.apache.aurora.gen.MaintenanceMode; |
| import org.apache.aurora.gen.PercentageSlaPolicy; |
| import org.apache.aurora.gen.ScheduledTask; |
| import org.apache.aurora.gen.SlaPolicy; |
| import org.apache.aurora.scheduler.SchedulerModule.TaskEventBatchWorker; |
| import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor; |
| import org.apache.aurora.scheduler.base.Query; |
| import org.apache.aurora.scheduler.base.TaskTestUtil; |
| import org.apache.aurora.scheduler.config.types.TimeAmount; |
| import org.apache.aurora.scheduler.events.EventSink; |
| import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange; |
| import org.apache.aurora.scheduler.events.PubsubEventModule; |
| import org.apache.aurora.scheduler.sla.SlaManager; |
| import org.apache.aurora.scheduler.state.PubsubTestUtil; |
| import org.apache.aurora.scheduler.state.StateManager; |
| import org.apache.aurora.scheduler.storage.Storage; |
| import org.apache.aurora.scheduler.storage.entities.IHostAttributes; |
| import org.apache.aurora.scheduler.storage.entities.IHostMaintenanceRequest; |
| import org.apache.aurora.scheduler.storage.entities.IHostStatus; |
| import org.apache.aurora.scheduler.storage.entities.IScheduledTask; |
| import org.apache.aurora.scheduler.storage.entities.ISlaPolicy; |
| import org.apache.aurora.scheduler.storage.testing.StorageTestUtil; |
| import org.apache.aurora.scheduler.testing.FakeStatsProvider; |
| import org.apache.mesos.v1.Protos; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| import static org.apache.aurora.gen.MaintenanceMode.DRAINED; |
| import static org.apache.aurora.gen.MaintenanceMode.DRAINING; |
| import static org.apache.aurora.gen.MaintenanceMode.NONE; |
| import static org.apache.aurora.gen.MaintenanceMode.SCHEDULED; |
| import static org.apache.aurora.gen.ScheduleStatus.KILLED; |
| import static org.apache.aurora.gen.ScheduleStatus.RUNNING; |
| import static org.apache.aurora.scheduler.testing.BatchWorkerUtil.expectBatchExecute; |
| import static org.easymock.EasyMock.anyObject; |
| import static org.easymock.EasyMock.eq; |
| import static org.easymock.EasyMock.expect; |
| import static org.junit.Assert.assertEquals; |
| |
| public class MaintenanceControllerImplTest extends EasyMockTest { |
| |
| private static final String HOST_A = "a"; |
| private static final Set<String> A = ImmutableSet.of(HOST_A); |
| private static final Protos.OfferID OFFER_ID = Protos.OfferID.newBuilder() |
| .setValue("offer-id") |
| .build(); |
| private static final Protos.AgentID AGENT_ID = Protos.AgentID.newBuilder() |
| .setValue("agent-id") |
| .build(); |
| private static final Protos.FrameworkID FRAMEWORK_ID = Protos.FrameworkID.newBuilder() |
| .setValue("framework-id") |
| .build(); |
| private static final Protos.URL AGENT_URL = Protos.URL.newBuilder() |
| .setAddress(Protos.Address.newBuilder() |
| .setHostname(HOST_A) |
| .setPort(5051)) |
| .setScheme("http") |
| .build(); |
| private static final Protos.Unavailability UNAVAILABILITY = Protos.Unavailability.newBuilder() |
| .setStart(Protos.TimeInfo.newBuilder() |
| .setNanoseconds(Amount.of(1L, Time.MINUTES).as(Time.NANOSECONDS))) |
| .build(); |
| private static final SlaPolicy SLA_POLICY = SlaPolicy.percentageSlaPolicy( |
| new PercentageSlaPolicy(95, 1800)); |
| |
| private static final Protos.InverseOffer INVERSE_OFFER = Protos.InverseOffer.newBuilder() |
| .setId(OFFER_ID) |
| .setAgentId(AGENT_ID) |
| .setUrl(AGENT_URL) |
| .setFrameworkId(FRAMEWORK_ID) |
| .setUnavailability(UNAVAILABILITY) |
| .build(); |
| |
| private static final SlaPolicy COUNT_SLA_POLICY = SlaPolicy.countSlaPolicy( |
| new CountSlaPolicy() |
| .setCount(2) |
| .setDurationSecs(1800) |
| ); |
| |
| private StorageTestUtil storageUtil; |
| private StateManager stateManager; |
| private SlaManager slaManager; |
| private MaintenanceController.MaintenanceControllerImpl maintenance; |
| private EventSink eventSink; |
| |
| @Before |
| public void setUp() throws Exception { |
| storageUtil = new StorageTestUtil(this); |
| storageUtil.expectOperations(); |
| stateManager = createMock(StateManager.class); |
| slaManager = createMock(SlaManager.class); |
| TaskEventBatchWorker batchWorker = createMock(TaskEventBatchWorker.class); |
| expectBatchExecute(batchWorker, storageUtil.storage, control).anyTimes(); |
| |
| Injector injector = Guice.createInjector( |
| new PubsubEventModule(), |
| new MaintenanceModule(new MaintenanceModule.Options()), |
| new AbstractModule() { |
| @Override |
| protected void configure() { |
| bind(Storage.class).toInstance(storageUtil.storage); |
| bind(StateManager.class).toInstance(stateManager); |
| bind(SlaManager.class).toInstance(slaManager); |
| bind(StatsProvider.class).toInstance(new FakeStatsProvider()); |
| bind(Executor.class).annotatedWith(AsyncExecutor.class) |
| .toInstance(MoreExecutors.directExecutor()); |
| bind(TaskEventBatchWorker.class).toInstance(batchWorker); |
| bind(new TypeLiteral<Amount<Long, Time>>() { }) |
| .annotatedWith( |
| MaintenanceController.MaintenanceControllerImpl.PollingInterval.class) |
| .toInstance(new TimeAmount(1, Time.MINUTES)); |
| } |
| }); |
| maintenance = injector.getInstance(MaintenanceController.MaintenanceControllerImpl.class); |
| eventSink = PubsubTestUtil.startPubsub(injector); |
| } |
| |
| private static IScheduledTask makeTask(String host, String taskId) { |
| ScheduledTask builder = TaskTestUtil.addStateTransition( |
| TaskTestUtil.makeTask(taskId, TaskTestUtil.JOB), |
| RUNNING, |
| 1000).newBuilder(); |
| builder.getAssignedTask().setSlaveHost(host); |
| return IScheduledTask.build(builder); |
| } |
| |
| @Test |
| public void testMaintenanceCycle() { |
| IScheduledTask task1 = makeTask(HOST_A, "taskA"); |
| IScheduledTask task2 = makeTask(HOST_A, "taskB"); |
| |
| expectMaintenanceModeChange(HOST_A, SCHEDULED); |
| expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2)); |
| expectTaskDraining(task1); |
| expectTaskDraining(task2); |
| expectMaintenanceModeChange(HOST_A, DRAINING); |
| IHostAttributes attributes = |
| IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)); |
| IHostMaintenanceRequest maintenanceRequest = |
| IHostMaintenanceRequest.build(new HostMaintenanceRequest() |
| .setHost(HOST_A) |
| .setCreatedTimestampMs(System.currentTimeMillis()) |
| .setTimeoutSecs(7200) |
| .setDefaultSlaPolicy(SLA_POLICY)); |
| |
| storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest( |
| anyObject(IHostMaintenanceRequest.class)); |
| expect(storageUtil.attributeStore.getHostAttributes(HOST_A)) |
| .andReturn(Optional.of(attributes)).times(2); |
| expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A)) |
| .andReturn(Optional.of(maintenanceRequest)).times(2); |
| |
| expect(storageUtil.attributeStore.getHostAttributes()).andReturn(ImmutableSet.of(attributes)); |
| expectFetchTasksByHost(HOST_A, ImmutableSet.of(task2)); |
| // TaskA is KILLED and therefore no longer active |
| expectFetchTasksByHost(HOST_A, ImmutableSet.of()); |
| expectMaintenanceModeChange(HOST_A, DRAINED); |
| expectMaintenanceModeChange(HOST_A, NONE); |
| storageUtil.hostMaintenanceStore.removeHostMaintenanceRequest(HOST_A); |
| // end maintenance |
| storageUtil.hostMaintenanceStore.removeHostMaintenanceRequest(HOST_A); |
| |
| control.replay(); |
| |
| assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A)); |
| assertStatus(HOST_A, DRAINING, maintenance.drain(A)); |
| assertStatus(HOST_A, DRAINING, maintenance.getStatus(A)); |
| eventSink.post( |
| TaskStateChange.transition( |
| IScheduledTask.build(task1.newBuilder().setStatus(KILLED)), RUNNING)); |
| eventSink.post( |
| TaskStateChange.transition( |
| IScheduledTask.build(task2.newBuilder().setStatus(KILLED)), RUNNING)); |
| assertStatus(HOST_A, NONE, maintenance.endMaintenance(A)); |
| } |
| |
| @Test |
| public void testUnknownHost() { |
| expect(storageUtil.attributeStore.getHostAttributes("b")) |
| .andReturn(Optional.empty()); |
| |
| control.replay(); |
| |
| assertEquals(ImmutableSet.of(), |
| maintenance.startMaintenance(ImmutableSet.of("b"))); |
| } |
| |
| @Test |
| public void testDrainEmptyHost() { |
| storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest( |
| anyObject(IHostMaintenanceRequest.class)); |
| expectMaintenanceModeChange(HOST_A, SCHEDULED); |
| expectFetchTasksByHost(HOST_A, ImmutableSet.of()); |
| expectMaintenanceModeChange(HOST_A, DRAINED); |
| |
| control.replay(); |
| |
| assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A)); |
| assertStatus(HOST_A, DRAINED, maintenance.drain(A)); |
| } |
| |
| @Test |
| public void testEndEarly() { |
| expectMaintenanceModeChange(HOST_A, SCHEDULED); |
| expectMaintenanceModeChange(HOST_A, NONE); |
| expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.of( |
| IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(NONE)))); |
| storageUtil.hostMaintenanceStore.removeHostMaintenanceRequest(HOST_A); |
| |
| control.replay(); |
| |
| assertStatus(HOST_A, SCHEDULED, maintenance.startMaintenance(A)); |
| |
| // End maintenance without DRAINING. |
| assertStatus(HOST_A, NONE, maintenance.endMaintenance(A)); |
| |
| // Make sure a later transition on the host does not cause any ill effects that could surface |
| // from stale internal state. |
| eventSink.post(TaskStateChange.transition( |
| IScheduledTask.build(makeTask(HOST_A, "taskA").newBuilder().setStatus(KILLED)), RUNNING)); |
| } |
| |
| @Test |
| public void testSlaDrain() { |
| storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest( |
| anyObject(IHostMaintenanceRequest.class)); |
| expectMaintenanceModeChange(HOST_A, DRAINING); |
| |
| control.replay(); |
| |
| assertStatus( |
| HOST_A, |
| DRAINING, |
| maintenance.slaDrain(ImmutableSet.of(HOST_A), COUNT_SLA_POLICY, 1800)); |
| } |
| |
| @Test |
| public void testSlaDrainUnknownHost() { |
| storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest( |
| anyObject(IHostMaintenanceRequest.class)); |
| expect(storageUtil.attributeStore.getHostAttributes("unknown")) |
| .andReturn(Optional.empty()); |
| |
| control.replay(); |
| |
| assertEquals(ImmutableSet.of(), |
| maintenance.slaDrain(ImmutableSet.of("unknown"), COUNT_SLA_POLICY, 1800)); |
| } |
| |
| @Test |
| public void testIteration() { |
| IScheduledTask task1 = makeTask(HOST_A, "taskA"); |
| IScheduledTask task2 = makeTask(HOST_A, "taskB"); |
| |
| IHostAttributes attributes = |
| IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)); |
| expect(storageUtil.attributeStore.getHostAttributes()) |
| .andReturn(ImmutableSet.of(attributes)); |
| expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2)); |
| IHostMaintenanceRequest maintenanceRequest = |
| IHostMaintenanceRequest.build(new HostMaintenanceRequest() |
| .setHost(HOST_A) |
| .setCreatedTimestampMs(System.currentTimeMillis()) |
| .setTimeoutSecs(7200) |
| .setDefaultSlaPolicy(SLA_POLICY)); |
| expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A)) |
| .andReturn(Optional.of(maintenanceRequest)).times(2); |
| expectTaskDraining(task1); |
| expectTaskDraining(task2); |
| |
| control.replay(); |
| |
| maintenance.runForTest(); |
| } |
| |
| @Test |
| public void testIterationEmptyHost() { |
| IHostAttributes attributes = |
| IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)); |
| expect(storageUtil.attributeStore.getHostAttributes()) |
| .andReturn(ImmutableSet.of(attributes)); |
| expectFetchTasksByHost(HOST_A, ImmutableSet.of()); |
| expectMaintenanceModeChange(HOST_A, DRAINED); |
| |
| control.replay(); |
| |
| maintenance.runForTest(); |
| } |
| |
| @Test |
| public void testIterationMaintenanceTimeout() { |
| IScheduledTask task1 = makeTask(HOST_A, "taskA"); |
| IScheduledTask task2 = makeTask(HOST_A, "taskB"); |
| |
| IHostAttributes attributes = |
| IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)); |
| expect(storageUtil.attributeStore.getHostAttributes()) |
| .andReturn(ImmutableSet.of(attributes)); |
| expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2)); |
| IHostMaintenanceRequest maintenanceRequest = |
| IHostMaintenanceRequest.build(new HostMaintenanceRequest() |
| .setHost(HOST_A) |
| .setCreatedTimestampMs(0) |
| .setDefaultSlaPolicy(SLA_POLICY)); |
| expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A)) |
| .andReturn(Optional.of(maintenanceRequest)).times(2); |
| expectTaskDraining(task1, true); |
| expectTaskDraining(task2, true); |
| |
| control.replay(); |
| |
| maintenance.runForTest(); |
| } |
| |
| @Test |
| public void testIterationNoMaintenanceRequest() { |
| IScheduledTask task1 = makeTask(HOST_A, "taskA"); |
| IScheduledTask task2 = makeTask(HOST_A, "taskB"); |
| |
| IHostAttributes attributes = |
| IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)); |
| expect(storageUtil.attributeStore.getHostAttributes()) |
| .andReturn(ImmutableSet.of(attributes)); |
| expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1, task2)); |
| expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A)) |
| .andReturn(Optional.empty()).times(2); |
| |
| control.replay(); |
| |
| maintenance.runForTest(); |
| } |
| |
| @Test |
| public void testGetMode() { |
| expect(storageUtil.attributeStore.getHostAttributes(HOST_A)).andReturn(Optional.of( |
| IHostAttributes.build(new HostAttributes().setHost(HOST_A).setMode(DRAINING)))); |
| expect(storageUtil.attributeStore.getHostAttributes("unknown")).andReturn(Optional.empty()); |
| |
| control.replay(); |
| |
| assertEquals(DRAINING, maintenance.getMode(HOST_A)); |
| assertEquals(NONE, maintenance.getMode("unknown")); |
| } |
| |
| @Test |
| public void testInverseOfferDrain() { |
| IScheduledTask task1 = makeTask(HOST_A, "taskA"); |
| storageUtil.hostMaintenanceStore.saveHostMaintenanceRequest( |
| anyObject(IHostMaintenanceRequest.class)); |
| expectFetchTasksByHost(HOST_A, ImmutableSet.of(task1)); |
| expectTaskDraining(task1); |
| IHostMaintenanceRequest maintenanceRequest = |
| IHostMaintenanceRequest.build(new HostMaintenanceRequest() |
| .setHost(HOST_A) |
| .setCreatedTimestampMs(System.currentTimeMillis()) |
| .setTimeoutSecs(7200) |
| .setDefaultSlaPolicy(SLA_POLICY)); |
| expect(storageUtil.hostMaintenanceStore.getHostMaintenanceRequest(HOST_A)) |
| .andReturn(Optional.of(maintenanceRequest)).times(1); |
| |
| control.replay(); |
| maintenance.drainForInverseOffer(INVERSE_OFFER); |
| } |
| |
| private void expectTaskDraining(IScheduledTask task) { |
| expectTaskDraining(task, false); |
| } |
| |
| private void expectTaskDraining(IScheduledTask task, boolean force) { |
| slaManager.checkSlaThenAct( |
| eq(task), |
| eq(ISlaPolicy.build(SLA_POLICY)), |
| anyObject(Storage.MutateWork.class), |
| anyObject(ImmutableMap.class), |
| eq(force)); |
| } |
| |
| private void expectFetchTasksByHost(String hostName, Set<IScheduledTask> tasks) { |
| expect(storageUtil.taskStore.fetchTasks(Query.slaveScoped(hostName).active())).andReturn(tasks); |
| } |
| |
| private void expectMaintenanceModeChange(String hostName, MaintenanceMode mode) { |
| IHostAttributes attributes = IHostAttributes.build(new HostAttributes().setHost(hostName)); |
| |
| expect(storageUtil.attributeStore.getHostAttributes(hostName)) |
| .andReturn(Optional.of(attributes)); |
| IHostAttributes updated = IHostAttributes.build(attributes.newBuilder().setMode(mode)); |
| expect(storageUtil.attributeStore.saveHostAttributes(updated)).andReturn(true); |
| } |
| |
| private void assertStatus(String host, MaintenanceMode mode, Set<IHostStatus> statuses) { |
| assertEquals(ImmutableSet.of(IHostStatus.build(new HostStatus(host, mode))), statuses); |
| } |
| } |