| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.samza.clustermanager; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableMap; |
| import java.time.Duration; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Consumer; |
| import org.apache.commons.lang3.RandomStringUtils; |
| import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore; |
| import org.apache.samza.clustermanager.container.placement.ContainerPlacementRequestAllocator; |
| import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadata; |
| import org.apache.samza.config.ApplicationConfig; |
| import org.apache.samza.config.ClusterManagerConfig; |
| import org.apache.samza.config.Config; |
| import org.apache.samza.config.MapConfig; |
| import org.apache.samza.container.LocalityManager; |
| import org.apache.samza.container.placement.ContainerPlacementMessage; |
| import org.apache.samza.container.placement.ContainerPlacementRequestMessage; |
| import org.apache.samza.container.placement.ContainerPlacementResponseMessage; |
| import org.apache.samza.coordinator.JobModelManager; |
| import org.apache.samza.coordinator.JobModelManagerTestUtil; |
| import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore; |
| import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil; |
| import org.apache.samza.coordinator.server.HttpServer; |
| import org.apache.samza.coordinator.stream.messages.SetContainerHostMapping; |
| import org.apache.samza.job.model.JobModel; |
| import org.apache.samza.metrics.MetricsRegistryMap; |
| import org.apache.samza.testUtils.MockHttpServer; |
| import org.eclipse.jetty.servlet.DefaultServlet; |
| import org.eclipse.jetty.servlet.ServletHolder; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.mockito.invocation.InvocationOnMock; |
| import org.mockito.runners.MockitoJUnitRunner; |
| import org.mockito.stubbing.Answer; |
| |
| import static org.junit.Assert.*; |
| import static org.mockito.Mockito.*; |
| |
| /** |
| * Set of Integration tests for container placement actions |
| * |
| * Please note that semaphores are used wherever possible, there are some Thread.sleep used for the main thread to check |
| * on state changes to atomic variables or synchroized metadata objects because of difficulty of plugging semaphores to |
| * those pieces of logic |
| */ |
| @RunWith(MockitoJUnitRunner.class) |
| public class TestContainerPlacementActions { |
| |
| private HttpServer server = null; |
| |
| private Map<String, String> configVals = new HashMap<String, String>() { |
| { |
| put("cluster-manager.container.count", "1"); |
| put("cluster-manager.container.retry.count", "1"); |
| put("cluster-manager.container.retry.window.ms", "1999999999"); |
| put("cluster-manager.allocator.sleep.ms", "10"); |
| put("cluster-manager.container.request.timeout.ms", "2000"); |
| put("cluster-manager.container.memory.mb", "512"); |
| put("yarn.package.path", "/foo"); |
| put("task.inputs", "test-system.test-stream"); |
| put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory"); |
| put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde"); |
| put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde"); |
| put("job.name", "test-job"); |
| put("job.coordinator.system", "test-kafka"); |
| put("app.run.id", "appAttempt-001"); |
| put("job.standbytasks.replication.factor", "2"); |
| } |
| }; |
| |
| private Config config = new MapConfig(configVals); |
| |
| private CoordinatorStreamStore coordinatorStreamStore; |
| private ContainerPlacementMetadataStore containerPlacementMetadataStore; |
| |
| volatile private SamzaApplicationState state; |
| private ContainerManager containerManager; |
| private MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity; |
| private ContainerProcessManager cpm; |
| private ClusterResourceManager.Callback callback; |
| |
| private Config getConfig() { |
| Map<String, String> map = new HashMap<>(); |
| map.putAll(config); |
| return new MapConfig(map); |
| } |
| |
| private Config getConfigWithHostAffinityAndRetries(boolean withHostAffinity, int maxRetries, |
| boolean failAfterRetries) { |
| Map<String, String> map = new HashMap<>(); |
| map.putAll(config); |
| map.put("job.host-affinity.enabled", String.valueOf(withHostAffinity)); |
| map.put(ClusterManagerConfig.CLUSTER_MANAGER_CONTAINER_RETRY_COUNT, String.valueOf(maxRetries)); |
| map.put(ClusterManagerConfig.CLUSTER_MANAGER_CONTAINER_FAIL_JOB_AFTER_RETRIES, String.valueOf(failAfterRetries)); |
| map.put(ClusterManagerConfig.CLUSTER_MANAGER_CONTAINER_PREFERRED_HOST_LAST_RETRY_DELAY_MS, "100"); |
| return new MapConfig(map); |
| } |
| |
| private JobModelManager getJobModelManagerWithHostAffinity(Map<String, String> containerIdToHost) { |
| Map<String, Map<String, String>> localityMap = new HashMap<>(); |
| containerIdToHost.forEach((containerId, host) -> { |
| localityMap.put(containerId, |
| ImmutableMap.of(SetContainerHostMapping.HOST_KEY, containerIdToHost.get(containerId))); |
| }); |
| LocalityManager mockLocalityManager = mock(LocalityManager.class); |
| when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap); |
| |
| return JobModelManagerTestUtil.getJobModelManagerWithLocalityManager(getConfig(), containerIdToHost.size(), |
| mockLocalityManager, this.server); |
| } |
| |
| private JobModelManager getJobModelManagerWithHostAffinityWithStandby(Map<String, String> containerIdToHost) { |
| Map<String, Map<String, String>> localityMap = new HashMap<>(); |
| containerIdToHost.forEach((containerId, host) -> { |
| localityMap.put(containerId, |
| ImmutableMap.of(SetContainerHostMapping.HOST_KEY, containerIdToHost.get(containerId))); |
| }); |
| LocalityManager mockLocalityManager = mock(LocalityManager.class); |
| when(mockLocalityManager.readContainerLocality()).thenReturn(localityMap); |
| // Generate JobModel for standby containers |
| JobModel standbyJobModel = TestStandbyAllocator.getJobModelWithStandby(2, 2, 2, Optional.of(mockLocalityManager)); |
| return new JobModelManager(standbyJobModel, server, null); |
| } |
| |
| @Before |
| public void setup() throws Exception { |
| server = new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)); |
| // Utils Related to Container Placement Metadata store |
| CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config); |
| coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore(); |
| coordinatorStreamStore.init(); |
| containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore); |
| containerPlacementMetadataStore.start(); |
| // Utils Related to Cluster manager: |
| config = new MapConfig(configVals, getConfigWithHostAffinityAndRetries(true, 1, true)); |
| state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2"))); |
| callback = mock(ClusterResourceManager.Callback.class); |
| MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); |
| ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); |
| containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false)); |
| allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager); |
| cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), |
| clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager); |
| } |
| |
| @After |
| public void teardown() { |
| containerPlacementMetadataStore.stop(); |
| cpm.stop(); |
| coordinatorStreamStore.close(); |
| } |
| |
| public void setupStandby() throws Exception { |
| state = new SamzaApplicationState(getJobModelManagerWithHostAffinityWithStandby(ImmutableMap.of("0", "host-1", "1", "host-2", "0-0", "host-2", "1-0", "host-1"))); |
| callback = mock(ClusterResourceManager.Callback.class); |
| MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); |
| ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); |
| // Enable standby |
| containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, true)); |
| allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager); |
| cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), |
| clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager); |
| } |
| |
| @Test(timeout = 10000) |
| public void testContainerSuccessfulMoveActionWithoutStandby() throws Exception { |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesAvailable((List<SamzaResource>) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesAvailable(anyList()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onStreamProcessorLaunchSuccess(any()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesCompleted((List<SamzaResourceStatus>) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesCompleted(anyList()); |
| |
| cpm.start(); |
| |
| if (!allocatorWithHostAffinity.awaitContainersStart(2, 5, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| while (state.runningProcessors.size() != 2) { |
| Thread.sleep(100); |
| } |
| |
| // App is in running state with two containers running |
| assertEquals(state.runningProcessors.size(), 2); |
| assertEquals(state.runningProcessors.get("0").getHost(), "host-1"); |
| assertEquals(state.runningProcessors.get("1").getHost(), "host-2"); |
| assertEquals(state.preferredHostRequests.get(), 2); |
| assertEquals(state.anyHostRequests.get(), 0); |
| |
| // Initiate container placement action to move a container with container id 0 |
| ContainerPlacementRequestMessage requestMessage = |
| new ContainerPlacementRequestMessage(UUID.randomUUID(), "appAttempt-001", "0", "host-3", |
| System.currentTimeMillis()); |
| |
| ContainerPlacementMetadata metadata = |
| containerManager.registerContainerPlacementActionForTest(requestMessage, allocatorWithHostAffinity); |
| |
| // Wait for the ControlAction to complete |
| if (!allocatorWithHostAffinity.awaitContainersStart(1, 3, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| Optional<ContainerPlacementResponseMessage> responseMessage = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid()); |
| |
| // Wait for the placement action to be complete & get written to the underlying metastore |
| while (true) { |
| if (metadata.getActionStatus() == ContainerPlacementMessage.StatusCode.SUCCEEDED && responseMessage.isPresent() |
| && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.SUCCEEDED) { |
| break; |
| } |
| Thread.sleep(100); |
| responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid()); |
| } |
| |
| assertEquals(state.preferredHostRequests.get(), 3); |
| assertEquals(state.runningProcessors.size(), 2); |
| assertEquals(state.runningProcessors.get("0").getHost(), "host-3"); |
| assertEquals(state.runningProcessors.get("1").getHost(), "host-2"); |
| assertEquals(state.anyHostRequests.get(), 0); |
| assertEquals(metadata.getActionStatus(), ContainerPlacementMessage.StatusCode.SUCCEEDED); |
| |
| assertTrue(responseMessage.isPresent()); |
| assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.SUCCEEDED); |
| assertResponseMessage(responseMessage.get(), requestMessage); |
| } |
| |
| @Test(timeout = 30000) |
| public void testActionQueuingForConsecutivePlacementActions() throws Exception { |
| // Spawn a Request Allocator Thread |
| ContainerPlacementRequestAllocator requestAllocator = |
| new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config), 100); |
| Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread"); |
| |
| requestAllocatorThread.start(); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesAvailable((List<SamzaResource>) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesAvailable(anyList()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onStreamProcessorLaunchSuccess(any()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesCompleted((List<SamzaResourceStatus>) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesCompleted(anyList()); |
| |
| cpm.start(); |
| |
| if (!allocatorWithHostAffinity.awaitContainersStart(2, 5, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| while (state.runningProcessors.size() != 2) { |
| Thread.sleep(100); |
| } |
| |
| // App is in running state with two containers running |
| assertEquals(state.runningProcessors.size(), 2); |
| assertEquals(state.runningProcessors.get("0").getHost(), "host-1"); |
| assertEquals(state.runningProcessors.get("1").getHost(), "host-2"); |
| assertEquals(state.preferredHostRequests.get(), 2); |
| assertEquals(state.anyHostRequests.get(), 0); |
| |
| // Initiate container placement action to move a container with container id 0 |
| |
| UUID requestUUIDMove1 = containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001", "0", "host-3", |
| null, System.currentTimeMillis()); |
| |
| UUID requestUUIDMoveBad = containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-002", "0", "host-4", |
| null, System.currentTimeMillis()); |
| |
| UUID requestUUIDMove2 = containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001", "0", "host-4", |
| null, System.currentTimeMillis()); |
| |
| // Wait for the ControlAction to complete |
| while (true) { |
| if (containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove2).isPresent() && |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove2).get().getStatusCode() |
| == ContainerPlacementMessage.StatusCode.SUCCEEDED) { |
| break; |
| } |
| Thread.sleep(100); |
| } |
| |
| assertEquals(state.preferredHostRequests.get(), 4); |
| assertEquals(state.runningProcessors.size(), 2); |
| assertEquals(state.runningProcessors.get("0").getHost(), "host-4"); |
| assertEquals(state.runningProcessors.get("1").getHost(), "host-2"); |
| assertEquals(state.anyHostRequests.get(), 0); |
| |
| Optional<ContainerPlacementResponseMessage> responseMessageMove1 = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove1); |
| |
| Optional<ContainerPlacementResponseMessage> responseMessageMove2 = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMove2); |
| |
| assertTrue(responseMessageMove1.isPresent()); |
| assertEquals(responseMessageMove1.get().getStatusCode(), ContainerPlacementMessage.StatusCode.SUCCEEDED); |
| |
| assertTrue(responseMessageMove2.isPresent()); |
| assertEquals(responseMessageMove2.get().getStatusCode(), ContainerPlacementMessage.StatusCode.SUCCEEDED); |
| |
| // Request should be deleted as soon as ita accepted / being acted upon |
| assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMove1).isPresent()); |
| assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMove2).isPresent()); |
| |
| // Requests from Previous deploy must be cleaned |
| assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMoveBad).isPresent()); |
| assertFalse(containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMoveBad).isPresent()); |
| |
| // Cleanup Request Allocator Thread |
| cleanUpRequestAllocatorThread(requestAllocator, requestAllocatorThread); |
| } |
| |
| @Test(timeout = 10000) |
| public void testContainerMoveActionExpiredRequestNotAffectRunningContainers() throws Exception { |
| |
| // Mimic the behavior of Expired request |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| List<SamzaResource> resources = (List<SamzaResource>) args[0]; |
| if (resources.get(0).getHost().equals("host-1") || resources.get(0).getHost().equals("host-2")) { |
| cpm.onResourcesAvailable((List<SamzaResource>) args[0]); |
| } |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesAvailable(anyList()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onStreamProcessorLaunchSuccess(any()); |
| |
| cpm.start(); |
| |
| if (!allocatorWithHostAffinity.awaitContainersStart(2, 3, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| while (state.runningProcessors.size() != 2) { |
| Thread.sleep(100); |
| } |
| |
| // App is in running state with two containers running |
| assertEquals(state.runningProcessors.size(), 2); |
| assertEquals(state.runningProcessors.get("0").getHost(), "host-1"); |
| assertEquals(state.runningProcessors.get("1").getHost(), "host-2"); |
| assertEquals(state.preferredHostRequests.get(), 2); |
| assertEquals(state.anyHostRequests.get(), 0); |
| |
| // Initiate container placement action to move a container with container id 0 |
| ContainerPlacementRequestMessage requestMessage = |
| new ContainerPlacementRequestMessage(UUID.randomUUID(), "appAttempt-001", "0", "host-3", Duration.ofMillis(10), |
| System.currentTimeMillis()); |
| ContainerPlacementMetadata metadata = |
| containerManager.registerContainerPlacementActionForTest(requestMessage, allocatorWithHostAffinity); |
| |
| |
| Optional<ContainerPlacementResponseMessage> responseMessage = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid()); |
| |
| // Wait for the placement action to be complete & get written to the underlying metastore |
| while (true) { |
| if (metadata.getActionStatus() == ContainerPlacementMessage.StatusCode.FAILED |
| && responseMessage.isPresent() |
| && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.FAILED) { |
| break; |
| } |
| Thread.sleep(100); |
| responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid()); |
| } |
| |
| assertEquals(state.preferredHostRequests.get(), 3); |
| assertEquals(state.runningProcessors.size(), 2); |
| // Container should not be stooped |
| assertEquals(state.runningProcessors.get("0").getHost(), "host-1"); |
| assertEquals(state.runningProcessors.get("1").getHost(), "host-2"); |
| assertEquals(state.anyHostRequests.get(), 0); |
| |
| assertTrue(responseMessage.isPresent()); |
| assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.FAILED); |
| assertResponseMessage(responseMessage.get(), requestMessage); |
| // Request shall be deleted as soon as it is acted upon |
| assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent()); |
| } |
| |
| @Test(timeout = 10000) |
| public void testActiveContainerLaunchFailureOnControlActionShouldFallbackToSourceHost() throws Exception { |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesAvailable((List<SamzaResource>) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesAvailable(anyList()); |
| |
| // Mimic stream processor launch failure only on host-3 |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| SamzaResource host3Resource = (SamzaResource) args[0]; |
| if (host3Resource.getHost().equals("host-3")) { |
| cpm.onStreamProcessorLaunchFailure(host3Resource, new Throwable("Custom Exception for Host-3")); |
| } else { |
| cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]); |
| } |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onStreamProcessorLaunchSuccess(any()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesCompleted((List<SamzaResourceStatus>) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesCompleted(anyList()); |
| |
| cpm.start(); |
| |
| if (!allocatorWithHostAffinity.awaitContainersStart(2, 5, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| while (state.runningProcessors.size() != 2) { |
| Thread.sleep(100); |
| } |
| |
| // App is in running state with two containers running |
| assertEquals(state.runningProcessors.size(), 2); |
| assertEquals(state.runningProcessors.get("0").getHost(), "host-1"); |
| assertEquals(state.runningProcessors.get("1").getHost(), "host-2"); |
| assertEquals(state.preferredHostRequests.get(), 2); |
| assertEquals(state.anyHostRequests.get(), 0); |
| |
| // Take a container placement action to move a container with container id 0 |
| ContainerPlacementRequestMessage requestMessage = |
| new ContainerPlacementRequestMessage(UUID.randomUUID(), "app-attempt-001", "0", "host-3", |
| System.currentTimeMillis()); |
| ContainerPlacementMetadata metadata = |
| containerManager.registerContainerPlacementActionForTest(requestMessage, allocatorWithHostAffinity); |
| |
| // Wait for the ControlAction to complete |
| if (!allocatorWithHostAffinity.awaitContainersStart(1, 3, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| while (state.runningProcessors.size() != 2) { |
| Thread.sleep(100); |
| } |
| |
| assertEquals(state.preferredHostRequests.get(), 4); |
| assertEquals(state.runningProcessors.size(), 2); |
| // Container 0 should fallback to source host |
| assertEquals(state.runningProcessors.get("0").getHost(), "host-1"); |
| assertEquals(state.runningProcessors.get("1").getHost(), "host-2"); |
| assertEquals(state.anyHostRequests.get(), 0); |
| // Control Action should be failed in this case |
| assertEquals(metadata.getActionStatus(), ContainerPlacementMessage.StatusCode.FAILED); |
| |
| Optional<ContainerPlacementResponseMessage> responseMessage = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid()); |
| |
| assertTrue(responseMessage.isPresent()); |
| assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.FAILED); |
| assertResponseMessage(responseMessage.get(), requestMessage); |
| |
| // Request shall be deleted as soon as it is acted upon |
| assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent()); |
| } |
| |
| |
| @Test(timeout = 20000) |
| public void testContainerPlacementsForJobRunningInDegradedState() throws Exception { |
| // Set failure after retries to false to enable job running in degraded state |
| config = new MapConfig(configVals, getConfigWithHostAffinityAndRetries(true, 1, false)); |
| state = new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2"))); |
| callback = mock(ClusterResourceManager.Callback.class); |
| MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); |
| ClusterManagerConfig clusterManagerConfig = new ClusterManagerConfig(config); |
| containerManager = spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false)); |
| allocatorWithHostAffinity = new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager); |
| cpm = new ContainerProcessManager(clusterManagerConfig, state, new MetricsRegistryMap(), |
| clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesAvailable((List<SamzaResource>) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesAvailable(anyList()); |
| |
| // Mimic stream processor launch failure only on host-2, |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onStreamProcessorLaunchSuccess(any()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesCompleted((List<SamzaResourceStatus>) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesCompleted(anyList()); |
| |
| cpm.start(); |
| |
| |
| if (!allocatorWithHostAffinity.awaitContainersStart(2, 5, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| while (state.runningProcessors.size() != 2) { |
| Thread.sleep(100); |
| } |
| |
| // App is in running state with two containers running |
| assertEquals(state.runningProcessors.size(), 2); |
| assertEquals(state.runningProcessors.get("0").getHost(), "host-1"); |
| assertEquals(state.runningProcessors.get("1").getHost(), "host-2"); |
| assertEquals(state.preferredHostRequests.get(), 2); |
| assertEquals(state.anyHostRequests.get(), 0); |
| |
| |
| // Trigger a container failure |
| clusterResourceManager.stopStreamProcessor(state.runningProcessors.get("1"), -103); |
| // Wait for container to start |
| if (!allocatorWithHostAffinity.awaitContainersStart(1, 2, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| while (state.runningProcessors.size() != 2) { |
| Thread.sleep(100); |
| } |
| // Trigger a container failure again |
| clusterResourceManager.stopStreamProcessor(state.runningProcessors.get("1"), -103); |
| // Ensure that this container has exhausted all retires |
| while (state.failedProcessors.size() != 1 && state.runningProcessors.size() != 1) { |
| Thread.sleep(100); |
| } |
| |
| // At this point the application should only have one container running |
| assertEquals(state.runningProcessors.get("0").getHost(), "host-1"); |
| assertEquals(state.runningProcessors.size(), 1); |
| assertEquals(state.pendingProcessors.size(), 0); |
| assertTrue(state.failedProcessors.containsKey("1")); |
| |
| ContainerPlacementRequestMessage requestMessage = |
| new ContainerPlacementRequestMessage(UUID.randomUUID(), "app-attempt-001", "1", "host-3", |
| System.currentTimeMillis()); |
| |
| ContainerPlacementMetadata metadata = |
| containerManager.registerContainerPlacementActionForTest(requestMessage, allocatorWithHostAffinity); |
| |
| // Wait for the ControlAction to complete |
| if (!allocatorWithHostAffinity.awaitContainersStart(1, 2, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| // Wait for both the containers to be in running state & control action metadata to succeed |
| while (state.runningProcessors.size() != 2 |
| && metadata.getActionStatus() != ContainerPlacementMessage.StatusCode.SUCCEEDED) { |
| Thread.sleep(100); |
| } |
| |
| assertEquals(state.preferredHostRequests.get(), 4); |
| assertEquals(state.runningProcessors.size(), 2); |
| // Container 1 should not go to host-3 |
| assertEquals(state.runningProcessors.get("0").getHost(), "host-1"); |
| assertEquals(state.runningProcessors.get("1").getHost(), "host-3"); |
| assertEquals(state.anyHostRequests.get(), 0); |
| // Failed processors must be empty |
| assertEquals(state.failedProcessors.size(), 0); |
| } |
| |
| @Test(timeout = 10000) |
| public void testAlwaysMoveToAnyHostForHostAffinityDisabled() throws Exception { |
| Map<String, String> conf = new HashMap<>(); |
| conf.putAll(getConfigWithHostAffinityAndRetries(false, 1, true)); |
| SamzaApplicationState state = |
| new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2"))); |
| ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class); |
| MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); |
| ContainerManager containerManager = |
| new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, false, false); |
| MockContainerAllocatorWithoutHostAffinity allocatorWithoutHostAffinity = |
| new MockContainerAllocatorWithoutHostAffinity(clusterResourceManager, new MapConfig(conf), state, |
| containerManager); |
| |
| ContainerProcessManager cpm = new ContainerProcessManager( |
| new ClusterManagerConfig(new MapConfig(getConfig(), getConfigWithHostAffinityAndRetries(false, 1, true))), state, |
| new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithoutHostAffinity), containerManager); |
| |
| // Mimic Cluster Manager returning any request |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| List<SamzaResource> resources = (List<SamzaResource>) args[0]; |
| SamzaResource preferredResource = resources.get(0); |
| SamzaResource anyResource = |
| new SamzaResource(preferredResource.getNumCores(), preferredResource.getMemoryMb(), |
| "host-" + RandomStringUtils.randomAlphanumeric(5), preferredResource.getContainerId()); |
| cpm.onResourcesAvailable(ImmutableList.of(anyResource)); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesAvailable(anyList()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onStreamProcessorLaunchSuccess(any()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesCompleted((List<SamzaResourceStatus>) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesCompleted(anyList()); |
| |
| cpm.start(); |
| |
| // This spawns async start request and waits for async requests to complete |
| if (!allocatorWithoutHostAffinity.awaitContainersStart(2, 3, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| while (state.runningProcessors.size() != 2) { |
| Thread.sleep(100); |
| } |
| |
| // App is in running state with two containers running |
| assertEquals(state.runningProcessors.size(), 2); |
| assertEquals(state.preferredHostRequests.get(), 0); |
| assertEquals(state.anyHostRequests.get(), 2); |
| |
| String previousHostOfContainer1 = state.runningProcessors.get("0").getHost(); |
| String previousHostOfContainer2 = state.runningProcessors.get("1").getHost(); |
| |
| // Initiate container placement action to move a container with container id 0 |
| ContainerPlacementRequestMessage requestMessage = |
| new ContainerPlacementRequestMessage(UUID.randomUUID(), "app-attempt-001", "0", "host-3", |
| System.currentTimeMillis()); |
| ContainerPlacementMetadata metadata = |
| containerManager.registerContainerPlacementActionForTest(requestMessage, allocatorWithoutHostAffinity); |
| |
| // Wait for the ControlAction to complete and spawn an async request |
| if (!allocatorWithoutHostAffinity.awaitContainersStart(1, 3, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| Optional<ContainerPlacementResponseMessage> responseMessage = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid()); |
| |
| while (true) { |
| if (metadata.getActionStatus() == ContainerPlacementMessage.StatusCode.SUCCEEDED && responseMessage.isPresent() |
| && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.SUCCEEDED) { |
| break; |
| } |
| Thread.sleep(100); |
| responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid()); |
| } |
| |
| // We should have no preferred host request |
| assertEquals(0, state.preferredHostRequests.get()); |
| // We should have one more ANY_HOST request |
| assertEquals(3, state.anyHostRequests.get()); |
| assertEquals(2, state.runningProcessors.size()); |
| assertNotEquals(previousHostOfContainer1, state.runningProcessors.get("0").getHost()); |
| // Container 2 should not be affected |
| assertEquals(previousHostOfContainer2, state.runningProcessors.get("1").getHost()); |
| assertEquals(3, state.anyHostRequests.get()); |
| // Action should success |
| assertEquals(ContainerPlacementMessage.StatusCode.SUCCEEDED, metadata.getActionStatus()); |
| |
| assertTrue(responseMessage.isPresent()); |
| assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.SUCCEEDED); |
| assertResponseMessage(responseMessage.get(), requestMessage); |
| |
| /** |
| * Inject a duplicate request and check it is not accepted |
| */ |
| ContainerPlacementRequestMessage duplicateRequestToBeIgnored = |
| new ContainerPlacementRequestMessage(requestMessage.getUuid(), "app-attempt-001", "1", |
| "host-3", System.currentTimeMillis()); |
| |
| // Request with a dup uuid should not be accepted |
| metadata = containerManager.registerContainerPlacementActionForTest(duplicateRequestToBeIgnored, |
| allocatorWithoutHostAffinity); |
| // metadata should be from the previous completed action |
| assertTrue(metadata == null || metadata.getUuid() != duplicateRequestToBeIgnored.getUuid()); |
| |
| responseMessage = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid()); |
| |
| assertTrue(responseMessage.isPresent()); |
| assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.BAD_REQUEST); |
| assertResponseMessage(responseMessage.get(), duplicateRequestToBeIgnored); |
| |
| // Request shall be deleted as soon as it is acted upon |
| assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent()); |
| } |
| |
| @Test(expected = NullPointerException.class) |
| public void testBadControlRequestRejected() throws Exception { |
| SamzaApplicationState state = |
| new SamzaApplicationState(getJobModelManagerWithHostAffinity(ImmutableMap.of("0", "host-1", "1", "host-2"))); |
| ClusterResourceManager.Callback callback = mock(ClusterResourceManager.Callback.class); |
| MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state); |
| ContainerManager containerManager = |
| spy(new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false)); |
| MockContainerAllocatorWithHostAffinity allocatorWithHostAffinity = |
| new MockContainerAllocatorWithHostAffinity(clusterResourceManager, config, state, containerManager); |
| ContainerProcessManager cpm = new ContainerProcessManager( |
| new ClusterManagerConfig(new MapConfig(getConfig(), getConfigWithHostAffinityAndRetries(true, 1, true))), state, |
| new MetricsRegistryMap(), clusterResourceManager, Optional.of(allocatorWithHostAffinity), containerManager); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesAvailable((List<SamzaResource>) args[0]); |
| return null; |
| } |
| }).when(callback).onResourcesAvailable(anyList()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| Object[] args = invocation.getArguments(); |
| cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]); |
| return null; |
| } |
| }).when(callback).onStreamProcessorLaunchSuccess(any()); |
| |
| cpm.start(); |
| |
| if (!allocatorWithHostAffinity.awaitContainersStart(2, 3, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| assertBadRequests(null, "host2", containerManager, allocatorWithHostAffinity); |
| assertBadRequests("0", null, containerManager, allocatorWithHostAffinity); |
| assertBadRequests("2", "host8", containerManager, allocatorWithHostAffinity); |
| } |
| |
| |
| @Test(timeout = 30000) |
| public void testContainerSuccessfulMoveActionWithStandbyEnabled() throws Exception { |
| // Setup standby for job |
| setupStandby(); |
| |
| // Spawn a Request Allocator Thread |
| ContainerPlacementRequestAllocator requestAllocator = |
| new ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new ApplicationConfig(config), 100); |
| Thread requestAllocatorThread = new Thread(requestAllocator, "ContainerPlacement Request Allocator Thread"); |
| |
| requestAllocatorThread.start(); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesAvailable((List<SamzaResource>) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesAvailable(anyList()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onStreamProcessorLaunchSuccess((SamzaResource) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onStreamProcessorLaunchSuccess(any()); |
| |
| doAnswer(new Answer<Void>() { |
| public Void answer(InvocationOnMock invocation) { |
| new Thread(() -> { |
| Object[] args = invocation.getArguments(); |
| cpm.onResourcesCompleted((List<SamzaResourceStatus>) args[0]); |
| }, "AMRMClientAsync").start(); |
| return null; |
| } |
| }).when(callback).onResourcesCompleted(anyList()); |
| |
| cpm.start(); |
| |
| if (!allocatorWithHostAffinity.awaitContainersStart(4, 4, TimeUnit.SECONDS)) { |
| fail("timed out waiting for the containers to start"); |
| } |
| |
| while (state.runningProcessors.size() != 4) { |
| Thread.sleep(100); |
| } |
| |
| // First running state of the app |
| Consumer<SamzaApplicationState> stateCheck = (SamzaApplicationState state) -> { |
| assertEquals(4, state.runningProcessors.size()); |
| assertEquals("host-1", state.runningProcessors.get("0").getHost()); |
| assertEquals("host-2", state.runningProcessors.get("1").getHost()); |
| assertEquals("host-2", state.runningProcessors.get("0-0").getHost()); |
| assertEquals("host-1", state.runningProcessors.get("1-0").getHost()); |
| assertEquals(4, state.preferredHostRequests.get()); |
| assertEquals(0, state.failedStandbyAllocations.get()); |
| assertEquals(0, state.anyHostRequests.get()); |
| }; |
| // Invoke a state check |
| stateCheck.accept(state); |
| |
| // Initiate a bad container placement action to move a standby to its active host and vice versa |
| // which should fail because this violates standby constraints |
| UUID badRequest1 = containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001", "0-0", "host-1", |
| null, System.currentTimeMillis()); |
| |
| UUID badRequest2 = containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001", "0", "host-2", |
| null, System.currentTimeMillis() + 100); |
| |
| // Wait for the ControlActions to complete |
| while (true) { |
| if (containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest2).isPresent() && |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest2).get().getStatusCode() |
| == ContainerPlacementMessage.StatusCode.BAD_REQUEST) { |
| break; |
| } |
| Thread.sleep(100); |
| } |
| |
| // App running state should remain the same |
| stateCheck.accept(state); |
| |
| Optional<ContainerPlacementResponseMessage> responseMessageMove1 = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest1); |
| Optional<ContainerPlacementResponseMessage> responseMessageMove2 = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(badRequest2); |
| |
| // Assert that both the requests were bad |
| assertTrue(responseMessageMove1.isPresent()); |
| assertEquals(responseMessageMove1.get().getStatusCode(), ContainerPlacementMessage.StatusCode.BAD_REQUEST); |
| assertTrue(responseMessageMove2.isPresent()); |
| assertEquals(responseMessageMove2.get().getStatusCode(), ContainerPlacementMessage.StatusCode.BAD_REQUEST); |
| |
| |
| // Initiate a standby failover which is supposed to be done in two steps |
| // Step 1. Move the standby container to any other host: move 0-0 to say host-3 |
| // Step 2. Move the active container to the standby's host: move 0 to host-1 |
| |
| // Action will get executed first |
| UUID standbyMoveRequest = |
| containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001", "0-0", "host-3", |
| null, System.currentTimeMillis()); |
| // Action will get executed when standbyMoveRequest move request is complete |
| UUID activeMoveRequest = containerPlacementMetadataStore.writeContainerPlacementRequestMessage("appAttempt-001", "0", "host-2", null, |
| System.currentTimeMillis() + 100); |
| |
| // Wait for the ControlActions to complete |
| while (true) { |
| if (containerPlacementMetadataStore.readContainerPlacementResponseMessage(activeMoveRequest).isPresent() && |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(activeMoveRequest).get().getStatusCode() |
| == ContainerPlacementMessage.StatusCode.SUCCEEDED) { |
| break; |
| } |
| Thread.sleep(100); |
| } |
| |
| assertEquals(4, state.runningProcessors.size()); |
| assertEquals("host-2", state.runningProcessors.get("0").getHost()); |
| assertEquals("host-2", state.runningProcessors.get("1").getHost()); |
| assertEquals("host-3", state.runningProcessors.get("0-0").getHost()); |
| assertEquals("host-1", state.runningProcessors.get("1-0").getHost()); |
| assertEquals(6, state.preferredHostRequests.get()); |
| assertEquals(0, state.failedStandbyAllocations.get()); |
| assertEquals(0, state.anyHostRequests.get()); |
| |
| |
| Optional<ContainerPlacementResponseMessage> responseStandbyMove = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(standbyMoveRequest); |
| |
| Optional<ContainerPlacementResponseMessage> responseActiveMove = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(activeMoveRequest); |
| |
| assertTrue(responseStandbyMove.isPresent()); |
| assertEquals(responseStandbyMove.get().getStatusCode(), ContainerPlacementMessage.StatusCode.SUCCEEDED); |
| |
| assertTrue(responseActiveMove.isPresent()); |
| assertEquals(responseActiveMove.get().getStatusCode(), ContainerPlacementMessage.StatusCode.SUCCEEDED); |
| |
| // Request should be deleted as soon as ita accepted / being acted upon |
| assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(standbyMoveRequest).isPresent()); |
| assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(activeMoveRequest).isPresent()); |
| |
| // Cleanup Request Allocator Thread |
| cleanUpRequestAllocatorThread(requestAllocator, requestAllocatorThread); |
| } |
| |
| private void assertResponseMessage(ContainerPlacementResponseMessage responseMessage, |
| ContainerPlacementRequestMessage requestMessage) { |
| assertEquals(responseMessage.getProcessorId(), requestMessage.getProcessorId()); |
| assertEquals(responseMessage.getDeploymentId(), requestMessage.getDeploymentId()); |
| assertEquals(responseMessage.getDestinationHost(), requestMessage.getDestinationHost()); |
| } |
| |
| private void assertBadRequests(String processorId, String destinationHost, ContainerManager containerManager, |
| ContainerAllocator allocator) throws InterruptedException { |
| ContainerPlacementRequestMessage requestMessage = |
| new ContainerPlacementRequestMessage(UUID.randomUUID(), "app-Attemp-001", processorId, destinationHost, |
| System.currentTimeMillis()); |
| ContainerPlacementMetadata metadata = |
| containerManager.registerContainerPlacementActionForTest(requestMessage, allocator); |
| assertNull(metadata); |
| |
| Optional<ContainerPlacementResponseMessage> responseMessage = |
| containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid()); |
| |
| while (true) { |
| if (responseMessage.isPresent() |
| && responseMessage.get().getStatusCode() == ContainerPlacementMessage.StatusCode.BAD_REQUEST) { |
| break; |
| } |
| Thread.sleep(100); |
| responseMessage = containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestMessage.getUuid()); |
| } |
| |
| assertEquals(responseMessage.get().getStatusCode(), ContainerPlacementMessage.StatusCode.BAD_REQUEST); |
| assertResponseMessage(responseMessage.get(), requestMessage); |
| // Request shall be deleted as soon as it is acted upon |
| assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent()); |
| } |
| |
| private void cleanUpRequestAllocatorThread(ContainerPlacementRequestAllocator requestAllocator, Thread containerPlacementRequestAllocatorThread) { |
| requestAllocator.stop(); |
| try { |
| containerPlacementRequestAllocatorThread.join(); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |