Reader
belongs to exclusive subscription type, and it uses nonDurable
cursor. After receiving messages, Reader
will ack cumulatively immediately. The flowPermits
are triggered in multiple scenarios from the client side and it is isolated from seek
of Consumer
. Therefore, it is possibile that flowPermits
will execute after seek
from the client side, like the following flow chart.
When handleSeek
processing is delay from the server side, the MarkDelete position
is modified in a wrong way. The expected result is that Reader
can re-consume messages from mark delete:(1,1)
after seek
. But it doesn't work.
Pulsar read message and seek position is not a synchronous operation, the seek request can't prevent an in-process entry reading operation. The client-side also has an opportunity to receive messages after the seek position.
Pulsar client make read messages operation and seek position operation synchronized so add an epoch into server and client consumer. After client reader consumer invoke seek
, the epoch increase 1 and send seek
command carry the epoch and then server consumer will update the epoch. When dispatcher messages to client will carry the epoch which the cursor read at the time. Client consumer will filter the send messages command which is smaller than current epoch. In this way, after the client consumer send seek
command successfully, because it has passed the epoch filtering, the consumer will not receive a message with a messageID greater than the user previously seek position.
// Reset an existing consumer to a particular message id message CommandSeek { required uint64 consumer_id = 1; required uint64 request_id = 2; optional MessageIdData message_id = 3; optional uint64 message_publish_time = 4; }
message CommandMessage { required uint64 consumer_id = 1; required MessageIdData message_id = 2; optional uint32 redelivery_count = 3 [default = 0]; repeated int64 ack_set = 4; optional uint64 epoch = 5 [default = 0]; }
CommandMessage
already add epoch by PIP-84 , when client receive CommandMessage
will compare the command epoch and local epoch to handle this command.
Add epoch into seek command.
// Reset an existing consumer to a particular message id message CommandSeek { required uint64 consumer_id = 1; required uint64 request_id = 2; optional MessageIdData message_id = 3; optional uint64 message_publish_time = 4; optional uint64 consumer_epoch = 5; }
CommandSeek
command add epoch field, when client send seek command to server successfully, the server will change the server consumer epoch to the command epoch. The epoch only can bigger than the old epoch in server. Now the client can filter out the message which contains less consumer epoch.
None yet.