Revert elasticity commits - bug found with broadcast input (#1626)
Symptom: Broadcast input ssp is not consumed by all containers of the job.
Cause: Elasticity code changing systemconsumers and samza-core
Changes:
reverting all elasticity commits post SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled #1585 after which this issue was detected.
updating the checkpointv1 serde to accept checkpoints written with SAMZA-2743: [Elasticity] Add keybucket into SSP serde for checkpoint #1608
list of elasticity PRs being reverted -
#1625
#1610
#1608
#1607
#1603
#1598
#1597
#1596
#1589
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index 4e76bb2..4dcefaa 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -19,8 +19,6 @@
package org.apache.samza.checkpoint;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.samza.container.TaskName;
/**
@@ -68,15 +66,4 @@
* Clear the checkpoints in the checkpoint stream.
*/
default void clearCheckpoints() { }
-
- /**
- * Returns the last recorded checkpoint for all tasks present in the implementation-specific location.
- * All tasks contains all the tasks within the current job model.
- * All tasks also includes tasks which may have been part of the job model during a previous deploy.
- * @return A Map of TaskName to Checkpoint object.
- * The Checkpoint object has the recorded offset data of the specified partition.
- */
- default Map<TaskName, Checkpoint> readAllCheckpoints() {
- return new HashMap<>();
- };
}
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 1f4a740..3908170 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -132,9 +132,7 @@
if (envelopeKeyorOffset == null) {
return new SystemStreamPartition(systemStreamPartition, 0);
}
- // modulo 31 first to best spread out the hashcode and then modulo elasticityFactor for actual keyBucket
- // Note: elasticityFactor <= 16 so modulo 31 is safe to do.
- int keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+ int keyBucket = Math.abs(envelopeKeyorOffset.hashCode()) % elasticityFactor;
return new SystemStreamPartition(systemStreamPartition, keyBucket);
}
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index d36ac66..b9aa82c 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -190,6 +190,7 @@
public static final String CONTAINER_HEARTBEAT_MONITOR_ENABLED = "job.container.heartbeat.monitor.enabled";
private static final boolean CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT = true;
+
// Enabled elasticity for the job
// number of (elastic) tasks in the job will be old task count X elasticity factor
public static final String JOB_ELASTICITY_FACTOR = "job.elasticity.factor";
@@ -517,8 +518,8 @@
public int getElasticityFactor() {
int elasticityFactor = getInt(JOB_ELASTICITY_FACTOR, DEFAULT_JOB_ELASTICITY_FACTOR);
- if (elasticityFactor < 1 || elasticityFactor > 16) {
- throw new ConfigException("Elasticity factor can not be less than 1 or greater than 16");
+ if (elasticityFactor < 1) {
+ throw new ConfigException("Elasticity factor can not be less than 1");
}
return elasticityFactor;
}
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index 5bbcbae..3334adc 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -860,19 +860,8 @@
IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
if (envelope.isEndOfStream()) {
- if (elasticityFactor <= 1) {
- SystemStreamPartition ssp = envelope.getSystemStreamPartition();
- processingSspSet.remove(ssp);
- } else {
- // if envelope is end of stream, the ssp of envelope should be removed from task's processing set irresp of keyBucket
- SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition();
- Optional<SystemStreamPartition> ssp = processingSspSet.stream()
- .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
- && sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
- .findFirst();
- ssp.ifPresent(processingSspSet::remove);
- ssp.ifPresent(processingSspSetToDrain::remove);
- }
+ SystemStreamPartition ssp = envelope.getSystemStreamPartition(elasticityFactor);
+ processingSspSet.remove(ssp);
if (!hasIntermediateStreams) {
pendingEnvelopeQueue.remove();
}
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
index 53e7400..1efd172 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
@@ -64,7 +64,7 @@
int keyBucket = elasticityFactor == 1 ? -1 : i;
String taskNameStr = elasticityFactor == 1 ?
String.format("Partition %d", ssp.getPartition().getPartitionId()) :
- String.format("Partition %d_%d_%d", ssp.getPartition().getPartitionId(), keyBucket, elasticityFactor);
+ String.format("Partition %d %d", ssp.getPartition().getPartitionId(), keyBucket);
TaskName taskName = new TaskName(taskNameStr);
SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, keyBucket);
groupedMap.putIfAbsent(taskName, new HashSet<>());
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
index 652cf15..9443048 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
@@ -63,8 +63,7 @@
SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, keyBucket);
HashSet<SystemStreamPartition> sspSet = new HashSet<SystemStreamPartition>();
sspSet.add(sspWithKeyBucket);
- String elasticitySuffix = elasticityFactor == 1 ? "" : String.format("_%d", elasticityFactor);
- groupedMap.put(new TaskName(sspWithKeyBucket.toString() + elasticitySuffix), sspSet);
+ groupedMap.put(new TaskName(sspWithKeyBucket.toString()), sspSet);
}
}
diff --git a/samza-core/src/main/java/org/apache/samza/elasticity/util/ElasticityUtils.java b/samza-core/src/main/java/org/apache/samza/elasticity/util/ElasticityUtils.java
deleted file mode 100644
index 90c3748..0000000
--- a/samza-core/src/main/java/org/apache/samza/elasticity/util/ElasticityUtils.java
+++ /dev/null
@@ -1,493 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.elasticity.util;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.samza.checkpoint.Checkpoint;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStreamPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Class with util methods to be used for checkpoint computation when elasticity is enabled
- * Elasticity is supported only for tasks created by either
- * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
- * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
- */
-public class ElasticityUtils {
- private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
-
- // GroupByPartition tasks have names like Partition 0_1_2
- // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
- // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
- private static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
- private static final Pattern ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
- private static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
- private static final Pattern TASK_NAME_GROUP_BY_PARTITION_PATTERN = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
- private static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
-
- //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
- // where 2 is the elasticity factor
- // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
- private static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
- private static final Pattern ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
- private static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
- private static final Pattern TASK_NAME_GROUP_BY_SSP_PATTERN = Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX);
- private static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
-
- /**
- * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
- * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
- * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
- * Both tasks have names ending with _%d where %d is the elasticity factor
- * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
- * @return
- * for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
- * for other tasks returns 1 which is the default elasticity factor
- */
- static int getElasticityFactorFromTaskName(TaskName taskName) {
- return getTaskNameParts(taskName).elasticityFactor;
- }
-
- /**
- * checks if the given taskname is of a GroupByPartition task
- * @param taskName of any task
- * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
- */
- static boolean isGroupByPartitionTask(TaskName taskName) {
- return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
- }
-
- /**
- * checks if the given taskname is of a GroupBySystemStreamPartition task
- * @param taskName of any task
- * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
- */
- static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
- return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
- }
-
- /**
- * checks if given taskName is elastic aka created with an elasticity factor > 1
- * @param taskName of any task
- * @return true for following, false otherwise
- * for task created by GroupByPartition, taskName has format "Partition 0_1_2"
- * for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
- */
- static boolean isTaskNameElastic(TaskName taskName) {
- if (isGroupByPartitionTask(taskName)) {
- Matcher m = ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskName.getTaskName());
- return m.find();
- } else if (isGroupBySystemStreamPartitionTask(taskName)) {
- Matcher m = ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskName.getTaskName());
- return m.find();
- }
- return false;
- }
-
- /**
- * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
- * @param taskName any taskName
- * @return TaskNameComponents object containing system, stream, partition, keyBucket and elasticityFactor
- * for GroupByPartition task:
- * taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
- * system and stream are empty "" strings and partition is the input partition,
- * without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
- * with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
- * for GroupBySystemStreamPartition task:
- * taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
- * "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
- * system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
- * without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
- * with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
- * for tasks created with other SSP groupers:
- * default TaskNameComponents is returned which has empty system, stream,
- * -1 for partition and 0 for keyBucket and 1 for elasticity factor
- */
- static TaskNameComponents getTaskNameParts(TaskName taskName) {
- if (isGroupByPartitionTask(taskName)) {
- return getTaskNameParts_GroupByPartition(taskName);
- } else if (isGroupBySystemStreamPartitionTask(taskName)) {
- return getTaskNameParts_GroupBySSP(taskName);
- }
- log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
- + "Elasticity is not supported for this taskName. "
- + "Returning default TaskNameComponents which has default keyBucket 0,"
- + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
- return new TaskNameComponents(TaskNameComponents.INVALID_PARTITION);
- }
-
- /**
- * see doc for getTaskNameParts above
- */
- private static TaskNameComponents getTaskNameParts_GroupByPartition(TaskName taskName) {
- String taskNameStr = taskName.getTaskName();
- log.debug("GetTaskNameParts for taskName {}", taskNameStr);
-
- Matcher matcher = ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskNameStr);
- if (matcher.find()) {
- return new TaskNameComponents(Integer.valueOf(matcher.group(1)),
- Integer.valueOf(matcher.group(2)),
- Integer.valueOf(matcher.group(3)));
- }
- matcher = TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskNameStr);
- if (matcher.find()) {
- return new TaskNameComponents(Integer.valueOf(matcher.group(1)));
- }
- log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
- throw new IllegalArgumentException("TaskName format incompatible");
- }
-
- /**
- * see doc for getTaskNameParts above
- */
- private static TaskNameComponents getTaskNameParts_GroupBySSP(TaskName taskName) {
- String taskNameStr = taskName.getTaskName();
- log.debug("GetTaskNameParts for taskName {}", taskNameStr);
-
- Matcher matcher = ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskNameStr);
- if (matcher.find()) {
- return new TaskNameComponents(matcher.group(1),
- matcher.group(2),
- Integer.valueOf(matcher.group(3)),
- Integer.valueOf(matcher.group(4)),
- Integer.valueOf(matcher.group(5)));
- }
- matcher = TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskNameStr);
- if (matcher.find()) {
- return new TaskNameComponents(matcher.group(1),
- matcher.group(2),
- Integer.valueOf(matcher.group(3)));
- }
- log.warn("Could not extract system, stream, partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
- throw new IllegalArgumentException("TaskName format incompatible");
- }
-
- /**
- * Without elasticity, a task consumes an entire (full) SSP = [System, stream, partition].
- * With elasticity, a task consumes a portion of the SSP_withKeyBucket = [system, stream, partition, keyBucket]
- * where 0 <= keyBucket < elasticityFactor and contains a subset of the IncomingMessageEnvelope(IME) from the full SSP
- * Given two tasks currentTask and otherTask, the task otherTask is called ancestor of currentTask if the following is true
- * all IME consumed by currentTask will be consumed by otherTask when elasticityFactor decreases or stays same
- * For example:
- * case 1: elasticityFactor 2 to 1
- * otherTask = Partition 0 consuming all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
- * currentTask1 = Partition 0_0_2 consumes IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 2
- * currentTask2 = Partition 0_1_2 consumes IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor = 2
- * SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2
- * case 2: elasticityFactor 2 to 2 - no change
- * Partition 0_0_2 is an ancestor of itself since the input SSP_withKeyBucket0 doesnt change
- * similarly Partition 0_1_2 is an ancestor of itself. This applies to all elasticityFactors
- * case 3: elasticityFactor 4 to 2
- * otherTask = Partition 0_0_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2
- * currentTask1 = Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
- * currentTask2 = Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
- * From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
- * we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
- * Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
- * Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and Partition 0_2_4
- * Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and Partition 0_3_4
- * And transitively, Partition 0 is ancestor of Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
- *
- * This applies to tasks created by GroupByPartition and GroupBySystemStreamPartition SSPGroupers.
- * aka this applies if both currentTask and otherTask are created by GroupByPartition or both are created by GroupBySystemStreamPartition
- * If either currentTask and/or otherTask were created by other SSPGroupers then false is returned.
- * @param currentTask
- * @param otherTask
- * @return true if otherTask is ancestor of currentTask, false otherwise
- */
- static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, TaskName otherTask) {
- log.debug("isOtherTaskAncestorOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
- if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
- || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
- return false;
- }
-
- TaskNameComponents currentTaskNameComponents = getTaskNameParts(currentTask);
- TaskNameComponents otherTaskNameComponents = getTaskNameParts(otherTask);
-
- if (!otherTaskNameComponents.system.equals(currentTaskNameComponents.system)
- || !otherTaskNameComponents.stream.equals(currentTaskNameComponents.stream)
- || otherTaskNameComponents.partition != currentTaskNameComponents.partition
- || otherTaskNameComponents.elasticityFactor > currentTaskNameComponents.elasticityFactor) {
- return false;
- }
-
- return (currentTaskNameComponents.keyBucket % otherTaskNameComponents.elasticityFactor) == otherTaskNameComponents.keyBucket;
- }
-
- /**
- * See javadoc for isOtherTaskAncestorOfCurrentTask above
- * Given currentTask and otherTask,
- * if currentTask == otherTask, then its not a descendant. (unlike ancestor)
- * else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then otherTask is descendant of currentTask
- * @param currentTask
- * @param otherTask
- * @return
- */
- static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, TaskName otherTask) {
- log.debug("isOtherTaskDescendantOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
- if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
- || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
- return false;
- }
-
- TaskNameComponents currentTaskNameComponents = getTaskNameParts(currentTask);
- TaskNameComponents otherTaskNameComponents = getTaskNameParts(otherTask);
-
- if (!otherTaskNameComponents.system.equals(currentTaskNameComponents.system)
- || !otherTaskNameComponents.stream.equals(currentTaskNameComponents.stream)
- || otherTaskNameComponents.partition != currentTaskNameComponents.partition
- || otherTaskNameComponents.elasticityFactor <= currentTaskNameComponents.elasticityFactor) {
- return false;
- }
-
- return (
- otherTaskNameComponents.keyBucket % currentTaskNameComponents.elasticityFactor) == currentTaskNameComponents.keyBucket;
- }
-
- /**
- * For a given taskName and a map of task names to checkpoints, returns the taskName's ancestor and descendants checkpoints
- * All ancestor checkpoints are put into a set
- * Descendant checkpoins are put into a map of elasticityFactor to descendant checkpoint where the elastictyFactor is of the descendant.
- * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4)
- * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>)
- * See javadoc of isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant
- * @param taskName name of the task
- * @param checkpointMap map from taskName to checkpoint
- * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map
- */
- static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> getAncestorAndDescendantCheckpoints(
- TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) {
- Set<Checkpoint> ancestorCheckpoints = new HashSet<>();
- Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>();
- log.debug("starting to parse the checkpoint map to find ancestors and descendants for taskName {}", taskName.getTaskName());
- checkpointMap.keySet().forEach(otherTaskName -> {
- Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName);
- if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) {
- log.debug("current task name is {} and other task name is {} and other task is ancestor", taskName, otherTaskName);
- ancestorCheckpoints.add(otherTaskCheckpoint);
- }
- if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) {
- log.debug("current task name is {} and other task name is {} and other task is descendant", taskName, otherTaskName);
- int otherEF = getElasticityFactorFromTaskName(otherTaskName);
- if (!descendantCheckpoints.containsKey(otherEF)) {
- descendantCheckpoints.put(otherEF, new HashSet<>());
- }
- descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint);
- }
- });
- log.debug("done computing all ancestors and descendants of {}", taskName);
- return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints);
- }
-
- /**
- * Given a checkpoint with offset map from SystemStreamPartition to offset, returns the offset for the desired ssp
- * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
- * A checkpoint belongs to one task and a task would consume either the full SSP (aka no keyBucket)
- * or consume exactly one of the keyBuckets of an SSP. Hence there will be at most one entry for an SSP in a checkpoint
- * @param checkpoint Checkpoint containing SSP -> offset
- * @param ssp SystemStreamPartition for which an offset needs to be fetched
- * @return offset for the ssp in the Checkpoint or null if doesnt exist.
- */
- static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, SystemStreamPartition ssp) {
- String checkpointStr = checkpoint.getOffsets().entrySet().stream()
- .map(k -> k.getKey() + " : " + k.getValue())
- .collect(Collectors.joining(", ", "{", "}"));
- log.debug("for ssp {}, in checkpoint {}", ssp, checkpointStr);
-
- Optional<String> offsetFound = checkpoint.getOffsets().entrySet()
- .stream()
- .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey()
- .getPartition()
- .equals(ssp.getPartition()))
- .map(Map.Entry::getValue)
- .findFirst();
- if (offsetFound.isPresent()) {
- return offsetFound.get();
- }
- log.warn("Could not find offset for ssp {} in checkpoint {}. returning null string as offset", ssp, checkpoint);
- return null;
- }
-
- /**
- * Given a set of checkpoints, find the max aka largest offset for an ssp
- * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
- * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
- * @param checkpointSet set of checkpoints
- * @param ssp for which largest offset is needed
- * @param systemAdmin of the ssp.getSystem()
- * @return offset - string if one exists else null
- */
- static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
- SystemStreamPartition ssp, SystemAdmin systemAdmin) {
- return checkpointSet.stream()
- .filter(Objects::nonNull)
- .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
- .filter(Objects::nonNull)
- .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first
- .findFirst().orElse(null);
- }
-
- /**
- * Given a set of checkpoints, find the min aka smallest offset for an ssp
- * Smallest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
- * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
- * @param checkpointSet set of checkpoints
- * @param ssp for which largest offset is needed
- * @param systemAdmin of the ssp.getSystem()
- * @return offset - string if one exists else null
- */
- static String getMinOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
- SystemStreamPartition ssp, SystemAdmin systemAdmin) {
- return checkpointSet.stream()
- .filter(Objects::nonNull)
- .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
- .filter(Objects::nonNull)
- .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset1, offset2)) //confirm ascending sort - aka smallest offset first
- .findFirst().orElse(null);
- }
-
- /**
- * Prereq: See javadoc for isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask to fully understand ancestor and descendant notion
- * Briefly, Given tasks - Partition 0, Partition 0_0_2, Partition 0_1_2 and Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
- * (recall Partition 0_1_2 means reads input partition 0, keyBucket 1 and elasticityFactor 2)
- * For task Partition 0_0_2: ancestors = [Partition 0, Partition 0_0_2] and descendants = [Partition 0_0_4, Partition 0_2_4]
- *
- * If a task has no descendants, then we just need to pick the largest offset among all the ancestors to get the last processed offset.
- * for example above, if Partition 0_0_2 only had ancestors and no descendants, taking largest offset among Partition 0 and 0_0_2 gives last proc offset.
- *
- * With descendants, a little care is needed. there could be descendants with different elasticity factors.
- * given one elasticity factor, each the descendant within the elasticity factor consumes a sub-portion (aka keyBucket) of the task.
- * hence, to avoid data loss, we need to pick the lowest offset across descendants of the same elasticity factor.
- * Across elasticity factors, largest works just like in ancestor
- *
- * Taking a concrete example
- * From {@link org.apache.samza.system.IncomingMessageEnvelope} (IME)
- * Partition 0 consunmig all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
- * Partition 0_1_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor=2
- * Partition 0_0_2 consuming all IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor=2
- * Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
- * Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
- * From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
- * we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
- * Thus,
- * SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1.
- * SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
- * If the checkpoint map has
- * Partition 0: (SSP : 1), Partition 0_0_2: (SSP0 : 2), Partition 0_1_2: (SSP1 : 3), Partition 0_0_4: (SSP0 : 4), Partition 0_2_4: (SSP1 : 6)
- * looking at these map and knowing that offsets are monotonically increasing, it is clear that last deploy was with elasticity factor = 4
- * to get checkpoint for Partition 0_0_2, we need to consider last deploy's offsets.
- * picking 6 (offset for Partition 0_2_4) means that 0_0_2 will start proc from 6 but offset 5 was never processed.
- * hence we need to take min of offsets within an elasticity factor.
- *
- * Given checkpoints for all the tasks in the checkpoint stream,
- * computing the last proc offset for an ssp checkpoint for a task,
- * the following needs to be met.
- * 1. Ancestors: we need to take largest offset among ancestors for an ssp
- * 2. Descendants:
- * a. group descendants by their elasticityFactor.
- * b. among descendants of the same elasticityFactor, take the smallest offset for an ssp
- * c. once step b is done, we have (elasticityFactor : smallest-offset-for-ssp) set, pick the largest in this set
- * 3. Pick the larger among the offsets received from step 1 (for ancestors) and step 2 (for descendants)
- *
- * @param taskName
- * @param taskSSPSet
- * @param checkpointMap
- * @param systemAdmins
- * @return
- */
- public static Map<SystemStreamPartition, String> computeLastProcessedOffsetsFromCheckpointMap(
- TaskName taskName,
- Set<SystemStreamPartition> taskSSPSet,
- Map<TaskName, Checkpoint> checkpointMap,
- SystemAdmins systemAdmins) {
- Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> acnestorsAndDescendantsFound =
- getAncestorAndDescendantCheckpoints(taskName, checkpointMap);
- Set<Checkpoint> ancestorCheckpoints = acnestorsAndDescendantsFound.getLeft();
- Map<Integer, Set<Checkpoint>> descendantCheckpoints = acnestorsAndDescendantsFound.getRight();
-
- Map<SystemStreamPartition, String> taskSSPOffsets = new HashMap<>();
-
- taskSSPSet.forEach(ssp_withKeyBucket -> {
- log.info("for taskName {} and ssp of the task {}, finding its last proc offset", taskName, ssp_withKeyBucket);
-
- SystemStreamPartition ssp = new SystemStreamPartition(ssp_withKeyBucket.getSystemStream(),
- ssp_withKeyBucket.getPartition());
-
- SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
-
- String currentLastOffsetForSSP = null;
-
- String ancestorLastOffsetForSSP = getMaxOffsetForSSPInCheckpointSet(ancestorCheckpoints, ssp, systemAdmin);
-
- log.info("for taskName {} and ssp {} got lastoffset from ancestors as {}",
- taskName, ssp_withKeyBucket, ancestorLastOffsetForSSP);
-
- String descendantLastOffsetForSSP = descendantCheckpoints.entrySet().stream()
- .map(entry -> getMinOffsetForSSPInCheckpointSet(entry.getValue(), ssp, systemAdmin)) // at each ef level, find min offset
- .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first
- .filter(Objects::nonNull)
- .findFirst().orElse(null);
-
- log.info("for taskName {} and ssp {} got lastoffset from descendants as {}",
- taskName, ssp_withKeyBucket, descendantLastOffsetForSSP);
-
- Integer offsetComparison = systemAdmin.offsetComparator(ancestorLastOffsetForSSP, descendantLastOffsetForSSP);
- if (descendantLastOffsetForSSP == null || (offsetComparison != null && offsetComparison > 0)) { // means ancestorLastOffsetForSSP > descendantLastOffsetForSSP
- currentLastOffsetForSSP = ancestorLastOffsetForSSP;
- } else {
- currentLastOffsetForSSP = descendantLastOffsetForSSP;
- }
- if (currentLastOffsetForSSP == null) {
- log.info("for taskName {} and ssp {} got lastoffset as null. "
- + "skipping adding this ssp to task's offsets loaded from previous checkpoint", taskName, ssp_withKeyBucket);
- } else {
- log.info("for taskName {} and ssp {} got lastoffset as {}", taskName, ssp_withKeyBucket, currentLastOffsetForSSP);
- taskSSPOffsets.put(ssp_withKeyBucket, currentLastOffsetForSSP);
- }
- });
-
- String checkpointStr = taskSSPOffsets.entrySet().stream()
- .map(k -> k.getKey() + " : " + k.getValue())
- .collect(Collectors.joining(", ", "{", "}"));
- log.info("for taskName {}, returning checkpoint as {}", taskName, checkpointStr);
- return taskSSPOffsets;
- }
-
- public static boolean wasElasticityEnabled(Map<TaskName, Checkpoint> checkpointMap) {
- return checkpointMap.keySet().stream()
- .filter(ElasticityUtils::isTaskNameElastic) // true if the taskName has elasticityFactor in it
- .findFirst().isPresent();
- }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/elasticity/util/TaskNameComponents.java b/samza-core/src/main/java/org/apache/samza/elasticity/util/TaskNameComponents.java
deleted file mode 100644
index 4b76b84..0000000
--- a/samza-core/src/main/java/org/apache/samza/elasticity/util/TaskNameComponents.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.elasticity.util;
-
-/**
- * POJO class to store system, stream, partition, and keyBucket information associated with a Task,
- * that is encoded in the task's name.
- */
-public class TaskNameComponents {
-
- public static final int DEFAULT_KEY_BUCKET = 0;
- public static final int DEFAULT_ELASTICITY_FACTOR = 1;
- public static final int INVALID_PARTITION = -1;
-
- public final String system;
- public final String stream;
- public final int partition;
- public final int keyBucket;
- public final int elasticityFactor;
-
- public TaskNameComponents(int partition) {
- this(partition, DEFAULT_KEY_BUCKET, DEFAULT_ELASTICITY_FACTOR);
- }
-
- public TaskNameComponents(int partition, int keyBucket, int elasticityFactor) {
- this("", "", partition, keyBucket, elasticityFactor);
- }
-
- public TaskNameComponents(String system, String stream, int partition) {
- this(system, stream, partition, DEFAULT_KEY_BUCKET, DEFAULT_ELASTICITY_FACTOR);
- }
-
- public TaskNameComponents(String system, String stream, int partition, int keyBucket, int elasticityFactor) {
- this.system = system;
- this.stream = stream;
- this.partition = partition;
- this.keyBucket = keyBucket;
- this.elasticityFactor = elasticityFactor;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof TaskNameComponents)) return false;
-
- TaskNameComponents that = (TaskNameComponents) o;
-
- if (!(this.system.equals(that.system))
- || !(this.stream.equals(that.stream))
- || (this.partition != that.partition)
- || (this.keyBucket != that.keyBucket)
- || (this.elasticityFactor != that.elasticityFactor)) {
- return false;
- }
- return true;
- }
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + system.hashCode();
- result = prime * result + stream.hashCode();
- result = prime * result + partition;
- result = prime * result + keyBucket;
- result = prime * result + elasticityFactor;
- return result;
- }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 3470fd8..e04edee 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -231,9 +231,10 @@
public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException {
String[] parts = sspString.split("\\.");
if (parts.length < 3) {
- throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' ");
+ throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition");
}
- return new SystemStreamPartition(new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
+ return new SystemStreamPartition(
+ new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 311dc6a..48b681e 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -22,13 +22,12 @@
import java.util
import java.util.HashMap
import java.util.concurrent.ConcurrentHashMap
+
import org.apache.commons.lang3.StringUtils
import org.apache.samza.SamzaException
import org.apache.samza.annotation.InterfaceStability
-import org.apache.samza.checkpoint.OffsetManager.info
-import org.apache.samza.config.{Config, JobConfig, StreamConfig, SystemConfig}
+import org.apache.samza.config.{Config, StreamConfig, SystemConfig}
import org.apache.samza.container.TaskName
-import org.apache.samza.elasticity.util.ElasticityUtils
import org.apache.samza.startpoint.{Startpoint, StartpointManager}
import org.apache.samza.system.SystemStreamMetadata.OffsetType
import org.apache.samza.system._
@@ -106,9 +105,7 @@
// Build OffsetSetting so we can create a map for OffsetManager.
(systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
}.toMap
-
- new OffsetManager(offsetSettings, checkpointManager, startpointManager, systemAdmins, checkpointListeners,
- offsetManagerMetrics)
+ new OffsetManager(offsetSettings, checkpointManager, startpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics)
}
}
@@ -215,26 +212,16 @@
* Set the last processed offset for a given SystemStreamPartition.
*/
def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
- // without elasticity enabled, there is exactly one entry of an ssp in the systemStreamPartitions map for a taskName
- // with elasticity enabled, there is exactly one of the keyBuckets of an ssp that a task consumes
- // and hence exactly one entry of an ssp with keyBucket in in the systemStreamPartitions map for a taskName
- // hence from the given ssp, find its sspWithKeybucket for the task and use that for updating lastProcessedOffsets
- val sspWithKeyBucket = systemStreamPartitions.getOrElse(taskName,
- throw new SamzaException("No SSPs registered for task: " + taskName))
- .filter(ssp => ssp.getSystemStream.equals(systemStreamPartition.getSystemStream)
- && ssp.getPartition.equals(systemStreamPartition.getPartition))
- .toIterator.next()
-
lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]())
taskSSPsWithProcessedOffsetUpdated.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, Boolean]())
if (offset != null) {
if (!offset.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
- lastProcessedOffsets.get(taskName).put(sspWithKeyBucket, offset)
+ lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
}
// Record the spp that have received the new messages. The startpoint for each ssp should only be deleted when the
// ssp has received the new messages. More details in SAMZA-2749.
- taskSSPsWithProcessedOffsetUpdated.get(taskName).putIfAbsent(sspWithKeyBucket, true)
+ taskSSPsWithProcessedOffsetUpdated.get(taskName).putIfAbsent(systemStreamPartition, true)
}
}
@@ -500,19 +487,12 @@
val checkpoint = checkpointManager.readLastCheckpoint(taskName)
- val checkpointMap = checkpointManager.readAllCheckpoints()
- if (!ElasticityUtils.wasElasticityEnabled(checkpointMap)) {
- if (checkpoint != null) {
- return Map(taskName -> checkpoint.getOffsets.asScala.toMap)
- } else {
- info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
- return Map(taskName -> Map())
- }
+ if (checkpoint != null) {
+ Map(taskName -> checkpoint.getOffsets.asScala.toMap)
+ } else {
+ info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
+ Map(taskName -> Map())
}
- info("There was elasticity enabled in one of the previous deploys." +
- "Last processed offsets computation at container start will use elasticity checkpoints if available.")
- Map(taskName -> ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(taskName,
- systemStreamPartitions.get(taskName).get.asJava, checkpointMap, systemAdmins).asScala)
}
/**
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 606f32a..75f4a8d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -226,7 +226,7 @@
// if elasticity is enabled aka elasticity factor > 1
// then this TaskInstance processes only those envelopes whose key falls
// within the keyBucket of the SSP assigned to the task.
- val incomingMessageSsp = getIncomingMessageSSP(envelope)
+ val incomingMessageSsp = envelope.getSystemStreamPartition(elasticityFactor)
if (!ssp2CaughtupMapping.getOrElse(incomingMessageSsp,
throw new SamzaException(incomingMessageSsp + " is not registered!"))) {
@@ -566,7 +566,7 @@
// if elasticity is enabled aka elasticity factor > 1
// then this TaskInstance handles only those envelopes whose key falls
// within the keyBucket of the SSP assigned to the task.
- var incomingMessageSsp = getIncomingMessageSSP(envelope)
+ val incomingMessageSsp = envelope.getSystemStreamPartition(elasticityFactor)
if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) {
ssp2CaughtupMapping(incomingMessageSsp) = true
@@ -629,28 +629,4 @@
startingOffset
}
-
- private def getIncomingMessageSSP(envelope: IncomingMessageEnvelope): SystemStreamPartition = {
- if (elasticityFactor <= 1) {
- return envelope.getSystemStreamPartition
- }
- // if elasticityFactor > 1, find the SSP with keyBucket
- var incomingMessageSsp = envelope.getSystemStreamPartition(elasticityFactor)
-
- // if envelope is end of stream or watermark or drain,
- // it needs to be routed to all tasks consuming the ssp irresp of keyBucket
- val messageType = MessageType.of(envelope.getMessage)
- if (envelope.isEndOfStream()
- || envelope.isDrain()
- || messageType == MessageType.END_OF_STREAM
- || messageType == MessageType.WATERMARK) {
-
- incomingMessageSsp = systemStreamPartitions
- .filter(ssp => ssp.getSystemStream.equals(incomingMessageSsp.getSystemStream)
- && ssp.getPartition.equals(incomingMessageSsp.getPartition))
- .toIterator.next()
- debug("for watermark or end-of-stream or drain envelope, found incoming ssp as {}".format(incomingMessageSsp))
- }
- incomingMessageSsp
- }
}
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
index e69a973..ba42264 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
@@ -40,7 +40,7 @@
// Serialize checkpoint as maps keyed by the SSP.toString() to the another map of the constituent SSP components
// and offset. Jackson can't automatically serialize the SSP since it's not a POJO and this avoids
// having to wrap it another class while maintaining readability.
- // { "SSP.toString()" -> {"system": system, "stream": stream, "partition": partition, "keyBucket": keyBucket, "offset": offset)}
+ // { "SSP.toString()" -> {"system": system, "stream": stream, "partition": partition, "offset": offset)}
def fromBytes(bytes: Array[Byte]): CheckpointV1 = {
try {
val jMap = jsonMapper.readValue(bytes, classOf[util.HashMap[String, util.HashMap[String, String]]])
@@ -55,6 +55,7 @@
require(partition != null, "Partition must be present in JSON-encoded SystemStreamPartition")
val offset = sspInfo.get("offset")
// allow null offsets, e.g. for changelog ssps
+
new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset
}
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index f56af29..a7458db 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -707,16 +707,6 @@
}
assertTrue(exceptionCaught);
- jobConfig =
- new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(17))));
- exceptionCaught = false;
- try {
- jobConfig.getElasticityFactor();
- } catch (ConfigException e) {
- exceptionCaught = true;
- }
- assertTrue(exceptionCaught);
-
jobConfig = new JobConfig(new MapConfig());
assertEquals(JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, jobConfig.getElasticityFactor());
}
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index e584974..16ef93d 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -225,39 +225,6 @@
}
@Test
- public void testEndOfStreamElasticityEnabled() {
-
- TaskName taskName0 = new TaskName(p0.toString() + " 0");
- TaskName taskName1 = new TaskName(p0.toString() + " 1");
- SystemStreamPartition ssp = new SystemStreamPartition("testSystem", "testStreamA", p0);
- SystemStreamPartition ssp0 = new SystemStreamPartition("testSystem", "testStreamA", p0, 0);
- SystemStreamPartition ssp1 = new SystemStreamPartition("testSystem", "testStreamA", p0, 1);
-
- // create EOS IME such that its ssp keybucket maps to ssp0 and not to ssp1
- // task in the runloop should give this ime to both it tasks
- IncomingMessageEnvelope envelopeEOS = spy(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
- when(envelopeEOS.getSystemStreamPartition(2)).thenReturn(ssp0);
-
-
- // two task in the run loop that processes ssp0 -> 0th keybucket of ssp and ssp1 -> 1st keybucket of ssp
- // EOS ime should be given to both the tasks irresp of the keybucket
- RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
- RunLoopTask task1 = getMockRunLoopTask(taskName1, ssp1);
-
- SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
- when(consumerMultiplexer.choose(false)).thenReturn(envelopeEOS).thenReturn(null);
-
- Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0, taskName1, task1);
- int maxMessagesInFlight = 1;
- RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
- callbackTimeoutMs, maxThrottlingDelayMs, 0, containerMetrics, () -> 0L, false, 2, null);
- runLoop.run();
-
- verify(task0).endOfStream(any());
- verify(task1).endOfStream(any());
- }
-
- @Test
public void testDrainWithElasticityEnabled() {
TaskName taskName0 = new TaskName(p0.toString() + " 0");
TaskName taskName1 = new TaskName(p0.toString() + " 1");
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
index a82a9a1..e99e3f2 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
@@ -93,12 +93,12 @@
GroupByPartition grouper = new GroupByPartition(config);
Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ab1, ab2, ac0));
Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
- .put(new TaskName("Partition 0_0_2"), ImmutableSet.of(new SystemStreamPartition(aa0, 0), new SystemStreamPartition(ac0, 0)))
- .put(new TaskName("Partition 0_1_2"), ImmutableSet.of(new SystemStreamPartition(aa0, 1), new SystemStreamPartition(ac0, 1)))
- .put(new TaskName("Partition 1_0_2"), ImmutableSet.of(new SystemStreamPartition(aa1, 0), new SystemStreamPartition(ab1, 0)))
- .put(new TaskName("Partition 1_1_2"), ImmutableSet.of(new SystemStreamPartition(aa1, 1), new SystemStreamPartition(ab1, 1)))
- .put(new TaskName("Partition 2_0_2"), ImmutableSet.of(new SystemStreamPartition(aa2, 0), new SystemStreamPartition(ab2, 0)))
- .put(new TaskName("Partition 2_1_2"), ImmutableSet.of(new SystemStreamPartition(aa2, 1), new SystemStreamPartition(ab2, 1)))
+ .put(new TaskName("Partition 0 0"), ImmutableSet.of(new SystemStreamPartition(aa0, 0), new SystemStreamPartition(ac0, 0)))
+ .put(new TaskName("Partition 0 1"), ImmutableSet.of(new SystemStreamPartition(aa0, 1), new SystemStreamPartition(ac0, 1)))
+ .put(new TaskName("Partition 1 0"), ImmutableSet.of(new SystemStreamPartition(aa1, 0), new SystemStreamPartition(ab1, 0)))
+ .put(new TaskName("Partition 1 1"), ImmutableSet.of(new SystemStreamPartition(aa1, 1), new SystemStreamPartition(ab1, 1)))
+ .put(new TaskName("Partition 2 0"), ImmutableSet.of(new SystemStreamPartition(aa2, 0), new SystemStreamPartition(ab2, 0)))
+ .put(new TaskName("Partition 2 1"), ImmutableSet.of(new SystemStreamPartition(aa2, 1), new SystemStreamPartition(ab2, 1)))
.build();
assertEquals(expectedResult, result);
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
index 90628b3..13b7678 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
@@ -84,22 +84,14 @@
Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ac0));
Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
- .put(new TaskName(new SystemStreamPartition(aa0, 0).toString() + "_2"),
- ImmutableSet.of(new SystemStreamPartition(aa0, 0)))
- .put(new TaskName(new SystemStreamPartition(aa0, 1).toString() + "_2"),
- ImmutableSet.of(new SystemStreamPartition(aa0, 1)))
- .put(new TaskName(new SystemStreamPartition(aa1, 0).toString() + "_2"),
- ImmutableSet.of(new SystemStreamPartition(aa1, 0)))
- .put(new TaskName(new SystemStreamPartition(aa1, 1).toString() + "_2"),
- ImmutableSet.of(new SystemStreamPartition(aa1, 1)))
- .put(new TaskName(new SystemStreamPartition(aa2, 0).toString() + "_2"),
- ImmutableSet.of(new SystemStreamPartition(aa2, 0)))
- .put(new TaskName(new SystemStreamPartition(aa2, 1).toString() + "_2"),
- ImmutableSet.of(new SystemStreamPartition(aa2, 1)))
- .put(new TaskName(new SystemStreamPartition(ac0, 0).toString() + "_2"),
- ImmutableSet.of(new SystemStreamPartition(ac0, 0)))
- .put(new TaskName(new SystemStreamPartition(ac0, 1).toString() + "_2"),
- ImmutableSet.of(new SystemStreamPartition(ac0, 1)))
+ .put(new TaskName(new SystemStreamPartition(aa0, 0).toString()), ImmutableSet.of(new SystemStreamPartition(aa0, 0)))
+ .put(new TaskName(new SystemStreamPartition(aa0, 1).toString()), ImmutableSet.of(new SystemStreamPartition(aa0, 1)))
+ .put(new TaskName(new SystemStreamPartition(aa1, 0).toString()), ImmutableSet.of(new SystemStreamPartition(aa1, 0)))
+ .put(new TaskName(new SystemStreamPartition(aa1, 1).toString()), ImmutableSet.of(new SystemStreamPartition(aa1, 1)))
+ .put(new TaskName(new SystemStreamPartition(aa2, 0).toString()), ImmutableSet.of(new SystemStreamPartition(aa2, 0)))
+ .put(new TaskName(new SystemStreamPartition(aa2, 1).toString()), ImmutableSet.of(new SystemStreamPartition(aa2, 1)))
+ .put(new TaskName(new SystemStreamPartition(ac0, 0).toString()), ImmutableSet.of(new SystemStreamPartition(ac0, 0)))
+ .put(new TaskName(new SystemStreamPartition(ac0, 1).toString()), ImmutableSet.of(new SystemStreamPartition(ac0, 1)))
.build();
assertEquals(expectedResult, result);
diff --git a/samza-core/src/test/java/org/apache/samza/elasticity/util/TestElasticityUtils.java b/samza-core/src/test/java/org/apache/samza/elasticity/util/TestElasticityUtils.java
deleted file mode 100644
index dc6bfbb..0000000
--- a/samza-core/src/test/java/org/apache/samza/elasticity/util/TestElasticityUtils.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.elasticity.util;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.samza.Partition;
-import org.apache.samza.checkpoint.Checkpoint;
-import org.apache.samza.checkpoint.CheckpointId;
-import org.apache.samza.checkpoint.CheckpointV2;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-
-// #TODO: going to make this entire class parametrized.
-public class TestElasticityUtils {
- private static final TaskName TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0");
- private static final TaskName ELASTIC_TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0_1_2");
- private static final TaskName TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0]");
- private static final TaskName ELASTIC_TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2");
-
- @Test
- public void testComputeLastProcessedOffsetsFromCheckpointMap() {
- // Setup :
- // there is one ssp = SystemStreamPartition [systemA, streamB, partition(0)] consumed by the job
- // Note: Partition 0_1_2 means task consumes keyBucket 1 of partition 0 and has elasticityFactor 2.
- // Before elasticity, job has one task with name "Partition 0"
- // with elasticity factor 2, job has 2 tasks with names "Partition 0_0_2" and "Partition 0_1_2"
- // Partition 0_0_2 consumes SSP[systemA, stream B, partition(0), keyBucket(0)]
- // Partition 0_1_2 consumes SSP[systemA, stream B, partition(0), keyBucket(1)]
- // with elasticity factor 4, job has 4 tasks with names "Partition 0_0_4", "Partition 0_1_4", "Partition 0_2_4" and "Partition 0_3_4"
- // Partition 0_0_4 consumes SSP[systemA, stream B, partition(0), keyBucket(0)]
- // Partition 0_1_4 consumes SSP[systemA, stream B, partition(0), keyBucket(1)]
- // Partition 0_2_4 consumes SSP[systemA, stream B, partition(0), keyBucket(2)]
- // Partition 0_3_4 consumes SSP[systemA, stream B, partition(0), keyBucket(3)]
-
- //
- // From the definition of keyBucket computation using elasticity factor in
- // {@link IncomingMessageEnvelope.getSystemStresamPartition(elasticityFactor) as
- // keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor
- // messages processed by 0_0_4 and 0_2_4 will be the same as those processed by 0_0_2
- // messages processed by 0_1_4 and 0_3_4 will be the same as those processed by 0_1_2
- // messages processed by 0_0_2 and 0_1_2 will be the same as those processed by Partition 0 itself
-
- TaskName taskName = new TaskName("Partition 0_0_2");
- Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
- SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
- SystemStreamPartition ssp0 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 0);
- SystemStreamPartition ssp2 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 2);
-
-
- SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
- // offsets ordering 1 < 2 < 3 < 4
- Mockito.when(mockSystemAdmin.offsetComparator("1", "2")).thenReturn(-1);
- Mockito.when(mockSystemAdmin.offsetComparator("2", "1")).thenReturn(1);
- Mockito.when(mockSystemAdmin.offsetComparator("1", "3")).thenReturn(-1);
- Mockito.when(mockSystemAdmin.offsetComparator("3", "1")).thenReturn(1);
- Mockito.when(mockSystemAdmin.offsetComparator("1", "4")).thenReturn(-1);
- Mockito.when(mockSystemAdmin.offsetComparator("4", "1")).thenReturn(1);
- Mockito.when(mockSystemAdmin.offsetComparator("2", "3")).thenReturn(-1);
- Mockito.when(mockSystemAdmin.offsetComparator("3", "2")).thenReturn(1);
- Mockito.when(mockSystemAdmin.offsetComparator("2", "4")).thenReturn(-1);
- Mockito.when(mockSystemAdmin.offsetComparator("4", "2")).thenReturn(1);
- Mockito.when(mockSystemAdmin.offsetComparator("3", "4")).thenReturn(-1);
- Mockito.when(mockSystemAdmin.offsetComparator("4", "3")).thenReturn(1);
-
- SystemAdmins mockSystemAdmins = Mockito.mock(SystemAdmins.class);
- Mockito.when(mockSystemAdmins.getSystemAdmin(ssp0.getSystem())).thenReturn(mockSystemAdmin);
-
- // case 1: for task Partition 0_0_2: last deploy was with ef = 2 itself.
- // hence "Partition 0_0_2" has the largest offset and that should be used for computing checkpoint for 0_0_2 now also
- checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1"));
- checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "4"));
- checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "2"));
- checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "3"));
- Map<SystemStreamPartition, String> result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
- taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
- Assert.assertEquals("4", result.get(ssp0));
-
- // case 2: for task Partition 0_0_2: last deploy was with ef =1
- // hence "Partition 0" has the largest offset. Computing checkpint for 0_0_2 should use this largest offset
- checkpointMap = new HashMap<>();
- checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "4"));
- checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "1"));
- checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3"));
- checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "2"));
-
-
- result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
- taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
- Assert.assertEquals("4", result.get(ssp0));
-
-
- // case 3: for task partition 0_0_2: last deploy was with ef = 4
- // hence checkpoints of Partition 0_0_4 and Partition 0_3_4 are relevant.
- // since messages from both end up in 0_0_2 with ef=2, need to take min of their checkpointed offsets
-
- checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1"));
- checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "2"));
- checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3"));
- checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "4"));
- result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
- taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
- Assert.assertEquals("3", result.get(ssp0));
- }
- @Test
- public void testComputeLastProcessedOffsetsWithEdgeCases() {
- TaskName taskName = new TaskName("Partition 0_0_2");
- Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
- SystemStreamPartition ssp0 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 0);
-
- SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
- SystemAdmins mockSystemAdmins = Mockito.mock(SystemAdmins.class);
- Mockito.when(mockSystemAdmins.getSystemAdmin(ssp0.getSystem())).thenReturn(mockSystemAdmin);
-
- // case 1: empty checkpoint map
- Map<SystemStreamPartition, String> result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
- taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
- Assert.assertTrue("if given checkpoint map is empty, return empty last processed offsets map", result.isEmpty());
-
- // case 2: null checkpoints given for some ancestor tasks
- checkpointMap.put(new TaskName("Partition 0"), null);
- result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
- taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
- Assert.assertTrue("if given checkpoint map has null checkpoint, return empty last processed offsets map", result.isEmpty());
-
- // case 3: null checkpoints given for some descendant tasks
- checkpointMap.put(new TaskName("Partition 0_0_4"), null);
- result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
- taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
- Assert.assertTrue("if given checkpoint map has null checkpoint, return empty last processed offsets map", result.isEmpty());
- }
-
- @Test
- public void testTaskIsGroupByPartitionOrGroupBySSP() {
- String msgPartition = "GroupByPartition task should start with Partition";
- String msgSsp = "GroupBySystemStreamPartition task should start with SystemStreamPartition";
-
- Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_PARTITION));
- Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_PARTITION));
-
- Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_PARTITION));
- Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask(
- ELASTIC_TASKNAME_GROUP_BY_PARTITION));
-
- Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_SSP));
- Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_SSP));
-
- Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP));
- Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP));
-
- TaskName taskName = new TaskName("FooBar");
- Assert.assertFalse(msgPartition, ElasticityUtils.isGroupByPartitionTask(taskName));
- Assert.assertFalse(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(taskName));
- }
-
- @Test
- public void testIsTaskNameElastic() {
- Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_SSP));
- Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_SSP));
- Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_PARTITION));
- Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_PARTITION));
- }
-
- @Test
- public void testGetElasticTaskNameParts() {
- TaskNameComponents taskNameComponents = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_PARTITION);
- Assert.assertEquals(taskNameComponents.partition, 0);
- Assert.assertEquals(taskNameComponents.keyBucket, TaskNameComponents.DEFAULT_KEY_BUCKET);
- Assert.assertEquals(taskNameComponents.elasticityFactor, TaskNameComponents.DEFAULT_ELASTICITY_FACTOR);
-
- taskNameComponents = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_PARTITION);
- Assert.assertEquals(taskNameComponents.partition, 0);
- Assert.assertEquals(taskNameComponents.keyBucket, 1);
- Assert.assertEquals(taskNameComponents.elasticityFactor, 2);
-
- taskNameComponents = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_SSP);
- Assert.assertEquals(taskNameComponents.system, "systemA");
- Assert.assertEquals(taskNameComponents.stream, "streamB");
- Assert.assertEquals(taskNameComponents.partition, 0);
- Assert.assertEquals(taskNameComponents.keyBucket, TaskNameComponents.DEFAULT_KEY_BUCKET);
- Assert.assertEquals(taskNameComponents.elasticityFactor, TaskNameComponents.DEFAULT_ELASTICITY_FACTOR);
-
- taskNameComponents = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_SSP);
- Assert.assertEquals(taskNameComponents.system, "systemA");
- Assert.assertEquals(taskNameComponents.stream, "streamB");
- Assert.assertEquals(taskNameComponents.partition, 0);
- Assert.assertEquals(taskNameComponents.keyBucket, 1);
- Assert.assertEquals(taskNameComponents.elasticityFactor, 2);
-
- taskNameComponents = ElasticityUtils.getTaskNameParts(new TaskName("FooBar"));
- Assert.assertEquals(taskNameComponents.partition, TaskNameComponents.INVALID_PARTITION);
- }
-
- @Test
- public void testIsOtherTaskAncestorDescendantOfCurrentTask() {
- TaskName task0 = new TaskName("Partition 0");
- TaskName task1 = new TaskName("Partition 1");
- TaskName task002 = new TaskName("Partition 0_0_2");
- TaskName task012 = new TaskName("Partition 0_1_2");
- TaskName task004 = new TaskName("Partition 0_0_4");
- TaskName task014 = new TaskName("Partition 0_1_4");
- TaskName task024 = new TaskName("Partition 0_2_4");
- TaskName task034 = new TaskName("Partition 0_3_4");
-
- TaskName sspTask0 = new TaskName("SystemStreamPartition [systemA, streamB, 0]");
- TaskName sspTask002 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_2");
- TaskName sspTask012 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2");
- TaskName sspTask004 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_4");
- TaskName sspTask014 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_4");
- TaskName sspTask024 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 2]_4");
- TaskName sspTask034 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 3]_4");
-
- // Partition 0 is ancestor of all tasks Partition 0_0_2, 0_1_2, 0_0_4, 0_1_4, 0_2_4, 0_3_4 and itself
- // and all these tasks are descendants of Partition 0 (except itself)
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task0));
- Assert.assertFalse(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task1));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task0));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task012, task0));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task0));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task014, task0));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task0));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task034, task0));
-
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task002));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task012));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task004));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task014));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task024));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task034));
-
- // Partition 0_0_2 is ancestor of tasks Partition 0_0_4 and 0_2_4 and itself
- // these tasks are descendants of 0_0_2
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task002));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task002));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task002));
-
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task004));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task024));
-
- // "SystemStreamPartition [systemA, streamB, 0]
- // is ancestor of all tasks "SystemStreamPartition [systemA, streamB, 0, 0]_2, [systemA, streamB, 0, 1]_2 and the rest incl itself
- // and all these tasks are descendants of Partition 0 (except itself)
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask0, sspTask0));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask0));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask012, sspTask0));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask0));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask014, sspTask0));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask0));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask034, sspTask0));
-
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask002));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask012));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask004));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask014));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask024));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask034));
-
- // SystemStreamPartition [systemA, streamB, 0, 0]_2 is ancestor of
- // tasks SystemStreamPartition [systemA, streamB, 0, 0]_4, SystemStreamPartition [systemA, streamB, 0, 2]_4 and itself
- // similarly, these tasks are descendants of SystemStreamPartition [systemA, streamB, 0, 0]_2
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask002));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask002));
- Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask002));
-
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask004));
- Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask024));
- }
-
- @Test
- public void testGetAncestorAndDescendantCheckpoints() {
- TaskName taskName = new TaskName("Partition 0_0_2");
- Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
- SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
- Checkpoint ansCheckpoint1 = buildCheckpointV2(ssp, "1");
- Checkpoint ansCheckpoint2 = buildCheckpointV2(ssp, "2");
- Checkpoint desCheckpoint1 = buildCheckpointV2(ssp, "3");
- Checkpoint desCheckpoint2 = buildCheckpointV2(ssp, "4");
- Checkpoint unrelCheckpoint = buildCheckpointV2(ssp, "5");
- Set<Checkpoint> ansCheckpointSet = new HashSet<>(Arrays.asList(ansCheckpoint1, ansCheckpoint2));
- Set<Checkpoint> desCheckpointSet = new HashSet<>(Arrays.asList(desCheckpoint1, desCheckpoint2));
-
- checkpointMap.put(new TaskName("Partition 0"), ansCheckpoint1);
- checkpointMap.put(new TaskName("Partition 0_0_2"), ansCheckpoint2);
- checkpointMap.put(new TaskName("Partition 0_0_4"), desCheckpoint1);
- checkpointMap.put(new TaskName("Partition 0_2_4"), desCheckpoint2);
- checkpointMap.put(new TaskName("Partition 0_1_4"), unrelCheckpoint);
-
- Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> result =
- ElasticityUtils.getAncestorAndDescendantCheckpoints(taskName, checkpointMap);
- Set<Checkpoint> anscestorCheckpointSet = result.getLeft();
- Set<Checkpoint> descendantCheckpointSetForEf4 = result.getRight().get(4);
-
- Assert.assertTrue("should contain all ancestors' checkpoints",
- anscestorCheckpointSet.containsAll(ansCheckpointSet));
- Assert.assertFalse("should not contain a descendant checkpoint in anscetor list",
- anscestorCheckpointSet.contains(desCheckpoint1));
- Assert.assertFalse("should not contain an unrelated checkpoint in ancestor list",
- anscestorCheckpointSet.contains(unrelCheckpoint));
-
- Assert.assertTrue("should contain all descendants' checkpoints",
- descendantCheckpointSetForEf4.containsAll(desCheckpointSet));
- Assert.assertFalse("should not contain a anscetor checkpoint in descendant list",
- descendantCheckpointSetForEf4.contains(ansCheckpoint1));
- Assert.assertFalse("should not contain an unrelated checkpoint in descendant list",
- descendantCheckpointSetForEf4.contains(unrelCheckpoint));
- }
-
- @Test
- public void testGetOffsetForSSPInCheckpoint() {
- String offset1 = "1111";
- String offset2 = "2222";
- // case 1: when looking for exact ssp
- SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
- Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1);
- Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset1);
-
- // case 2: checkpoint has ssp with key bucket but looking for the full ssp (same system stream and partition but without keybucket)
- SystemStreamPartition sspWithKB = new SystemStreamPartition("systemA", "streamB", new Partition(0), 1);
- checkpoint1 = buildCheckpointV2(sspWithKB, offset2);
- Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset2);
-
- // case 3: try getting offset for an ssp not present in the checkpoint -> should return null
- SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(1));
- Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp2), null);
- }
-
- @Test
- public void testGetMaxMinOffsetForSSPInCheckpointSet() {
- String offset1 = "1111";
- String offset2 = "2222";
-
- SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
- Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1);
- Checkpoint checkpoint2 = buildCheckpointV2(ssp, offset2);
- Set<Checkpoint> checkpointSet = new HashSet<>(Arrays.asList(checkpoint1, checkpoint2));
-
- SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
- // offset 1 < offset2
- Mockito.when(mockSystemAdmin.offsetComparator(offset1, offset2)).thenReturn(-1);
- Mockito.when(mockSystemAdmin.offsetComparator(offset2, offset1)).thenReturn(1);
-
- // case 1: when exact ssp is in checkpoint set
- Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin));
- Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin));
-
- // case 2: when looking for ssp with keyBucket 1 whereas checkpoint set only has full ssp (same system stream and partition but without keybucket)
- SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, 1);
- Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin));
- Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin));
-
-
- // case 3: when ssp not in checkpoint set -> should receive null for min and max offset
- SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(0));
- Assert.assertEquals(null, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin));
- Assert.assertEquals(null, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin));
- }
-
- @Test
- public void testWasElasticityEnabled() {
- Checkpoint checkpoint1 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0)), "1");
- Checkpoint checkpoint2 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(1)), "2");
- Checkpoint checkpoint3 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0), 0), "3");
- Checkpoint checkpoint4 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0), 1), "4");
-
- // case 0: empty checkpoint map
- Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(new HashMap<>()));
-
- // case 1: no tasks with elasticity enabled in the checkpoint map
- Map<TaskName, Checkpoint> checkpointMap1 = new HashMap<>();
- checkpointMap1.put(new TaskName("Partition 0"), checkpoint1);
- checkpointMap1.put(new TaskName("Partition 2"), checkpoint2);
- Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap1));
-
- // case 2: tasks with no elasticity and tasks with elasticity both present in the checkpoint map
- Map<TaskName, Checkpoint> checkpointMap2 = new HashMap<>();
- checkpointMap2.put(new TaskName("Partition 0"), checkpoint1);
- checkpointMap2.put(new TaskName("Partition 2"), checkpoint2);
- checkpointMap2.put(new TaskName("Partition 0_0_2"), checkpoint3);
- Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap2));
-
- // case 3: only tasks with elasticity present in the checkpoint map
- Map<TaskName, Checkpoint> checkpointMap3 = new HashMap<>();
- checkpointMap3.put(new TaskName("Partition 0_0_2"), checkpoint3);
- checkpointMap3.put(new TaskName("Partition 0_1_2"), checkpoint4);
- Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap3));
-
- // case 4: repeat same checks with GroupBySSP grouper tasks
- Map<TaskName, Checkpoint> checkpointMap4 = new HashMap<>();
- checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0]"), checkpoint1);
- checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 1]"), checkpoint2);
- Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap4));
- checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0, 0]_2"), checkpoint3);
- checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0, 1]_2"), checkpoint4);
- Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap4));
-
- // case 5: repeat same checks with AllSspToSingleTask grouper tasks - no elasticity supported for this grouper
- Map<TaskName, Checkpoint> checkpointMap5 = new HashMap<>();
- checkpointMap5.put(new TaskName("Task-0"), checkpoint1);
- checkpointMap5.put(new TaskName("Task-1"), checkpoint2);
- Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap5));
- }
-
- private static CheckpointV2 buildCheckpointV2(SystemStreamPartition ssp, String offset) {
- return new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, offset),
- ImmutableMap.of("backend", ImmutableMap.of("store", "10")));
- }
-}
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 6583240..2ddecf0 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -340,7 +340,7 @@
assertEquals(serializedSSPAsJson.get("partition"), deserSSPAsJson.get("partition"));
assertEquals(serializedSSPAsJson.get("keyBucket"), deserSSPAsJson.get("-1"));
- //Scenario 2: ssp serialized with new elasticMapper and deserialized by old preElastic Mapper
+ //Scenario 1: ssp serialized with new elasticMapper and deserialized by old preElastic Mapper
SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
serializedString = elasticObjectMapper.writeValueAsString(sspWithKeyBucket);
@@ -356,37 +356,6 @@
assertEquals(serializedSSPAsJson.get("system"), deserSSPAsJson.get("system"));
assertEquals(serializedSSPAsJson.get("stream"), deserSSPAsJson.get("stream"));
assertEquals(serializedSSPAsJson.get("partition"), deserSSPAsJson.get("partition"));
-
- // scenario 3: ssp as key serialized with preElasticMapper and deserialized by new Mapper with elasticity
- Map<SystemStreamPartition, String> offsets = new HashMap<>();
- String offset = "100";
-
- String sspmapString = preElasticObjectMapper.writeValueAsString(ImmutableMap.of(ssp, offset));
-
- TypeReference<HashMap<SystemStreamPartition, String>> typeRef
- = new TypeReference<HashMap<SystemStreamPartition, String>>() { };
-
- Map<SystemStreamPartition, String> deserSSPMap = elasticObjectMapper.readValue(sspmapString, typeRef);
- SystemStreamPartition deserSSP = deserSSPMap.keySet().stream().findAny().get();
- String deserOffset = deserSSPMap.values().stream().findFirst().get();
- assertEquals(ssp.getSystem(), deserSSP.getSystem());
- assertEquals(ssp.getStream(), deserSSP.getStream());
- assertEquals(ssp.getPartition(), deserSSP.getPartition());
- assertEquals(ssp.getKeyBucket(), deserSSP.getKeyBucket());
- assertEquals(offset, deserOffset);
-
- // Scenario 4: ssp key serialized with new elasticMapper and deserialized by old preElastic Mapper
- sspmapString = elasticObjectMapper.writeValueAsString(ImmutableMap.of(sspWithKeyBucket, offset));
-
- deserSSPMap = preElasticObjectMapper.readValue(sspmapString, typeRef);
- deserSSP = deserSSPMap.keySet().stream().findAny().get();
- deserOffset = deserSSPMap.values().stream().findFirst().get();
- assertEquals(sspWithKeyBucket.getSystem(), deserSSP.getSystem());
- assertEquals(sspWithKeyBucket.getStream(), deserSSP.getStream());
- assertEquals(sspWithKeyBucket.getPartition(), deserSSP.getPartition());
- // preElastic mapper does not know about KeyBucket so dont check for it
- assertEquals(offset, deserOffset);
-
}
private JobModel deserializeFromObjectNode(ObjectNode jobModelJson) throws IOException {
@@ -440,12 +409,14 @@
private static class OldSystemStreamPartitionKeyDeserializer extends KeyDeserializer {
@Override
public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException {
- String[] parts = sspString.split("\\.");
- if (parts.length < 3 || parts.length > 4) {
- throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' "
- + "or 'system.stream.partition.keyBucket");
+ int idx = sspString.indexOf('.');
+ int lastIdx = sspString.lastIndexOf('.');
+ if (idx < 0 || lastIdx < 0) {
+ throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition");
}
- return new SystemStreamPartition(new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
+ return new SystemStreamPartition(
+ new SystemStream(sspString.substring(0, idx), sspString.substring(idx + 1, lastIdx)),
+ new Partition(Integer.parseInt(sspString.substring(lastIdx + 1))));
}
}
public static ObjectMapper getOldDeserForSSpKeyObjectMapper() {
@@ -483,27 +454,6 @@
}
}
- private static class PreElasticitySystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
- @Override
- public void serialize(SystemStreamPartition ssp, JsonGenerator jgen, SerializerProvider provider) throws IOException {
- String sspString = ssp.getSystem() + "." + ssp.getStream() + "."
- + String.valueOf(ssp.getPartition().getPartitionId());
- jgen.writeFieldName(sspString);
- }
- }
-
- private static class PreElasticitySystemStreamPartitionKeyDeserializer extends KeyDeserializer {
- @Override
- public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException {
- String[] parts = sspString.split("\\.");
- if (parts.length < 3) {
- throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition");
- }
- return new SystemStreamPartition(
- new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
- }
- }
-
private static final ObjectMapper OBJECT_MAPPER = getPreEleasticObjectMapper();
public static ObjectMapper getPreEleasticObjectMapper() {
ObjectMapper mapper = new ObjectMapper();
@@ -514,13 +464,13 @@
// Setup custom serdes for simple data types.
module.addSerializer(Partition.class, new SamzaObjectMapper.PartitionSerializer());
module.addSerializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionSerializer());
- module.addKeySerializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionKeySerializer());
+ module.addKeySerializer(SystemStreamPartition.class, new SamzaObjectMapper.SystemStreamPartitionKeySerializer());
module.addSerializer(TaskName.class, new SamzaObjectMapper.TaskNameSerializer());
module.addSerializer(TaskMode.class, new SamzaObjectMapper.TaskModeSerializer());
module.addDeserializer(TaskName.class, new SamzaObjectMapper.TaskNameDeserializer());
module.addDeserializer(Partition.class, new SamzaObjectMapper.PartitionDeserializer());
module.addDeserializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionDeserializer());
- module.addKeyDeserializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionKeyDeserializer());
+ module.addKeyDeserializer(SystemStreamPartition.class, new SamzaObjectMapper.SystemStreamPartitionKeyDeserializer());
module.addDeserializer(Config.class, new SamzaObjectMapper.ConfigDeserializer());
module.addDeserializer(TaskMode.class, new SamzaObjectMapper.TaskModeDeserializer());
module.addSerializer(CheckpointId.class, new SamzaObjectMapper.CheckpointIdSerializer());
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index d0039a9..c2d24a9 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -585,47 +585,6 @@
}
@Test
- def testElasticityUpdateWithoutKeyBucket: Unit = {
- // When elasticity is enabled, task consumes a part of the full SSP represented by SSP With KeyBucket.
- // OffsetManager tracks the set of SSP with KeyBucket consumed by a task.
- // However, after an IME processing is complete, OffsetManager.update is called without KeyBuket.
- // OffsetManager has to find and udpate the last processed offset for the task correctly for its SSP with KeyBucket.
- val taskName = new TaskName("c")
- val systemStream = new SystemStream("test-system", "test-stream")
- val partition = new Partition(0)
- val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
- val systemStreamPartitionWithKeyBucket = new SystemStreamPartition(systemStreamPartition, 0);
- val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
- val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
- val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
- val startpointManagerUtil = getStartpointManagerUtil()
- val systemAdmins = mock(classOf[SystemAdmins])
- when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
- val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, startpointManagerUtil.getStartpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
- // register task and its input SSP with KeyBucket
- offsetManager.register(taskName, Set(systemStreamPartitionWithKeyBucket))
-
- offsetManager.start
-
- // update is called with only the full SSP and no keyBucket information.
- offsetManager.update(taskName, systemStreamPartition, "46")
- // Get checkpoint snapshot like we do at the beginning of TaskInstance.commit()
- val checkpoint46 = offsetManager.getLastProcessedOffsets(taskName)
- offsetManager.update(taskName, systemStreamPartition, "47") // Offset updated before checkpoint
- offsetManager.writeCheckpoint(taskName, new CheckpointV1(checkpoint46))
- // OffsetManager correctly updates the lastProcessedOffset and checkpoint for the task and input SSP with KeyBucket.
- assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartitionWithKeyBucket))
- assertEquals("46", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartitionWithKeyBucket).getValue)
-
- // Now write the checkpoint for the latest offset
- val checkpoint47 = offsetManager.getLastProcessedOffsets(taskName)
- offsetManager.writeCheckpoint(taskName, new CheckpointV1(checkpoint47))
-
- assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartitionWithKeyBucket))
- assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartitionWithKeyBucket).getValue)
- }
-
- @Test
def testStartpointUpdate: Unit = {
val taskName = new TaskName("c")
val systemStream = new SystemStream("test-system", "test-stream")
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 77b1ee3..08b082c 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -162,7 +162,7 @@
verify(processesCounter).inc()
verify(messagesActuallyProcessedCounter).inc()
- // case 2: taskInstance processes the keyBucket=0 of the ssp and envelope is NOT from same keyBucket
+ // case 1: taskInstance processes the keyBucket=0 of the ssp and envelope is NOT from same keyBucket
// taskInstance.process should throw the exception ssp is not registered.
when(envelope.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
val thrown = intercept[Exception] {
@@ -171,26 +171,6 @@
assert(thrown.isInstanceOf[SamzaException])
assert(thrown.getMessage.contains(notProcessedSSPKeyBucket.toString))
assert(thrown.getMessage.contains("is not registered!"))
-
- // case 3: taskInstance processes the keyBucket=0 of the ssp and envelope is watermark NOT from same keyBucket
- // regular processing should happen as Watermark and end of stream should be processed by all tasks
- val watermark = spy(IncomingMessageEnvelope.buildWatermarkEnvelope(SYSTEM_STREAM_PARTITION, 1234l))
- when(watermark.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
- this.taskInstance.process(watermark, coordinator, callbackFactory)
- assertEquals(2, this.taskInstanceExceptionHandler.numTimesCalled) // case 1 and case 3
- verify(this.task).processAsync(watermark, this.collector, coordinator, callback)
- verify(processesCounter, times(3)).inc() // case 1, 2 and 3
- verify(messagesActuallyProcessedCounter, times(2)).inc() // case 1 and 3
-
- // case 4: taskInstance processes the keyBucket=0 of the ssp and envelope is EndOfStream NOT from same keyBucket
- // regular processing should happen as Watermark and end of stream should be processed by all tasks
- val endOfStream = spy(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SYSTEM_STREAM_PARTITION))
- when(endOfStream.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
- this.taskInstance.process(endOfStream, coordinator, callbackFactory)
- assertEquals(3, this.taskInstanceExceptionHandler.numTimesCalled) // case 1 and case 3 and case 4
- verify(this.task).processAsync(endOfStream, this.collector, coordinator, callback)
- verify(processesCounter, times(4)).inc() // case 1, 2, 3 and 4
- verify(messagesActuallyProcessedCounter, times(3)).inc() // case 1 and 3 and 4
}
@Test
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala
index bb89798..02f2e59 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala
@@ -19,11 +19,10 @@
package org.apache.samza.serializers
-import com.fasterxml.jackson.databind.ObjectMapper
-
import java.util
+
import org.apache.samza.Partition
-import org.apache.samza.checkpoint.CheckpointV1
+import org.apache.samza.checkpoint.{CheckpointV1}
import org.apache.samza.container.TaskName
import org.apache.samza.system.SystemStreamPartition
import org.junit.Assert._
@@ -50,81 +49,4 @@
val checkpoint = checkpointSerde.fromBytes(checkpointBytes)
assertNull(checkpoint)
}
-
- @Test
- def testSSPWithKeyBucket {
- // case 1: write and read with serde that is aware of KeyBucket within SSP
- val serde = new CheckpointV1Serde
- var offsets = Map[SystemStreamPartition, String]()
- val ssp = new SystemStreamPartition("test-system", "test-stream",
- new Partition(777), -1)
- offsets += ssp -> "1"
- val deserializedOffsets = serde.fromBytes(serde.toBytes(new CheckpointV1(offsets.asJava)))
- assertEquals("1", deserializedOffsets.getOffsets.get(ssp))
- assertEquals(1, deserializedOffsets.getOffsets.size)
-
- // case 2: SSP was serialized by serde not aware of KeyBucket - aka did not put keyBucket into serialized form
- val deserializedOffsets2 = serde.fromBytes(toBytesWithoutKeyBucket(new CheckpointV1(offsets.asJava)))
- assertEquals("1", deserializedOffsets2.getOffsets.get(ssp))
- assertEquals(1, deserializedOffsets2.getOffsets.size)
-
- // case 3: SSP was serialized by serde aware of KeyBucket but deserialized by serde not aware of KeyBucket
- val deserializedOffsets3 = fromBytesWithoutKeyBucket(serde.toBytes(new CheckpointV1(offsets.asJava)))
- assertEquals("1", deserializedOffsets3.getOffsets.get(ssp))
- assertEquals(1, deserializedOffsets3.getOffsets.size)
-
- // case 4: SSP has keyBucket = 0 (aka not default -1) - serialize by serde NOT aware of keyBucket
- // when serialized with serde not aware of keyBucket, the info about keyBucket is lost,
- // we can only recover the system, stream and partition parts out during deserialization.
- // hence after deser, we need to look for SSP without key bucket
- val sspWithKeyBucket = new SystemStreamPartition("test-system", "test-stream",
- new Partition(777), 0)
- val sspWithoutKeyBucket = new SystemStreamPartition("test-system", "test-stream",
- new Partition(777))
- var offsets1 = Map[SystemStreamPartition, String]()
- offsets1 += sspWithKeyBucket -> "1"
-
- val deserializedOffsets4 = serde.fromBytes(toBytesWithoutKeyBucket(new CheckpointV1(offsets1.asJava)))
- assertEquals("1", deserializedOffsets4.getOffsets.get(sspWithoutKeyBucket))
- assertEquals(1, deserializedOffsets4.getOffsets.size)
-
- // case 5: SSP has KeyBucket = 0, serialized by serde aware of keyBucket
- val deserializedOffsets5 = fromBytesWithoutKeyBucket(serde.toBytes(new CheckpointV1(offsets1.asJava)))
- assertEquals("1", deserializedOffsets5.getOffsets.get(sspWithoutKeyBucket))
- assertEquals(1, deserializedOffsets5.getOffsets.size)
- }
-
- private def fromBytesWithoutKeyBucket(bytes: Array[Byte]): CheckpointV1 = {
- val jsonMapper = new ObjectMapper()
- val jMap = jsonMapper.readValue(bytes, classOf[util.HashMap[String, util.HashMap[String, String]]])
-
- def deserializeJSONMap(sspInfo:util.HashMap[String, String]) = {
- val system = sspInfo.get("system")
- val stream = sspInfo.get("stream")
- val partition = sspInfo.get("partition")
- val offset = sspInfo.get("offset")
- new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset
- }
- val cpMap = jMap.values.asScala.map(deserializeJSONMap).toMap
- new CheckpointV1(cpMap.asJava)
- }
-
- private def toBytesWithoutKeyBucket(checkpoint: CheckpointV1): Array[Byte] = {
- val jsonMapper = new ObjectMapper()
- val offsets = checkpoint.getOffsets
- val asMap = new util.HashMap[String, util.HashMap[String, String]](offsets.size())
-
- offsets.asScala.foreach {
- case (ssp, offset) =>
- val jMap = new util.HashMap[String, String](4)
- jMap.put("system", ssp.getSystemStream.getSystem)
- jMap.put("stream", ssp.getSystemStream.getStream)
- jMap.put("partition", ssp.getPartition.getPartitionId.toString)
- jMap.put("offset", offset)
-
- asMap.put(ssp.toString, jMap)
- }
-
- jsonMapper.writeValueAsBytes(asMap)
- }
}
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index cc97a70..7793b56 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -142,7 +142,16 @@
info(s"Reading checkpoint for taskName $taskName")
- populateTaskNamesToCheckpointsMap()
+ if (taskNamesToCheckpoints == null) {
+ info("Reading checkpoints for the first time")
+ taskNamesToCheckpoints = readCheckpoints()
+ if (stopConsumerAfterFirstRead) {
+ info("Stopping system consumer")
+ systemConsumer.stop()
+ }
+ } else if (!stopConsumerAfterFirstRead) {
+ taskNamesToCheckpoints ++= readCheckpoints()
+ }
val checkpoint: Checkpoint = taskNamesToCheckpoints.getOrElse(taskName, null)
@@ -151,14 +160,6 @@
}
/**
- * @inheritdoc
- */
- override def readAllCheckpoints(): util.Map[TaskName, Checkpoint] = {
- populateTaskNamesToCheckpointsMap()
- scala.collection.JavaConverters.mapAsJavaMapConverter(taskNamesToCheckpoints).asJava
- }
-
- /**
* @inheritdoc
*/
override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
@@ -409,17 +410,4 @@
throw new IllegalArgumentException("Unknown checkpoint key type: " + checkpointKey.getType)
}
}
-
- private def populateTaskNamesToCheckpointsMap() = {
- if (taskNamesToCheckpoints == null) {
- info("Reading checkpoints for the first time")
- taskNamesToCheckpoints = readCheckpoints()
- if (stopConsumerAfterFirstRead) {
- info("Stopping system consumer")
- systemConsumer.stop()
- }
- } else if (!stopConsumerAfterFirstRead) {
- taskNamesToCheckpoints ++= readCheckpoints()
- }
- }
}
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java
index d0e4586..fe9bfb1 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java
@@ -386,25 +386,6 @@
}
@Test
- public void testReadAllCheckpoints() throws InterruptedException {
- Config config = config(ImmutableMap.of(TaskConfig.CHECKPOINT_READ_VERSIONS, "1,2"));
- setupSystemFactory(config);
- CheckpointV2 checkpointV2ForTask0 = buildCheckpointV2(INPUT_SSP0, "0");
- CheckpointV2 checkpointV2ForTask1 = buildCheckpointV2(INPUT_SSP0, "1");
- List<IncomingMessageEnvelope> checkpointEnvelopes =
- ImmutableList.of(
- newCheckpointV2Envelope(TASK0, checkpointV2ForTask0, "0"),
- newCheckpointV2Envelope(TASK1, checkpointV2ForTask1, "1")
- );
- setupConsumer(checkpointEnvelopes);
- Map<TaskName, Checkpoint> checkpointMap = ImmutableMap.of(TASK0, checkpointV2ForTask0, TASK1, checkpointV2ForTask1);
- KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config);
- kafkaCheckpointManager.register(TASK0);
- Map<TaskName, Checkpoint> readCheckpoints = kafkaCheckpointManager.readAllCheckpoints();
- assertEquals(checkpointMap, readCheckpoints);
- }
-
- @Test
public void testWriteCheckpointV1() {
setupSystemFactory(config());
KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());