blob: a09422b89a1cf02752793c49383cd4f6fc22568f [file] [log] [blame]
# -*- coding: utf-8 -*-
import ctypes
from collections import namedtuple
from .ffi import dll, _CSendResult
SendResult = namedtuple('SendResult', ['status', 'msg_id', 'offset'])
class Message(object):
def __init__(self, topic):
self._handle = dll.CreateMessage(topic.encode('utf-8'))
def __del__(self):
if self._handle is not None:
dll.DestroyMessage(self._handle)
def set_keys(self, keys):
return dll.SetMessageKeys(self._handle, keys.encode('utf-8'))
def set_body(self, body):
return dll.SetMessageBody(self._handle, body.encode('utf-8'))
def set_property(self, key, value):
return dll.SetMessageProperty(self._handle, key.encode('utf-8'), value.encode('utf-8'))
@property
def _as_parameter_(self):
return self._handle
class Producer(object):
def __init__(self, group_id):
self._handle = dll.CreateProducer(group_id.encode('utf-8'))
def __del__(self):
if self._handle is not None:
dll.DestroyProducer(self._handle)
def send_sync(self, msg):
cres = _CSendResult()
dll.SendMessageSync(self._handle, msg, ctypes.pointer(cres))
return SendResult(cres.sendStatus, cres.msgId.decode('utf-8'), cres.offset)
def send_oneway(self, msg):
return dll.SendMessageOneway(self._handle, msg)
def set_group(self, group_name):
return dll.SetProducerGroupName(group_name.encode('utf-8'))
def set_namesrv_addr(self, addr):
return dll.SetProducerNameServerAddress(self._handle, addr.encode('utf-8'))
def set_namesrv_domain(self, domain):
return dll.SetProducerNameServerDomain(self._handle, domain.encode('utf-8'))
def set_session_credentials(self, access_key, access_secret, channel):
return dll.SetProducerSessionCredentials(self._handle, access_key.encode('utf-8'), access_secret.encode('utf-8'), channel.encode('utf-8'))
def start(self):
return dll.StartProducer(self._handle)
def shutdown(self):
return dll.ShutdownProducer(self._handle)
class PushConsumer(object):
def __init__(self, group_id):
self._handle = dll.CreatePushConsumer(group_id.encode('utf-8'))
def __del__(self):
if self._handle is not None:
dll.DestroyPushConsumer(self._handle)
def start(self):
dll.StartPushConsumer(self._handle)
def shutdown(self):
dll.ShutdownPushConsumer(self._handle)
def set_group(self, group_id):
return dll.SetPushConsumerGroupID(group_id.encode('utf-8'))
def set_namesrv_addr(self, addr):
return dll.SetPushConsumerNameServerAddress(self._handle, addr.encode('utf-8'))
def set_namesrv_domain(self, domain):
return dll.SetPushConsumerNameServerDomain(self._handle, domain.encode('utf-8'))
def set_session_credentials(self, access_key, access_secret, channel):
return dll.SetPushConsumerSessionCredentials(self._handle, access_key.encode('utf-8'), access_secret.encode('utf-8'), channel.encode('utf-8'))
def subscribe(self, topic, expression):
return dll.Subscribe(self._handle, topic.encode('utf-8'), expression.encode('utf-8'))
def register_callback(self, callback, orderly=False):
from .ffi import MSG_CALLBACK_FUNC
if orderly:
register_func = dll.RegisterMessageCallbackOrderly
else:
register_func = dll.RegisterMessageCallback
return register_func(self._handle, MSG_CALLBACK_FUNC(callback))
def unregister_callback(self, orderly=False):
if orderly:
return dll.UnregisterMessageCallbackOrderly(self._handle)
return dll.UnregisterMessageCallback(self._handle)
def set_thread_count(self, thread_count):
return dll.SetPushConsumerThreadCount(self._handle, thread_count)
def set_message_batch_max_size(self, max_size):
return dll.SetPushConsumerMessageBatchMaxSize(self._handle, max_size)
def set_instance_name(self, name):
return dll.SetPushConsumerInstanceName(self._handle, name.encode('utf-8'))