tree 08db728443ec6355e59ce5b00a9a8ac17fe0f8ac
parent 493de64bb8c0228bbda11822ae3c84c2085263bf
author Alan Zhang <shuai.xyz@gmail.com> 1656546469 -0700
committer GitHub <noreply@github.com> 1656546469 -0700
gpgsig -----BEGIN PGP SIGNATURE-----
 
 wsBcBAABCAAQBQJivOSlCRBK7hj4Ov3rIwAANfUIAE+RKrpJ3T5EURJFScTnq25s
 FyRjTsOWs4BHz4q0wDSdDpPYL9z1vlHVgJgoMur/m+xSzTxmfE6tk+03JMvPcKUt
 MfqpfQt08rfuhRPeGTbYkEtJCi6Qbxtv6jjhQyCwDi4Eyl0bGnrgyJ8sSSLQ3rj2
 250jx1De+H49Qz2puAddmGuTZ3cjxulcedHs/Wfihv/s3JouXdUn8T26q6AgDnIi
 WRCzYXAe98yqpv82dSHfNlzEkriJHyA7TJ6T7P6v5cwp38FbFPed8L2mm1g3SyUw
 xyEGeaALEI8GBY1jIYbPl4t83r3VWlIr8sSd+Io1QBxwigRP+zLltn59jcYQ7LE=
 =5ee7
 -----END PGP SIGNATURE-----
 

SAMZA-2749: Startpoint bug fix (#1615)

Symptom:
Using startpoints to trigger full bootstrapping is not reliable in the current implementation, we observed that the bootstrapping only happened on the part of expected partitions.

Cause:
Within Samza (the main class to pay attention to is OffsetManager.scala), there is a bug in which a startpoint can be deleted before the startpoint actually gets used for message consumption. If a container gets into this situation, then the result is that the startpoint is ignored and consumption will continue from the previous processed message from before the startpoint was applied.

Load last processed offsets and startpoints
Use startpoints to register starting offsets for consumers
Message processing starts, but messages for only some of the partitions are received
Write checkpoint using last processed offsets
If a partition did not get messages, then the last processed offset is still the offset from before the standpoint.
Delete startpoints
Container dies (e.g. due to running out of memory)
On restart, load last processed offsets (startpoints have been deleted)
The partitions that did have messages in the previous deployment will have the correct checkpoint.
The partitions that did not have messages will have the checkpoint set to the offset from before the startpoint was applied. This is unexpected, and it means that bootstrapping is not happening for this partition.
Changes:

Keep track of the partitions which have updated processed offsets, and only delete the startpoint for those partitions after checkpointing.

API Changes:

Added a new API removeFanOutForTaskSSPs in StartpointManager to allow clean up the fan outs on partition granularity
