blob: cb3a7a795efca37bea9b82fab20b57c3638a8c61 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.clustermanager;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.samza.Partition;
import org.apache.samza.config.MapConfig;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.apache.samza.system.SystemStreamPartition;
import org.junit.Assert;
import org.junit.Test;
public class TestStandbyAllocator {
@Test
public void testWithNoStandby() {
JobModel jobModel = getJobModelWithStandby(1, 1, 1);
List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints("0", jobModel);
Assert.assertEquals("Constrained container count should be 0", 0, containerConstraints.size());
}
@Test
public void testWithStandby() {
testWithStandby(2, 1, 2);
testWithStandby(10, 1, 2);
testWithStandby(2, 10, 2);
testWithStandby(2, 10, 4);
testWithStandby(10, 1, 4);
testWithStandby(10, 10, 4);
}
public void testWithStandby(int nContainers, int nTasks, int replicationFactor) {
JobModel jobModel = getJobModelWithStandby(nContainers, nTasks, replicationFactor);
for (String containerID : jobModel.getContainers().keySet()) {
List<String> containerConstraints = StandbyTaskUtil.getStandbyContainerConstraints(containerID, jobModel);
Assert.assertTrue("Constrained container should be valid containers",
jobModel.getContainers().keySet().containsAll(containerConstraints));
Assert.assertEquals("Constrained container count should be (replication factor-1)", replicationFactor - 1,
containerConstraints.size());
Assert.assertFalse("Constrained containers list should not have the container itself",
containerConstraints.contains(containerID));
containerConstraints.forEach(containerConstraintID -> {
Assert.assertTrue("Constrained containers IDs should correspond to the active container",
containerID.split("-")[0].equals(containerConstraintID.split("-")[0]));
});
}
}
// Helper method to create a jobmodel with given number of containers, tasks and replication factor
public static JobModel getJobModelWithStandby(int nContainers, int nTasks, int replicationFactor) {
Map<String, ContainerModel> containerModels = new HashMap<>();
int taskID = 0;
for (int j = 0; j < nContainers; j++) {
Map<TaskName, TaskModel> tasks = new HashMap<>();
for (int i = 0; i < nTasks; i++) {
TaskModel taskModel = getTaskModel(taskID++);
tasks.put(taskModel.getTaskName(), taskModel);
}
containerModels.put(String.valueOf(j), new ContainerModel(String.valueOf(j), tasks));
}
Map<String, ContainerModel> standbyContainerModels = new HashMap<>();
for (int i = 0; i < replicationFactor - 1; i++) {
for (String containerID : containerModels.keySet()) {
String standbyContainerId = StandbyTaskUtil.getStandbyContainerId(containerID, i);
Map<TaskName, TaskModel> standbyTasks = getStandbyTasks(containerModels.get(containerID).getTasks(), i);
standbyContainerModels.put(standbyContainerId, new ContainerModel(standbyContainerId, standbyTasks));
}
}
containerModels.putAll(standbyContainerModels);
return new JobModel(new MapConfig(), containerModels);
}
// Helper method that creates a taskmodel with one input ssp
private static TaskModel getTaskModel(int partitionNum) {
return new TaskModel(new TaskName("Partition " + partitionNum),
Collections.singleton(new SystemStreamPartition("test-system", "test-stream", new Partition(partitionNum))),
new Partition(partitionNum), TaskMode.Active);
}
// Helper method to create standby-taskModels from active-taskModels
private static Map<TaskName, TaskModel> getStandbyTasks(Map<TaskName, TaskModel> tasks, int replicaNum) {
Map<TaskName, TaskModel> standbyTasks = new HashMap<>();
tasks.forEach((taskName, taskModel) -> {
TaskName standbyTaskName = StandbyTaskUtil.getStandbyTaskName(taskName, replicaNum);
standbyTasks.put(standbyTaskName,
new TaskModel(standbyTaskName, taskModel.getSystemStreamPartitions(), taskModel.getChangelogPartition(),
TaskMode.Standby));
});
return standbyTasks;
}
}