pull consumer add offset

The code implementation mimics the c++ version, depth-first traversing each message queue.
if someone wants to save offset into such as redis, the two method `get_message_queue_offset` and `set_message_queue_offset` could be overrided.
https://github.com/apache/rocketmq-client-cpp/blob/master/example/PullConsumer.cpp
diff --git a/rocketmq/client.py b/rocketmq/client.py
index 8366377..6b37990 100644
--- a/rocketmq/client.py
+++ b/rocketmq/client.py
@@ -395,6 +395,7 @@
 
 
 class PullConsumer(object):
+    offset_table = {}
     def __init__(self, group_id):
         self._handle = dll.CreatePullConsumer(_to_bytes(group_id))
         if self._handle is None:
@@ -432,6 +433,17 @@
             _to_bytes(access_secret),
             _to_bytes(channel)
         ))
+    
+    def _get_mq_key(self, mq):
+        key = '%s@%s' % (mq.topic, mq.queueId)
+        return key
+
+    def get_message_queue_offset(self, mq):
+        offset = self.offset_table.get(self._get_mq_key(mq), 0)
+        return offset
+
+    def set_message_queue_offset(self, mq, offset):
+        self.offset_table[self._get_mq_key(mq)] = offset
 
     def pull(self, topic, expression='*', max_num=32):
         message_queue = POINTER(_CMessageQueue)()
@@ -443,25 +455,33 @@
             ctypes.pointer(queue_size)
         ))
         for i in range(int(queue_size.value)):
-            tmp_offset = ctypes.c_longlong()
-            while True:
+            mq = message_queue[i]
+            tmp_offset = ctypes.c_longlong(self.get_message_queue_offset(mq))
+
+            has_new_msg = True
+            while has_new_msg:
                 pull_res = dll.Pull(
                     self._handle,
-                    ctypes.pointer(message_queue[i]),
+                    ctypes.pointer(mq),
                     _to_bytes(expression),
                     tmp_offset,
                     max_num,
                 )
+
                 if pull_res.pullStatus != _CPullStatus.BROKER_TIMEOUT:
                     tmp_offset = pull_res.nextBeginOffset
+                    self.set_message_queue_offset(mq, tmp_offset)
+
                 if pull_res.pullStatus == _CPullStatus.FOUND:
                     for i in range(int(pull_res.size)):
                         yield RecvMessage(pull_res.msgFoundList[i])
-                elif pull_res.pullStatus in [_CPullStatus.NO_MATCHED_MSG, _CPullStatus.NO_NEW_MSG, _CPullStatus.OFFSET_ILLEGAL]:
-                    dll.ReleasePullResult(pull_res)  # NOTE: No need to check ffi return code here
-                    break
+                elif pull_res.pullStatus == _CPullStatus.NO_MATCHED_MSG:
+                    pass
+                elif pull_res.pullStatus == _CPullStatus.NO_NEW_MSG:
+                    has_new_msg = False
+                elif pull_res.pullStatus == _CPullStatus.OFFSET_ILLEGAL:
+                    pass
                 else:
-                    dll.ReleasePullResult(pull_res)  # NOTE: No need to check ffi return code here
-                    break
-                dll.ReleasePullResult(pull_res)  # NOTE: No need to check ffi return code here
+                    pass
+            dll.ReleasePullResult(pull_res)  # NOTE: No need to check ffi return code here
         ffi_check(dll.ReleaseSubscriptionMessageQueue(message_queue))