Add basic PushConsumer
diff --git a/rocketmq/client.py b/rocketmq/client.py
index a90a033..a09422b 100644
--- a/rocketmq/client.py
+++ b/rocketmq/client.py
@@ -63,3 +63,56 @@
def shutdown(self):
return dll.ShutdownProducer(self._handle)
+
+
+class PushConsumer(object):
+ def __init__(self, group_id):
+ self._handle = dll.CreatePushConsumer(group_id.encode('utf-8'))
+
+ def __del__(self):
+ if self._handle is not None:
+ dll.DestroyPushConsumer(self._handle)
+
+ def start(self):
+ dll.StartPushConsumer(self._handle)
+
+ def shutdown(self):
+ dll.ShutdownPushConsumer(self._handle)
+
+ def set_group(self, group_id):
+ return dll.SetPushConsumerGroupID(group_id.encode('utf-8'))
+
+ def set_namesrv_addr(self, addr):
+ return dll.SetPushConsumerNameServerAddress(self._handle, addr.encode('utf-8'))
+
+ def set_namesrv_domain(self, domain):
+ return dll.SetPushConsumerNameServerDomain(self._handle, domain.encode('utf-8'))
+
+ def set_session_credentials(self, access_key, access_secret, channel):
+ return dll.SetPushConsumerSessionCredentials(self._handle, access_key.encode('utf-8'), access_secret.encode('utf-8'), channel.encode('utf-8'))
+
+ def subscribe(self, topic, expression):
+ return dll.Subscribe(self._handle, topic.encode('utf-8'), expression.encode('utf-8'))
+
+ def register_callback(self, callback, orderly=False):
+ from .ffi import MSG_CALLBACK_FUNC
+
+ if orderly:
+ register_func = dll.RegisterMessageCallbackOrderly
+ else:
+ register_func = dll.RegisterMessageCallback
+ return register_func(self._handle, MSG_CALLBACK_FUNC(callback))
+
+ def unregister_callback(self, orderly=False):
+ if orderly:
+ return dll.UnregisterMessageCallbackOrderly(self._handle)
+ return dll.UnregisterMessageCallback(self._handle)
+
+ def set_thread_count(self, thread_count):
+ return dll.SetPushConsumerThreadCount(self._handle, thread_count)
+
+ def set_message_batch_max_size(self, max_size):
+ return dll.SetPushConsumerMessageBatchMaxSize(self._handle, max_size)
+
+ def set_instance_name(self, name):
+ return dll.SetPushConsumerInstanceName(self._handle, name.encode('utf-8'))