Revert elasticity commits - bug found with broadcast input (#1626)

Symptom: Broadcast input ssp is not consumed by all containers of the job.
Cause: Elasticity code changing systemconsumers and samza-core
Changes:

reverting all elasticity commits post SAMZA-2719: [Elasticity] fix container level metrics when elasticity is enabled #1585 after which this issue was detected.
updating the checkpointv1 serde to accept checkpoints written with SAMZA-2743: [Elasticity] Add keybucket into SSP serde for checkpoint #1608 

list of elasticity PRs being reverted -
#1625
#1610
#1608
#1607
#1603
#1598
#1597
#1596
#1589
diff --git a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
index 4e76bb2..4dcefaa 100644
--- a/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
+++ b/samza-api/src/main/java/org/apache/samza/checkpoint/CheckpointManager.java
@@ -19,8 +19,6 @@
 
 package org.apache.samza.checkpoint;
 
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.samza.container.TaskName;
 
 /**
@@ -68,15 +66,4 @@
    * Clear the checkpoints in the checkpoint stream.
    */
   default void clearCheckpoints() { }
-
-  /**
-   * Returns the last recorded checkpoint for all tasks present in the implementation-specific location.
-   * All tasks contains all the tasks within the current job model.
-   * All tasks also includes tasks which may have been part of the job model during a previous deploy.
-   * @return A Map of TaskName to Checkpoint object.
-   *         The Checkpoint object has the recorded offset data of the specified partition.
-   */
-  default Map<TaskName, Checkpoint> readAllCheckpoints() {
-    return new HashMap<>();
-  };
 }
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 1f4a740..3908170 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -132,9 +132,7 @@
     if (envelopeKeyorOffset == null) {
       return new SystemStreamPartition(systemStreamPartition, 0);
     }
-    // modulo 31 first to best spread out the hashcode and then modulo elasticityFactor for actual keyBucket
-    // Note: elasticityFactor <= 16 so modulo 31 is safe to do.
-    int keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
+    int keyBucket = Math.abs(envelopeKeyorOffset.hashCode()) % elasticityFactor;
     return new SystemStreamPartition(systemStreamPartition, keyBucket);
   }
 
diff --git a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
index d36ac66..b9aa82c 100644
--- a/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/JobConfig.java
@@ -190,6 +190,7 @@
   public static final String CONTAINER_HEARTBEAT_MONITOR_ENABLED = "job.container.heartbeat.monitor.enabled";
   private static final boolean CONTAINER_HEARTBEAT_MONITOR_ENABLED_DEFAULT = true;
 
+
   // Enabled elasticity for the job
   // number of (elastic) tasks in the job will be old task count X elasticity factor
   public static final String JOB_ELASTICITY_FACTOR = "job.elasticity.factor";
@@ -517,8 +518,8 @@
 
   public int getElasticityFactor() {
     int elasticityFactor = getInt(JOB_ELASTICITY_FACTOR, DEFAULT_JOB_ELASTICITY_FACTOR);
-    if (elasticityFactor < 1 || elasticityFactor > 16) {
-      throw new ConfigException("Elasticity factor can not be less than 1 or greater than 16");
+    if (elasticityFactor < 1) {
+      throw new ConfigException("Elasticity factor can not be less than 1");
     }
     return elasticityFactor;
   }
diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
index 5bbcbae..3334adc 100644
--- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java
@@ -860,19 +860,8 @@
         IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isEndOfStream()) {
-          if (elasticityFactor <= 1) {
-            SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-            processingSspSet.remove(ssp);
-          } else {
-            // if envelope is end of stream, the ssp of envelope should be removed from task's processing set irresp of keyBucket
-            SystemStreamPartition sspOfEnvelope = envelope.getSystemStreamPartition();
-            Optional<SystemStreamPartition> ssp = processingSspSet.stream()
-                .filter(sspInSet -> sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
-                    && sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
-                .findFirst();
-            ssp.ifPresent(processingSspSet::remove);
-            ssp.ifPresent(processingSspSetToDrain::remove);
-          }
+          SystemStreamPartition ssp = envelope.getSystemStreamPartition(elasticityFactor);
+          processingSspSet.remove(ssp);
           if (!hasIntermediateStreams) {
             pendingEnvelopeQueue.remove();
           }
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
index 53e7400..1efd172 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
@@ -64,7 +64,7 @@
         int keyBucket = elasticityFactor == 1 ? -1 : i;
         String taskNameStr = elasticityFactor == 1 ?
             String.format("Partition %d", ssp.getPartition().getPartitionId()) :
-            String.format("Partition %d_%d_%d", ssp.getPartition().getPartitionId(), keyBucket, elasticityFactor);
+            String.format("Partition %d %d", ssp.getPartition().getPartitionId(), keyBucket);
         TaskName taskName = new TaskName(taskNameStr);
         SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, keyBucket);
         groupedMap.putIfAbsent(taskName, new HashSet<>());
diff --git a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
index 652cf15..9443048 100644
--- a/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
+++ b/samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupBySystemStreamPartition.java
@@ -63,8 +63,7 @@
         SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, keyBucket);
         HashSet<SystemStreamPartition> sspSet = new HashSet<SystemStreamPartition>();
         sspSet.add(sspWithKeyBucket);
-        String elasticitySuffix = elasticityFactor == 1 ? "" : String.format("_%d", elasticityFactor);
-        groupedMap.put(new TaskName(sspWithKeyBucket.toString() + elasticitySuffix), sspSet);
+        groupedMap.put(new TaskName(sspWithKeyBucket.toString()), sspSet);
       }
     }
 
diff --git a/samza-core/src/main/java/org/apache/samza/elasticity/util/ElasticityUtils.java b/samza-core/src/main/java/org/apache/samza/elasticity/util/ElasticityUtils.java
deleted file mode 100644
index 90c3748..0000000
--- a/samza-core/src/main/java/org/apache/samza/elasticity/util/ElasticityUtils.java
+++ /dev/null
@@ -1,493 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.elasticity.util;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.samza.checkpoint.Checkpoint;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStreamPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Class with util methods to be used for checkpoint computation when elasticity is enabled
- * Elasticity is supported only  for tasks created by either
- * the {@link org.apache.samza.container.grouper.stream.GroupByPartition} SSP grouper or
- * the {@link org.apache.samza.container.grouper.stream.GroupBySystemStreamPartition} SSP grouper
- */
-public class ElasticityUtils {
-  private static final Logger log = LoggerFactory.getLogger(ElasticityUtils.class);
-
-  // GroupByPartition tasks have names like Partition 0_1_2
-  // where 0 is the partition number, 1 is the key bucket and 2 is the elasticity factor
-  // see {@link GroupByPartition.ELASTIC_TASK_NAME_FORMAT}
-  private static final String ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)_(\\d+)_(\\d+)";
-  private static final Pattern ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_PARTITION_REGEX);
-  private static final String TASK_NAME_GROUP_BY_PARTITION_REGEX = "Partition (\\d+)";
-  private static final Pattern TASK_NAME_GROUP_BY_PARTITION_PATTERN = Pattern.compile(TASK_NAME_GROUP_BY_PARTITION_REGEX);
-  private static final String TASK_NAME_GROUP_BY_PARTITION_PREFIX = "Partition ";
-
-  //GroupBySSP tasks have names like "SystemStreamPartition [<system>, <Stream>, <partition>, keyBucket]_2"
-  // where 2 is the elasticity factor
-  // see {@link GroupBySystemStreamPartition} and {@link SystemStreamPartition.toString}
-  private static final String ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+), (\\d+)\\]_(\\d+)";
-  private static final Pattern ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN = Pattern.compile(ELASTIC_TASK_NAME_GROUP_BY_SSP_REGEX);
-  private static final String TASK_NAME_GROUP_BY_SSP_REGEX = "SystemStreamPartition \\[(\\S+), (\\S+), (\\d+)\\]";
-  private static final Pattern TASK_NAME_GROUP_BY_SSP_PATTERN = Pattern.compile(TASK_NAME_GROUP_BY_SSP_REGEX);
-  private static final String TASK_NAME_GROUP_BY_SSP_PREFIX = "SystemStreamPartition ";
-
-  /**
-   * Elasticity is supported for GroupByPartition tasks and GroupBySystemStreamPartition tasks
-   * When elasticity is enabled, GroupByPartition tasks have names Partition 0_1_2
-   * When elasticity is enabled, GroupBySystemStreamPartition tasks have names SystemStreamPartition [systemA, streamB, 0, 1]_2
-   * Both tasks have names ending with _%d where %d is the elasticity factor
-   * @param taskName of either GroupByPartition or GroupBySystemStreamPartition task
-   * @return
-   *   for GroupByPartition and GroupBySystemStreamPartition tasks returns elasticity factor from the task name
-   *   for other tasks returns 1 which is the default elasticity factor
-   */
-  static int getElasticityFactorFromTaskName(TaskName taskName) {
-    return getTaskNameParts(taskName).elasticityFactor;
-  }
-
-  /**
-   * checks if the given taskname is of a GroupByPartition task
-   * @param taskName of any task
-   * @return true if GroupByPartition (starts with prefix "Partition ") or false otherwise
-   */
-  static boolean isGroupByPartitionTask(TaskName taskName) {
-    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_PARTITION_PREFIX);
-  }
-
-  /**
-   * checks if the given taskname is of a GroupBySystemStreamPartition task
-   * @param taskName of any task
-   * @return true if GroupBySystemStreamPartition (starts with prefix "SystemStreamPartition ") or false otherwise
-   */
-  static boolean isGroupBySystemStreamPartitionTask(TaskName taskName) {
-    return taskName.getTaskName().startsWith(TASK_NAME_GROUP_BY_SSP_PREFIX);
-  }
-
-  /**
-   * checks if given taskName is elastic aka created with an elasticity factor > 1
-   * @param taskName of any task
-   * @return true for following, false otherwise
-   *    for task created by GroupByPartition, taskName has format "Partition 0_1_2"
-   *    for task created by GroupBySystemStreamPartition, taskName has format "SystemStreamPartition [systemA, streamB, 0, 1]_2"
-   */
-  static boolean isTaskNameElastic(TaskName taskName) {
-    if (isGroupByPartitionTask(taskName)) {
-      Matcher m = ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskName.getTaskName());
-      return m.find();
-    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
-      Matcher m = ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskName.getTaskName());
-      return m.find();
-    }
-    return false;
-  }
-
-  /**
-   * From given taskName extract the values for system, stream, partition, keyBucket and elasticityFactor
-   * @param taskName any taskName
-   * @return TaskNameComponents object containing system, stream, partition, keyBucket and elasticityFactor
-   *    for GroupByPartition task:
-   *         taskNames are of the format "Partition 0_1_2" (with elasticity) or "Partition 0" (without elasticity)
-   *         system and stream are empty "" strings and partition is the input partition,
-   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
-   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
-   *    for GroupBySystemStreamPartition task:
-   *         taskNames are of the format "SystemStreamPartition [systemA, streamB, 0, 1]_2" (with elasticity) or
-   *         "SystemStreamPartition [systemA, streamB, 0]" (without elasticity)
-   *         system and stream and partition are from the name (ex system = systemA, steram = streamB, partition =0 above)
-   *         without elasticity, keyBucket = 0 and elasticityFactor = 1 (the default values)
-   *         with elasticity, keyBucket from name (ex 1 above) and elasticityFactor (ex 2 from above)
-   *   for tasks created with other SSP groupers:
-   *        default TaskNameComponents is returned which has empty system, stream,
-   *        -1 for partition and 0 for keyBucket and 1 for elasticity factor
-   */
-  static TaskNameComponents getTaskNameParts(TaskName taskName) {
-    if (isGroupByPartitionTask(taskName)) {
-      return getTaskNameParts_GroupByPartition(taskName);
-    } else if (isGroupBySystemStreamPartitionTask(taskName)) {
-      return getTaskNameParts_GroupBySSP(taskName);
-    }
-    log.warn("TaskName {} is neither GroupByPartition nor GroupBySystemStreamPartition task. "
-        + "Elasticity is not supported for this taskName. "
-        + "Returning default TaskNameComponents which has default keyBucket 0,"
-        + " default elasticityFactor 1 and invalid partition -1", taskName.getTaskName());
-    return new TaskNameComponents(TaskNameComponents.INVALID_PARTITION);
-  }
-
-  /**
-   * see doc for getTaskNameParts above
-   */
-  private static TaskNameComponents getTaskNameParts_GroupByPartition(TaskName taskName) {
-    String taskNameStr = taskName.getTaskName();
-    log.debug("GetTaskNameParts for taskName {}", taskNameStr);
-
-    Matcher matcher = ELASTIC_TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskNameStr);
-    if (matcher.find()) {
-      return new TaskNameComponents(Integer.valueOf(matcher.group(1)),
-          Integer.valueOf(matcher.group(2)),
-          Integer.valueOf(matcher.group(3)));
-    }
-    matcher = TASK_NAME_GROUP_BY_PARTITION_PATTERN.matcher(taskNameStr);
-    if (matcher.find()) {
-      return new TaskNameComponents(Integer.valueOf(matcher.group(1)));
-    }
-    log.error("Could not extract partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
-    throw new IllegalArgumentException("TaskName format incompatible");
-  }
-
-  /**
-   * see doc for getTaskNameParts above
-   */
-  private static TaskNameComponents getTaskNameParts_GroupBySSP(TaskName taskName) {
-    String taskNameStr = taskName.getTaskName();
-    log.debug("GetTaskNameParts for taskName {}", taskNameStr);
-
-    Matcher matcher = ELASTIC_TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskNameStr);
-    if (matcher.find()) {
-      return new TaskNameComponents(matcher.group(1),
-          matcher.group(2),
-          Integer.valueOf(matcher.group(3)),
-          Integer.valueOf(matcher.group(4)),
-          Integer.valueOf(matcher.group(5)));
-    }
-    matcher = TASK_NAME_GROUP_BY_SSP_PATTERN.matcher(taskNameStr);
-    if (matcher.find()) {
-      return new TaskNameComponents(matcher.group(1),
-          matcher.group(2),
-          Integer.valueOf(matcher.group(3)));
-    }
-    log.warn("Could not extract system, stream, partition, keybucket and elasticity factor from taskname for task {}.", taskNameStr);
-    throw new IllegalArgumentException("TaskName format incompatible");
-  }
-
-  /**
-   * Without elasticity, a task consumes an entire (full) SSP = [System, stream, partition].
-   * With elasticity, a task consumes a portion of the SSP_withKeyBucket = [system, stream, partition, keyBucket]
-   *    where 0 <= keyBucket < elasticityFactor and contains a subset of the IncomingMessageEnvelope(IME) from the full SSP
-   * Given two tasks currentTask and otherTask, the task otherTask is called ancestor of currentTask if the following is true
-   *    all IME consumed by currentTask will be consumed by otherTask when elasticityFactor decreases or stays same
-   *    For example:
-   *      case 1: elasticityFactor 2 to 1
-   *            otherTask = Partition 0 consuming all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
-   *            currentTask1 = Partition 0_0_2 consumes IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 2
-   *            currentTask2 = Partition 0_1_2 consumes IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor = 2
-   *            SSP =  SSP_withKeyBucket0 + SSP_withKeyBucket1. Thus, Partition 0 is ancestor of Partition 0_0_2 and Partition 0_1_2
-   *      case 2: elasticityFactor 2 to 2 - no change
-   *            Partition 0_0_2 is an ancestor of itself since the input SSP_withKeyBucket0 doesnt change
-   *            similarly Partition 0_1_2 is an ancestor of itself. This applies to all elasticityFactors
-   *      case 3: elasticityFactor 4 to 2
-   *            otherTask = Partition 0_0_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 0] when elasticityFactor=2
-   *            currentTask1 = Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
-   *            currentTask2 = Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
-   *            From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
-   *            we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
-   *            Thus, SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
-   *            Thus, Partition 0_0_2 is ancestor of Partition 0_0_4 and Partition 0_2_4
-   *            Similarly, Partition 0_1_2 is ancestor of Partition 0_1_4 and Partition 0_3_4
-   *            And transitively, Partition 0 is ancestor of Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
-   *
-   * This applies to tasks created by GroupByPartition and GroupBySystemStreamPartition SSPGroupers.
-   * aka this applies if both currentTask and otherTask are created by GroupByPartition or both are created by GroupBySystemStreamPartition
-   * If either currentTask and/or otherTask were created by other SSPGroupers then false is returned.
-   * @param currentTask
-   * @param otherTask
-   * @return true if otherTask is ancestor of currentTask, false otherwise
-   */
-  static boolean isOtherTaskAncestorOfCurrentTask(TaskName currentTask, TaskName otherTask) {
-    log.debug("isOtherTaskAncestorOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
-    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
-        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
-      return false;
-    }
-
-    TaskNameComponents currentTaskNameComponents = getTaskNameParts(currentTask);
-    TaskNameComponents otherTaskNameComponents = getTaskNameParts(otherTask);
-
-    if (!otherTaskNameComponents.system.equals(currentTaskNameComponents.system)
-        || !otherTaskNameComponents.stream.equals(currentTaskNameComponents.stream)
-        || otherTaskNameComponents.partition != currentTaskNameComponents.partition
-        || otherTaskNameComponents.elasticityFactor > currentTaskNameComponents.elasticityFactor) {
-      return false;
-    }
-
-    return (currentTaskNameComponents.keyBucket % otherTaskNameComponents.elasticityFactor) == otherTaskNameComponents.keyBucket;
-  }
-
-  /**
-   * See javadoc for isOtherTaskAncestorOfCurrentTask above
-   * Given currentTask and otherTask,
-   *   if currentTask == otherTask, then its not a descendant. (unlike ancestor)
-   *   else, if isOtherTaskAncestorOfCurrentTask(otherTask, currentTask) then otherTask is descendant of currentTask
-   * @param currentTask
-   * @param otherTask
-   * @return
-   */
-  static boolean isOtherTaskDescendantOfCurrentTask(TaskName currentTask, TaskName otherTask) {
-    log.debug("isOtherTaskDescendantOfCurrentTask with currentTask {} and otherTask {}", currentTask, otherTask);
-    if (!((isGroupByPartitionTask(currentTask) && isGroupByPartitionTask(otherTask))
-        || (isGroupBySystemStreamPartitionTask(currentTask) && isGroupBySystemStreamPartitionTask(otherTask)))) {
-      return false;
-    }
-
-    TaskNameComponents currentTaskNameComponents = getTaskNameParts(currentTask);
-    TaskNameComponents otherTaskNameComponents = getTaskNameParts(otherTask);
-
-    if (!otherTaskNameComponents.system.equals(currentTaskNameComponents.system)
-        || !otherTaskNameComponents.stream.equals(currentTaskNameComponents.stream)
-        || otherTaskNameComponents.partition != currentTaskNameComponents.partition
-        || otherTaskNameComponents.elasticityFactor <= currentTaskNameComponents.elasticityFactor) {
-      return false;
-    }
-
-    return (
-        otherTaskNameComponents.keyBucket % currentTaskNameComponents.elasticityFactor) == currentTaskNameComponents.keyBucket;
-  }
-
-  /**
-   * For a given taskName and a map of task names to checkpoints, returns the taskName's ancestor and descendants checkpoints
-   * All ancestor checkpoints are put into a set
-   * Descendant checkpoins are put into a map of elasticityFactor to descendant checkpoint where the elastictyFactor is of the descendant.
-   * For example, given taskName Partition 0_0_2 and checkpoint Map (Partition 0->C1, Partition 0_0_4-> C2, Partition 0_1_4 -> C3, Partition 0_2_4 ->C4)
-   * the return value is AncestorSet = <C1> and descendantMap = (4 -> <C2, C4>)
-   * See javadoc of isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask for definition of ancestor and descendant
-   * @param taskName name of the task
-   * @param checkpointMap map from taskName to checkpoint
-   * @return Pair of AncestorCheckpoint set and Descendant Checkpoint Map
-   */
-  static Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> getAncestorAndDescendantCheckpoints(
-      TaskName taskName, Map<TaskName, Checkpoint> checkpointMap) {
-    Set<Checkpoint> ancestorCheckpoints = new HashSet<>();
-    Map<Integer, Set<Checkpoint>> descendantCheckpoints = new HashMap<>();
-    log.debug("starting to parse the checkpoint map to find ancestors and descendants for taskName {}", taskName.getTaskName());
-    checkpointMap.keySet().forEach(otherTaskName -> {
-      Checkpoint otherTaskCheckpoint = checkpointMap.get(otherTaskName);
-      if (isOtherTaskAncestorOfCurrentTask(taskName, otherTaskName)) {
-        log.debug("current task name is {} and other task name is {} and other task is ancestor", taskName, otherTaskName);
-        ancestorCheckpoints.add(otherTaskCheckpoint);
-      }
-      if (isOtherTaskDescendantOfCurrentTask(taskName, otherTaskName)) {
-        log.debug("current task name is {} and other task name is {} and other task is descendant", taskName, otherTaskName);
-        int otherEF = getElasticityFactorFromTaskName(otherTaskName);
-        if (!descendantCheckpoints.containsKey(otherEF)) {
-          descendantCheckpoints.put(otherEF, new HashSet<>());
-        }
-        descendantCheckpoints.get(otherEF).add(otherTaskCheckpoint);
-      }
-    });
-    log.debug("done computing all ancestors and descendants of {}", taskName);
-    return new ImmutablePair<>(ancestorCheckpoints, descendantCheckpoints);
-  }
-
-  /**
-   * Given a checkpoint with offset map from SystemStreamPartition to offset, returns the offset for the desired ssp
-   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
-   * A checkpoint belongs to one task and a task would consume either the full SSP (aka no keyBucket)
-   * or consume exactly one of the keyBuckets of an SSP. Hence there will be at most one entry for an SSP in a checkpoint
-   * @param checkpoint Checkpoint containing SSP -> offset
-   * @param ssp SystemStreamPartition for which an offset needs to be fetched
-   * @return offset for the ssp in the Checkpoint or null if doesnt exist.
-   */
-  static String getOffsetForSSPInCheckpoint(Checkpoint checkpoint, SystemStreamPartition ssp) {
-    String checkpointStr = checkpoint.getOffsets().entrySet().stream()
-        .map(k -> k.getKey() + " : " + k.getValue())
-        .collect(Collectors.joining(", ", "{", "}"));
-    log.debug("for ssp {}, in checkpoint {}", ssp, checkpointStr);
-
-    Optional<String> offsetFound = checkpoint.getOffsets().entrySet()
-        .stream()
-        .filter(entry -> entry.getKey().getSystemStream().equals(ssp.getSystemStream()) && entry.getKey()
-            .getPartition()
-            .equals(ssp.getPartition()))
-        .map(Map.Entry::getValue)
-        .findFirst();
-    if (offsetFound.isPresent()) {
-      return offsetFound.get();
-    }
-    log.warn("Could not find offset for ssp {} in checkpoint {}. returning null string as offset", ssp, checkpoint);
-    return null;
-  }
-
-  /**
-   * Given a set of checkpoints, find the max aka largest offset for an ssp
-   * Largest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
-   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
-   * @param checkpointSet set of checkpoints
-   * @param ssp for which largest offset is needed
-   * @param systemAdmin of the ssp.getSystem()
-   * @return offset - string if one exists else null
-   */
-  static String getMaxOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
-      SystemStreamPartition ssp, SystemAdmin systemAdmin) {
-    return checkpointSet.stream()
-        .filter(Objects::nonNull)
-        .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
-        .filter(Objects::nonNull)
-        .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first
-        .findFirst().orElse(null);
-  }
-
-  /**
-   * Given a set of checkpoints, find the min aka smallest offset for an ssp
-   * Smallest is determined by the SystemAdmin.offsetCompartor of the ssp's system.
-   * Only the system, stream and partition portions of the SSP are matched, the keyBucket is not considered.
-   * @param checkpointSet set of checkpoints
-   * @param ssp for which largest offset is needed
-   * @param systemAdmin of the ssp.getSystem()
-   * @return offset - string if one exists else null
-   */
-  static String getMinOffsetForSSPInCheckpointSet(Set<Checkpoint> checkpointSet,
-      SystemStreamPartition ssp, SystemAdmin systemAdmin) {
-    return checkpointSet.stream()
-        .filter(Objects::nonNull)
-        .map(checkpoint -> getOffsetForSSPInCheckpoint(checkpoint, ssp))
-        .filter(Objects::nonNull)
-        .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset1, offset2)) //confirm ascending sort - aka smallest offset first
-        .findFirst().orElse(null);
-  }
-
-  /**
-   * Prereq: See javadoc for isOtherTaskAncestorOfCurrentTask and isOtherTaskDescendantOfCurrentTask to fully understand ancestor and descendant notion
-   * Briefly, Given tasks - Partition 0, Partition 0_0_2, Partition 0_1_2 and Partition 0_0_4, Partition 0_1_4, Partition 0_2_4 and Partition 0_3_4
-   * (recall Partition 0_1_2 means reads input partition 0, keyBucket 1 and elasticityFactor 2)
-   * For task Partition 0_0_2: ancestors = [Partition 0, Partition 0_0_2] and descendants = [Partition 0_0_4, Partition 0_2_4]
-   *
-   * If a task has no descendants, then we just need to pick the largest offset among all the ancestors to get the last processed offset.
-   * for example above, if Partition 0_0_2 only had ancestors and no descendants, taking largest offset among Partition 0 and 0_0_2 gives last proc offset.
-   *
-   * With descendants, a little care is needed. there could be descendants with different elasticity factors.
-   * given one elasticity factor, each the descendant within the elasticity factor consumes a sub-portion (aka keyBucket) of the task.
-   * hence, to avoid data loss, we need to pick the lowest offset across descendants of the same elasticity factor.
-   * Across elasticity factors, largest works just like in ancestor
-   *
-   * Taking a concrete example
-   * From {@link org.apache.samza.system.IncomingMessageEnvelope} (IME)
-   *    Partition 0 consunmig all IME in SSP = [systemA, streamB, 0] when elasticityFactor=1
-   *    Partition 0_1_2 consuming all IME in SSP_withKeyBucket0 = [systemA, streamB, 0, 1 (keyBucket)] when elasticityFactor=2
-   *    Partition 0_0_2 consuming all IME in SSP_withKeyBucket1 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor=2
-   *    Partition 0_0_4 consumes IME in SSP_withKeyBucket00 = [systemA, streamB, 0, 0 (keyBucket)] when elasticityFactor = 4
-   *    Partition 0_2_4 consumes IME in SSP_withKeyBucket01 = [systemA, streamB, 0, 2 (keyBucket)] when elasticityFactor = 4
-   *    From the computation of SSP_withkeyBucket in {@link org.apache.samza.system.IncomingMessageEnvelope}
-   *    we have getSystemStreamPartition(int elasticityFactor) which does keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor;
-   *    Thus,
-   *       SSP = SSP_withKeyBucket0 + SSP_withKeyBucket1.
-   *       SSP_withKeyBucket0 = SSP_withKeyBucket00 + SSP_withKeyBucket01.
-   *    If the checkpoint map has
-   *      Partition 0: (SSP : 1), Partition 0_0_2: (SSP0 : 2), Partition 0_1_2: (SSP1 : 3), Partition 0_0_4: (SSP0 : 4), Partition 0_2_4: (SSP1 : 6)
-   *      looking at these map and knowing that offsets are monotonically increasing, it is clear that last deploy was with elasticity factor = 4
-   *      to get checkpoint for Partition 0_0_2, we need to consider last deploy's offsets.
-   *      picking 6 (offset for Partition 0_2_4) means that 0_0_2 will start proc from 6 but offset 5 was never processed.
-   *      hence we need to take min of offsets within an elasticity factor.
-   *
-   * Given checkpoints for all the tasks in the checkpoint stream,
-   * computing the last proc offset for an ssp checkpoint for a task,
-   * the following needs to be met.
-   *    1. Ancestors: we need to take largest offset among ancestors for an ssp
-   *    2. Descendants:
-   *         a. group descendants by their elasticityFactor.
-   *         b. among descendants of the same elasticityFactor, take the smallest offset for an ssp
-   *         c. once step b is done, we have (elasticityFactor : smallest-offset-for-ssp) set, pick the largest in this set
-   *    3. Pick the larger among the offsets received from step 1 (for ancestors) and step 2 (for descendants)
-   *
-   * @param taskName
-   * @param taskSSPSet
-   * @param checkpointMap
-   * @param systemAdmins
-   * @return
-   */
-  public static Map<SystemStreamPartition, String> computeLastProcessedOffsetsFromCheckpointMap(
-      TaskName taskName,
-      Set<SystemStreamPartition> taskSSPSet,
-      Map<TaskName, Checkpoint> checkpointMap,
-      SystemAdmins systemAdmins) {
-    Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> acnestorsAndDescendantsFound =
-        getAncestorAndDescendantCheckpoints(taskName, checkpointMap);
-    Set<Checkpoint> ancestorCheckpoints = acnestorsAndDescendantsFound.getLeft();
-    Map<Integer, Set<Checkpoint>> descendantCheckpoints = acnestorsAndDescendantsFound.getRight();
-
-    Map<SystemStreamPartition, String> taskSSPOffsets = new HashMap<>();
-
-    taskSSPSet.forEach(ssp_withKeyBucket -> {
-      log.info("for taskName {} and ssp of the task {}, finding its last proc offset", taskName, ssp_withKeyBucket);
-
-      SystemStreamPartition ssp = new SystemStreamPartition(ssp_withKeyBucket.getSystemStream(),
-          ssp_withKeyBucket.getPartition());
-
-      SystemAdmin systemAdmin = systemAdmins.getSystemAdmin(ssp.getSystem());
-
-      String currentLastOffsetForSSP = null;
-
-      String ancestorLastOffsetForSSP = getMaxOffsetForSSPInCheckpointSet(ancestorCheckpoints, ssp, systemAdmin);
-
-      log.info("for taskName {} and ssp {} got lastoffset from ancestors as {}",
-          taskName, ssp_withKeyBucket, ancestorLastOffsetForSSP);
-
-      String descendantLastOffsetForSSP = descendantCheckpoints.entrySet().stream()
-          .map(entry -> getMinOffsetForSSPInCheckpointSet(entry.getValue(), ssp, systemAdmin)) // at each ef level, find min offset
-          .sorted((offset1, offset2) -> systemAdmin.offsetComparator(offset2, offset1)) //confirm reverse sort - aka largest offset first
-          .filter(Objects::nonNull)
-          .findFirst().orElse(null);
-
-      log.info("for taskName {} and ssp {} got lastoffset from descendants as {}",
-          taskName, ssp_withKeyBucket, descendantLastOffsetForSSP);
-
-      Integer offsetComparison = systemAdmin.offsetComparator(ancestorLastOffsetForSSP, descendantLastOffsetForSSP);
-      if (descendantLastOffsetForSSP == null || (offsetComparison != null && offsetComparison > 0)) { // means ancestorLastOffsetForSSP > descendantLastOffsetForSSP
-        currentLastOffsetForSSP = ancestorLastOffsetForSSP;
-      } else {
-        currentLastOffsetForSSP = descendantLastOffsetForSSP;
-      }
-      if (currentLastOffsetForSSP == null) {
-        log.info("for taskName {} and ssp {} got lastoffset as null. "
-            + "skipping adding this ssp to task's offsets loaded from previous checkpoint", taskName, ssp_withKeyBucket);
-      } else {
-        log.info("for taskName {} and ssp {} got lastoffset as {}", taskName, ssp_withKeyBucket, currentLastOffsetForSSP);
-        taskSSPOffsets.put(ssp_withKeyBucket, currentLastOffsetForSSP);
-      }
-    });
-
-    String checkpointStr = taskSSPOffsets.entrySet().stream()
-        .map(k -> k.getKey() + " : " + k.getValue())
-        .collect(Collectors.joining(", ", "{", "}"));
-    log.info("for taskName {}, returning checkpoint as {}", taskName, checkpointStr);
-    return taskSSPOffsets;
-  }
-
-  public static boolean wasElasticityEnabled(Map<TaskName, Checkpoint> checkpointMap) {
-    return checkpointMap.keySet().stream()
-        .filter(ElasticityUtils::isTaskNameElastic) // true if the taskName has elasticityFactor in it
-        .findFirst().isPresent();
-  }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/elasticity/util/TaskNameComponents.java b/samza-core/src/main/java/org/apache/samza/elasticity/util/TaskNameComponents.java
deleted file mode 100644
index 4b76b84..0000000
--- a/samza-core/src/main/java/org/apache/samza/elasticity/util/TaskNameComponents.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.elasticity.util;
-
-/**
- * POJO class to store system, stream, partition, and keyBucket information associated with a Task,
- *  that is encoded in the task's name.
- */
-public class TaskNameComponents {
-
-  public static final int DEFAULT_KEY_BUCKET = 0;
-  public static final int DEFAULT_ELASTICITY_FACTOR = 1;
-  public static final int INVALID_PARTITION = -1;
-
-  public final String system;
-  public final String stream;
-  public final int partition;
-  public final int keyBucket;
-  public final int elasticityFactor;
-
-  public TaskNameComponents(int partition) {
-    this(partition, DEFAULT_KEY_BUCKET, DEFAULT_ELASTICITY_FACTOR);
-  }
-
-  public TaskNameComponents(int partition, int keyBucket, int elasticityFactor) {
-    this("", "", partition, keyBucket, elasticityFactor);
-  }
-
-  public TaskNameComponents(String system, String stream, int partition) {
-    this(system, stream, partition, DEFAULT_KEY_BUCKET, DEFAULT_ELASTICITY_FACTOR);
-  }
-
-  public TaskNameComponents(String system, String stream, int partition, int keyBucket, int elasticityFactor) {
-    this.system = system;
-    this.stream = stream;
-    this.partition = partition;
-    this.keyBucket = keyBucket;
-    this.elasticityFactor = elasticityFactor;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) return true;
-    if (!(o instanceof TaskNameComponents)) return false;
-
-    TaskNameComponents that = (TaskNameComponents) o;
-
-    if (!(this.system.equals(that.system))
-        || !(this.stream.equals(that.stream))
-        || (this.partition != that.partition)
-        || (this.keyBucket != that.keyBucket)
-        || (this.elasticityFactor != that.elasticityFactor)) {
-      return false;
-    }
-    return true;
-  }
-  @Override
-  public int hashCode() {
-    final int prime = 31;
-    int result = 1;
-    result = prime * result + system.hashCode();
-    result = prime * result + stream.hashCode();
-    result = prime * result + partition;
-    result = prime * result + keyBucket;
-    result = prime * result + elasticityFactor;
-    return result;
-  }
-}
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index 3470fd8..e04edee 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -231,9 +231,10 @@
     public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException {
       String[] parts = sspString.split("\\.");
       if (parts.length < 3) {
-        throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' ");
+        throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition");
       }
-      return new SystemStreamPartition(new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
+      return new SystemStreamPartition(
+          new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
     }
   }
 
diff --git a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
index 311dc6a..48b681e 100644
--- a/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
+++ b/samza-core/src/main/scala/org/apache/samza/checkpoint/OffsetManager.scala
@@ -22,13 +22,12 @@
 import java.util
 import java.util.HashMap
 import java.util.concurrent.ConcurrentHashMap
+
 import org.apache.commons.lang3.StringUtils
 import org.apache.samza.SamzaException
 import org.apache.samza.annotation.InterfaceStability
-import org.apache.samza.checkpoint.OffsetManager.info
-import org.apache.samza.config.{Config, JobConfig, StreamConfig, SystemConfig}
+import org.apache.samza.config.{Config, StreamConfig, SystemConfig}
 import org.apache.samza.container.TaskName
-import org.apache.samza.elasticity.util.ElasticityUtils
 import org.apache.samza.startpoint.{Startpoint, StartpointManager}
 import org.apache.samza.system.SystemStreamMetadata.OffsetType
 import org.apache.samza.system._
@@ -106,9 +105,7 @@
           // Build OffsetSetting so we can create a map for OffsetManager.
           (systemStream, OffsetSetting(systemStreamMetadata, defaultOffsetType, resetOffset))
       }.toMap
-
-    new OffsetManager(offsetSettings, checkpointManager, startpointManager, systemAdmins, checkpointListeners,
-      offsetManagerMetrics)
+    new OffsetManager(offsetSettings, checkpointManager, startpointManager, systemAdmins, checkpointListeners, offsetManagerMetrics)
   }
 }
 
@@ -215,26 +212,16 @@
    * Set the last processed offset for a given SystemStreamPartition.
    */
   def update(taskName: TaskName, systemStreamPartition: SystemStreamPartition, offset: String) {
-    // without elasticity enabled, there is exactly one entry of an ssp in the systemStreamPartitions map for a taskName
-    // with elasticity enabled, there is exactly one of the keyBuckets of an ssp that a task consumes
-    // and hence exactly one entry of an ssp with keyBucket in in the systemStreamPartitions map for a taskName
-    // hence from the given ssp, find its sspWithKeybucket for the task and use that for updating lastProcessedOffsets
-    val sspWithKeyBucket = systemStreamPartitions.getOrElse(taskName,
-      throw new SamzaException("No SSPs registered for task: " + taskName))
-      .filter(ssp => ssp.getSystemStream.equals(systemStreamPartition.getSystemStream)
-        && ssp.getPartition.equals(systemStreamPartition.getPartition))
-      .toIterator.next()
-
     lastProcessedOffsets.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, String]())
     taskSSPsWithProcessedOffsetUpdated.putIfAbsent(taskName, new ConcurrentHashMap[SystemStreamPartition, Boolean]())
 
     if (offset != null) {
       if (!offset.equals(IncomingMessageEnvelope.END_OF_STREAM_OFFSET)) {
-        lastProcessedOffsets.get(taskName).put(sspWithKeyBucket, offset)
+        lastProcessedOffsets.get(taskName).put(systemStreamPartition, offset)
       }
       // Record the spp that have received the new messages. The startpoint for each ssp should only be deleted when the
       // ssp has received the new messages. More details in SAMZA-2749.
-      taskSSPsWithProcessedOffsetUpdated.get(taskName).putIfAbsent(sspWithKeyBucket, true)
+      taskSSPsWithProcessedOffsetUpdated.get(taskName).putIfAbsent(systemStreamPartition, true)
     }
   }
 
@@ -500,19 +487,12 @@
 
     val checkpoint = checkpointManager.readLastCheckpoint(taskName)
 
-    val checkpointMap = checkpointManager.readAllCheckpoints()
-    if (!ElasticityUtils.wasElasticityEnabled(checkpointMap)) {
-      if (checkpoint != null) {
-        return Map(taskName -> checkpoint.getOffsets.asScala.toMap)
-      } else {
-        info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
-        return Map(taskName -> Map())
-      }
+    if (checkpoint != null) {
+      Map(taskName -> checkpoint.getOffsets.asScala.toMap)
+    } else {
+      info("Did not receive a checkpoint for taskName %s. Proceeding without a checkpoint." format taskName)
+      Map(taskName -> Map())
     }
-    info("There was elasticity enabled in one of the previous deploys." +
-      "Last processed offsets computation at container start will use elasticity checkpoints if available.")
-    Map(taskName -> ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(taskName,
-      systemStreamPartitions.get(taskName).get.asJava, checkpointMap, systemAdmins).asScala)
   }
 
   /**
diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
index 606f32a..75f4a8d 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
@@ -226,7 +226,7 @@
     // if elasticity is enabled aka elasticity factor > 1
     // then this TaskInstance processes only those envelopes whose key falls
     // within the keyBucket of the SSP assigned to the task.
-    val incomingMessageSsp = getIncomingMessageSSP(envelope)
+    val incomingMessageSsp = envelope.getSystemStreamPartition(elasticityFactor)
 
     if (!ssp2CaughtupMapping.getOrElse(incomingMessageSsp,
       throw new SamzaException(incomingMessageSsp + " is not registered!"))) {
@@ -566,7 +566,7 @@
     // if elasticity is enabled aka elasticity factor > 1
     // then this TaskInstance handles only those envelopes whose key falls
     // within the keyBucket of the SSP assigned to the task.
-    var incomingMessageSsp = getIncomingMessageSSP(envelope)
+    val incomingMessageSsp = envelope.getSystemStreamPartition(elasticityFactor)
 
     if (IncomingMessageEnvelope.END_OF_STREAM_OFFSET.equals(envelope.getOffset)) {
       ssp2CaughtupMapping(incomingMessageSsp) = true
@@ -629,28 +629,4 @@
 
     startingOffset
   }
-
-  private def getIncomingMessageSSP(envelope: IncomingMessageEnvelope): SystemStreamPartition = {
-    if (elasticityFactor <= 1) {
-      return envelope.getSystemStreamPartition
-    }
-    // if elasticityFactor > 1, find the SSP with keyBucket
-    var incomingMessageSsp = envelope.getSystemStreamPartition(elasticityFactor)
-
-    // if envelope is end of stream or watermark or drain,
-    // it needs to be routed to all tasks consuming the ssp irresp of keyBucket
-    val messageType = MessageType.of(envelope.getMessage)
-    if (envelope.isEndOfStream()
-      || envelope.isDrain()
-      || messageType == MessageType.END_OF_STREAM
-      || messageType == MessageType.WATERMARK) {
-
-      incomingMessageSsp = systemStreamPartitions
-        .filter(ssp => ssp.getSystemStream.equals(incomingMessageSsp.getSystemStream)
-          && ssp.getPartition.equals(incomingMessageSsp.getPartition))
-        .toIterator.next()
-      debug("for watermark or end-of-stream or drain envelope, found incoming ssp as {}".format(incomingMessageSsp))
-    }
-    incomingMessageSsp
-  }
 }
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
index e69a973..ba42264 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
@@ -40,7 +40,7 @@
   // Serialize checkpoint as maps keyed by the SSP.toString() to the another map of the constituent SSP components
   // and offset. Jackson can't automatically serialize the SSP since it's not a POJO and this avoids
   // having to wrap it another class while maintaining readability.
-  // { "SSP.toString()" -> {"system": system, "stream": stream, "partition": partition, "keyBucket": keyBucket, "offset": offset)}
+  // { "SSP.toString()" -> {"system": system, "stream": stream, "partition": partition, "offset": offset)}
   def fromBytes(bytes: Array[Byte]): CheckpointV1 = {
     try {
       val jMap = jsonMapper.readValue(bytes, classOf[util.HashMap[String, util.HashMap[String, String]]])
@@ -55,6 +55,7 @@
         require(partition != null, "Partition must be present in JSON-encoded SystemStreamPartition")
         val offset = sspInfo.get("offset")
         // allow null offsets, e.g. for changelog ssps
+
         new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset
       }
 
diff --git a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
index f56af29..a7458db 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestJobConfig.java
@@ -707,16 +707,6 @@
     }
     assertTrue(exceptionCaught);
 
-    jobConfig =
-        new JobConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_ELASTICITY_FACTOR, Integer.toString(17))));
-    exceptionCaught = false;
-    try {
-      jobConfig.getElasticityFactor();
-    } catch (ConfigException e) {
-      exceptionCaught = true;
-    }
-    assertTrue(exceptionCaught);
-
     jobConfig = new JobConfig(new MapConfig());
     assertEquals(JobConfig.DEFAULT_JOB_ELASTICITY_FACTOR, jobConfig.getElasticityFactor());
   }
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
index e584974..16ef93d 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java
@@ -225,39 +225,6 @@
   }
 
   @Test
-  public void testEndOfStreamElasticityEnabled() {
-
-    TaskName taskName0 = new TaskName(p0.toString() + " 0");
-    TaskName taskName1 = new TaskName(p0.toString() + " 1");
-    SystemStreamPartition ssp = new SystemStreamPartition("testSystem", "testStreamA", p0);
-    SystemStreamPartition ssp0 = new SystemStreamPartition("testSystem", "testStreamA", p0, 0);
-    SystemStreamPartition ssp1 = new SystemStreamPartition("testSystem", "testStreamA", p0, 1);
-
-    // create EOS IME such that its ssp keybucket maps to ssp0 and not to ssp1
-    // task in the runloop should give this ime to both it tasks
-    IncomingMessageEnvelope envelopeEOS = spy(IncomingMessageEnvelope.buildEndOfStreamEnvelope(ssp));
-    when(envelopeEOS.getSystemStreamPartition(2)).thenReturn(ssp0);
-
-
-    // two task in the run loop that processes ssp0 -> 0th keybucket of ssp and ssp1 -> 1st keybucket of ssp
-    // EOS ime should be given to both the tasks irresp of the keybucket
-    RunLoopTask task0 = getMockRunLoopTask(taskName0, ssp0);
-    RunLoopTask task1 = getMockRunLoopTask(taskName1, ssp1);
-
-    SystemConsumers consumerMultiplexer = mock(SystemConsumers.class);
-    when(consumerMultiplexer.choose(false)).thenReturn(envelopeEOS).thenReturn(null);
-
-    Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0, taskName1, task1);
-    int maxMessagesInFlight = 1;
-    RunLoop runLoop = new RunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs,
-        callbackTimeoutMs, maxThrottlingDelayMs, 0, containerMetrics, () -> 0L, false, 2, null);
-    runLoop.run();
-
-    verify(task0).endOfStream(any());
-    verify(task1).endOfStream(any());
-  }
-
-  @Test
   public void testDrainWithElasticityEnabled() {
     TaskName taskName0 = new TaskName(p0.toString() + " 0");
     TaskName taskName1 = new TaskName(p0.toString() + " 1");
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
index a82a9a1..e99e3f2 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupByPartition.java
@@ -93,12 +93,12 @@
     GroupByPartition grouper = new GroupByPartition(config);
     Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ab1, ab2, ac0));
     Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
-        .put(new TaskName("Partition 0_0_2"), ImmutableSet.of(new SystemStreamPartition(aa0, 0), new SystemStreamPartition(ac0, 0)))
-        .put(new TaskName("Partition 0_1_2"), ImmutableSet.of(new SystemStreamPartition(aa0, 1), new SystemStreamPartition(ac0, 1)))
-        .put(new TaskName("Partition 1_0_2"), ImmutableSet.of(new SystemStreamPartition(aa1, 0), new SystemStreamPartition(ab1, 0)))
-        .put(new TaskName("Partition 1_1_2"), ImmutableSet.of(new SystemStreamPartition(aa1, 1), new SystemStreamPartition(ab1, 1)))
-        .put(new TaskName("Partition 2_0_2"), ImmutableSet.of(new SystemStreamPartition(aa2, 0), new SystemStreamPartition(ab2, 0)))
-        .put(new TaskName("Partition 2_1_2"), ImmutableSet.of(new SystemStreamPartition(aa2, 1), new SystemStreamPartition(ab2, 1)))
+        .put(new TaskName("Partition 0 0"), ImmutableSet.of(new SystemStreamPartition(aa0, 0), new SystemStreamPartition(ac0, 0)))
+        .put(new TaskName("Partition 0 1"), ImmutableSet.of(new SystemStreamPartition(aa0, 1), new SystemStreamPartition(ac0, 1)))
+        .put(new TaskName("Partition 1 0"), ImmutableSet.of(new SystemStreamPartition(aa1, 0), new SystemStreamPartition(ab1, 0)))
+        .put(new TaskName("Partition 1 1"), ImmutableSet.of(new SystemStreamPartition(aa1, 1), new SystemStreamPartition(ab1, 1)))
+        .put(new TaskName("Partition 2 0"), ImmutableSet.of(new SystemStreamPartition(aa2, 0), new SystemStreamPartition(ab2, 0)))
+        .put(new TaskName("Partition 2 1"), ImmutableSet.of(new SystemStreamPartition(aa2, 1), new SystemStreamPartition(ab2, 1)))
         .build();
 
     assertEquals(expectedResult, result);
diff --git a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
index 90628b3..13b7678 100644
--- a/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
+++ b/samza-core/src/test/java/org/apache/samza/container/grouper/stream/TestGroupBySystemStreamPartition.java
@@ -84,22 +84,14 @@
 
     Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(ImmutableSet.of(aa0, aa1, aa2, ac0));
     Map<TaskName, Set<SystemStreamPartition>> expectedResult = ImmutableMap.<TaskName, Set<SystemStreamPartition>>builder()
-        .put(new TaskName(new SystemStreamPartition(aa0, 0).toString() + "_2"),
-            ImmutableSet.of(new SystemStreamPartition(aa0, 0)))
-        .put(new TaskName(new SystemStreamPartition(aa0, 1).toString() + "_2"),
-            ImmutableSet.of(new SystemStreamPartition(aa0, 1)))
-        .put(new TaskName(new SystemStreamPartition(aa1, 0).toString() + "_2"),
-            ImmutableSet.of(new SystemStreamPartition(aa1, 0)))
-        .put(new TaskName(new SystemStreamPartition(aa1, 1).toString() + "_2"),
-            ImmutableSet.of(new SystemStreamPartition(aa1, 1)))
-        .put(new TaskName(new SystemStreamPartition(aa2, 0).toString() + "_2"),
-            ImmutableSet.of(new SystemStreamPartition(aa2, 0)))
-        .put(new TaskName(new SystemStreamPartition(aa2, 1).toString() + "_2"),
-            ImmutableSet.of(new SystemStreamPartition(aa2, 1)))
-        .put(new TaskName(new SystemStreamPartition(ac0, 0).toString() + "_2"),
-            ImmutableSet.of(new SystemStreamPartition(ac0, 0)))
-        .put(new TaskName(new SystemStreamPartition(ac0, 1).toString() + "_2"),
-            ImmutableSet.of(new SystemStreamPartition(ac0, 1)))
+        .put(new TaskName(new SystemStreamPartition(aa0, 0).toString()), ImmutableSet.of(new SystemStreamPartition(aa0, 0)))
+        .put(new TaskName(new SystemStreamPartition(aa0, 1).toString()), ImmutableSet.of(new SystemStreamPartition(aa0, 1)))
+        .put(new TaskName(new SystemStreamPartition(aa1, 0).toString()), ImmutableSet.of(new SystemStreamPartition(aa1, 0)))
+        .put(new TaskName(new SystemStreamPartition(aa1, 1).toString()), ImmutableSet.of(new SystemStreamPartition(aa1, 1)))
+        .put(new TaskName(new SystemStreamPartition(aa2, 0).toString()), ImmutableSet.of(new SystemStreamPartition(aa2, 0)))
+        .put(new TaskName(new SystemStreamPartition(aa2, 1).toString()), ImmutableSet.of(new SystemStreamPartition(aa2, 1)))
+        .put(new TaskName(new SystemStreamPartition(ac0, 0).toString()), ImmutableSet.of(new SystemStreamPartition(ac0, 0)))
+        .put(new TaskName(new SystemStreamPartition(ac0, 1).toString()), ImmutableSet.of(new SystemStreamPartition(ac0, 1)))
         .build();
 
     assertEquals(expectedResult, result);
diff --git a/samza-core/src/test/java/org/apache/samza/elasticity/util/TestElasticityUtils.java b/samza-core/src/test/java/org/apache/samza/elasticity/util/TestElasticityUtils.java
deleted file mode 100644
index dc6bfbb..0000000
--- a/samza-core/src/test/java/org/apache/samza/elasticity/util/TestElasticityUtils.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.samza.elasticity.util;
-
-import com.google.common.collect.ImmutableMap;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.samza.Partition;
-import org.apache.samza.checkpoint.Checkpoint;
-import org.apache.samza.checkpoint.CheckpointId;
-import org.apache.samza.checkpoint.CheckpointV2;
-import org.apache.samza.container.TaskName;
-import org.apache.samza.system.SystemAdmin;
-import org.apache.samza.system.SystemAdmins;
-import org.apache.samza.system.SystemStreamPartition;
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-
-// #TODO: going to make this entire class parametrized.
-public class TestElasticityUtils {
-  private static final TaskName TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0");
-  private static final TaskName ELASTIC_TASKNAME_GROUP_BY_PARTITION = new TaskName("Partition 0_1_2");
-  private static final TaskName TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0]");
-  private static final TaskName ELASTIC_TASKNAME_GROUP_BY_SSP = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2");
-
-  @Test
-  public void testComputeLastProcessedOffsetsFromCheckpointMap() {
-    // Setup :
-    // there is one ssp = SystemStreamPartition [systemA, streamB, partition(0)] consumed by the job
-    // Note: Partition 0_1_2 means task consumes keyBucket 1 of partition 0 and has elasticityFactor 2.
-    // Before elasticity, job has one task with name "Partition 0"
-    // with elasticity factor 2, job has 2 tasks with names "Partition 0_0_2" and "Partition 0_1_2"
-    //         Partition 0_0_2 consumes SSP[systemA, stream B, partition(0), keyBucket(0)]
-    //         Partition 0_1_2 consumes SSP[systemA, stream B, partition(0), keyBucket(1)]
-    // with elasticity factor 4, job has 4 tasks with names "Partition 0_0_4", "Partition 0_1_4", "Partition 0_2_4" and "Partition 0_3_4"
-    //         Partition 0_0_4 consumes SSP[systemA, stream B, partition(0), keyBucket(0)]
-    //         Partition 0_1_4 consumes SSP[systemA, stream B, partition(0), keyBucket(1)]
-    //         Partition 0_2_4 consumes SSP[systemA, stream B, partition(0), keyBucket(2)]
-    //         Partition 0_3_4 consumes SSP[systemA, stream B, partition(0), keyBucket(3)]
-
-    //
-    // From the definition of keyBucket computation using elasticity factor in
-    // {@link IncomingMessageEnvelope.getSystemStresamPartition(elasticityFactor) as
-    // keyBucket = (Math.abs(envelopeKeyorOffset.hashCode()) % 31) % elasticityFactor
-    // messages processed by 0_0_4 and 0_2_4 will be the same as those processed by 0_0_2
-    // messages processed by 0_1_4 and 0_3_4 will be the same as those processed by 0_1_2
-    // messages processed by 0_0_2 and 0_1_2 will be the same as those processed by Partition 0 itself
-
-    TaskName taskName = new TaskName("Partition 0_0_2");
-    Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
-    SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
-    SystemStreamPartition ssp0 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 0);
-    SystemStreamPartition ssp2 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 2);
-
-
-    SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
-    // offsets ordering 1 < 2 < 3 < 4
-    Mockito.when(mockSystemAdmin.offsetComparator("1", "2")).thenReturn(-1);
-    Mockito.when(mockSystemAdmin.offsetComparator("2", "1")).thenReturn(1);
-    Mockito.when(mockSystemAdmin.offsetComparator("1", "3")).thenReturn(-1);
-    Mockito.when(mockSystemAdmin.offsetComparator("3", "1")).thenReturn(1);
-    Mockito.when(mockSystemAdmin.offsetComparator("1", "4")).thenReturn(-1);
-    Mockito.when(mockSystemAdmin.offsetComparator("4", "1")).thenReturn(1);
-    Mockito.when(mockSystemAdmin.offsetComparator("2", "3")).thenReturn(-1);
-    Mockito.when(mockSystemAdmin.offsetComparator("3", "2")).thenReturn(1);
-    Mockito.when(mockSystemAdmin.offsetComparator("2", "4")).thenReturn(-1);
-    Mockito.when(mockSystemAdmin.offsetComparator("4", "2")).thenReturn(1);
-    Mockito.when(mockSystemAdmin.offsetComparator("3", "4")).thenReturn(-1);
-    Mockito.when(mockSystemAdmin.offsetComparator("4", "3")).thenReturn(1);
-
-    SystemAdmins mockSystemAdmins = Mockito.mock(SystemAdmins.class);
-    Mockito.when(mockSystemAdmins.getSystemAdmin(ssp0.getSystem())).thenReturn(mockSystemAdmin);
-
-    // case 1: for task Partition 0_0_2: last deploy was with ef = 2 itself.
-    // hence "Partition 0_0_2" has the largest offset and that should be used for computing checkpoint for 0_0_2 now also
-    checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1"));
-    checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "4"));
-    checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "2"));
-    checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "3"));
-    Map<SystemStreamPartition, String> result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
-        taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
-    Assert.assertEquals("4", result.get(ssp0));
-
-    // case 2: for task Partition 0_0_2: last deploy was with ef =1
-    // hence "Partition 0" has the largest offset. Computing checkpint for 0_0_2 should use this largest offset
-    checkpointMap = new HashMap<>();
-    checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "4"));
-    checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "1"));
-    checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3"));
-    checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "2"));
-
-
-    result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
-        taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
-    Assert.assertEquals("4", result.get(ssp0));
-
-
-    // case 3: for task partition 0_0_2: last deploy was with ef = 4
-    // hence checkpoints of Partition 0_0_4 and Partition 0_3_4 are relevant.
-    // since messages from both end up in 0_0_2 with ef=2, need to take min of their checkpointed offsets
-
-    checkpointMap.put(new TaskName("Partition 0"), buildCheckpointV2(ssp, "1"));
-    checkpointMap.put(new TaskName("Partition 0_0_2"), buildCheckpointV2(ssp0, "2"));
-    checkpointMap.put(new TaskName("Partition 0_0_4"), buildCheckpointV2(ssp0, "3"));
-    checkpointMap.put(new TaskName("Partition 0_2_4"), buildCheckpointV2(ssp2, "4"));
-    result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
-        taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
-    Assert.assertEquals("3", result.get(ssp0));
-  }
-  @Test
-  public void testComputeLastProcessedOffsetsWithEdgeCases() {
-    TaskName taskName = new TaskName("Partition 0_0_2");
-    Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
-    SystemStreamPartition ssp0 = new SystemStreamPartition("systemA", "streamB", new Partition(0), 0);
-
-    SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
-    SystemAdmins mockSystemAdmins = Mockito.mock(SystemAdmins.class);
-    Mockito.when(mockSystemAdmins.getSystemAdmin(ssp0.getSystem())).thenReturn(mockSystemAdmin);
-
-    // case 1: empty checkpoint map
-    Map<SystemStreamPartition, String> result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
-        taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
-    Assert.assertTrue("if given checkpoint map is empty, return empty last processed offsets map", result.isEmpty());
-
-    // case 2: null checkpoints given for some ancestor tasks
-    checkpointMap.put(new TaskName("Partition 0"), null);
-    result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
-        taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
-    Assert.assertTrue("if given checkpoint map has null checkpoint, return empty last processed offsets map", result.isEmpty());
-
-    // case 3: null checkpoints given for some descendant tasks
-    checkpointMap.put(new TaskName("Partition 0_0_4"), null);
-    result = ElasticityUtils.computeLastProcessedOffsetsFromCheckpointMap(
-        taskName, Collections.singleton(ssp0), checkpointMap, mockSystemAdmins);
-    Assert.assertTrue("if given checkpoint map has null checkpoint, return empty last processed offsets map", result.isEmpty());
-  }
-
-  @Test
-  public void testTaskIsGroupByPartitionOrGroupBySSP() {
-    String msgPartition = "GroupByPartition task should start with Partition";
-    String msgSsp = "GroupBySystemStreamPartition task should start with SystemStreamPartition";
-
-    Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_PARTITION));
-    Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_PARTITION));
-
-    Assert.assertTrue(msgPartition, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_PARTITION));
-    Assert.assertFalse(msgPartition, ElasticityUtils.isGroupBySystemStreamPartitionTask(
-        ELASTIC_TASKNAME_GROUP_BY_PARTITION));
-
-    Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(TASKNAME_GROUP_BY_SSP));
-    Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(TASKNAME_GROUP_BY_SSP));
-
-    Assert.assertTrue(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP));
-    Assert.assertFalse(msgSsp, ElasticityUtils.isGroupByPartitionTask(ELASTIC_TASKNAME_GROUP_BY_SSP));
-
-    TaskName taskName = new TaskName("FooBar");
-    Assert.assertFalse(msgPartition, ElasticityUtils.isGroupByPartitionTask(taskName));
-    Assert.assertFalse(msgSsp, ElasticityUtils.isGroupBySystemStreamPartitionTask(taskName));
-  }
-
-  @Test
-  public void testIsTaskNameElastic() {
-    Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_SSP));
-    Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_SSP));
-    Assert.assertFalse(ElasticityUtils.isTaskNameElastic(TASKNAME_GROUP_BY_PARTITION));
-    Assert.assertTrue(ElasticityUtils.isTaskNameElastic(ELASTIC_TASKNAME_GROUP_BY_PARTITION));
-  }
-
-  @Test
-  public void testGetElasticTaskNameParts() {
-    TaskNameComponents taskNameComponents = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_PARTITION);
-    Assert.assertEquals(taskNameComponents.partition, 0);
-    Assert.assertEquals(taskNameComponents.keyBucket, TaskNameComponents.DEFAULT_KEY_BUCKET);
-    Assert.assertEquals(taskNameComponents.elasticityFactor, TaskNameComponents.DEFAULT_ELASTICITY_FACTOR);
-
-    taskNameComponents = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_PARTITION);
-    Assert.assertEquals(taskNameComponents.partition, 0);
-    Assert.assertEquals(taskNameComponents.keyBucket, 1);
-    Assert.assertEquals(taskNameComponents.elasticityFactor, 2);
-
-    taskNameComponents = ElasticityUtils.getTaskNameParts(TASKNAME_GROUP_BY_SSP);
-    Assert.assertEquals(taskNameComponents.system, "systemA");
-    Assert.assertEquals(taskNameComponents.stream, "streamB");
-    Assert.assertEquals(taskNameComponents.partition, 0);
-    Assert.assertEquals(taskNameComponents.keyBucket, TaskNameComponents.DEFAULT_KEY_BUCKET);
-    Assert.assertEquals(taskNameComponents.elasticityFactor, TaskNameComponents.DEFAULT_ELASTICITY_FACTOR);
-
-    taskNameComponents = ElasticityUtils.getTaskNameParts(ELASTIC_TASKNAME_GROUP_BY_SSP);
-    Assert.assertEquals(taskNameComponents.system, "systemA");
-    Assert.assertEquals(taskNameComponents.stream, "streamB");
-    Assert.assertEquals(taskNameComponents.partition, 0);
-    Assert.assertEquals(taskNameComponents.keyBucket, 1);
-    Assert.assertEquals(taskNameComponents.elasticityFactor, 2);
-
-    taskNameComponents = ElasticityUtils.getTaskNameParts(new TaskName("FooBar"));
-    Assert.assertEquals(taskNameComponents.partition, TaskNameComponents.INVALID_PARTITION);
-  }
-
-  @Test
-  public void testIsOtherTaskAncestorDescendantOfCurrentTask() {
-    TaskName task0 = new TaskName("Partition 0");
-    TaskName task1 = new TaskName("Partition 1");
-    TaskName task002 = new TaskName("Partition 0_0_2");
-    TaskName task012 = new TaskName("Partition 0_1_2");
-    TaskName task004 = new TaskName("Partition 0_0_4");
-    TaskName task014 = new TaskName("Partition 0_1_4");
-    TaskName task024 = new TaskName("Partition 0_2_4");
-    TaskName task034 = new TaskName("Partition 0_3_4");
-
-    TaskName sspTask0 = new TaskName("SystemStreamPartition [systemA, streamB, 0]");
-    TaskName sspTask002 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_2");
-    TaskName sspTask012 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_2");
-    TaskName sspTask004 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 0]_4");
-    TaskName sspTask014 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 1]_4");
-    TaskName sspTask024 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 2]_4");
-    TaskName sspTask034 = new TaskName("SystemStreamPartition [systemA, streamB, 0, 3]_4");
-
-    // Partition 0 is ancestor of all tasks Partition 0_0_2, 0_1_2, 0_0_4, 0_1_4, 0_2_4, 0_3_4 and itself
-    // and all these tasks are descendants of Partition 0 (except itself)
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task0));
-    Assert.assertFalse(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task0, task1));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task0));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task012, task0));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task0));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task014, task0));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task0));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task034, task0));
-
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task002));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task012));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task004));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task014));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task024));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task0, task034));
-
-    // Partition 0_0_2 is ancestor of tasks Partition 0_0_4 and 0_2_4 and itself
-    // these tasks are descendants of 0_0_2
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task004, task002));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task024, task002));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(task002, task002));
-
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task004));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(task002, task024));
-
-    // "SystemStreamPartition [systemA, streamB, 0]
-    // is ancestor of all tasks "SystemStreamPartition [systemA, streamB, 0, 0]_2, [systemA, streamB, 0, 1]_2 and the rest incl itself
-    // and all these tasks are descendants of Partition 0 (except itself)
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask0, sspTask0));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask0));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask012, sspTask0));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask0));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask014, sspTask0));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask0));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask034, sspTask0));
-
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask002));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask012));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask004));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask014));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask024));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask0, sspTask034));
-
-    // SystemStreamPartition [systemA, streamB, 0, 0]_2 is ancestor of
-    // tasks SystemStreamPartition [systemA, streamB, 0, 0]_4, SystemStreamPartition [systemA, streamB, 0, 2]_4 and itself
-    // similarly, these tasks are descendants of SystemStreamPartition [systemA, streamB, 0, 0]_2
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask004, sspTask002));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask024, sspTask002));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskAncestorOfCurrentTask(sspTask002, sspTask002));
-
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask004));
-    Assert.assertTrue(ElasticityUtils.isOtherTaskDescendantOfCurrentTask(sspTask002, sspTask024));
-  }
-
-  @Test
-  public void testGetAncestorAndDescendantCheckpoints() {
-    TaskName taskName = new TaskName("Partition 0_0_2");
-    Map<TaskName, Checkpoint> checkpointMap = new HashMap<>();
-    SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
-    Checkpoint ansCheckpoint1 = buildCheckpointV2(ssp, "1");
-    Checkpoint ansCheckpoint2 = buildCheckpointV2(ssp, "2");
-    Checkpoint desCheckpoint1 = buildCheckpointV2(ssp, "3");
-    Checkpoint desCheckpoint2 = buildCheckpointV2(ssp, "4");
-    Checkpoint unrelCheckpoint = buildCheckpointV2(ssp, "5");
-    Set<Checkpoint> ansCheckpointSet = new HashSet<>(Arrays.asList(ansCheckpoint1, ansCheckpoint2));
-    Set<Checkpoint> desCheckpointSet = new HashSet<>(Arrays.asList(desCheckpoint1, desCheckpoint2));
-
-    checkpointMap.put(new TaskName("Partition 0"), ansCheckpoint1);
-    checkpointMap.put(new TaskName("Partition 0_0_2"), ansCheckpoint2);
-    checkpointMap.put(new TaskName("Partition 0_0_4"), desCheckpoint1);
-    checkpointMap.put(new TaskName("Partition 0_2_4"), desCheckpoint2);
-    checkpointMap.put(new TaskName("Partition 0_1_4"), unrelCheckpoint);
-
-    Pair<Set<Checkpoint>, Map<Integer, Set<Checkpoint>>> result =
-        ElasticityUtils.getAncestorAndDescendantCheckpoints(taskName, checkpointMap);
-    Set<Checkpoint> anscestorCheckpointSet = result.getLeft();
-    Set<Checkpoint> descendantCheckpointSetForEf4 = result.getRight().get(4);
-
-    Assert.assertTrue("should contain all ancestors' checkpoints",
-        anscestorCheckpointSet.containsAll(ansCheckpointSet));
-    Assert.assertFalse("should not contain a descendant checkpoint in anscetor list",
-        anscestorCheckpointSet.contains(desCheckpoint1));
-    Assert.assertFalse("should not contain an unrelated checkpoint in ancestor list",
-        anscestorCheckpointSet.contains(unrelCheckpoint));
-
-    Assert.assertTrue("should contain all descendants' checkpoints",
-        descendantCheckpointSetForEf4.containsAll(desCheckpointSet));
-    Assert.assertFalse("should not contain a anscetor checkpoint in descendant list",
-        descendantCheckpointSetForEf4.contains(ansCheckpoint1));
-    Assert.assertFalse("should not contain an unrelated checkpoint in descendant list",
-        descendantCheckpointSetForEf4.contains(unrelCheckpoint));
-  }
-
-  @Test
-  public void testGetOffsetForSSPInCheckpoint() {
-    String offset1 = "1111";
-    String offset2 = "2222";
-    // case 1: when looking for exact ssp
-    SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
-    Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1);
-    Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset1);
-
-    // case 2: checkpoint has ssp with key bucket but looking for the full ssp (same system stream and partition but without keybucket)
-    SystemStreamPartition sspWithKB = new SystemStreamPartition("systemA", "streamB", new Partition(0), 1);
-    checkpoint1 = buildCheckpointV2(sspWithKB, offset2);
-    Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp), offset2);
-
-    // case 3: try getting offset for an ssp not present in the checkpoint -> should return null
-    SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(1));
-    Assert.assertEquals(ElasticityUtils.getOffsetForSSPInCheckpoint(checkpoint1, ssp2), null);
-  }
-
-  @Test
-  public void testGetMaxMinOffsetForSSPInCheckpointSet() {
-    String offset1 = "1111";
-    String offset2 = "2222";
-
-    SystemStreamPartition ssp = new SystemStreamPartition("systemA", "streamB", new Partition(0));
-    Checkpoint checkpoint1 = buildCheckpointV2(ssp, offset1);
-    Checkpoint checkpoint2 = buildCheckpointV2(ssp, offset2);
-    Set<Checkpoint> checkpointSet = new HashSet<>(Arrays.asList(checkpoint1, checkpoint2));
-
-    SystemAdmin mockSystemAdmin = Mockito.mock(SystemAdmin.class);
-    // offset 1 < offset2
-    Mockito.when(mockSystemAdmin.offsetComparator(offset1, offset2)).thenReturn(-1);
-    Mockito.when(mockSystemAdmin.offsetComparator(offset2, offset1)).thenReturn(1);
-
-    // case 1: when exact ssp is in checkpoint set
-    Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin));
-    Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp, mockSystemAdmin));
-
-    // case 2: when looking for ssp with keyBucket 1 whereas checkpoint set only has full ssp (same system stream and partition but without keybucket)
-    SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, 1);
-    Assert.assertEquals(offset2, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin));
-    Assert.assertEquals(offset1, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, sspWithKeyBucket, mockSystemAdmin));
-
-
-    // case 3: when ssp not in checkpoint set -> should receive null for min and max offset
-    SystemStreamPartition ssp2 = new SystemStreamPartition("A", "B", new Partition(0));
-    Assert.assertEquals(null, ElasticityUtils.getMaxOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin));
-    Assert.assertEquals(null, ElasticityUtils.getMinOffsetForSSPInCheckpointSet(checkpointSet, ssp2, mockSystemAdmin));
-  }
-
-  @Test
-  public void testWasElasticityEnabled() {
-    Checkpoint checkpoint1 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0)), "1");
-    Checkpoint checkpoint2 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(1)), "2");
-    Checkpoint checkpoint3 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0), 0), "3");
-    Checkpoint checkpoint4 = buildCheckpointV2(new SystemStreamPartition("A", "B", new Partition(0), 1), "4");
-
-    // case 0: empty checkpoint map
-    Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(new HashMap<>()));
-
-    // case 1: no tasks with elasticity enabled in the checkpoint map
-    Map<TaskName, Checkpoint> checkpointMap1 = new HashMap<>();
-    checkpointMap1.put(new TaskName("Partition 0"), checkpoint1);
-    checkpointMap1.put(new TaskName("Partition 2"), checkpoint2);
-    Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap1));
-
-    // case 2: tasks with no elasticity and tasks with elasticity both present in the checkpoint map
-    Map<TaskName, Checkpoint> checkpointMap2 = new HashMap<>();
-    checkpointMap2.put(new TaskName("Partition 0"), checkpoint1);
-    checkpointMap2.put(new TaskName("Partition 2"), checkpoint2);
-    checkpointMap2.put(new TaskName("Partition 0_0_2"), checkpoint3);
-    Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap2));
-
-    // case 3: only tasks with elasticity present in the checkpoint map
-    Map<TaskName, Checkpoint> checkpointMap3 = new HashMap<>();
-    checkpointMap3.put(new TaskName("Partition 0_0_2"), checkpoint3);
-    checkpointMap3.put(new TaskName("Partition 0_1_2"), checkpoint4);
-    Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap3));
-
-    // case 4: repeat same checks with GroupBySSP grouper tasks
-    Map<TaskName, Checkpoint> checkpointMap4 = new HashMap<>();
-    checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0]"), checkpoint1);
-    checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 1]"), checkpoint2);
-    Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap4));
-    checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0, 0]_2"), checkpoint3);
-    checkpointMap4.put(new TaskName("SystemStreamPartition [A, B, 0, 1]_2"), checkpoint4);
-    Assert.assertTrue(ElasticityUtils.wasElasticityEnabled(checkpointMap4));
-
-    // case 5: repeat same checks with AllSspToSingleTask grouper tasks - no elasticity supported for this grouper
-    Map<TaskName, Checkpoint> checkpointMap5 = new HashMap<>();
-    checkpointMap5.put(new TaskName("Task-0"), checkpoint1);
-    checkpointMap5.put(new TaskName("Task-1"), checkpoint2);
-    Assert.assertFalse(ElasticityUtils.wasElasticityEnabled(checkpointMap5));
-  }
-
-  private static CheckpointV2 buildCheckpointV2(SystemStreamPartition ssp, String offset) {
-    return new CheckpointV2(CheckpointId.create(), ImmutableMap.of(ssp, offset),
-        ImmutableMap.of("backend", ImmutableMap.of("store", "10")));
-  }
-}
diff --git a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
index 6583240..2ddecf0 100644
--- a/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
+++ b/samza-core/src/test/java/org/apache/samza/serializers/model/TestSamzaObjectMapper.java
@@ -340,7 +340,7 @@
     assertEquals(serializedSSPAsJson.get("partition"), deserSSPAsJson.get("partition"));
     assertEquals(serializedSSPAsJson.get("keyBucket"), deserSSPAsJson.get("-1"));
 
-    //Scenario 2: ssp serialized with new elasticMapper and deserialized by old preElastic Mapper
+    //Scenario 1: ssp serialized with new elasticMapper and deserialized by old preElastic Mapper
     SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition("foo", "bar", new Partition(1), 1);
     serializedString = elasticObjectMapper.writeValueAsString(sspWithKeyBucket);
 
@@ -356,37 +356,6 @@
     assertEquals(serializedSSPAsJson.get("system"), deserSSPAsJson.get("system"));
     assertEquals(serializedSSPAsJson.get("stream"), deserSSPAsJson.get("stream"));
     assertEquals(serializedSSPAsJson.get("partition"), deserSSPAsJson.get("partition"));
-
-    // scenario 3: ssp as key serialized with preElasticMapper and deserialized by new Mapper with elasticity
-    Map<SystemStreamPartition, String> offsets = new HashMap<>();
-    String offset = "100";
-
-    String sspmapString = preElasticObjectMapper.writeValueAsString(ImmutableMap.of(ssp, offset));
-
-    TypeReference<HashMap<SystemStreamPartition, String>> typeRef
-        = new TypeReference<HashMap<SystemStreamPartition, String>>() { };
-
-    Map<SystemStreamPartition, String> deserSSPMap = elasticObjectMapper.readValue(sspmapString, typeRef);
-    SystemStreamPartition deserSSP = deserSSPMap.keySet().stream().findAny().get();
-    String deserOffset = deserSSPMap.values().stream().findFirst().get();
-    assertEquals(ssp.getSystem(), deserSSP.getSystem());
-    assertEquals(ssp.getStream(), deserSSP.getStream());
-    assertEquals(ssp.getPartition(), deserSSP.getPartition());
-    assertEquals(ssp.getKeyBucket(), deserSSP.getKeyBucket());
-    assertEquals(offset, deserOffset);
-
-    // Scenario 4: ssp key serialized with new elasticMapper and deserialized by old preElastic Mapper
-    sspmapString = elasticObjectMapper.writeValueAsString(ImmutableMap.of(sspWithKeyBucket, offset));
-
-    deserSSPMap = preElasticObjectMapper.readValue(sspmapString, typeRef);
-    deserSSP = deserSSPMap.keySet().stream().findAny().get();
-    deserOffset = deserSSPMap.values().stream().findFirst().get();
-    assertEquals(sspWithKeyBucket.getSystem(), deserSSP.getSystem());
-    assertEquals(sspWithKeyBucket.getStream(), deserSSP.getStream());
-    assertEquals(sspWithKeyBucket.getPartition(), deserSSP.getPartition());
-    // preElastic mapper does not know about KeyBucket so dont check for it
-    assertEquals(offset, deserOffset);
-
   }
 
   private JobModel deserializeFromObjectNode(ObjectNode jobModelJson) throws IOException {
@@ -440,12 +409,14 @@
   private static class OldSystemStreamPartitionKeyDeserializer extends KeyDeserializer {
     @Override
     public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException {
-      String[] parts = sspString.split("\\.");
-      if (parts.length < 3 || parts.length > 4) {
-        throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' "
-            + "or 'system.stream.partition.keyBucket");
+      int idx = sspString.indexOf('.');
+      int lastIdx = sspString.lastIndexOf('.');
+      if (idx < 0 || lastIdx < 0) {
+        throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition");
       }
-      return new SystemStreamPartition(new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
+      return new SystemStreamPartition(
+          new SystemStream(sspString.substring(0, idx), sspString.substring(idx + 1, lastIdx)),
+          new Partition(Integer.parseInt(sspString.substring(lastIdx + 1))));
     }
   }
   public static ObjectMapper getOldDeserForSSpKeyObjectMapper() {
@@ -483,27 +454,6 @@
     }
   }
 
-  private static class PreElasticitySystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
-    @Override
-    public void serialize(SystemStreamPartition ssp, JsonGenerator jgen, SerializerProvider provider) throws IOException {
-      String sspString = ssp.getSystem() + "." + ssp.getStream() + "."
-          + String.valueOf(ssp.getPartition().getPartitionId());
-      jgen.writeFieldName(sspString);
-    }
-  }
-
-  private static class PreElasticitySystemStreamPartitionKeyDeserializer extends KeyDeserializer {
-    @Override
-    public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException {
-      String[] parts = sspString.split("\\.");
-      if (parts.length < 3) {
-        throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition");
-      }
-      return new SystemStreamPartition(
-          new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
-    }
-  }
-
   private static final ObjectMapper OBJECT_MAPPER = getPreEleasticObjectMapper();
   public static ObjectMapper getPreEleasticObjectMapper() {
     ObjectMapper mapper = new ObjectMapper();
@@ -514,13 +464,13 @@
     // Setup custom serdes for simple data types.
     module.addSerializer(Partition.class, new SamzaObjectMapper.PartitionSerializer());
     module.addSerializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionSerializer());
-    module.addKeySerializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionKeySerializer());
+    module.addKeySerializer(SystemStreamPartition.class, new SamzaObjectMapper.SystemStreamPartitionKeySerializer());
     module.addSerializer(TaskName.class, new SamzaObjectMapper.TaskNameSerializer());
     module.addSerializer(TaskMode.class, new SamzaObjectMapper.TaskModeSerializer());
     module.addDeserializer(TaskName.class, new SamzaObjectMapper.TaskNameDeserializer());
     module.addDeserializer(Partition.class, new SamzaObjectMapper.PartitionDeserializer());
     module.addDeserializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionDeserializer());
-    module.addKeyDeserializer(SystemStreamPartition.class, new PreElasticitySystemStreamPartitionKeyDeserializer());
+    module.addKeyDeserializer(SystemStreamPartition.class, new SamzaObjectMapper.SystemStreamPartitionKeyDeserializer());
     module.addDeserializer(Config.class, new SamzaObjectMapper.ConfigDeserializer());
     module.addDeserializer(TaskMode.class, new SamzaObjectMapper.TaskModeDeserializer());
     module.addSerializer(CheckpointId.class, new SamzaObjectMapper.CheckpointIdSerializer());
diff --git a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
index d0039a9..c2d24a9 100644
--- a/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
+++ b/samza-core/src/test/scala/org/apache/samza/checkpoint/TestOffsetManager.scala
@@ -585,47 +585,6 @@
   }
 
   @Test
-  def testElasticityUpdateWithoutKeyBucket: Unit = {
-    // When elasticity is enabled, task consumes a part of the full SSP represented by SSP With KeyBucket.
-    // OffsetManager tracks the set of SSP with KeyBucket consumed by a task.
-    // However, after an IME processing is complete, OffsetManager.update is called without KeyBuket.
-    // OffsetManager has to find and udpate the last processed offset for the task correctly for its SSP with KeyBucket.
-    val taskName = new TaskName("c")
-    val systemStream = new SystemStream("test-system", "test-stream")
-    val partition = new Partition(0)
-    val systemStreamPartition = new SystemStreamPartition(systemStream, partition)
-    val systemStreamPartitionWithKeyBucket = new SystemStreamPartition(systemStreamPartition, 0);
-    val testStreamMetadata = new SystemStreamMetadata(systemStream.getStream, Map(partition -> new SystemStreamPartitionMetadata("0", "1", "2")).asJava)
-    val systemStreamMetadata = Map(systemStream -> testStreamMetadata)
-    val checkpointManager = getCheckpointManager(systemStreamPartition, taskName)
-    val startpointManagerUtil = getStartpointManagerUtil()
-    val systemAdmins = mock(classOf[SystemAdmins])
-    when(systemAdmins.getSystemAdmin("test-system")).thenReturn(getSystemAdmin)
-    val offsetManager = OffsetManager(systemStreamMetadata, new MapConfig, checkpointManager, startpointManagerUtil.getStartpointManager, systemAdmins, Map(), new OffsetManagerMetrics)
-    // register task and its input SSP with KeyBucket
-    offsetManager.register(taskName, Set(systemStreamPartitionWithKeyBucket))
-
-    offsetManager.start
-
-    // update is called with only the full SSP and no keyBucket information.
-    offsetManager.update(taskName, systemStreamPartition, "46")
-    // Get checkpoint snapshot like we do at the beginning of TaskInstance.commit()
-    val checkpoint46 = offsetManager.getLastProcessedOffsets(taskName)
-    offsetManager.update(taskName, systemStreamPartition, "47") // Offset updated before checkpoint
-    offsetManager.writeCheckpoint(taskName, new CheckpointV1(checkpoint46))
-    // OffsetManager correctly updates the lastProcessedOffset and checkpoint for the task and input SSP with KeyBucket.
-    assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartitionWithKeyBucket))
-    assertEquals("46", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartitionWithKeyBucket).getValue)
-
-    // Now write the checkpoint for the latest offset
-    val checkpoint47 = offsetManager.getLastProcessedOffsets(taskName)
-    offsetManager.writeCheckpoint(taskName, new CheckpointV1(checkpoint47))
-
-    assertEquals(Some("47"), offsetManager.getLastProcessedOffset(taskName, systemStreamPartitionWithKeyBucket))
-    assertEquals("47", offsetManager.offsetManagerMetrics.checkpointedOffsets.get(systemStreamPartitionWithKeyBucket).getValue)
-  }
-
-  @Test
   def testStartpointUpdate: Unit = {
     val taskName = new TaskName("c")
     val systemStream = new SystemStream("test-system", "test-stream")
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
index 77b1ee3..08b082c 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestTaskInstance.scala
@@ -162,7 +162,7 @@
     verify(processesCounter).inc()
     verify(messagesActuallyProcessedCounter).inc()
 
-    // case 2: taskInstance processes the keyBucket=0 of the ssp and envelope is NOT from same keyBucket
+    // case 1: taskInstance processes the keyBucket=0 of the ssp and envelope is NOT from same keyBucket
     // taskInstance.process should throw the exception ssp is not registered.
     when(envelope.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
     val thrown = intercept[Exception] {
@@ -171,26 +171,6 @@
     assert(thrown.isInstanceOf[SamzaException])
     assert(thrown.getMessage.contains(notProcessedSSPKeyBucket.toString))
     assert(thrown.getMessage.contains("is not registered!"))
-
-    // case 3: taskInstance processes the keyBucket=0 of the ssp and envelope is watermark NOT from same keyBucket
-    // regular processing should happen as Watermark and end of stream should be processed by all tasks
-    val watermark = spy(IncomingMessageEnvelope.buildWatermarkEnvelope(SYSTEM_STREAM_PARTITION, 1234l))
-    when(watermark.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
-    this.taskInstance.process(watermark, coordinator, callbackFactory)
-    assertEquals(2, this.taskInstanceExceptionHandler.numTimesCalled) // case 1 and case 3
-    verify(this.task).processAsync(watermark, this.collector, coordinator, callback)
-    verify(processesCounter, times(3)).inc() // case 1, 2 and 3
-    verify(messagesActuallyProcessedCounter, times(2)).inc() // case 1 and 3
-
-    // case 4: taskInstance processes the keyBucket=0 of the ssp and envelope is EndOfStream NOT from same keyBucket
-    // regular processing should happen as Watermark and end of stream should be processed by all tasks
-    val endOfStream = spy(IncomingMessageEnvelope.buildEndOfStreamEnvelope(SYSTEM_STREAM_PARTITION))
-    when(endOfStream.getSystemStreamPartition(2)).thenReturn(notProcessedSSPKeyBucket)
-    this.taskInstance.process(endOfStream, coordinator, callbackFactory)
-    assertEquals(3, this.taskInstanceExceptionHandler.numTimesCalled) // case 1 and case 3 and case 4
-    verify(this.task).processAsync(endOfStream, this.collector, coordinator, callback)
-    verify(processesCounter, times(4)).inc() // case 1, 2, 3 and 4
-    verify(messagesActuallyProcessedCounter, times(3)).inc() // case 1 and 3 and 4
   }
 
   @Test
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala
index bb89798..02f2e59 100644
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala
+++ b/samza-core/src/test/scala/org/apache/samza/serializers/TestCheckpointV1Serde.scala
@@ -19,11 +19,10 @@
 
 package org.apache.samza.serializers
 
-import com.fasterxml.jackson.databind.ObjectMapper
-
 import java.util
+
 import org.apache.samza.Partition
-import org.apache.samza.checkpoint.CheckpointV1
+import org.apache.samza.checkpoint.{CheckpointV1}
 import org.apache.samza.container.TaskName
 import org.apache.samza.system.SystemStreamPartition
 import org.junit.Assert._
@@ -50,81 +49,4 @@
     val checkpoint = checkpointSerde.fromBytes(checkpointBytes)
     assertNull(checkpoint)
   }
-
-  @Test
-  def testSSPWithKeyBucket {
-    // case 1: write and read with serde that is aware of KeyBucket within SSP
-    val serde = new CheckpointV1Serde
-    var offsets = Map[SystemStreamPartition, String]()
-    val ssp = new SystemStreamPartition("test-system", "test-stream",
-      new Partition(777), -1)
-    offsets += ssp -> "1"
-    val deserializedOffsets = serde.fromBytes(serde.toBytes(new CheckpointV1(offsets.asJava)))
-    assertEquals("1", deserializedOffsets.getOffsets.get(ssp))
-    assertEquals(1, deserializedOffsets.getOffsets.size)
-
-    // case 2: SSP was serialized by serde not aware of KeyBucket - aka did not put keyBucket into serialized form
-    val deserializedOffsets2 = serde.fromBytes(toBytesWithoutKeyBucket(new CheckpointV1(offsets.asJava)))
-    assertEquals("1", deserializedOffsets2.getOffsets.get(ssp))
-    assertEquals(1, deserializedOffsets2.getOffsets.size)
-
-    // case 3: SSP was serialized by serde aware of KeyBucket but deserialized by serde not aware of KeyBucket
-    val deserializedOffsets3 = fromBytesWithoutKeyBucket(serde.toBytes(new CheckpointV1(offsets.asJava)))
-    assertEquals("1", deserializedOffsets3.getOffsets.get(ssp))
-    assertEquals(1, deserializedOffsets3.getOffsets.size)
-
-    // case 4: SSP has keyBucket = 0 (aka not default -1) - serialize by serde NOT aware of keyBucket
-    // when serialized with serde not aware of keyBucket, the info about keyBucket is lost,
-    // we can only recover the system, stream and partition parts out during deserialization.
-    // hence after deser, we need to look for SSP without key bucket
-    val sspWithKeyBucket = new SystemStreamPartition("test-system", "test-stream",
-      new Partition(777), 0)
-    val sspWithoutKeyBucket = new SystemStreamPartition("test-system", "test-stream",
-      new Partition(777))
-    var offsets1 = Map[SystemStreamPartition, String]()
-    offsets1 += sspWithKeyBucket -> "1"
-
-    val deserializedOffsets4 = serde.fromBytes(toBytesWithoutKeyBucket(new CheckpointV1(offsets1.asJava)))
-    assertEquals("1", deserializedOffsets4.getOffsets.get(sspWithoutKeyBucket))
-    assertEquals(1, deserializedOffsets4.getOffsets.size)
-
-    // case 5: SSP has KeyBucket = 0, serialized by serde aware of keyBucket
-    val deserializedOffsets5 = fromBytesWithoutKeyBucket(serde.toBytes(new CheckpointV1(offsets1.asJava)))
-    assertEquals("1", deserializedOffsets5.getOffsets.get(sspWithoutKeyBucket))
-    assertEquals(1, deserializedOffsets5.getOffsets.size)
-  }
-
-  private  def fromBytesWithoutKeyBucket(bytes: Array[Byte]): CheckpointV1 = {
-    val jsonMapper = new ObjectMapper()
-    val jMap = jsonMapper.readValue(bytes, classOf[util.HashMap[String, util.HashMap[String, String]]])
-
-    def deserializeJSONMap(sspInfo:util.HashMap[String, String]) = {
-      val system = sspInfo.get("system")
-      val stream = sspInfo.get("stream")
-      val partition = sspInfo.get("partition")
-      val offset = sspInfo.get("offset")
-      new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset
-    }
-    val cpMap = jMap.values.asScala.map(deserializeJSONMap).toMap
-    new CheckpointV1(cpMap.asJava)
-  }
-
-  private   def toBytesWithoutKeyBucket(checkpoint: CheckpointV1): Array[Byte] = {
-    val jsonMapper = new ObjectMapper()
-    val offsets = checkpoint.getOffsets
-    val asMap = new util.HashMap[String, util.HashMap[String, String]](offsets.size())
-
-    offsets.asScala.foreach {
-      case (ssp, offset) =>
-        val jMap = new util.HashMap[String, String](4)
-        jMap.put("system", ssp.getSystemStream.getSystem)
-        jMap.put("stream", ssp.getSystemStream.getStream)
-        jMap.put("partition", ssp.getPartition.getPartitionId.toString)
-        jMap.put("offset", offset)
-
-        asMap.put(ssp.toString, jMap)
-    }
-
-    jsonMapper.writeValueAsBytes(asMap)
-  }
 }
diff --git a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
index cc97a70..7793b56 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/checkpoint/kafka/KafkaCheckpointManager.scala
@@ -142,7 +142,16 @@
 
     info(s"Reading checkpoint for taskName $taskName")
 
-    populateTaskNamesToCheckpointsMap()
+    if (taskNamesToCheckpoints == null) {
+      info("Reading checkpoints for the first time")
+      taskNamesToCheckpoints = readCheckpoints()
+      if (stopConsumerAfterFirstRead) {
+        info("Stopping system consumer")
+        systemConsumer.stop()
+      }
+    } else if (!stopConsumerAfterFirstRead) {
+      taskNamesToCheckpoints ++= readCheckpoints()
+    }
 
     val checkpoint: Checkpoint = taskNamesToCheckpoints.getOrElse(taskName, null)
 
@@ -151,14 +160,6 @@
   }
 
   /**
-   * @inheritdoc
-   */
-  override def readAllCheckpoints(): util.Map[TaskName, Checkpoint] = {
-    populateTaskNamesToCheckpointsMap()
-    scala.collection.JavaConverters.mapAsJavaMapConverter(taskNamesToCheckpoints).asJava
-  }
-
-  /**
     * @inheritdoc
     */
   override def writeCheckpoint(taskName: TaskName, checkpoint: Checkpoint) {
@@ -409,17 +410,4 @@
       throw new IllegalArgumentException("Unknown checkpoint key type: " + checkpointKey.getType)
     }
   }
-
-  private def populateTaskNamesToCheckpointsMap() = {
-    if (taskNamesToCheckpoints == null) {
-      info("Reading checkpoints for the first time")
-      taskNamesToCheckpoints = readCheckpoints()
-      if (stopConsumerAfterFirstRead) {
-        info("Stopping system consumer")
-        systemConsumer.stop()
-      }
-    } else if (!stopConsumerAfterFirstRead) {
-      taskNamesToCheckpoints ++= readCheckpoints()
-    }
-  }
 }
diff --git a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java
index d0e4586..fe9bfb1 100644
--- a/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java
+++ b/samza-kafka/src/test/java/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.java
@@ -386,25 +386,6 @@
   }
 
   @Test
-  public void testReadAllCheckpoints() throws InterruptedException {
-    Config config = config(ImmutableMap.of(TaskConfig.CHECKPOINT_READ_VERSIONS, "1,2"));
-    setupSystemFactory(config);
-    CheckpointV2 checkpointV2ForTask0 = buildCheckpointV2(INPUT_SSP0, "0");
-    CheckpointV2 checkpointV2ForTask1 = buildCheckpointV2(INPUT_SSP0, "1");
-    List<IncomingMessageEnvelope> checkpointEnvelopes =
-        ImmutableList.of(
-            newCheckpointV2Envelope(TASK0, checkpointV2ForTask0, "0"),
-            newCheckpointV2Envelope(TASK1, checkpointV2ForTask1, "1")
-            );
-    setupConsumer(checkpointEnvelopes);
-    Map<TaskName, Checkpoint> checkpointMap = ImmutableMap.of(TASK0, checkpointV2ForTask0, TASK1, checkpointV2ForTask1);
-    KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config);
-    kafkaCheckpointManager.register(TASK0);
-    Map<TaskName, Checkpoint> readCheckpoints = kafkaCheckpointManager.readAllCheckpoints();
-    assertEquals(checkpointMap, readCheckpoints);
-  }
-
-  @Test
   public void testWriteCheckpointV1() {
     setupSystemFactory(config());
     KafkaCheckpointManager kafkaCheckpointManager = buildKafkaCheckpointManager(true, config());