remove keyBucket from checkpoint serde to allow for simpler rollback (#1625)

Symptom: Rolling back to versions which dont accept 4 parts in checkpoint serde can throw NPE exceptions.
Cause: As part of elasticity, #1608 introduced keyBucket into checkpoint serde.
Fix: Remove keyBucket from checkpoint serde - aka do not add it when serializing checkpoint.

Backwards Compatible: yes.
Though checkpoints written after #1608 and before this PR will have 4 part SSP in checkpoint, serde, they can still be read by code in this PR and vice versa.
elasticity will not work completely once this pr is merged.
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index b317e9d..3470fd8 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -221,9 +221,7 @@
   public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
     @Override
     public void serialize(SystemStreamPartition ssp, JsonGenerator jgen, SerializerProvider provider) throws IOException {
-      String sspString = ssp.getSystem() + "." + ssp.getStream() + "."
-          + String.valueOf(ssp.getPartition().getPartitionId()) + "."
-          + String.valueOf(ssp.getKeyBucket());
+      String sspString = ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId());
       jgen.writeFieldName(sspString);
     }
   }
@@ -232,16 +230,10 @@
     @Override
     public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException {
       String[] parts = sspString.split("\\.");
-      if (parts.length < 3 || parts.length > 4) {
-        throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' "
-            + "or 'system.stream.partition.keyBucket");
+      if (parts.length < 3) {
+        throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' ");
       }
-      if (parts.length == 3) {
-        return new SystemStreamPartition(
-            new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
-      }
-      // else parts.length == 4 and the 4th part is the keyBucket
-      return new SystemStreamPartition(parts[0], parts[1], new Partition(Integer.parseInt(parts[2])), Integer.parseInt(parts[3]));
+      return new SystemStreamPartition(new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
     }
   }
 
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
index c817e2f..e69a973 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
@@ -55,12 +55,7 @@
         require(partition != null, "Partition must be present in JSON-encoded SystemStreamPartition")
         val offset = sspInfo.get("offset")
         // allow null offsets, e.g. for changelog ssps
-        var keyBucket = sspInfo.get("keyBucket")
-        if (keyBucket == null) {
-          keyBucket = "-1"
-        }
-
-        new SystemStreamPartition(system, stream, new Partition(partition.toInt), keyBucket.toInt) -> offset
+        new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset
       }
 
       val cpMap = jMap.values.asScala.map(deserializeJSONMap).toMap
@@ -83,7 +78,6 @@
         jMap.put("system", ssp.getSystemStream.getSystem)
         jMap.put("stream", ssp.getSystemStream.getStream)
         jMap.put("partition", ssp.getPartition.getPartitionId.toString)
-        jMap.put("keyBucket", ssp.getKeyBucket.toString)
         jMap.put("offset", offset)
 
         asMap.put(ssp.toString, jMap)