blob: 5fd9ceb8111c1dfa3105f69235230b3b6a634feb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.container.grouper.task;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.samza.clustermanager.StandbyTaskUtil;
import org.apache.samza.container.TaskName;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.TaskMode;
import org.apache.samza.job.model.TaskModel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This TaskNameGrouperProxy wraps around any given TaskNameGrouper.
* In addition to apply the provided grouper, this grouper also generates
* generates Standby-tasks and adds them to separate dedicated containers.
* It adds (r-1) Standby-Tasks for each active task, where r is the replication factor.
* Hence it adds r-1 additional containers for each regular container.
*
* All Standby-tasks are assigned a TaskName with a "Standby" prefix.
* The new containers carrying Standby tasks that are added are assigned containerIDs corresponding to its
* active container, e.g., activeContainerID-replicaNumber
* For e.g.,
*
* If the initial container model map is:
*
* Container 0 : (Partition 0, Partition 1)
* Container 1 : (Partition 2, Partition 3)
* with replicationFactor = 3
*
* The generated containerModel map is:
* Container 0 : (Partition 0, Partition 1)
* Container 1 : (Partition 2, Partition 3)
* Container 0-0 : (Standby-Partition 0-0, Standby-Partition 1-0)
* Container 1-0 : (Standby-Partition 2-0, Standby-Partition 3-0)
* Container 0-1 : (Standby-Partition 0-1, Standby-Partition 1-1)
* Container 1-1 : (Standby-Partition 2-1, Standby-Partition 3-1)
*/
public class TaskNameGrouperProxy {
private static final Logger LOG = LoggerFactory.getLogger(TaskNameGrouperProxy.class);
private final TaskNameGrouper taskNameGrouper;
private final boolean standbyTasksEnabled;
private final int replicationFactor;
public TaskNameGrouperProxy(TaskNameGrouper taskNameGrouper, boolean standbyTasksEnabled,
int replicationFactor) {
this.taskNameGrouper = taskNameGrouper;
this.standbyTasksEnabled = standbyTasksEnabled;
this.replicationFactor = replicationFactor;
}
public Set<ContainerModel> group(Set<TaskModel> taskModels, GrouperMetadata grouperMetadata) {
if (this.standbyTasksEnabled) {
return generateStandbyTasks(this.taskNameGrouper.group(taskModels, grouperMetadata), replicationFactor);
} else {
return this.taskNameGrouper.group(taskModels, grouperMetadata);
}
}
public Set<ContainerModel> group(Set<TaskModel> taskModels, List<String> containersIds) {
if (this.standbyTasksEnabled) {
return generateStandbyTasks(this.taskNameGrouper.group(taskModels, containersIds), replicationFactor);
} else {
return this.taskNameGrouper.group(taskModels, containersIds);
}
}
/**
* Generate a container model map with standby tasks added and grouped into buddy containers.
* Package-private for testing.
*
* @param containerModels The initial container model map.
* @param replicationFactor The desired replication factor, if the replication-factor is n, we add n-1 standby tasks for each active task.
* @return The generated map of containerModels with added containers, and the initial regular containers
*/
Set<ContainerModel> generateStandbyTasks(Set<ContainerModel> containerModels, int replicationFactor) {
LOG.info("Received current containerModel map : {}, replicationFactor : {}", containerModels, replicationFactor);
Set<ContainerModel> buddyContainers = new HashSet<>();
for (ContainerModel activeContainer : containerModels) {
for (int replicaNum = 0; replicaNum < replicationFactor - 1; replicaNum++) {
String buddyContainerId = StandbyTaskUtil.getStandbyContainerId(activeContainer.getId(), replicaNum);
ContainerModel buddyContainerModel =
new ContainerModel(buddyContainerId, getTaskModelForBuddyContainer(activeContainer.getTasks(), replicaNum));
buddyContainers.add(buddyContainerModel);
}
}
LOG.info("Adding buddy containers : {}", buddyContainers);
buddyContainers.addAll(containerModels);
return buddyContainers;
}
// Helper method to populate the container model for a buddy container.
private static Map<TaskName, TaskModel> getTaskModelForBuddyContainer(
Map<TaskName, TaskModel> activeContainerTaskModel, int replicaNum) {
Map<TaskName, TaskModel> standbyTaskModels = new HashMap<>();
for (TaskName taskName : activeContainerTaskModel.keySet()) {
TaskName standbyTaskName = StandbyTaskUtil.getStandbyTaskName(taskName, replicaNum);
TaskModel standbyTaskModel =
new TaskModel(standbyTaskName, activeContainerTaskModel.get(taskName).getSystemStreamPartitions(),
activeContainerTaskModel.get(taskName).getChangelogPartition(), TaskMode.Standby);
standbyTaskModels.put(standbyTaskName, standbyTaskModel);
}
LOG.info("Generated standbyTaskModels : {} for active task models : {}", standbyTaskModels,
activeContainerTaskModel);
return standbyTaskModels;
}
}