blob: 90c37480d7b6c8dc95639803bb6377c02aca4991 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.elasticity.util;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.checkpoint.Checkpoint;
import org.apache.samza.container.TaskName;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemAdmins;
import org.apache.samza.system.SystemStreamPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class with util methods to be used for checkpoint computation when elasticity is enabled
* Elasticity is supported only for tasks created by either
* the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
* the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
*/
public class ElasticityUtils {
private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
// GroupByPartition tasks have names like Partition 0_1_2
// where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
// see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
private static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
private static final Pattern ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
private static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
private static final Pattern TASK_NAME_GROUP_BY_PARTITION_PATTERN = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
private static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
//GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
// where 2 is the elasticity factor
// see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
private static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
private static final Pattern ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
private static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
private static final Pattern TASK_NAME_GROUP_BY_SSP_PATTERN = Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX);
private static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
/**
* Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
* When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
* When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
* Both tasks have names ending with _%d where %d is the elasticity factor
* @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
* @return
* for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
* for other tasks returns 1 which is the default elasticity factor
*/
static int getElasticityFactorFromTaskName(TaskName taskName) {
return getTaskNameParts(taskName).elasticityFactor;
}
/**
* checks if the given taskname is of a GroupByPartition task
* @param taskName of any task
* @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
*/
static boolean isGroupByPartitionTask(TaskName taskName) {
return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
}
/**
* checks if the given taskname is of a GroupBySystemStreamPartition task
* @param taskName of any task
* @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
*/
static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
}
/**
* checks if given taskName is elastic aka created with an elasticity factor > 1
* @param taskName of any task
* @return true for following, false otherwise
* for task created by GroupByPartition, taskName has format "Partition 0_1_2"
* for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
*/
static boolean isTaskNameElastic(TaskName taskName) {
if (isGroupByPartitionTask(taskName)) {
Matcher m = ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskName.getTaskName());
return m.find();
} else if (isGroupBySystemStreamPartitionTask(taskName)) {
Matcher m = ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskName.getTaskName());
return m.find();
}
return false;
}
/**
* From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
* @param taskName any taskName
* @return TaskNameComponents object containing system, stream, partition, keyBucket and elasticityFactor
* for GroupByPartition task:
* taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
* system and stream are empty "" strings and partition is the input partition,
* without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
* with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
* for GroupBySystemStreamPartition task:
* taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
* "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
* system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
* without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
* with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
* for tasks created with other SSP groupers:
* default TaskNameComponents is returned which has empty system, stream,
* -1 for partition and 0 for keyBucket and 1 for elasticity factor
*/
static TaskNameComponents getTaskNameParts(TaskName taskName) {
if (isGroupByPartitionTask(taskName)) {
return getTaskNameParts_GroupByPartition(taskName);
} else if (isGroupBySystemStreamPartitionTask(taskName)) {
return getTaskNameParts_GroupBySSP(taskName);
}
log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
+ "Elasticity is not supported for this taskName. "
+ "Returning default TaskNameComponents which has default keyBucket 0,"
+ " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
return new TaskNameComponents(TaskNameComponents.INVALID_PARTITION);
}
/**
* see doc for getTaskNameParts above
*/
private static TaskNameComponents getTaskNameParts_GroupByPartition(TaskName taskName) {
String taskNameStr = taskName.getTaskName();
log.debug("GetTaskNameParts for taskName {}", taskNameStr);
Matcher matcher = ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskNameStr);
if (matcher.find()) {
return new TaskNameComponents(Integer.valueOf(matcher.group(1)),
Integer.valueOf(matcher.group(2)),
Integer.valueOf(matcher.group(3)));
}
matcher = TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskNameStr);
if (matcher.find()) {
return new TaskNameComponents(Integer.valueOf(matcher.group(1)));
}
log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
throw new IllegalArgumentException("TaskName format incompatible");
}
/**
* see doc for getTaskNameParts above
*/
private static TaskNameComponents getTaskNameParts_GroupBySSP(TaskName taskName) {
String taskNameStr = taskName.getTaskName();
log.debug("GetTaskNameParts for taskName {}", taskNameStr);
Matcher matcher = ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskNameStr);
if (matcher.find()) {
return new TaskNameComponents(matcher.group(1),
matcher.group(2),
Integer.valueOf(matcher.group(3)),
Integer.valueOf(matcher.group(4)),
Integer.valueOf(matcher.group(5)));
}
matcher = TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskNameStr);
if (matcher.find()) {
return new TaskNameComponents(matcher.group(1),
matcher.group(2),
Integer.valueOf(matcher.group(3)));
}
log.warn("Could not extract system, stream, partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
throw new IllegalArgumentException("TaskName format incompatible");
}
/**
* Without elasticity, a task consumes an entire (full) SSP = [System, stream, partition].
* With elasticity, a task consumes a portion of the SSP_withKeyBucket = [system, stream, partition, keyBucket]
* where 0 <= keyBucket < elasticityFactor and contains a subset of the IncomingMessageEnvelope(IME) from the full SSP
* Given two tasks currentTask and otherTask, the task otherTask is called ancestor of currentTask if the following is true
* all IME consumed by currentTask will be consumed by otherTask when elasticityFactor decreases or stays same
* For example:
* case 1: elasticityFactor 2 to 1
* otherTask = Partition 0 consuming all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
* currentTask1 = Partition 0_0_2 consumes IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 2
* currentTask2 = Partition 0_1_2 consumes IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor = 2
* SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2
* case 2: elasticityFactor 2 to 2 - no change
* Partition 0_0_2 is an ancestor of itself since the input SSP_withKeyBucket0 doesnt change
* similarly Partition 0_1_2 is an ancestor of itself. This applies to all elasticityFactors
* case 3: elasticityFactor 4 to 2
* otherTask = Partition 0_0_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2
* currentTask1 = Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
* currentTask2 = Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
* From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
* we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
* Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
* Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and Partition 0_2_4
* Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and Partition 0_3_4
* And transitively, Partition 0 is ancestor of Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
*
* This applies to tasks created by GroupByPartition and GroupBySystemStreamPartition SSPGroupers.
* aka this applies if both currentTask and otherTask are created by GroupByPartition or both are created by GroupBySystemStreamPartition
* If either currentTask and/or otherTask were created by other SSPGroupers then false is returned.
* @param currentTask
* @param otherTask
* @return true if otherTask is ancestor of currentTask, false otherwise
*/
static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, TaskName otherTask) {
log.debug("isOtherTaskAncestorOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
|| (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
return false;
}
TaskNameComponents currentTaskNameComponents = getTaskNameParts(currentTask);
TaskNameComponents otherTaskNameComponents = getTaskNameParts(otherTask);
if (!otherTaskNameComponents.system.equals(currentTaskNameComponents.system)
|| !otherTaskNameComponents.stream.equals(currentTaskNameComponents.stream)
|| otherTaskNameComponents.partition != currentTaskNameComponents.partition
|| otherTaskNameComponents.elasticityFactor > currentTaskNameComponents.elasticityFactor) {
return false;
}
return (currentTaskNameComponents.keyBucket % otherTaskNameComponents.elasticityFactor) == otherTaskNameComponents.keyBucket;
}
/**
* See javadoc for isOtherTaskAncestorOfCurrentTask above
* Given currentTask and otherTask,
* if currentTask == otherTask, then its not a descendant. (unlike ancestor)
* else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then otherTask is descendant of currentTask
* @param currentTask
* @param otherTask
* @return
*/
static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, TaskName otherTask) {
log.debug("isOtherTaskDescendantOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
|| (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
return false;
}
TaskNameComponents currentTaskNameComponents = getTaskNameParts(currentTask);
TaskNameComponents otherTaskNameComponents = getTaskNameParts(otherTask);
if (!otherTaskNameComponents.system.equals(currentTaskNameComponents.system)
|| !otherTaskNameComponents.stream.equals(currentTaskNameComponents.stream)
|| otherTaskNameComponents.partition != currentTaskNameComponents.partition
|| otherTaskNameComponents.elasticityFactor <= currentTaskNameComponents.elasticityFactor) {
return false;
}
return (
otherTaskNameComponents.keyBucket % currentTaskNameComponents.elasticityFactor) == currentTaskNameComponents.keyBucket;
}
/**
* For a given taskName and a map of task names to checkpoints, returns the taskName's ancestor and descendants checkpoints
* All ancestor checkpoints are put into a set
* Descendant checkpoins are put into a map of elasticityFactor to descendant checkpoint where the elastictyFactor is of the descendant.
* For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4)
* the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>)
* See javadoc of isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant
* @param taskName name of the task
* @param checkpointMap map from taskName to checkpoint
* @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map
*/
static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> getAncestorAndDescendantCheckpoints(
TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) {
Set<Checkpoint> ancestorCheckpoints = new HashSet<>();
Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>();
log.debug("starting to parse the checkpoint map to find ancestors and descendants for taskName {}", taskName.getTaskName());
checkpointMap.keySet().forEach(otherTaskName -> {
Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName);
if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) {
log.debug("current task name is {} and other task name is {} and other task is ancestor", taskName, otherTaskName);
ancestorCheckpoints.add(otherTaskCheckpoint);
}
if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) {
log.debug("current task name is {} and other task name is {} and other task is descendant", taskName, otherTaskName);
int otherEF = getElasticityFactorFromTaskName(otherTaskName);
if (!descendantCheckpoints.containsKey(otherEF)) {
descendantCheckpoints.put(otherEF, new HashSet<>());
}
descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint);
}
});
log.debug("done computing all ancestors and descendants of {}", taskName);
return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints);
}
/**
* Given a checkpoint with offset map from SystemStreamPartition to offset, returns the offset for the desired ssp
* Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
* A checkpoint belongs to one task and a task would consume either the full SSP (aka no keyBucket)
* or consume exactly one of the keyBuckets of an SSP. Hence there will be at most one entry for an SSP in a checkpoint
* @param checkpoint Checkpoint containing SSP -> offset
* @param ssp SystemStreamPartition for which an offset needs to be fetched
* @return offset for the ssp in the Checkpoint or null if doesnt exist.
*/
static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, SystemStreamPartition ssp) {
String checkpointStr = checkpoint.getOffsets().entrySet().stream()
.map(k -> k.getKey() + " : " + k.getValue())
.collect(Collectors.joining(", ", "{", "}"));
log.debug("for ssp {}, in checkpoint {}", ssp, checkpointStr);
Optional<String> offsetFound = checkpoint.getOffsets().entrySet()
.stream()
.filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey()
.getPartition()
.equals(ssp.getPartition()))
.map(Map.Entry::getValue)
.findFirst();
if (offsetFound.isPresent()) {
return offsetFound.get();
}
log.warn("Could not find offset for ssp {} in checkpoint {}. returning null string as offset", ssp, checkpoint);
return null;
}
/**
* Given a set of checkpoints, find the max aka largest offset for an ssp
* Largest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
* Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
* @param checkpointSet set of checkpoints
* @param ssp for which largest offset is needed
* @param systemAdmin of the ssp.getSystem()
* @return offset - string if one exists else null
*/
static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
SystemStreamPartition ssp, SystemAdmin systemAdmin) {
return checkpointSet.stream()
.filter(Objects::nonNull)
.map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
.filter(Objects::nonNull)
.sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first
.findFirst().orElse(null);
}
/**
* Given a set of checkpoints, find the min aka smallest offset for an ssp
* Smallest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
* Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
* @param checkpointSet set of checkpoints
* @param ssp for which largest offset is needed
* @param systemAdmin of the ssp.getSystem()
* @return offset - string if one exists else null
*/
static String getMinOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
SystemStreamPartition ssp, SystemAdmin systemAdmin) {
return checkpointSet.stream()
.filter(Objects::nonNull)
.map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
.filter(Objects::nonNull)
.sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset1, offset2)) //confirm ascending sort - aka smallest offset first
.findFirst().orElse(null);
}
/**
* Prereq: See javadoc for isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask to fully understand ancestor and descendant notion
* Briefly, Given tasks - Partition 0, Partition 0_0_2, Partition 0_1_2 and Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
* (recall Partition 0_1_2 means reads input partition 0, keyBucket 1 and elasticityFactor 2)
* For task Partition 0_0_2: ancestors = [Partition 0, Partition 0_0_2] and descendants = [Partition 0_0_4, Partition 0_2_4]
*
* If a task has no descendants, then we just need to pick the largest offset among all the ancestors to get the last processed offset.
* for example above, if Partition 0_0_2 only had ancestors and no descendants, taking largest offset among Partition 0 and 0_0_2 gives last proc offset.
*
* With descendants, a little care is needed. there could be descendants with different elasticity factors.
* given one elasticity factor, each the descendant within the elasticity factor consumes a sub-portion (aka keyBucket) of the task.
* hence, to avoid data loss, we need to pick the lowest offset across descendants of the same elasticity factor.
* Across elasticity factors, largest works just like in ancestor
*
* Taking a concrete example
* From {@link org.apache.samza.system.IncomingMessageEnvelope} (IME)
* Partition 0 consunmig all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
* Partition 0_1_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor=2
* Partition 0_0_2 consuming all IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor=2
* Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
* Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
* From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
* we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
* Thus,
* SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1.
* SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
* If the checkpoint map has
* Partition 0: (SSP : 1), Partition 0_0_2: (SSP0 : 2), Partition 0_1_2: (SSP1 : 3), Partition 0_0_4: (SSP0 : 4), Partition 0_2_4: (SSP1 : 6)
* looking at these map and knowing that offsets are monotonically increasing, it is clear that last deploy was with elasticity factor = 4
* to get checkpoint for Partition 0_0_2, we need to consider last deploy's offsets.
* picking 6 (offset for Partition 0_2_4) means that 0_0_2 will start proc from 6 but offset 5 was never processed.
* hence we need to take min of offsets within an elasticity factor.
*
* Given checkpoints for all the tasks in the checkpoint stream,
* computing the last proc offset for an ssp checkpoint for a task,
* the following needs to be met.
* 1. Ancestors: we need to take largest offset among ancestors for an ssp
* 2. Descendants:
* a. group descendants by their elasticityFactor.
* b. among descendants of the same elasticityFactor, take the smallest offset for an ssp
* c. once step b is done, we have (elasticityFactor : smallest-offset-for-ssp) set, pick the largest in this set
* 3. Pick the larger among the offsets received from step 1 (for ancestors) and step 2 (for descendants)
*
* @param taskName
* @param taskSSPSet
* @param checkpointMap
* @param systemAdmins
* @return
*/
public static Map<SystemStreamPartition, String> computeLastProcessedOffsetsFromCheckpointMap(
TaskName taskName,
Set<SystemStreamPartition> taskSSPSet,
Map<TaskName, Checkpoint> checkpointMap,
SystemAdmins systemAdmins) {
Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> acnestorsAndDescendantsFound =
getAncestorAndDescendantCheckpoints(taskName, checkpointMap);
Set<Checkpoint> ancestorCheckpoints = acnestorsAndDescendantsFound.getLeft();
Map<Integer, Set<Checkpoint>> descendantCheckpoints = acnestorsAndDescendantsFound.getRight();
Map<SystemStreamPartition, String> taskSSPOffsets = new HashMap<>();
taskSSPSet.forEach(ssp_withKeyBucket -> {
log.info("for taskName {} and ssp of the task {}, finding its last proc offset", taskName, ssp_withKeyBucket);
SystemStreamPartition ssp = new SystemStreamPartition(ssp_withKeyBucket.getSystemStream(),
ssp_withKeyBucket.getPartition());
SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
String currentLastOffsetForSSP = null;
String ancestorLastOffsetForSSP = getMaxOffsetForSSPInCheckpointSet(ancestorCheckpoints, ssp, systemAdmin);
log.info("for taskName {} and ssp {} got lastoffset from ancestors as {}",
taskName, ssp_withKeyBucket, ancestorLastOffsetForSSP);
String descendantLastOffsetForSSP = descendantCheckpoints.entrySet().stream()
.map(entry -> getMinOffsetForSSPInCheckpointSet(entry.getValue(), ssp, systemAdmin)) // at each ef level, find min offset
.sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first
.filter(Objects::nonNull)
.findFirst().orElse(null);
log.info("for taskName {} and ssp {} got lastoffset from descendants as {}",
taskName, ssp_withKeyBucket, descendantLastOffsetForSSP);
Integer offsetComparison = systemAdmin.offsetComparator(ancestorLastOffsetForSSP, descendantLastOffsetForSSP);
if (descendantLastOffsetForSSP == null || (offsetComparison != null && offsetComparison > 0)) { // means ancestorLastOffsetForSSP > descendantLastOffsetForSSP
currentLastOffsetForSSP = ancestorLastOffsetForSSP;
} else {
currentLastOffsetForSSP = descendantLastOffsetForSSP;
}
if (currentLastOffsetForSSP == null) {
log.info("for taskName {} and ssp {} got lastoffset as null. "
+ "skipping adding this ssp to task's offsets loaded from previous checkpoint", taskName, ssp_withKeyBucket);
} else {
log.info("for taskName {} and ssp {} got lastoffset as {}", taskName, ssp_withKeyBucket, currentLastOffsetForSSP);
taskSSPOffsets.put(ssp_withKeyBucket, currentLastOffsetForSSP);
}
});
String checkpointStr = taskSSPOffsets.entrySet().stream()
.map(k -> k.getKey() + " : " + k.getValue())
.collect(Collectors.joining(", ", "{", "}"));
log.info("for taskName {}, returning checkpoint as {}", taskName, checkpointStr);
return taskSSPOffsets;
}
public static boolean wasElasticityEnabled(Map<TaskName, Checkpoint> checkpointMap) {
return checkpointMap.keySet().stream()
.filter(ElasticityUtils::isTaskNameElastic) // true if the taskName has elasticityFactor in it
.findFirst().isPresent();
}
}