blob: d93c95769a29bdda14bfca711400c14ecc023d74 [file] [log] [blame]
# -*- coding: utf-8 -*-
import ctypes
from collections import namedtuple
from .ffi import dll, _CSendResult, MSG_CALLBACK_FUNC, _CMessageQueue, _CPullStatus
SendResult = namedtuple('SendResult', ['status', 'msg_id', 'offset'])
class Message(object):
def __init__(self, topic):
self._handle = dll.CreateMessage(topic.encode('utf-8'))
def __del__(self):
if self._handle is not None:
dll.DestroyMessage(self._handle)
def set_keys(self, keys):
return dll.SetMessageKeys(self._handle, keys.encode('utf-8'))
def set_tags(self, tags):
return dll.SetMessageTags(self._handle, tags.encode('utf-8'))
def set_body(self, body):
return dll.SetMessageBody(self._handle, body.encode('utf-8'))
def set_property(self, key, value):
return dll.SetMessageProperty(self._handle, key.encode('utf-8'), value.encode('utf-8'))
def set_delay_time_level(self, delay_time_level):
return dll.SetDelayTimeLevel(self._handle, delay_time_level)
@property
def _as_parameter_(self):
return self._handle
def maybe_decode(val):
if val:
return val.decode('utf-8')
class RecvMessage(object):
def __init__(self, handle):
self.topic = maybe_decode(dll.GetMessageTopic(handle))
self.tags = maybe_decode(dll.GetMessageTags(handle))
self.keys = maybe_decode(dll.GetMessageKeys(handle))
self.body = dll.GetMessageBody(handle)
self.id = maybe_decode(dll.GetMessageId(handle))
self.delay_time_level = dll.GetMessageDelayTimeLevel(handle)
self.queue_id = dll.GetMessageQueueId(handle)
self.reconsume_times = dll.GetMessageReconsumeTimes(handle)
self.store_size = dll.GetMessageStoreSize(handle)
self.born_timestamp = dll.GetMessageBornTimestamp(handle)
self.store_timestamp = dll.GetMessageStoreTimestamp(handle)
self.queue_offset = dll.GetMessageQueueOffset(handle)
self.commit_log_offset = dll.GetMessageCommitLogOffset(handle)
self.prepared_transaction_offset = dll.GetMessagePreparedTransactionOffset(handle)
class Producer(object):
def __init__(self, group_id):
self._handle = dll.CreateProducer(group_id.encode('utf-8'))
def __del__(self):
if self._handle is not None:
dll.DestroyProducer(self._handle)
def send_sync(self, msg):
cres = _CSendResult()
dll.SendMessageSync(self._handle, msg, ctypes.pointer(cres))
return SendResult(cres.sendStatus, cres.msgId.decode('utf-8'), cres.offset)
def send_oneway(self, msg):
return dll.SendMessageOneway(self._handle, msg)
def set_group(self, group_name):
return dll.SetProducerGroupName(group_name.encode('utf-8'))
def set_namesrv_addr(self, addr):
return dll.SetProducerNameServerAddress(self._handle, addr.encode('utf-8'))
def set_namesrv_domain(self, domain):
return dll.SetProducerNameServerDomain(self._handle, domain.encode('utf-8'))
def set_session_credentials(self, access_key, access_secret, channel):
return dll.SetProducerSessionCredentials(self._handle, access_key.encode('utf-8'), access_secret.encode('utf-8'), channel.encode('utf-8'))
def start(self):
return dll.StartProducer(self._handle)
def shutdown(self):
return dll.ShutdownProducer(self._handle)
class PushConsumer(object):
def __init__(self, group_id, orderly=False):
self._handle = dll.CreatePushConsumer(group_id.encode('utf-8'))
self._orderly = orderly
def __del__(self):
if self._handle is not None:
dll.DestroyPushConsumer(self._handle)
def start(self):
return dll.StartPushConsumer(self._handle)
def shutdown(self):
return dll.ShutdownPushConsumer(self._handle)
def set_group(self, group_id):
return dll.SetPushConsumerGroupID(group_id.encode('utf-8'))
def set_namesrv_addr(self, addr):
return dll.SetPushConsumerNameServerAddress(self._handle, addr.encode('utf-8'))
def set_namesrv_domain(self, domain):
return dll.SetPushConsumerNameServerDomain(self._handle, domain.encode('utf-8'))
def set_session_credentials(self, access_key, access_secret, channel):
return dll.SetPushConsumerSessionCredentials(self._handle, access_key.encode('utf-8'), access_secret.encode('utf-8'), channel.encode('utf-8'))
def subscribe(self, topic, callback, expression='*'):
from .ffi import _CConsumeStatus
def _on_message(consumer, msg):
try:
callback(msg)
except Exception:
return _CConsumeStatus.CONSUME_SUCCESS.value
return _CConsumeStatus.RECONSUME_LATER.value
dll.Subscribe(self._handle, topic.encode('utf-8'), expression.encode('utf-8'))
self._register_callback(MSG_CALLBACK_FUNC(_on_message))
def _register_callback(self, callback):
if self._orderly:
register_func = dll.RegisterMessageCallbackOrderly
else:
register_func = dll.RegisterMessageCallback
return register_func(self._handle, MSG_CALLBACK_FUNC(callback))
def _unregister_callback(self):
if self._orderly:
return dll.UnregisterMessageCallbackOrderly(self._handle)
return dll.UnregisterMessageCallback(self._handle)
def set_thread_count(self, thread_count):
return dll.SetPushConsumerThreadCount(self._handle, thread_count)
def set_message_batch_max_size(self, max_size):
return dll.SetPushConsumerMessageBatchMaxSize(self._handle, max_size)
def set_instance_name(self, name):
return dll.SetPushConsumerInstanceName(self._handle, name.encode('utf-8'))
class PullConsumer(object):
def __init__(self, group_id):
self._handle = dll.CreatePullConsumer(group_id.encode('utf-8'))
def __del__(self):
if self._handle is not None:
dll.DestroyPullConsumer(self._handle)
def start(self):
return dll.StartPullConsumer(self._handle)
def shutdown(self):
return dll.ShutdownPullConsumer(self._handle)
def set_group(self, group_id):
return dll.SetPullConsumerGroupID(group_id.encode('utf-8'))
def set_namesrv_addr(self, addr):
return dll.SetPullConsumerNameServerAddress(self._handle, addr.encode('utf-8'))
def set_namesrv_domain(self, domain):
return dll.SetPullConsumerNameServerDomain(self._handle, domain.encode('utf-8'))
def set_session_credentials(self, access_key, access_secret, channel):
return dll.SetPullConsumerSessionCredentials(self._handle, access_key.encode('utf-8'), access_secret.encode('utf-8'), channel.encode('utf-8'))
def pull(self, topic):
message_queue = ctypes.POINTER(_CMessageQueue)()
queue_size = ctypes.c_int()
dll.FetchSubscriptionMessageQueues(self._handle, topic.encode('utf-8'), ctypes.pointer(message_queue), ctypes.pointer(queue_size))
for i in range(int(queue_size.value)):
tmp_offset = ctypes.c_longlong()
while True:
pull_res = dll.Pull(self._handle, ctypes.pointer(message_queue[i]), b'*', tmp_offset, 32)
if pull_res.pullStatus != _CPullStatus.BROKER_TIMEOUT:
tmp_offset = pull_res.nextBeginOffset
if pull_res.pullStatus == _CPullStatus.FOUND:
for i in range(int(pull_res.size)):
yield RecvMessage(pull_res.msgFoundList[i])
elif pull_res.pullStatus == _CPullStatus.NO_MATCHED_MSG:
break
dll.ReleasePullResult(pull_res)
dll.ReleaseSubscriptionMessageQueue(message_queue)