blob: 2b4a4b0b2692503d615d9ffad2accdbbf30b8047 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.clustermanager;
import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.JobModelManagerTestUtil;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
import org.apache.samza.job.model.ProcessorLocality;
import org.apache.samza.job.model.LocalityModel;
import org.apache.samza.testUtils.MockHttpServer;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class)
public class TestContainerAllocatorWithHostAffinity {
private final Config config = getConfig();
private final JobModelManager jobModelManager = initializeJobModelManager(getConfig(), 1);
private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);
private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
private ContainerPlacementMetadataStore containerPlacementMetadataStore;
private ContainerManager containerManager;
private JobModelManager initializeJobModelManager(Config config, int containerCount) {
return JobModelManagerTestUtil.getJobModelManager(config, containerCount,
new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)));
}
private ContainerAllocator containerAllocator;
private ContainerAllocator spyAllocator;
private final int timeoutMillis = 1000;
private MockContainerRequestState requestState;
private Thread allocatorThread;
private Thread spyAllocatorThread;
@Before
public void setup() throws Exception {
LocalityManager mockLocalityManager = mock(LocalityManager.class);
when(mockLocalityManager.readLocality())
.thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "abc"))));
CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config);
CoordinatorStreamStore coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
coordinatorStreamStore.init();
containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
containerPlacementMetadataStore.start();
containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
containerAllocator =
new ContainerAllocator(clusterResourceManager, config, state, true, containerManager);
requestState = new MockContainerRequestState(clusterResourceManager, true);
Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
requestStateField.setAccessible(true);
requestStateField.set(containerAllocator, requestState);
allocatorThread = new Thread(containerAllocator);
}
/**
* Test request containers with no containerToHostMapping makes the right number of requests
*/
@Test
public void testRequestContainersWithNoMapping() throws Exception {
int containerCount = 4;
Map<String, String> containersToHostMapping = new HashMap<String, String>();
for (int i = 0; i < containerCount; i++) {
containersToHostMapping.put(String.valueOf(i), null);
}
allocatorThread.start();
containerAllocator.requestResources(containersToHostMapping);
assertNotNull(requestState);
assertEquals(4, requestState.numPendingRequests());
assertNotNull(requestState.getHostRequestCounts());
assertEquals(1, requestState.getHostRequestCounts().keySet().size());
assertTrue(requestState.getHostRequestCounts().keySet().contains(ResourceRequestState.ANY_HOST));
}
/**
* Add containers to the correct host in the request state
*/
@Test
public void testAddContainerWithHostAffinity() throws Exception {
containerAllocator.requestResources(new HashMap<String, String>() {
{
put("0", "abc");
put("1", "xyz");
}
});
assertNotNull(requestState.getResourcesOnAHost("abc"));
assertEquals(0, requestState.getResourcesOnAHost("abc").size());
assertNotNull(requestState.getResourcesOnAHost("xyz"));
assertEquals(0, requestState.getResourcesOnAHost("xyz").size());
assertNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID1"));
containerAllocator.addResource(new SamzaResource(1, 10, "def", "ID2"));
containerAllocator.addResource(new SamzaResource(1, 10, "xyz", "ID3"));
assertNotNull(requestState.getResourcesOnAHost("abc"));
assertEquals(1, requestState.getResourcesOnAHost("abc").size());
assertNotNull(requestState.getResourcesOnAHost("xyz"));
assertEquals(1, requestState.getResourcesOnAHost("xyz").size());
assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size() == 1);
assertEquals("ID2", requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).get(0).getContainerId());
}
/**
* Test that extra resources are buffered under ANY_HOST
*/
@Test
public void testSurplusResourcesAreBufferedUnderAnyHost() throws Exception {
containerAllocator.requestResources(new HashMap<String, String>() {
{
put("0", "abc");
put("1", "xyz");
}
});
assertNotNull(requestState.getResourcesOnAHost("abc"));
assertEquals(0, requestState.getResourcesOnAHost("abc").size());
assertNotNull(requestState.getResourcesOnAHost("xyz"));
assertEquals(0, requestState.getResourcesOnAHost("xyz").size());
assertNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID1"));
containerAllocator.addResource(new SamzaResource(1, 10, "xyz", "ID2"));
// surplus resources for host - "abc"
containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID3"));
containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID4"));
containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID5"));
containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID6"));
assertNotNull(requestState.getResourcesOnAHost("abc"));
assertEquals(1, requestState.getResourcesOnAHost("abc").size());
assertNotNull(requestState.getResourcesOnAHost("xyz"));
assertEquals(1, requestState.getResourcesOnAHost("xyz").size());
assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
// assert that the surplus resources goto the ANY_HOST buffer
assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size() == 4);
assertEquals("ID3", requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).get(0).getContainerId());
}
@Test
public void testAllocatorReleasesExtraContainers() throws Exception {
final SamzaResource resource0 = new SamzaResource(1, 1024, "abc", "id1");
final SamzaResource resource1 = new SamzaResource(1, 1024, "abc", "id2");
final SamzaResource resource2 = new SamzaResource(1, 1024, "def", "id3");
Runnable releasedAssertions = new Runnable() {
@Override
public void run() {
assertEquals(2, clusterResourceManager.releasedResources.size());
assertTrue(clusterResourceManager.releasedResources.contains(resource1));
assertTrue(clusterResourceManager.releasedResources.contains(resource2));
// Test that state is cleaned up
assertEquals(0, requestState.numPendingRequests());
assertEquals(0, requestState.getHostRequestCounts().size());
assertNull(requestState.getResourcesOnAHost("abc"));
assertNull(requestState.getResourcesOnAHost("def"));
}
};
// Set up our final asserts before starting the allocator thread
MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, null, releasedAssertions,
null, null);
requestState.registerContainerListener(listener);
allocatorThread.start();
containerAllocator.requestResource("0", "abc");
containerAllocator.addResource(resource0);
containerAllocator.addResource(resource1);
containerAllocator.addResource(resource2);
listener.verify();
}
@Test
public void testRequestContainers() throws Exception {
Map<String, String> containersToHostMapping = new HashMap<String, String>() {
{
put("0", "abc");
put("1", "def");
put("2", null);
put("3", "abc");
}
};
containerAllocator.requestResources(containersToHostMapping);
assertNotNull(clusterResourceManager.resourceRequests);
assertEquals(clusterResourceManager.resourceRequests.size(), 4);
assertEquals(requestState.numPendingRequests(), 4);
Map<String, AtomicInteger> requestsMap = requestState.getHostRequestCounts();
assertNotNull(requestsMap.get("abc"));
assertEquals(2, requestsMap.get("abc").get());
assertNotNull(requestsMap.get("def"));
assertEquals(1, requestsMap.get("def").get());
assertNotNull(requestsMap.get(ResourceRequestState.ANY_HOST));
assertEquals(1, requestsMap.get(ResourceRequestState.ANY_HOST).get());
}
@Test
public void testDelayedRequestedContainers() {
containerAllocator.requestResource("0", "abc");
containerAllocator.requestResourceWithDelay("0", "efg", Duration.ofHours(2));
containerAllocator.requestResourceWithDelay("0", "hij", Duration.ofHours(3));
containerAllocator.requestResourceWithDelay("0", "klm", Duration.ofHours(4));
assertNotNull(clusterResourceManager.resourceRequests);
assertEquals(clusterResourceManager.resourceRequests.size(), 1);
assertEquals(requestState.numPendingRequests(), 1);
assertEquals(requestState.numDelayedRequests(), 3);
}
/**
* Handles expired requests correctly and assigns ANY_HOST
*/
@Test
public void testExpiredRequestAreAssignedToAnyHost() throws Exception {
final SamzaResource resource0 = new SamzaResource(1, 1000, "xyz", "id1");
final SamzaResource resource1 = new SamzaResource(1, 1000, "zzz", "id2");
Map<String, String> containersToHostMapping = new HashMap<String, String>() {
{
put("0", "abc");
put("1", "def");
}
};
containerAllocator.requestResources(containersToHostMapping);
assertEquals(requestState.numPendingRequests(), 2);
assertNotNull(requestState.getHostRequestCounts());
assertNotNull(requestState.getHostRequestCounts().get("abc"));
assertTrue(requestState.getHostRequestCounts().get("abc").get() == 1);
assertNotNull(requestState.getHostRequestCounts().get("def"));
assertTrue(requestState.getHostRequestCounts().get("def").get() == 1);
Runnable addContainerAssertions = new Runnable() {
@Override
public void run() {
assertNull(requestState.getResourcesOnAHost("xyz"));
assertNull(requestState.getResourcesOnAHost("zzz"));
assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size() == 2);
}
};
Runnable assignContainerAssertions = new Runnable() {
@Override
public void run() {
assertEquals(requestState.numPendingRequests(), 0);
assertNotNull(requestState.getHostRequestCounts());
assertNotNull(requestState.getHostRequestCounts().get("abc"));
assertNotNull(requestState.getHostRequestCounts().get("def"));
}
};
Runnable runningContainerAssertions = new Runnable() {
@Override
public void run() {
assertTrue(clusterResourceManager.launchedResources.contains(resource0));
assertTrue(clusterResourceManager.launchedResources.contains(resource1));
}
};
MockContainerListener listener = new MockContainerListener(2, 0, 2, 2, addContainerAssertions, null, assignContainerAssertions, runningContainerAssertions);
requestState.registerContainerListener(listener);
((MockClusterResourceManager) clusterResourceManager).registerContainerListener(listener);
containerAllocator.addResource(resource0);
containerAllocator.addResource(resource1);
allocatorThread.start();
listener.verify();
}
@Test
public void testExpiredRequestsAreCancelled() throws Exception {
// request one container each on host-1 and host-2
containerAllocator.requestResources(ImmutableMap.of("0", "host-1", "1", "host-2"));
// assert that the requests made it to YARN
Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 2);
// allocate one resource from YARN on a different host (host-3)
SamzaResource resource0 = new SamzaResource(1, 1000, "host-3", "id1");
containerAllocator.addResource(resource0);
// let the matching begin
allocatorThread.start();
// verify that a container is launched on host-3 after the request expires
if (!clusterResourceManager.awaitContainerLaunch(1, 20, TimeUnit.SECONDS)) {
Assert.fail("Timed out waiting container launch");
}
Assert.assertEquals(1, clusterResourceManager.launchedResources.size());
Assert.assertEquals(clusterResourceManager.launchedResources.get(0).getHost(), "host-3");
Assert.assertEquals(clusterResourceManager.launchedResources.get(0).getContainerId(), "id1");
// Now, there are no more resources left to run the 2nd container. Verify that we eventually issue another request
if (!clusterResourceManager.awaitResourceRequests(4, 20, TimeUnit.SECONDS)) {
Assert.fail("Timed out waiting for resource requests");
}
// verify that we have cancelled previous requests and there's one outstanding request
Assert.assertEquals(clusterResourceManager.cancelledRequests.size(), 3);
}
@Test
public void testRequestAllocationOnPreferredHostWithRunStreamProcessor() throws Exception {
ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
ClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
ContainerManager containerManager =
new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class));
// Mock the callback from ClusterManager to add resources to the allocator
doAnswer((InvocationOnMock invocation) -> {
SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
spyAllocator.addResource(resource);
return null;
}).when(mockCPM).onResourcesAvailable(anyList());
spyAllocator = Mockito.spy(
new ContainerAllocator(mockClusterResourceManager, config, state, true, containerManager));
// Request Resources
spyAllocator.requestResources(new HashMap<String, String>() {
{
put("0", "abc");
put("1", "xyz");
}
});
spyAllocatorThread = new Thread(spyAllocator);
// Start the container allocator thread periodic assignment
spyAllocatorThread.start();
// Let Allocator thread periodically fulfill requests
Thread.sleep(100);
// Verify that all the request that were created were preferred host requests
ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
verify(spyAllocator, times(2)).runStreamProcessor(resourceRequestCaptor.capture(), anyString());
resourceRequestCaptor.getAllValues()
.forEach(resourceRequest -> assertNotEquals(resourceRequest.getPreferredHost(), ResourceRequestState.ANY_HOST));
Set<String> hostNames = resourceRequestCaptor.getAllValues().stream().map(request -> request.getPreferredHost()).collect(
Collectors.toSet());
assertTrue(hostNames.contains("abc"));
assertTrue(hostNames.contains("xyz"));
// No any host requests should be made if preferred host is satisfied
assertTrue(state.anyHostRequests.get() == 0);
// State check when host affinity is enabled
assertTrue(state.matchedResourceRequests.get() == 2);
assertTrue(state.preferredHostRequests.get() == 2);
containerAllocator.stop();
}
@Test
public void testExpiredRequestAllocationOnAnyHost() throws Exception {
MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
ContainerManager spyContainerManager =
spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false, mock(LocalityManager.class)));
spyAllocator = Mockito.spy(
new ContainerAllocator(spyManager, config, state, true, spyContainerManager));
// Request Preferred Resources
spyAllocator.requestResources(new HashMap<String, String>() {
{
put("0", "hostname-0");
put("1", "hostname-1");
}
});
spyAllocatorThread = new Thread(spyAllocator);
// Start the container allocator thread periodic assignment
spyAllocatorThread.start();
// Let the request expire, expiration timeout is 3 ms
Thread.sleep(100);
// Verify that all the request that were created as preferred host requests expired
assertTrue(state.preferredHostRequests.get() == 2);
assertTrue(state.expiredPreferredHostRequests.get() == 2);
verify(spyContainerManager, times(1)).handleExpiredRequest(eq("0"), eq("hostname-0"),
any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
verify(spyContainerManager, times(1)).handleExpiredRequest(eq("1"), eq("hostname-1"),
any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
// Verify that preferred host request were cancelled and since no surplus resources were available
// requestResource was invoked with ANY_HOST requests
ArgumentCaptor<SamzaResourceRequest> cancelledRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
// At least 2 preferred host requests were cancelled
verify(spyManager, atLeast(2)).cancelResourceRequest(cancelledRequestCaptor.capture());
assertTrue(cancelledRequestCaptor.getAllValues().stream().map(resourceRequest -> resourceRequest.getPreferredHost()).collect(
Collectors.toSet()).size() > 2);
// Check that atleast 2 ANY_HOST requests were made
assertTrue(state.matchedResourceRequests.get() == 0);
assertTrue(state.anyHostRequests.get() > 2);
containerAllocator.stop();
}
@Test
public void testExpiredRequestAllocationOnSurplusAnyHostWithRunStreamProcessor() throws Exception {
// Add Extra Resources
MockClusterResourceManager spyClusterResourceManager = spy(new MockClusterResourceManager(callback, state));
ContainerManager spyContainerManager =
spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false, mock(LocalityManager.class)));
spyAllocator = Mockito.spy(
new ContainerAllocator(spyClusterResourceManager, config, state, true, spyContainerManager));
spyAllocator.addResource(new SamzaResource(1, 1000, "xyz", "id1"));
spyAllocator.addResource(new SamzaResource(1, 1000, "zzz", "id2"));
// Request Preferred Resources
spyAllocator.requestResources(new HashMap<String, String>() {
{
put("0", "hostname-0");
put("1", "hostname-1");
}
});
spyAllocatorThread = new Thread(spyAllocator);
// Start the container allocator thread periodic assignment
spyAllocatorThread.start();
// Let the request expire, expiration timeout is 3 ms
Thread.sleep(100);
// Verify that all the request that were created as preferred host requests expired
assertEquals(state.expiredPreferredHostRequests.get(), 2);
verify(spyContainerManager, times(1)).handleExpiredRequest(eq("0"), eq("hostname-0"),
any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
verify(spyContainerManager, times(1)).handleExpiredRequest(eq("1"), eq("hostname-1"),
any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
// Verify that runStreamProcessor was invoked with already available ANY_HOST requests
ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
ArgumentCaptor<String> hostCaptor = ArgumentCaptor.forClass(String.class);
verify(spyAllocator, times(2)).runStreamProcessor(resourceRequestCaptor.capture(), hostCaptor.capture());
// Resource request were preferred host requests
resourceRequestCaptor.getAllValues()
.forEach(resourceRequest -> assertNotEquals(resourceRequest.getPreferredHost(), ResourceRequestState.ANY_HOST));
// Since requests expired, allocator ran the requests on surplus available ANY_HOST
hostCaptor.getAllValues()
.forEach(host -> assertEquals(host, ResourceRequestState.ANY_HOST));
// State Update check
assertTrue(state.matchedResourceRequests.get() == 0);
assertTrue(state.preferredHostRequests.get() == 2);
assertTrue(state.anyHostRequests.get() == 0);
containerAllocator.stop();
}
@Test(timeout = 5000)
public void testExpiredAllocatedResourcesAreReleased() throws Exception {
ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
MockClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
ContainerManager spyContainerManager =
spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class)));
SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, "host-0", "id0",
System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
spyAllocator = Mockito.spy(
new ContainerAllocator(mockClusterResourceManager, config, state, true, spyContainerManager));
spyAllocator.addResource(expiredAllocatedResource);
spyAllocator.addResource(new SamzaResource(1, 1000, "host-1", "1d1"));
// Request Preferred Resources
spyAllocator.requestResources(new HashMap<String, String>() {
{
put("0", "host-0");
put("1", "host-1");
}
});
spyAllocatorThread = new Thread(spyAllocator);
// Start the container allocator thread periodic assignment
spyAllocatorThread.start();
// Wait until allocated resource is expired
while (state.preferredHostRequests.get() != 3) {
Thread.sleep(100);
}
// Verify that handleExpiredResource was invoked once for expired allocated resource
ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
ArgumentCaptor<SamzaResource> resourceArgumentCaptor = ArgumentCaptor.forClass(SamzaResource.class);
verify(spyContainerManager, times(1)).handleExpiredResource(resourceRequestCaptor.capture(),
resourceArgumentCaptor.capture(), eq("host-0"), any(), any());
resourceRequestCaptor.getAllValues()
.forEach(resourceRequest -> assertEquals(resourceRequest.getProcessorId(), "0"));
resourceArgumentCaptor.getAllValues()
.forEach(resource -> assertEquals(resource.getHost(), "host-0"));
// Verify resources were released
assertTrue(mockClusterResourceManager.containsReleasedResource(expiredAllocatedResource));
containerAllocator.stop();
}
//@Test
public void testExpiryWithNonResponsiveClusterManager() throws Exception {
final SamzaResource resource0 = new SamzaResource(1, 1000, "host-3", "id1");
final SamzaResource resource1 = new SamzaResource(1, 1000, "host-4", "id2");
Map<String, String> containersToHostMapping = ImmutableMap.of("0", "host-1", "1", "host-2");
Runnable addContainerAssertions = new Runnable() {
@Override
public void run() {
// verify that resources are buffered in the right queue. ie, only those resources on previously requested hosts
// in the preferred-host queue while other resources end up in the ANY_HOST queue
assertNull(requestState.getResourcesOnAHost("host-3"));
assertNull(requestState.getResourcesOnAHost("host-4"));
assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
assertEquals(1, requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size());
}
};
Runnable assignContainerAssertions = new Runnable() {
// verify that we processed all requests
@Override
public void run() {
assertEquals(requestState.numPendingRequests(), 0);
}
};
Runnable runningContainerAssertions = new Runnable() {
// verify that the two containers were actually launched
@Override
public void run() {
assertTrue(clusterResourceManager.launchedResources.contains(resource0));
assertTrue(clusterResourceManager.launchedResources.contains(resource1));
}
};
MockContainerListener listener = new MockContainerListener(2, 0, 2, 2, addContainerAssertions, null, assignContainerAssertions, runningContainerAssertions);
requestState.registerContainerListener(listener);
clusterResourceManager.registerContainerListener(listener);
// request for resources - one each on host-1 and host-2
containerAllocator.requestResources(containersToHostMapping);
assertEquals(requestState.numPendingRequests(), 2);
assertNotNull(requestState.getHostRequestCounts());
assertNotNull(requestState.getHostRequestCounts().get("host-1"));
assertTrue(requestState.getHostRequestCounts().get("host-1").get() == 1);
assertNotNull(requestState.getHostRequestCounts().get("host-2"));
assertTrue(requestState.getHostRequestCounts().get("host-2").get() == 1);
// verify that no containers have been launched yet (since, YARN has not provided any resources)
assertEquals(0, clusterResourceManager.launchedResources.size());
// provide a resource on host-3
containerAllocator.addResource(resource0);
allocatorThread.start();
// verify that the first preferred host request should expire and container-0 should launch on host-3
if (!clusterResourceManager.awaitContainerLaunch(1, 20, TimeUnit.SECONDS)) {
Assert.fail("Timed out waiting for container-0 to launch");
}
// verify that the second preferred host request should expire and should trigger ANY_HOST requests
// wait for 4 requests to be made (2 preferred-host requests - one each on host-1 & host-2; 2 any-host requests)
if (!clusterResourceManager.awaitResourceRequests(4, 20, TimeUnit.SECONDS)) {
Assert.fail("Timed out waiting for resource requests");
}
// verify 2 preferred host requests should have been made for host-1 and host-2
Assert.assertEquals(2, state.preferredHostRequests.get());
// verify both of them should have expired.
Assert.assertEquals(2, state.expiredPreferredHostRequests.get());
// verify there were at-least 2 any-host requests
Assert.assertTrue(state.anyHostRequests.get() >= 2);
Assert.assertTrue(state.expiredAnyHostRequests.get() <= state.anyHostRequests.get());
// finally, provide a container from YARN after multiple requests
containerAllocator.addResource(resource1);
// verify all the test assertions
listener.verify();
}
@After
public void teardown() throws Exception {
jobModelManager.stop();
containerAllocator.stop();
}
private static Config getConfig() {
Config config = new MapConfig(new HashMap<String, String>() {
{
put("cluster-manager.container.count", "1");
put("cluster-manager.container.retry.count", "1");
put("cluster-manager.container.retry.window.ms", "1999999999");
put("cluster-manager.container.request.timeout.ms", "3");
put("cluster-manager.allocator.sleep.ms", "1");
put("cluster-manager.container.memory.mb", "512");
put("yarn.package.path", "/foo");
put("task.inputs", "test-system.test-stream");
put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
put("job.host-affinity.enabled", "true");
put("job.name", "test-job");
put("job.coordinator.system", "test-kafka");
}
});
Map<String, String> map = new HashMap<>();
map.putAll(config);
return new MapConfig(map);
}
}