https://github.com/XiaoMi/pegasus-python-client
Python 3.7+
pip3 install pypegasus3
pegasus python3 client 从 python2-client 改动而来,对原有的接口参数做了破坏性改动,由str
改为bytes
,并不再支持 python2 环境。
其他使用方法与 python2 客户端无差异
完整的示例请参考sample。以下是简单的示例:
#!/usr/bin/env python # coding:utf-8 from pypegasus.pgclient import Pegasus from twisted.internet import reactor from twisted.internet.defer import inlineCallbacks @inlineCallbacks def basic_test(): # init c = Pegasus(['127.0.0.1:34601', '127.0.0.1:34602'], 'temp') suc = yield c.init() if not suc: reactor.stop() print('ERROR: connect pegasus server failed') return # set try: ret = yield c.set('hkey1', 'skey1', 'value', 0, 500) print('set ret: ', ret) except Exception as e: print(e) # get ret = yield c.get('hkey1', 'skey1') print('get ret: ', bytes.decode(ret)) reactor.stop() if __name__ == "__main__": reactor.callWhenRunning(basic_test) reactor.run()
pegasus python client使用了logging日志包,默认配置如下:
[loggers] keys=root [logger_root] level=INFO handlers=hand01 propagate=0 [handlers] keys=hand01 [handler_hand01] class=handlers.RotatingFileHandler formatter=form01 args=('pegasus.log', 'a', 100*1024*1024, 10) [formatters] keys=form01 [formatter_form01] format=%(asctime)s [%(thread)d] [%(levelname)s] %(filename)s:%(lineno)d %(message)s datefmt=%Y-%m-%d %H:%M:%S
如果用户有定制需求,可以在自己的代码目录添加配置文件logger.conf
初始化先构造Pegasus对象,在使用init函数完成初始化:
class Pegasus(object): """ Pegasus client class. """ def __init__(self, meta_addrs=None, table_name='', timeout=DEFAULT_TIMEOUT): """ :param meta_addrs: (list) pagasus meta servers list. example: ['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'] :param table_name: (bytes) table name/app name used in pegasus. :param timeout: (int) default timeout in milliseconds when communicate with meta sever and replica server. """
def init(self): """ Initialize the client before you can use it. :return: (DeferredList) True when initialized succeed, others when failed. """
判断key的剩余的ttl时间
def ttl(self, hash_key, sort_key, timeout=0): """ Get ttl(time to live) of the data. :param hash_key: (bytes) which hash key used for this API. :param sort_key: (bytes) which sort key used for this API. :param timeout: (int) how long will the operation timeout in milliseconds. if timeout > 0, it is a timeout value for current operation, else the timeout value specified to create the instance will be used. :return: (tuple<error_types.code.value, int>) (code, ttl) code: error_types.ERR_OK.value when data exist, error_types.ERR_OBJECT_NOT_FOUND.value when data not found. ttl: in seconds, -1 means forever. """
判断key是否存在
def exist(self, hash_key, sort_key, timeout=0): """ Check value exist. :param hash_key: (bytes) which hash key used for this API. :param sort_key: (bytes) which sort key used for this API. :param timeout: (int) how long will the operation timeout in milliseconds. if timeout > 0, it is a timeout value for current operation, else the timeout value specified to create the instance will be used. :return: (tuple<error_types.code.value, None>) (code, ign) code: error_types.ERR_OK.value when data exist, error_types.ERR_OBJECT_NOT_FOUND.value when data not found. ign: useless, should be ignored. """
插入一条数据(若已存在则会覆盖)
def set(self, hash_key, sort_key, value, ttl=0, timeout=0): """ Set value to be stored in <hash_key, sort_key>. :param hash_key: (bytes) which hash key used for this API. :param sort_key: (bytes) which sort key used for this API. :param value: (bytes) value to be stored under <hash_key, sort_key>. :param ttl: (int) ttl(time to live) in seconds of this data. :param timeout: (int) how long will the operation timeout in milliseconds. if timeout > 0, it is a timeout value for current operation, else the timeout value specified to create the instance will be used. :return: (tuple<error_types.code.value, None>) (code, ign) code: error_types.ERR_OK.value when data stored succeed. ign: useless, should be ignored. """
同时写一条hashkey的多条sortkey数据
def multi_set(self, hash_key, sortkey_value_dict, ttl=0, timeout=0): """ Set multiple sort_keys-values under hash_key to be stored. :param hash_key: (bytes) which hash key used for this API. :param sortkey_value_dict: (dict) <sort_key, value> pairs in dict. :param ttl: (int) ttl(time to live) in seconds of these data. :param timeout: (int) how long will the operation timeout in milliseconds. if timeout > 0, it is a timeout value for current operation, else the timeout value specified to create the instance will be used. :return: (tuple<error_types.code.value, _>) (code, ign) code: error_types.ERR_OK.value when data stored succeed. ign: useless, should be ignored. """
获取一条数据
def get(self, hash_key, sort_key, timeout=0): """ Get value stored in <hash_key, sort_key>. :param hash_key: (bytes) which hash key used for this API. :param sort_key: (bytes) which sort key used for this API. :param timeout: (int) how long will the operation timeout in milliseconds. if timeout > 0, it is a timeout value for current operation, else the timeout value specified to create the instance will be used. :return: (tuple<error_types.code.value, bytes>) (code, value). code: error_types.ERR_OK.value when data got succeed, error_types.ERR_OBJECT_NOT_FOUND.value when data not found. value: data stored in this <hash_key, sort_key> """
同时读一条hashkey的多条sortkey数据
def multi_get(self, hash_key, sortkey_set, max_kv_count=100, max_kv_size=1000000, no_value=False, timeout=0): """ Get multiple values stored in <hash_key, sortkey> pairs. :param hash_key: (bytes) which hash key used for this API. :param sortkey_set: (set) sort keys in set. :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit. :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit. :param no_value: (bool) whether to fetch value of these keys. :param timeout: (int) how long will the operation timeout in milliseconds. if timeout > 0, it is a timeout value for current operation, else the timeout value specified to create the instance will be used. :return: (tuple<error_types.code.value, dict>) (code, kvs) code: error_types.ERR_OK.value when data got succeed. kvs: <sort_key, value> pairs in dict. """
同时读一条hashkey的多条sortkey数据, 读取的数据根据multi_get_options
参数指定的模式确定。
def multi_get_opt(self, hash_key, start_sort_key, stop_sort_key, multi_get_options, max_kv_count=100, max_kv_size=1000000, timeout=0): """ Get multiple values stored in hash_key, and sort key range in [start_sort_key, stop_sort_key) as default. :param hash_key: (bytes) which hash key used for this API. :param start_sort_key: (bytes) returned k-v pairs is start from start_sort_key. :param stop_sort_key: (bytes) returned k-v pairs is stop at stop_sort_key. :param multi_get_options: (MultiGetOptions) configurable multi_get options. :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit. :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit. :param timeout: (int) how long will the operation timeout in milliseconds. if timeout > 0, it is a timeout value for current operation, else the timeout value specified to create the instance will be used. :return: (tuple<error_types.code.value, dict>) (code, kvs) code: error_types.ERR_OK.value when data got succeed. kvs: <sort_key, value> pairs in dict. """
其中,MultiGetOptions
可以指定sortkey的范围、是否包含边界、子串匹配、是否返回value、是否逆序等,具体定义如下:
class MultiGetOptions(object): """ configurable options for multi_get. """ def __init__(self): self.start_inclusive = True self.stop_inclusive = False self.sortkey_filter_type = filter_type.FT_NO_FILTER self.sortkey_filter_pattern = "" self.no_value = False self.reverse = False class filter_type: FT_NO_FILTER = 0 FT_MATCH_ANYWHERE = 1 FT_MATCH_PREFIX = 2 FT_MATCH_POSTFIX = 3
删除一条数据
def remove(self, hash_key, sort_key, timeout=0): """ Remove the entire <hash_key, sort_key>-value in pegasus. :param hash_key: (bytes) which hash key used for this API. :param sort_key: (bytes) which sort key used for this API. :param timeout: (int) how long will the operation timeout in milliseconds. if timeout > 0, it is a timeout value for current operation, else the timeout value specified to create the instance will be used. :return: (tuple<error_types.code.value, None>) (code, ign) code: error_types.ERR_OK.value when data stored succeed. ign: useless, should be ignored. """
批量删除一个hashkey下的多条sortkey数据
def multi_del(self, hash_key, sortkey_set, timeout=0): """ Remove multiple entire <hash_key, sort_key>-values in pegasus. :param hash_key: (bytes) which hash key used for this API. :param sortkey_set: (set) sort keys in set. :param timeout: (int) how long will the operation timeout in milliseconds. if timeout > 0, it is a timeout value for current operation, else the timeout value specified to create the instance will be used. :return: (tuple<error_types.code.value, int>) (code, count). code: error_types.ERR_OK.value when data got succeed. count: count of deleted k-v pairs. """
获取一个hashkey下的sortkey数量
def sort_key_count(self, hash_key, timeout=0): """ Get the total sort key count under the hash_key. :param hash_key: (bytes) which hash key used for this API. :param timeout: (int) how long will the operation timeout in milliseconds. if timeout > 0, it is a timeout value for current operation, else the timeout value specified to create the instance will be used. :return: (tuple<error_types.code.value, count>) (code, count) code: error_types.ERR_OK.value when data got succeed, error_types.ERR_OBJECT_NOT_FOUND.value when data not found. value: total sort key count under the hash_key. """
获取一个hashkey下的sortkey值
def get_sort_keys(self, hash_key, max_kv_count=100, max_kv_size=1000000, timeout=0): """ Get multiple sort keys under hash_key. :param hash_key: (bytes) which hash key used for this API. :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit. :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit. :param timeout: (int) how long will the operation timeout in milliseconds. if timeout > 0, it is a timeout value for current operation, else the timeout value specified to create the instance will be used. :return: (tuple<error_types.code.value, set>) (code, ks) code: error_types.ERR_OK.value when data got succeed. ks: <sort_key, ign> pairs in dict, ign will always be empty bytes. """
获取scanner对象,用于指定范围的数据扫描。可以通过scan_options
参数指定扫描的模式。
def get_scanner(self, hash_key, start_sort_key, stop_sort_key, scan_options): """ Get scanner for hash_key, start from start_sort_key, and stop at stop_sort_key. Whether the scanner include the start_sort_key and stop_sort_key is configurable by scan_options :param hash_key: (bytes) which hash key used for this API. :param start_sort_key: (bytes) returned scanner is start from start_sort_key. :param stop_sort_key: (bytes) returned scanner is stop at stop_sort_key. :param scan_options: (ScanOptions) configurable scan options. :return: (PegasusScanner) scanner, instance of PegasusScanner. """
其中,ScanOptions
可以指定是否包含边界、超时时间、一次从replica server批量获取的sortkey-value数量等,具体定义如下:
class ScanOptions(object): """ configurable options for scan. """ def __init__(self): self.timeout_millis = 5000 self.batch_size = 1000 self.start_inclusive = True self.stop_inclusive = False self.snapshot = None # for future use
一次性获取多个scanner,用于整个table的数据扫描。可以通过scan_options
参数指定扫描的模式。
def get_unordered_scanners(self, max_split_count, scan_options): """ Get scanners for the whole pegasus table. :param max_split_count: (int) max count of scanners will be returned. :param scan_options: (ScanOptions) configurable scan options. :return: (list) instance of PegasusScanner list. each scanner in this list can scan separate part of the whole pegasus table. """
用于数据扫描的对象,由get_scanner
和get_unordered_scanners
返回。使用它的next
函数执行扫描过程。
class PegasusScanner(object): """ Pegasus scanner class, used for scanning data in pegasus table. """
获取扫描得到的数据,需要循环执行,直到返回None
结束扫描。
def get_next(self): """ scan the next k-v pair for the scanner. :return: (tuple<tuple<hash_key, sort_key>, value> or None) all the sort_keys returned by this API are in ascend order. """