Currently, when we send chunked messages, the producer returns the message-id of the last chunk. This can cause some problems. For example, when we use this message-id to seek, it will cause the consumer to consume from the position of the last chunk, and the consumer will mistakenly think that the previous chunks are lost and choose to skip the current message. If we use the inclusive seek, the consumer may skip the first message, which brings the wrong behavior.
Here is the simple code used to demonstrate the problem.
var msgId = producer.send(...); // eg. return 0:1:-1 var otherMsg = producer.send(...); // return 0:2:-1 consumer.seek(msgId); // inclusive seek var receiveMsgId = consumer.receive().getMessageId(); // it may skip the first message and return like 0:2:-1 Assert.assertEquals(msgId, receiveMsgId); // fail
Earlier, we tried to fix the problem by having the producer and the consumer return the firstChunkMessageID.(Discussion and Draft pull requests). However, this may have some impact on the original business logic. If users rely on the feature of returning lastChunkMessageId, they will be affected. For this reason, we propose a new solution to minimize the impact. In this PIP, the expected impact for the original user will only occur when seeking the chunk message.
We can solve the above problem by introducing chunk message ID to the producer and consumer. Here are some goals for this PIP:
Here is the simple demo codes for the ChunkMessageID:
public class ChunkMessageIdImpl extends MessageIdImpl implements MessageId { private final MessageIdImpl firstChunkMsgId; public ChunkMessageIdImpl(MessageIdImpl firstChunkMsgId, MessageIdImpl lastChunkMsgId) { super(lastChunkMsgId.getLedgerId(), lastChunkMsgId.getEntryId(), lastChunkMsgId.getPartitionIndex()); this.firstChunkMsgId = firstChunkMsgId; } public MessageIdImpl getFirstChunkMessageId() { return firstChunkMsgId; } public MessageIdImpl getLastChunkMessageId() { return this; } }
The chunk message-id is returned to the user when the Producer produces the chunk message or when the consumer consumes the chunk message.
In cosumer.seek, use the first chunk message-id of the chunk message-id. This will solve the problem caused by seeking chunk messages. This is also the impact of this PIP on the original business logic.
In order to make the chunkMessaegId serializable and deserializable, we need to change the proto definition of MessageIdData. Add the first_chunk_message_id
optional field to the MessageIdData
in proto file:
message MessageIdData { required uint64 ledgerId = 1; required uint64 entryId = 2; optional int32 partition = 3 [default = -1]; optional int32 batch_index = 4 [default = -1]; repeated int64 ack_set = 5; optional int32 batch_size = 6; // For the chunk message id, we need to specify the first chunk message id. optional MessageIdData first_chunk_message_id = 7; }
For serialization and deserialization of MessageId, it is both forward compatibility and backward compatibility.
The old version of message-id raw data for deserialization, regardless of whether it is a chunk message, will all be serialized into MessageIdImpl in the current client version. In this case, the problem of seeking chunk messages mentioned above will still exist. Older versions of the client will have no impact when serializing newer versions of chunk message id raw data.
Here is the PR to demonstrate this PIP: https://github.com/apache/pulsar/pull/12403.