blob: 187754745388e63b4332fe5fcd897353603a84c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.clustermanager;
import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Random;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
public class TestContainerRequestState {
private final MockClusterResourceManagerCallback callback = new MockClusterResourceManagerCallback();
private final MockClusterResourceManager manager = new MockClusterResourceManager(callback, new SamzaApplicationState(null));
private static final String ANY_HOST = ResourceRequestState.ANY_HOST;
/**
* Test state after a request is submitted
*/
@Test
public void testUpdateRequestState() {
// Host-affinity is enabled
ResourceRequestState state = new ResourceRequestState(true, manager);
SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", "0");
state.addResourceRequest(request);
assertNotNull(manager.resourceRequests);
assertEquals(1, manager.resourceRequests.size());
assertNotNull(state.numPendingRequests() == 1);
assertNotNull(state.getHostRequestCounts());
assertNotNull(state.getHostRequestCounts().get("abc"));
assertEquals(1, state.getHostRequestCounts().get("abc").get());
assertNotNull(state.getResourcesOnAHost("abc"));
assertEquals(0, state.getResourcesOnAHost("abc").size());
// Host-affinity is not enabled
ResourceRequestState state1 = new ResourceRequestState(false, manager);
SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, null, "1");
state1.addResourceRequest(request1);
assertNotNull(manager.resourceRequests);
assertEquals(2, manager.resourceRequests.size());
assertTrue(state1.numPendingRequests() == 1);
assertNotNull(state1.getHostRequestCounts());
assertNull(state1.getHostRequestCounts().get(ANY_HOST));
}
/**
* Test addContainer() updates the state correctly
*/
@Test
public void testAddContainer() {
// Add container to ANY_LIST when host-affinity is not enabled
ResourceRequestState state = new ResourceRequestState(false, manager);
SamzaResource resource = new SamzaResource(1, 1024, "abc", "id1");
state.addResource(resource);
assertNotNull(state.getHostRequestCounts());
assertNotNull(state.getResourcesOnAHost(ANY_HOST));
assertEquals(1, state.getResourcesOnAHost(ANY_HOST).size());
assertEquals(resource, state.getResourcesOnAHost(ANY_HOST).get(0));
// Container Allocated when there is no request in queue
ResourceRequestState state1 = spy(new ResourceRequestState(true, manager));
SamzaResource container1 = new SamzaResource(1, 1024, "zzz", "id2");
state1.addResource(container1);
assertEquals(0, state1.numPendingRequests());
assertNull(state1.getResourcesOnAHost("zzz"));
assertNotNull(state1.getResourcesOnAHost(ANY_HOST));
assertEquals(1, state1.getResourcesOnAHost(ANY_HOST).size());
assertEquals(container1, state1.getResourcesOnAHost(ANY_HOST).get(0));
// Container Allocated on a Requested Host
state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "abc", "0"));
// Delayed request
state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "def", "1",
Instant.now().plus(Duration.ofHours(1))));
state1.addResourceRequest(new SamzaResourceRequest(1, 1024, "ghi", "2",
Instant.now().plus(Duration.ofHours(2))));
assertEquals(1, state1.numPendingRequests());
assertEquals(2, state1.numDelayedRequests());
// Verify request sent only once for the non-delayed request
verify(state1).sendResourceRequest(any(SamzaResourceRequest.class));
assertNotNull(state1.getHostRequestCounts());
assertNotNull(state1.getHostRequestCounts().get("abc"));
assertEquals(1, state1.getHostRequestCounts().get("abc").get());
state1.addResource(resource);
assertNotNull(state1.getResourcesOnAHost("abc"));
assertEquals(1, state1.getResourcesOnAHost("abc").size());
assertEquals(resource, state1.getResourcesOnAHost("abc").get(0));
// Container Allocated on host that was not requested
SamzaResource container2 = new SamzaResource(1, 1024, "xyz", "id2");
state1.addResource(container2);
assertNull(state1.getResourcesOnAHost("xyz"));
assertNotNull(state1.getResourcesOnAHost(ANY_HOST));
assertEquals(2, state1.getResourcesOnAHost(ANY_HOST).size());
assertEquals(container2, state1.getResourcesOnAHost(ANY_HOST).get(1));
// Extra containers were allocated on a host that was requested
SamzaResource container3 = new SamzaResource(1, 1024, "abc", "id3");
state1.addResource(container3);
assertEquals(3, state1.getResourcesOnAHost(ANY_HOST).size());
assertEquals(container3, state1.getResourcesOnAHost(ANY_HOST).get(2));
}
/**
* Test request state after container is assigned to a host
* * Assigned on requested host
* * Assigned on any host
*/
@Test
public void testContainerAssignment() throws Exception {
// Host-affinity enabled
ResourceRequestState state = new ResourceRequestState(true, manager);
SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", "0");
SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, "def", "0");
state.addResourceRequest(request);
state.addResourceRequest(request1);
SamzaResource container = new SamzaResource(1, 1024, "abc", "id0");
SamzaResource container1 = new SamzaResource(1, 1024, "zzz", "id1");
state.addResource(container);
state.addResource(container1);
assertEquals(2, state.numPendingRequests());
assertEquals(2, state.getHostRequestCounts().size());
assertNotNull(state.getResourcesOnAHost("abc"));
assertEquals(1, state.getResourcesOnAHost("abc").size());
assertEquals(container, state.getResourcesOnAHost("abc").get(0));
assertNotNull(state.getResourcesOnAHost("def"));
assertEquals(0, state.getResourcesOnAHost("def").size());
assertNotNull(state.getResourcesOnAHost(ANY_HOST));
assertEquals(1, state.getResourcesOnAHost(ANY_HOST).size());
assertEquals(container1, state.getResourcesOnAHost(ANY_HOST).get(0));
// Container assigned on the requested host
state.updateStateAfterAssignment(request, "abc", container);
assertEquals(request1, state.peekPendingRequest());
assertNotNull(state.getHostRequestCounts().get("abc"));
assertEquals(0, state.getHostRequestCounts().get("abc").get());
assertNotNull(state.getResourcesOnAHost("abc"));
assertEquals(0, state.getResourcesOnAHost("abc").size());
// Container assigned on any host
state.updateStateAfterAssignment(request1, ANY_HOST, container1);
assertEquals(0, state.numPendingRequests());
assertNotNull(state.getHostRequestCounts().get("def"));
assertEquals(0, state.getHostRequestCounts().get("def").get());
assertNotNull(state.getResourcesOnAHost(ANY_HOST));
assertEquals(0, state.getResourcesOnAHost(ANY_HOST).size());
}
@Test
public void testReleaseResource() {
// Host-affinity is enabled
ResourceRequestState state = new ResourceRequestState(true, manager);
SamzaResourceRequest request = new SamzaResourceRequest(1, 1024, "abc", "0");
SamzaResourceRequest request1 = new SamzaResourceRequest(1, 1024, "def", "0");
state.addResourceRequest(request);
state.addResourceRequest(request1);
SamzaResource container = new SamzaResource(1, 1024, "abc", "id0");
SamzaResource container1 = new SamzaResource(1, 1024, ANY_HOST, "id1");
state.addResource(container);
state.addResource(container1);
state.releaseResource("id0");
assertEquals(0, state.getResourcesOnAHost("abc").size());
assertEquals(1, state.getResourcesOnAHost(ANY_HOST).size());
state.releaseResource("id1");
assertEquals(0, state.getResourcesOnAHost("abc").size());
assertEquals(0, state.getResourcesOnAHost(ANY_HOST).size());
}
@Test
public void testPriorityQueueOrdering() {
PriorityQueue<SamzaResourceRequest> pq = new PriorityQueue<>();
Instant now = Instant.now();
ImmutableList<SamzaResourceRequest> expectedOrder = ImmutableList.of(
createRequestForActive(now.minusSeconds(120)),
createRequestForActive(now),
createRequestForActive(now.plusSeconds(120)),
createRequestForActive(now.plusSeconds(240)),
createRequestForStandby(now.minusSeconds(120)),
createRequestForStandby(now),
createRequestForStandby(now.plusSeconds(120)),
createRequestForStandby(now.plusSeconds(240)));
SamzaResourceRequest[] copyExpectedOrder = new SamzaResourceRequest[expectedOrder.size()];
copyExpectedOrder = expectedOrder.toArray(copyExpectedOrder);
List<SamzaResourceRequest> shuffled = Arrays.asList(copyExpectedOrder);
Collections.shuffle(shuffled, new Random(Instant.now().toEpochMilli()));
pq.addAll(shuffled);
ArrayList priorityQueueOrder = new ArrayList();
for (int i = 0; i < expectedOrder.size(); ++i) {
priorityQueueOrder.add(pq.poll());
}
assertEquals(expectedOrder, priorityQueueOrder);
}
@Test
public void testDelayedQueueOrdering() {
ResourceRequestState.DelayedRequestQueue delayedRequestQueue = new ResourceRequestState.DelayedRequestQueue();
Instant now = Instant.now();
// Expected priority by request timestamp only, regardless of active or standby
ImmutableList<SamzaResourceRequest> expectedOrder = ImmutableList.of(
createRequestForActive(now),
createRequestForStandby(now.plusSeconds(60)),
createRequestForActive(now.plusSeconds(120)),
createRequestForStandby(now.plusSeconds(121)),
createRequestForActive(now.plusSeconds(240)),
createRequestForStandby(now.plusSeconds(241)));
SamzaResourceRequest[] copyExpectedOrder = new SamzaResourceRequest[expectedOrder.size()];
copyExpectedOrder = expectedOrder.toArray(copyExpectedOrder);
List<SamzaResourceRequest> shuffled = Arrays.asList(copyExpectedOrder);
Collections.shuffle(shuffled, new Random(Instant.now().toEpochMilli()));
delayedRequestQueue.addAll(shuffled);
ArrayList priorityQueueOrder = new ArrayList();
for (int i = 0; i < expectedOrder.size(); ++i) {
priorityQueueOrder.add(delayedRequestQueue.poll());
}
assertEquals(expectedOrder, priorityQueueOrder);
}
SamzaResourceRequest createRequestForActive(Instant requestTime) {
String randomHost = RandomStringUtils.randomAlphanumeric(4);
String randomId = RandomStringUtils.randomAlphanumeric(8);
return new SamzaResourceRequest(1, 1, randomHost, randomId, requestTime);
}
SamzaResourceRequest createRequestForStandby(Instant requestTime) {
String randomHost = RandomStringUtils.randomAlphanumeric(4);
String randomId = RandomStringUtils.randomAlphanumeric(8) + "-standby"; // hyphen in ID denotes a standby processor
return new SamzaResourceRequest(1, 1, randomHost, randomId, requestTime);
}
}