blob: a451b8cacf643d0a803ee7d4c77bb8c10fd87720 [file] [log] [blame]
package org.apache.helix.task.assigner;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.HashMap;
import java.util.Map;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.TaskConstants;
import org.testng.Assert;
import org.testng.annotations.Test;
import org.testng.collections.Maps;
public class TestAssignableInstance extends AssignerTestBase {
@Test
public void testInvalidInitialization() {
try {
AssignableInstance ai = new AssignableInstance(null, null, null);
Assert.fail("Expecting IllegalArgumentException");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains("cannot be null"));
}
try {
ClusterConfig clusterConfig = new ClusterConfig("testCluster");
InstanceConfig instanceConfig = new InstanceConfig("instance");
LiveInstance liveInstance = new LiveInstance("another-instance");
AssignableInstance ai = new AssignableInstance(clusterConfig, instanceConfig, liveInstance);
Assert.fail("Expecting IllegalArgumentException");
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains("don't match"));
}
}
@Test
public void testInitializationWithQuotaUnset() {
int expectedCurrentTaskThreadPoolSize = 100;
LiveInstance liveInstance = createLiveInstance(null, null);
liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
// Initialize AssignableInstance with neither resource capacity nor quota ratio provided
AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, false),
new InstanceConfig(testInstanceName), liveInstance);
Assert.assertEquals(ai.getUsedCapacity().size(), 1);
Assert.assertEquals(
(int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
.get(AssignableInstance.DEFAULT_QUOTA_TYPE), 0);
Assert.assertEquals(
(int) ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
.get(AssignableInstance.DEFAULT_QUOTA_TYPE), expectedCurrentTaskThreadPoolSize);
Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
}
@Test
public void testInitializationWithOnlyCapacity() {
// Initialize AssignableInstance with only resource capacity provided
AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, false),
new InstanceConfig(testInstanceName),
createLiveInstance(testResourceTypes, testResourceCapacity));
Assert.assertEquals(ai.getTotalCapacity().size(), testResourceTypes.length);
Assert.assertEquals(ai.getUsedCapacity().size(), testResourceTypes.length);
for (int i = 0; i < testResourceTypes.length; i++) {
Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i]).size(), 1);
Assert.assertEquals(ai.getUsedCapacity().get(testResourceTypes[i]).size(), 1);
Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i])
.get(AssignableInstance.DEFAULT_QUOTA_TYPE), Integer.valueOf(testResourceCapacity[i]));
Assert.assertEquals(
ai.getUsedCapacity().get(testResourceTypes[i]).get(AssignableInstance.DEFAULT_QUOTA_TYPE),
Integer.valueOf(0));
}
}
@Test
public void testInitializationWithOnlyQuotaType() {
int expectedCurrentTaskThreadPoolSize = 100;
LiveInstance liveInstance = createLiveInstance(null, null);
liveInstance.setCurrentTaskThreadPoolSize(expectedCurrentTaskThreadPoolSize);
// Initialize AssignableInstance with only quota type provided
AssignableInstance ai =
new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
new InstanceConfig(testInstanceName), liveInstance);
Assert.assertEquals(ai.getTotalCapacity().size(), 1);
Assert.assertEquals(ai.getUsedCapacity().size(), 1);
Assert.assertEquals(
ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).size(),
testQuotaTypes.length);
Assert.assertEquals(
ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()).size(),
testQuotaTypes.length);
Assert.assertEquals(
ai.getTotalCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()),
calculateExpectedQuotaPerType(expectedCurrentTaskThreadPoolSize, testQuotaTypes,
testQuotaRatio));
Assert.assertEquals(ai.getCurrentAssignments().size(), 0);
}
@Test
public void testInitializationWithQuotaAndCapacity() {
// Initialize AssignableInstance with both capacity and quota type provided
AssignableInstance ai =
new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
new InstanceConfig(testInstanceName),
createLiveInstance(testResourceTypes, testResourceCapacity));
Map<String, Integer> usedResourcePerType =
createResourceQuotaPerTypeMap(testQuotaTypes, new int[] {
0, 0, 0
});
for (int i = 0; i < testResourceTypes.length; i++) {
Assert.assertEquals(ai.getTotalCapacity().get(testResourceTypes[i]),
calculateExpectedQuotaPerType(Integer.valueOf(testResourceCapacity[i]), testQuotaTypes,
testQuotaRatio));
Assert.assertEquals(ai.getUsedCapacity().get(testResourceTypes[i]), usedResourcePerType);
}
}
@Test
public void testAssignableInstanceUpdateConfigs() {
AssignableInstance ai =
new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
new InstanceConfig(testInstanceName),
createLiveInstance(testResourceTypes, testResourceCapacity));
String[] newResources = new String[] {
"Resource2", "Resource3", "Resource4"
};
String[] newResourceCapacities = new String[] {
"100", "150", "50"
};
String[] newTypes = new String[] {
"Type3", "Type4", "Type5", "Type6"
};
String[] newTypeRatio = new String[] {
"20", "40", "25", "25"
};
LiveInstance newLiveInstance = createLiveInstance(newResources, newResourceCapacities);
ClusterConfig newClusterConfig = createClusterConfig(newTypes, newTypeRatio, false);
ai.updateConfigs(newClusterConfig, null, newLiveInstance);
Assert.assertEquals(ai.getUsedCapacity().size(), newResourceCapacities.length);
Assert.assertEquals(ai.getTotalCapacity().size(), newResourceCapacities.length);
for (int i = 0; i < newResources.length; i++) {
Assert.assertEquals(ai.getTotalCapacity().get(newResources[i]), calculateExpectedQuotaPerType(
Integer.valueOf(newResourceCapacities[i]), newTypes, newTypeRatio));
Assert.assertEquals(ai.getUsedCapacity().get(newResources[i]),
createResourceQuotaPerTypeMap(newTypes, new int[] {
0, 0, 0, 0
}));
}
}
@Test
public void testNormalTryAssign() {
int testCurrentTaskThreadPoolSize = 100;
LiveInstance liveInstance = createLiveInstance(null, null);
liveInstance.setCurrentTaskThreadPoolSize(testCurrentTaskThreadPoolSize);
AssignableInstance ai = new AssignableInstance(createClusterConfig(null, null, true),
new InstanceConfig(testInstanceName), liveInstance);
// When nothing is configured, we should use default quota type to assign
Map<String, TaskAssignResult> results = new HashMap<>();
for (int i = 0; i < testCurrentTaskThreadPoolSize; i++) {
String taskId = Integer.toString(i);
TaskConfig task = new TaskConfig("", null, taskId, null);
TaskAssignResult result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
Assert.assertTrue(result.isSuccessful());
ai.assign(result);
results.put(taskId, result);
}
// We are out of quota now and we should not be able to assign
String taskId = "TaskCannotAssign";
TaskConfig task = new TaskConfig("", null, taskId, null);
TaskAssignResult result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
Assert.assertFalse(result.isSuccessful());
Assert.assertEquals(result.getFailureReason(),
TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
try {
ai.assign(result);
Assert.fail("Expecting IllegalStateException");
} catch (IllegalStateException e) {
// OK
}
// After releasing 1 task, we should be able to schedule
ai.release(results.get("1").getTaskConfig(), AssignableInstance.DEFAULT_QUOTA_TYPE);
result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
Assert.assertTrue(result.isSuccessful());
// release all tasks, check remaining resources
for (TaskAssignResult rst : results.values()) {
ai.release(rst.getTaskConfig(), AssignableInstance.DEFAULT_QUOTA_TYPE);
}
Assert.assertEquals(
(int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
.get(AssignableInstance.DEFAULT_QUOTA_TYPE),
0);
}
@Test
public void testTryAssignFailure() {
AssignableInstance ai =
new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, false),
new InstanceConfig(testInstanceName),
createLiveInstance(testResourceTypes, testResourceCapacity));
// No such resource type
String taskId = "testTask";
TaskConfig task = new TaskConfig("", null, taskId, "");
TaskAssignResult result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
Assert.assertFalse(result.isSuccessful());
Assert.assertEquals(result.getFailureReason(),
TaskAssignResult.FailureReason.NO_SUCH_RESOURCE_TYPE);
ai.updateConfigs(null, null, createLiveInstance(new String[] {
LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()
}, new String[] {
"1"
}));
ai.updateConfigs(createClusterConfig(testQuotaTypes, testQuotaRatio, true), null, null);
result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
Assert.assertTrue(result.isSuccessful());
ai.assign(result);
try {
ai.assign(result);
Assert.fail("Expecting IllegalArgumentException");
} catch (IllegalStateException e) {
// OK
}
// Duplicate assignment
result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
Assert.assertFalse(result.isSuccessful());
Assert.assertEquals(result.getFailureReason(),
TaskAssignResult.FailureReason.TASK_ALREADY_ASSIGNED);
// Insufficient quota
ai.release(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
ai.updateConfigs(null, null, createLiveInstance(new String[] {
LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()
}, new String[] {
"0"
}));
result = ai.tryAssign(task, AssignableInstance.DEFAULT_QUOTA_TYPE);
Assert.assertFalse(result.isSuccessful());
Assert.assertEquals(result.getFailureReason(),
TaskAssignResult.FailureReason.INSUFFICIENT_QUOTA);
}
@Test
public void testRestoreTaskAssignResult() {
AssignableInstance ai =
new AssignableInstance(createClusterConfig(testQuotaTypes, testQuotaRatio, true),
new InstanceConfig(testInstanceName), createLiveInstance(new String[] {
LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name()
}, new String[] {
"40"
}));
Map<String, TaskConfig> currentAssignments = new HashMap<>();
TaskConfig supportedTask = new TaskConfig("", null, "supportedTask", "");
currentAssignments.put("supportedTask", supportedTask);
TaskConfig unsupportedTask = new TaskConfig("", null, "unsupportedTask", "");
currentAssignments.put("unsupportedTask", unsupportedTask);
Map<String, TaskAssignResult> results = Maps.newHashMap();
for (Map.Entry<String, TaskConfig> entry : currentAssignments.entrySet()) {
String taskID = entry.getKey();
TaskConfig taskConfig = entry.getValue();
String quotaType = (taskID.equals("supportedTask")) ? AssignableInstance.DEFAULT_QUOTA_TYPE
: "UnsupportedQuotaType";
// Restore TaskAssignResult
TaskAssignResult taskAssignResult = ai.restoreTaskAssignResult(taskID, taskConfig, quotaType);
if (taskAssignResult.isSuccessful()) {
results.put(taskID, taskAssignResult);
}
}
for (TaskAssignResult rst : results.values()) {
Assert.assertTrue(rst.isSuccessful());
Assert.assertEquals(rst.getAssignableInstance(), ai);
}
Assert.assertEquals(ai.getCurrentAssignments().size(), 2);
// The expected value for the following should be 2, not 1 because the unsupported task should
// also have been assigned as a DEFAULT task
Assert.assertEquals(
(int) ai.getUsedCapacity().get(LiveInstance.InstanceResourceType.TASK_EXEC_THREAD.name())
.get(AssignableInstance.DEFAULT_QUOTA_TYPE),
2);
}
private Map<String, Integer> createResourceQuotaPerTypeMap(String[] types, int[] quotas) {
Map<String, Integer> ret = new HashMap<>();
for (int i = 0; i < types.length; i++) {
ret.put(types[i], quotas[i]);
}
return ret;
}
private Map<String, Integer> calculateExpectedQuotaPerType(int capacity, String[] quotaTypes,
String[] quotaRatios) {
Integer totalQuota = 0;
Map<String, Integer> expectedQuotaPerType = new HashMap<>();
for (String ratio : quotaRatios) {
totalQuota += Integer.valueOf(ratio);
}
for (int i = 0; i < quotaRatios.length; i++) {
expectedQuotaPerType.put(quotaTypes[i],
Math.round((float) capacity * Integer.valueOf(quotaRatios[i]) / totalQuota));
}
return expectedQuotaPerType;
}
}