/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.samza.clustermanager;

import com.google.common.collect.ImmutableMap;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.samza.clustermanager.container.placement.ContainerPlacementMetadataStore;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.LocalityManager;
import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.JobModelManagerTestUtil;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
import org.apache.samza.job.model.ProcessorLocality;
import org.apache.samza.job.model.LocalityModel;
import org.apache.samza.testUtils.MockHttpServer;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;


@RunWith(MockitoJUnitRunner.class)
public class TestContainerAllocatorWithHostAffinity {

  private final Config config = getConfig();
  private final JobModelManager jobModelManager = initializeJobModelManager(getConfig(), 1);
  private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
  private final SamzaApplicationState state = new SamzaApplicationState(jobModelManager);

  private final MockClusterResourceManager clusterResourceManager = new MockClusterResourceManager(callback, state);
  private ContainerPlacementMetadataStore containerPlacementMetadataStore;
  private ContainerManager containerManager;

  private JobModelManager initializeJobModelManager(Config config, int containerCount) {
    return JobModelManagerTestUtil.getJobModelManager(config, containerCount,
        new MockHttpServer("/", 7777, null, new ServletHolder(DefaultServlet.class)));
  }

  private ContainerAllocator containerAllocator;
  private ContainerAllocator spyAllocator;
  private final int timeoutMillis = 1000;
  private MockContainerRequestState requestState;
  private Thread allocatorThread;
  private Thread spyAllocatorThread;

  @Before
  public void setup() throws Exception {
    LocalityManager mockLocalityManager = mock(LocalityManager.class);
    when(mockLocalityManager.readLocality())
        .thenReturn(new LocalityModel(ImmutableMap.of("0", new ProcessorLocality("0", "abc"))));
    CoordinatorStreamStoreTestUtil coordinatorStreamStoreTestUtil = new CoordinatorStreamStoreTestUtil(config);
    CoordinatorStreamStore coordinatorStreamStore = coordinatorStreamStoreTestUtil.getCoordinatorStreamStore();
    coordinatorStreamStore.init();
    containerPlacementMetadataStore = new ContainerPlacementMetadataStore(coordinatorStreamStore);
    containerPlacementMetadataStore.start();
    containerManager = new ContainerManager(containerPlacementMetadataStore, state, clusterResourceManager, true, false, mockLocalityManager);
    containerAllocator =
        new ContainerAllocator(clusterResourceManager, config, state, true, containerManager);
    requestState = new MockContainerRequestState(clusterResourceManager, true);
    Field requestStateField = containerAllocator.getClass().getDeclaredField("resourceRequestState");
    requestStateField.setAccessible(true);
    requestStateField.set(containerAllocator, requestState);
    allocatorThread = new Thread(containerAllocator);
  }

  /**
   * Test request containers with no containerToHostMapping makes the right number of requests
   */
  @Test
  public void testRequestContainersWithNoMapping() throws Exception {
    int containerCount = 4;
    Map<String, String> containersToHostMapping = new HashMap<String, String>();
    for (int i = 0; i < containerCount; i++) {
      containersToHostMapping.put(String.valueOf(i), null);
    }

    allocatorThread.start();

    containerAllocator.requestResources(containersToHostMapping);

    assertNotNull(requestState);

    assertEquals(4, requestState.numPendingRequests());

    assertNotNull(requestState.getHostRequestCounts());
    assertEquals(1, requestState.getHostRequestCounts().keySet().size());
    assertTrue(requestState.getHostRequestCounts().keySet().contains(ResourceRequestState.ANY_HOST));
  }

  /**
   * Add containers to the correct host in the request state
   */
  @Test
  public void testAddContainerWithHostAffinity() throws Exception {
    containerAllocator.requestResources(new HashMap<String, String>() {
      {
        put("0", "abc");
        put("1", "xyz");
      }
    });

    assertNotNull(requestState.getResourcesOnAHost("abc"));
    assertEquals(0, requestState.getResourcesOnAHost("abc").size());

    assertNotNull(requestState.getResourcesOnAHost("xyz"));
    assertEquals(0, requestState.getResourcesOnAHost("xyz").size());

    assertNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));

    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID1"));
    containerAllocator.addResource(new SamzaResource(1, 10, "def", "ID2"));
    containerAllocator.addResource(new SamzaResource(1, 10, "xyz", "ID3"));


    assertNotNull(requestState.getResourcesOnAHost("abc"));
    assertEquals(1, requestState.getResourcesOnAHost("abc").size());

    assertNotNull(requestState.getResourcesOnAHost("xyz"));
    assertEquals(1, requestState.getResourcesOnAHost("xyz").size());

    assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
    assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size() == 1);
    assertEquals("ID2", requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).get(0).getContainerId());
  }

  /**
   * Test that extra resources are buffered under ANY_HOST
   */
  @Test
  public void testSurplusResourcesAreBufferedUnderAnyHost() throws Exception {
    containerAllocator.requestResources(new HashMap<String, String>() {
      {
        put("0", "abc");
        put("1", "xyz");
      }
    });

    assertNotNull(requestState.getResourcesOnAHost("abc"));
    assertEquals(0, requestState.getResourcesOnAHost("abc").size());

    assertNotNull(requestState.getResourcesOnAHost("xyz"));
    assertEquals(0, requestState.getResourcesOnAHost("xyz").size());

    assertNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));

    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID1"));
    containerAllocator.addResource(new SamzaResource(1, 10, "xyz", "ID2"));
    // surplus resources for host - "abc"
    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID3"));
    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID4"));
    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID5"));
    containerAllocator.addResource(new SamzaResource(1, 10, "abc", "ID6"));

    assertNotNull(requestState.getResourcesOnAHost("abc"));
    assertEquals(1, requestState.getResourcesOnAHost("abc").size());

    assertNotNull(requestState.getResourcesOnAHost("xyz"));
    assertEquals(1, requestState.getResourcesOnAHost("xyz").size());

    assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
    // assert that the surplus resources goto the ANY_HOST buffer
    assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size() == 4);
    assertEquals("ID3", requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).get(0).getContainerId());
  }

  @Test
  public void testAllocatorReleasesExtraContainers() throws Exception {
    final SamzaResource resource0 = new SamzaResource(1, 1024, "abc", "id1");
    final SamzaResource resource1 = new SamzaResource(1, 1024, "abc", "id2");
    final SamzaResource resource2 = new SamzaResource(1, 1024, "def", "id3");

    Runnable releasedAssertions = new Runnable() {
      @Override
      public void run() {
        assertEquals(2, clusterResourceManager.releasedResources.size());
        assertTrue(clusterResourceManager.releasedResources.contains(resource1));
        assertTrue(clusterResourceManager.releasedResources.contains(resource2));

        // Test that state is cleaned up
        assertEquals(0, requestState.numPendingRequests());
        assertEquals(0, requestState.getHostRequestCounts().size());
        assertNull(requestState.getResourcesOnAHost("abc"));
        assertNull(requestState.getResourcesOnAHost("def"));
      }
    };
    // Set up our final asserts before starting the allocator thread
    MockContainerListener listener = new MockContainerListener(3, 2, 0, 0, null, releasedAssertions,
        null, null);
    requestState.registerContainerListener(listener);

    allocatorThread.start();

    containerAllocator.requestResource("0", "abc");

    containerAllocator.addResource(resource0);
    containerAllocator.addResource(resource1);
    containerAllocator.addResource(resource2);

    listener.verify();
  }

  @Test
  public void testRequestContainers() throws Exception {
    Map<String, String> containersToHostMapping = new HashMap<String, String>() {
      {
        put("0", "abc");
        put("1", "def");
        put("2", null);
        put("3", "abc");
      }
    };

    containerAllocator.requestResources(containersToHostMapping);

    assertNotNull(clusterResourceManager.resourceRequests);
    assertEquals(clusterResourceManager.resourceRequests.size(), 4);
    assertEquals(requestState.numPendingRequests(), 4);

    Map<String, AtomicInteger> requestsMap = requestState.getHostRequestCounts();
    assertNotNull(requestsMap.get("abc"));
    assertEquals(2, requestsMap.get("abc").get());

    assertNotNull(requestsMap.get("def"));
    assertEquals(1, requestsMap.get("def").get());

    assertNotNull(requestsMap.get(ResourceRequestState.ANY_HOST));
    assertEquals(1, requestsMap.get(ResourceRequestState.ANY_HOST).get());
  }

  @Test
  public void testDelayedRequestedContainers() {
    containerAllocator.requestResource("0", "abc");
    containerAllocator.requestResourceWithDelay("0", "efg", Duration.ofHours(2));
    containerAllocator.requestResourceWithDelay("0", "hij", Duration.ofHours(3));
    containerAllocator.requestResourceWithDelay("0", "klm", Duration.ofHours(4));

    assertNotNull(clusterResourceManager.resourceRequests);
    assertEquals(clusterResourceManager.resourceRequests.size(), 1);
    assertEquals(requestState.numPendingRequests(), 1);
    assertEquals(requestState.numDelayedRequests(), 3);
  }

  /**
   * Handles expired requests correctly and assigns ANY_HOST
   */
  @Test
  public void testExpiredRequestAreAssignedToAnyHost() throws Exception {
    final SamzaResource resource0 = new SamzaResource(1, 1000, "xyz", "id1");
    final SamzaResource resource1 = new SamzaResource(1, 1000, "zzz", "id2");

    Map<String, String> containersToHostMapping = new HashMap<String, String>() {
      {
        put("0", "abc");
        put("1", "def");
      }
    };
    containerAllocator.requestResources(containersToHostMapping);
    assertEquals(requestState.numPendingRequests(), 2);
    assertNotNull(requestState.getHostRequestCounts());
    assertNotNull(requestState.getHostRequestCounts().get("abc"));
    assertTrue(requestState.getHostRequestCounts().get("abc").get() == 1);

    assertNotNull(requestState.getHostRequestCounts().get("def"));
    assertTrue(requestState.getHostRequestCounts().get("def").get() == 1);

    Runnable addContainerAssertions = new Runnable() {
      @Override
      public void run() {
        assertNull(requestState.getResourcesOnAHost("xyz"));
        assertNull(requestState.getResourcesOnAHost("zzz"));
        assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
        assertTrue(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size() == 2);
      }
    };

    Runnable assignContainerAssertions = new Runnable() {
      @Override
      public void run() {
        assertEquals(requestState.numPendingRequests(), 0);
        assertNotNull(requestState.getHostRequestCounts());
        assertNotNull(requestState.getHostRequestCounts().get("abc"));
        assertNotNull(requestState.getHostRequestCounts().get("def"));
      }
    };

    Runnable runningContainerAssertions = new Runnable() {
      @Override
      public void run() {
        assertTrue(clusterResourceManager.launchedResources.contains(resource0));
        assertTrue(clusterResourceManager.launchedResources.contains(resource1));
      }
    };
    MockContainerListener listener = new MockContainerListener(2, 0, 2, 2, addContainerAssertions, null, assignContainerAssertions, runningContainerAssertions);
    requestState.registerContainerListener(listener);
    ((MockClusterResourceManager) clusterResourceManager).registerContainerListener(listener);
    containerAllocator.addResource(resource0);
    containerAllocator.addResource(resource1);
    allocatorThread.start();

    listener.verify();
  }

  @Test
  public void testExpiredRequestsAreCancelled() throws Exception {
    // request one container each on host-1 and host-2
    containerAllocator.requestResources(ImmutableMap.of("0", "host-1", "1", "host-2"));
    // assert that the requests made it to YARN
    Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 2);
    // allocate one resource from YARN on a different host (host-3)
    SamzaResource resource0 = new SamzaResource(1, 1000, "host-3", "id1");
    containerAllocator.addResource(resource0);
    // let the matching begin
    allocatorThread.start();

    // verify that a container is launched on host-3 after the request expires
    if (!clusterResourceManager.awaitContainerLaunch(1, 20, TimeUnit.SECONDS)) {
      Assert.fail("Timed out waiting container launch");
    }
    Assert.assertEquals(1, clusterResourceManager.launchedResources.size());
    Assert.assertEquals(clusterResourceManager.launchedResources.get(0).getHost(), "host-3");
    Assert.assertEquals(clusterResourceManager.launchedResources.get(0).getContainerId(), "id1");

    // Now, there are no more resources left to run the 2nd container. Verify that we eventually issue another request
    if (!clusterResourceManager.awaitResourceRequests(4, 20, TimeUnit.SECONDS)) {
      Assert.fail("Timed out waiting for resource requests");
    }
    // verify that we have cancelled previous requests and there's one outstanding request
    Assert.assertEquals(clusterResourceManager.cancelledRequests.size(), 3);
  }

  @Test
  public void testRequestAllocationOnPreferredHostWithRunStreamProcessor() throws Exception {
    ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
    ClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
    ContainerManager containerManager =
        new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class));
    // Mock the callback from ClusterManager to add resources to the allocator
    doAnswer((InvocationOnMock invocation) -> {
      SamzaResource resource = (SamzaResource) invocation.getArgumentAt(0, List.class).get(0);
      spyAllocator.addResource(resource);
      return null;
    }).when(mockCPM).onResourcesAvailable(anyList());

    spyAllocator = Mockito.spy(
        new ContainerAllocator(mockClusterResourceManager, config, state, true, containerManager));

    // Request Resources
    spyAllocator.requestResources(new HashMap<String, String>() {
      {
        put("0", "abc");
        put("1", "xyz");
      }
    });

    spyAllocatorThread = new Thread(spyAllocator);

    // Start the container allocator thread periodic assignment
    spyAllocatorThread.start();
    // Let Allocator thread periodically fulfill requests
    Thread.sleep(100);

    // Verify that all the request that were created were preferred host requests
    ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
    verify(spyAllocator, times(2)).runStreamProcessor(resourceRequestCaptor.capture(), anyString());
    resourceRequestCaptor.getAllValues()
        .forEach(resourceRequest -> assertNotEquals(resourceRequest.getPreferredHost(), ResourceRequestState.ANY_HOST));
    Set<String> hostNames = resourceRequestCaptor.getAllValues().stream().map(request -> request.getPreferredHost()).collect(
        Collectors.toSet());
    assertTrue(hostNames.contains("abc"));
    assertTrue(hostNames.contains("xyz"));
    // No any host requests should be made if preferred host is satisfied
    assertTrue(state.anyHostRequests.get() == 0);
    // State check when host affinity is enabled
    assertTrue(state.matchedResourceRequests.get() == 2);
    assertTrue(state.preferredHostRequests.get() == 2);
    containerAllocator.stop();
  }

  @Test
  public void testExpiredRequestAllocationOnAnyHost() throws Exception {
    MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
    ContainerManager spyContainerManager =
        spy(new ContainerManager(containerPlacementMetadataStore, state, spyManager, true, false, mock(LocalityManager.class)));
    spyAllocator = Mockito.spy(
        new ContainerAllocator(spyManager, config, state, true, spyContainerManager));
    // Request Preferred Resources
    spyAllocator.requestResources(new HashMap<String, String>() {
      {
        put("0", "hostname-0");
        put("1", "hostname-1");
      }
    });

    spyAllocatorThread = new Thread(spyAllocator);
    // Start the container allocator thread periodic assignment
    spyAllocatorThread.start();

    // Let the request expire, expiration timeout is 3 ms
    Thread.sleep(100);

    // Verify that all the request that were created as preferred host requests expired
    assertTrue(state.preferredHostRequests.get() == 2);
    assertTrue(state.expiredPreferredHostRequests.get() == 2);
    verify(spyContainerManager, times(1)).handleExpiredRequest(eq("0"), eq("hostname-0"),
        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
    verify(spyContainerManager, times(1)).handleExpiredRequest(eq("1"), eq("hostname-1"),
        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));

    // Verify that preferred host request were cancelled and since no surplus resources were available
    // requestResource was invoked with ANY_HOST requests
    ArgumentCaptor<SamzaResourceRequest> cancelledRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
    // At least 2 preferred host requests were cancelled
    verify(spyManager, atLeast(2)).cancelResourceRequest(cancelledRequestCaptor.capture());
    assertTrue(cancelledRequestCaptor.getAllValues().stream().map(resourceRequest -> resourceRequest.getPreferredHost()).collect(
        Collectors.toSet()).size() > 2);
    // Check that atleast 2 ANY_HOST requests were made
    assertTrue(state.matchedResourceRequests.get() == 0);
    assertTrue(state.anyHostRequests.get() > 2);
    containerAllocator.stop();
  }

  @Test
  public void testExpiredRequestAllocationOnSurplusAnyHostWithRunStreamProcessor() throws Exception {
    // Add Extra Resources
    MockClusterResourceManager spyClusterResourceManager = spy(new MockClusterResourceManager(callback, state));
    ContainerManager spyContainerManager =
        spy(new ContainerManager(containerPlacementMetadataStore, state, spyClusterResourceManager, true, false, mock(LocalityManager.class)));

    spyAllocator = Mockito.spy(
        new ContainerAllocator(spyClusterResourceManager, config, state, true, spyContainerManager));
    spyAllocator.addResource(new SamzaResource(1, 1000, "xyz", "id1"));
    spyAllocator.addResource(new SamzaResource(1, 1000, "zzz", "id2"));

    // Request Preferred Resources
    spyAllocator.requestResources(new HashMap<String, String>() {
      {
        put("0", "hostname-0");
        put("1", "hostname-1");
      }
    });

    spyAllocatorThread = new Thread(spyAllocator);
    // Start the container allocator thread periodic assignment
    spyAllocatorThread.start();

    // Let the request expire, expiration timeout is 3 ms
    Thread.sleep(100);

    // Verify that all the request that were created as preferred host requests expired
    assertEquals(state.expiredPreferredHostRequests.get(), 2);
    verify(spyContainerManager, times(1)).handleExpiredRequest(eq("0"), eq("hostname-0"),
        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));
    verify(spyContainerManager, times(1)).handleExpiredRequest(eq("1"), eq("hostname-1"),
        any(SamzaResourceRequest.class), any(ContainerAllocator.class), any(ResourceRequestState.class));

    // Verify that runStreamProcessor was invoked with already available ANY_HOST requests
    ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
    ArgumentCaptor<String> hostCaptor = ArgumentCaptor.forClass(String.class);
    verify(spyAllocator, times(2)).runStreamProcessor(resourceRequestCaptor.capture(), hostCaptor.capture());
    // Resource request were preferred host requests
    resourceRequestCaptor.getAllValues()
        .forEach(resourceRequest -> assertNotEquals(resourceRequest.getPreferredHost(), ResourceRequestState.ANY_HOST));

    // Since requests expired, allocator ran the requests on surplus available ANY_HOST
    hostCaptor.getAllValues()
        .forEach(host -> assertEquals(host, ResourceRequestState.ANY_HOST));

    // State Update check
    assertTrue(state.matchedResourceRequests.get() == 0);
    assertTrue(state.preferredHostRequests.get() == 2);
    assertTrue(state.anyHostRequests.get() == 0);
    containerAllocator.stop();
  }

  @Test(timeout = 5000)
  public void testExpiredAllocatedResourcesAreReleased() throws Exception {
    ClusterResourceManager.Callback mockCPM = mock(MockClusterResourceManagerCallback.class);
    MockClusterResourceManager mockClusterResourceManager = new MockClusterResourceManager(mockCPM, state);
    ContainerManager spyContainerManager =
        spy(new ContainerManager(containerPlacementMetadataStore, state, mockClusterResourceManager, true, false, mock(LocalityManager.class)));

    SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, "host-0", "id0",
        System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
    spyAllocator = Mockito.spy(
        new ContainerAllocator(mockClusterResourceManager, config, state, true, spyContainerManager));
    spyAllocator.addResource(expiredAllocatedResource);
    spyAllocator.addResource(new SamzaResource(1, 1000, "host-1", "1d1"));

    // Request Preferred Resources
    spyAllocator.requestResources(new HashMap<String, String>() {
      {
        put("0", "host-0");
        put("1", "host-1");
      }
    });

    spyAllocatorThread = new Thread(spyAllocator);
    // Start the container allocator thread periodic assignment
    spyAllocatorThread.start();

    // Wait until allocated resource is expired
    while (state.preferredHostRequests.get() != 3) {
      Thread.sleep(100);
    }

    // Verify that handleExpiredResource was invoked once for expired allocated resource
    ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
    ArgumentCaptor<SamzaResource> resourceArgumentCaptor = ArgumentCaptor.forClass(SamzaResource.class);
    verify(spyContainerManager, times(1)).handleExpiredResource(resourceRequestCaptor.capture(),
        resourceArgumentCaptor.capture(), eq("host-0"), any(), any());
    resourceRequestCaptor.getAllValues()
        .forEach(resourceRequest -> assertEquals(resourceRequest.getProcessorId(), "0"));
    resourceArgumentCaptor.getAllValues()
        .forEach(resource -> assertEquals(resource.getHost(), "host-0"));
    // Verify resources were released
    assertTrue(mockClusterResourceManager.containsReleasedResource(expiredAllocatedResource));
    containerAllocator.stop();
  }

  //@Test
  public void testExpiryWithNonResponsiveClusterManager() throws Exception {

    final SamzaResource resource0 = new SamzaResource(1, 1000, "host-3", "id1");
    final SamzaResource resource1 = new SamzaResource(1, 1000, "host-4", "id2");

    Map<String, String> containersToHostMapping = ImmutableMap.of("0", "host-1", "1", "host-2");

    Runnable addContainerAssertions = new Runnable() {
      @Override
      public void run() {
        // verify that resources are buffered in the right queue. ie, only those resources on previously requested hosts
        // in the preferred-host queue while other resources end up in the ANY_HOST queue
        assertNull(requestState.getResourcesOnAHost("host-3"));
        assertNull(requestState.getResourcesOnAHost("host-4"));
        assertNotNull(requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST));
        assertEquals(1, requestState.getResourcesOnAHost(ResourceRequestState.ANY_HOST).size());
      }
    };

    Runnable assignContainerAssertions = new Runnable() {
      // verify that we processed all requests
      @Override
      public void run() {
        assertEquals(requestState.numPendingRequests(), 0);
      }
    };

    Runnable runningContainerAssertions = new Runnable() {
      // verify that the two containers were actually launched
      @Override
      public void run() {
        assertTrue(clusterResourceManager.launchedResources.contains(resource0));
        assertTrue(clusterResourceManager.launchedResources.contains(resource1));
      }
    };
    MockContainerListener listener = new MockContainerListener(2, 0, 2, 2, addContainerAssertions, null, assignContainerAssertions, runningContainerAssertions);
    requestState.registerContainerListener(listener);
    clusterResourceManager.registerContainerListener(listener);

    // request for resources - one each on host-1 and host-2
    containerAllocator.requestResources(containersToHostMapping);
    assertEquals(requestState.numPendingRequests(), 2);
    assertNotNull(requestState.getHostRequestCounts());
    assertNotNull(requestState.getHostRequestCounts().get("host-1"));
    assertTrue(requestState.getHostRequestCounts().get("host-1").get() == 1);
    assertNotNull(requestState.getHostRequestCounts().get("host-2"));
    assertTrue(requestState.getHostRequestCounts().get("host-2").get() == 1);

    // verify that no containers have been launched yet (since, YARN has not provided any resources)
    assertEquals(0, clusterResourceManager.launchedResources.size());

    // provide a resource on host-3
    containerAllocator.addResource(resource0);
    allocatorThread.start();
    // verify that the first preferred host request should expire and container-0 should launch on host-3
    if (!clusterResourceManager.awaitContainerLaunch(1, 20, TimeUnit.SECONDS)) {
      Assert.fail("Timed out waiting for container-0 to launch");
    }
    // verify that the second preferred host request should expire and should trigger ANY_HOST requests
    // wait for 4 requests to be made (2 preferred-host requests - one each on host-1 & host-2;  2 any-host requests)
    if (!clusterResourceManager.awaitResourceRequests(4, 20, TimeUnit.SECONDS)) {
      Assert.fail("Timed out waiting for resource requests");
    }
    // verify 2 preferred host requests should have been made for host-1 and host-2
    Assert.assertEquals(2, state.preferredHostRequests.get());
    // verify both of them should have expired.
    Assert.assertEquals(2, state.expiredPreferredHostRequests.get());
    // verify there were at-least 2 any-host requests
    Assert.assertTrue(state.anyHostRequests.get() >= 2);
    Assert.assertTrue(state.expiredAnyHostRequests.get() <= state.anyHostRequests.get());
    // finally, provide a container from YARN after multiple requests
    containerAllocator.addResource(resource1);
    // verify all the test assertions
    listener.verify();
  }

  @After
  public void teardown() throws Exception {
    jobModelManager.stop();
    containerAllocator.stop();
  }

  private static Config getConfig() {
    Config config = new MapConfig(new HashMap<String, String>() {
      {
        put("cluster-manager.container.count", "1");
        put("cluster-manager.container.retry.count", "1");
        put("cluster-manager.container.retry.window.ms", "1999999999");
        put("cluster-manager.container.request.timeout.ms", "3");
        put("cluster-manager.allocator.sleep.ms", "1");
        put("cluster-manager.container.memory.mb", "512");
        put("yarn.package.path", "/foo");
        put("task.inputs", "test-system.test-stream");
        put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
        put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
        put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
        put("job.host-affinity.enabled", "true");
        put("job.name", "test-job");
        put("job.coordinator.system", "test-kafka");
      }
    });

    Map<String, String> map = new HashMap<>();
    map.putAll(config);
    return new MapConfig(map);
  }

}
