blob: c993445b5c96080fd485eb164e632fc177c93851 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.clustermanager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.job.model.TaskMode;
/**
* Collection of util methods used for performing Standby-aware Container allocation in YARN.
*/
public class StandbyTaskUtil {
private static final String STANDBY_CONTAINER_ID_SEPARATOR = "-";
private static final String TASKNAME_SEPARATOR = "-";
private static final String STANDBY_TASKNAME_PREFIX = "Standby";
/**
* Returns true if the containerName implies a standby container, false otherwise.
* @param containerID The desired containerID
* @return
*/
public static boolean isStandbyContainer(String containerID) {
return containerID.contains(STANDBY_CONTAINER_ID_SEPARATOR);
}
// Helper method to generate buddy containerIDs by appending the replica-number to the active-container's id.
public final static String getStandbyContainerId(String activeContainerId, int replicaNumber) {
return activeContainerId.concat(STANDBY_CONTAINER_ID_SEPARATOR).concat(String.valueOf(replicaNumber));
}
// Helper method to generate active container's ID by removing the replica-number from the standby container's id.
public final static String getActiveContainerId(String standbyContainerID) {
return standbyContainerID.split(STANDBY_CONTAINER_ID_SEPARATOR)[0];
}
// Helper method to get the standby task name by prefixing "Standby" to the corresponding active task's name.
public final static TaskName getStandbyTaskName(TaskName activeTaskName, int replicaNum) {
return new TaskName(STANDBY_TASKNAME_PREFIX.concat(TASKNAME_SEPARATOR)
.concat(activeTaskName.getTaskName())
.concat(TASKNAME_SEPARATOR)
.concat(String.valueOf(replicaNum)));
}
// Helper method to get the active task name by stripping the prefix "Standby" from the standby task name.
public final static TaskName getActiveTaskName(TaskName standbyTaskName) {
return new TaskName(standbyTaskName.getTaskName().split(TASKNAME_SEPARATOR)[1]);
}
/**
* Given a containerID and job model, it returns the containerids of all containers that either have
* a. standby tasks corresponding to active tasks on the given container, or
* b. have active tasks corresponding to standby tasks on the given container.
* This is used to ensure that an active task and all its corresponding standby tasks are on separate hosts, and
* standby tasks corresponding to the same active task are on separate hosts.
*/
public static List<String> getStandbyContainerConstraints(String containerID, JobModel jobModel) {
ContainerModel givenContainerModel = jobModel.getContainers().get(containerID);
List<String> containerIDsWithStandbyConstraints = new ArrayList<>();
// iterate over all containerModels in the jobModel
for (ContainerModel containerModel : jobModel.getContainers().values()) {
// add to list if active and standby tasks on the two containerModels overlap
if (!givenContainerModel.equals(containerModel) && checkTaskOverlap(givenContainerModel, containerModel)) {
containerIDsWithStandbyConstraints.add(containerModel.getId());
}
}
return containerIDsWithStandbyConstraints;
}
// Helper method that checks if tasks on the two containerModels overlap
private static boolean checkTaskOverlap(ContainerModel containerModel1, ContainerModel containerModel2) {
Set<TaskName> activeTasksOnContainer1 = getCorrespondingActiveTasks(containerModel1);
Set<TaskName> activeTasksOnContainer2 = getCorrespondingActiveTasks(containerModel2);
return !Collections.disjoint(activeTasksOnContainer1, activeTasksOnContainer2);
}
// Helper method that returns the active tasks corresponding to all standby tasks on a container, including any already-active tasks on the container
private static Set<TaskName> getCorrespondingActiveTasks(ContainerModel containerModel) {
Set<TaskName> tasksInActiveMode = getAllTasks(containerModel, TaskMode.Active);
tasksInActiveMode.addAll(getAllTasks(containerModel, TaskMode.Standby).stream()
.map(taskName -> getActiveTaskName(taskName))
.collect(Collectors.toSet()));
return tasksInActiveMode;
}
// Helper method to getAllTaskModels of this container in the given taskMode
private static Set<TaskName> getAllTasks(ContainerModel containerModel, TaskMode taskMode) {
return containerModel.getTasks()
.values()
.stream()
.filter(e -> e.getTaskMode().equals(taskMode))
.map(taskModel -> taskModel.getTaskName())
.collect(Collectors.toSet());
}
}