remove keyBucket from checkpoint serde to allow for simpler rollback (#1625)
Symptom: Rolling back to versions which dont accept 4 parts in checkpoint serde can throw NPE exceptions.
Cause: As part of elasticity, #1608 introduced keyBucket into checkpoint serde.
Fix: Remove keyBucket from checkpoint serde - aka do not add it when serializing checkpoint.
Backwards Compatible: yes.
Though checkpoints written after #1608 and before this PR will have 4 part SSP in checkpoint, serde, they can still be read by code in this PR and vice versa.
elasticity will not work completely once this pr is merged.
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
index b317e9d..3470fd8 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/model/SamzaObjectMapper.java
@@ -221,9 +221,7 @@
public static class SystemStreamPartitionKeySerializer extends JsonSerializer<SystemStreamPartition> {
@Override
public void serialize(SystemStreamPartition ssp, JsonGenerator jgen, SerializerProvider provider) throws IOException {
- String sspString = ssp.getSystem() + "." + ssp.getStream() + "."
- + String.valueOf(ssp.getPartition().getPartitionId()) + "."
- + String.valueOf(ssp.getKeyBucket());
+ String sspString = ssp.getSystem() + "." + ssp.getStream() + "." + String.valueOf(ssp.getPartition().getPartitionId());
jgen.writeFieldName(sspString);
}
}
@@ -232,16 +230,10 @@
@Override
public Object deserializeKey(String sspString, DeserializationContext ctxt) throws IOException {
String[] parts = sspString.split("\\.");
- if (parts.length < 3 || parts.length > 4) {
- throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' "
- + "or 'system.stream.partition.keyBucket");
+ if (parts.length < 3) {
+ throw new IllegalArgumentException("System stream partition expected in format 'system.stream.partition' ");
}
- if (parts.length == 3) {
- return new SystemStreamPartition(
- new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
- }
- // else parts.length == 4 and the 4th part is the keyBucket
- return new SystemStreamPartition(parts[0], parts[1], new Partition(Integer.parseInt(parts[2])), Integer.parseInt(parts[3]));
+ return new SystemStreamPartition(new SystemStream(parts[0], parts[1]), new Partition(Integer.parseInt(parts[2])));
}
}
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
index c817e2f..e69a973 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/CheckpointV1Serde.scala
@@ -55,12 +55,7 @@
require(partition != null, "Partition must be present in JSON-encoded SystemStreamPartition")
val offset = sspInfo.get("offset")
// allow null offsets, e.g. for changelog ssps
- var keyBucket = sspInfo.get("keyBucket")
- if (keyBucket == null) {
- keyBucket = "-1"
- }
-
- new SystemStreamPartition(system, stream, new Partition(partition.toInt), keyBucket.toInt) -> offset
+ new SystemStreamPartition(system, stream, new Partition(partition.toInt)) -> offset
}
val cpMap = jMap.values.asScala.map(deserializeJSONMap).toMap
@@ -83,7 +78,6 @@
jMap.put("system", ssp.getSystemStream.getSystem)
jMap.put("stream", ssp.getSystemStream.getStream)
jMap.put("partition", ssp.getPartition.getPartitionId.toString)
- jMap.put("keyBucket", ssp.getKeyBucket.toString)
jMap.put("offset", offset)
asMap.put(ssp.toString, jMap)