pull consumer add offset
The code implementation mimics the c++ version, depth-first traversing each message queue.
if someone wants to save offset into such as redis, the two method `get_message_queue_offset` and `set_message_queue_offset` could be overrided.
https://github.com/apache/rocketmq-client-cpp/blob/master/example/PullConsumer.cpp
diff --git a/rocketmq/client.py b/rocketmq/client.py
index 8366377..6b37990 100644
--- a/rocketmq/client.py
+++ b/rocketmq/client.py
@@ -395,6 +395,7 @@
class PullConsumer(object):
+ offset_table = {}
def __init__(self, group_id):
self._handle = dll.CreatePullConsumer(_to_bytes(group_id))
if self._handle is None:
@@ -432,6 +433,17 @@
_to_bytes(access_secret),
_to_bytes(channel)
))
+
+ def _get_mq_key(self, mq):
+ key = '%s@%s' % (mq.topic, mq.queueId)
+ return key
+
+ def get_message_queue_offset(self, mq):
+ offset = self.offset_table.get(self._get_mq_key(mq), 0)
+ return offset
+
+ def set_message_queue_offset(self, mq, offset):
+ self.offset_table[self._get_mq_key(mq)] = offset
def pull(self, topic, expression='*', max_num=32):
message_queue = POINTER(_CMessageQueue)()
@@ -443,25 +455,33 @@
ctypes.pointer(queue_size)
))
for i in range(int(queue_size.value)):
- tmp_offset = ctypes.c_longlong()
- while True:
+ mq = message_queue[i]
+ tmp_offset = ctypes.c_longlong(self.get_message_queue_offset(mq))
+
+ has_new_msg = True
+ while has_new_msg:
pull_res = dll.Pull(
self._handle,
- ctypes.pointer(message_queue[i]),
+ ctypes.pointer(mq),
_to_bytes(expression),
tmp_offset,
max_num,
)
+
if pull_res.pullStatus != _CPullStatus.BROKER_TIMEOUT:
tmp_offset = pull_res.nextBeginOffset
+ self.set_message_queue_offset(mq, tmp_offset)
+
if pull_res.pullStatus == _CPullStatus.FOUND:
for i in range(int(pull_res.size)):
yield RecvMessage(pull_res.msgFoundList[i])
- elif pull_res.pullStatus in [_CPullStatus.NO_MATCHED_MSG, _CPullStatus.NO_NEW_MSG, _CPullStatus.OFFSET_ILLEGAL]:
- dll.ReleasePullResult(pull_res) # NOTE: No need to check ffi return code here
- break
+ elif pull_res.pullStatus == _CPullStatus.NO_MATCHED_MSG:
+ pass
+ elif pull_res.pullStatus == _CPullStatus.NO_NEW_MSG:
+ has_new_msg = False
+ elif pull_res.pullStatus == _CPullStatus.OFFSET_ILLEGAL:
+ pass
else:
- dll.ReleasePullResult(pull_res) # NOTE: No need to check ffi return code here
- break
- dll.ReleasePullResult(pull_res) # NOTE: No need to check ffi return code here
+ pass
+ dll.ReleasePullResult(pull_res) # NOTE: No need to check ffi return code here
ffi_check(dll.ReleaseSubscriptionMessageQueue(message_queue))