| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from __future__ import print_function |
| from __future__ import with_statement |
| |
| import os |
| import logging.config |
| import six |
| |
| from thrift.Thrift import TMessageType, TApplicationException |
| from twisted.internet import defer |
| from twisted.internet import reactor |
| from twisted.internet.defer import inlineCallbacks, succeed, fail |
| from twisted.internet.protocol import ClientCreator |
| |
| from pypegasus import rrdb, replication |
| from pypegasus.base.ttypes import * |
| from pypegasus.operate.packet import * |
| from pypegasus.replication.ttypes import query_cfg_request |
| from pypegasus.rrdb import * |
| from pypegasus.rrdb.ttypes import scan_request, get_scanner_request, update_request, key_value, multi_put_request, \ |
| multi_get_request, multi_remove_request |
| from pypegasus.transport.protocol import * |
| from pypegasus.utils.tools import restore_key, get_ttl, bytes_cmp, ScanOptions |
| |
| try: |
| from thrift.protocol import fastbinary |
| except: |
| fastbinary = None |
| |
| |
| logging.config.fileConfig(os.path.dirname(__file__)+"/logger.conf") |
| logger = logging.getLogger("pgclient") |
| |
| DEFAULT_TIMEOUT = 2000 # ms |
| META_CHECK_INTERVAL = 2 # s |
| MAX_TIMEOUT_THRESHOLD = 5 # times |
| |
| |
| class BaseSession(object): |
| |
| def __init__(self, transport, oprot_factory, container, timeout): |
| self._transport = transport # TPegasusTransport |
| self._oprot_factory = oprot_factory |
| self._container = container |
| self._seqid = 0 |
| self._requests = {} |
| self._default_timeout = timeout |
| |
| def __del__(self): |
| self.close() |
| |
| def get_peer_addr(self): |
| return self._transport.get_peer_addr() |
| |
| def cb_send(self, _, seqid): |
| return self._requests[seqid] |
| |
| def eb_send(self, f, seqid): |
| d = self._requests.pop(seqid) |
| d.errback(f) |
| logger.warning('peer: %s, failure: %s', |
| self.get_peer_addr(), f) |
| return d |
| |
| def eb_recv(self, f): |
| logger.warning('peer: %s, failure: %s', |
| self.get_peer_addr(), f) |
| |
| def on_timeout(self, _, seconds): |
| ec = error_types.ERR_TIMEOUT |
| self._container.update_state(ec) |
| logger.warning('peer: %s, time: %s s, timeout', |
| self.get_peer_addr(), seconds) |
| return ec.value, None |
| |
| def operate(self, op, timeout=None): |
| if not isinstance(timeout, int) or timeout <= 0: |
| timeout = self._default_timeout |
| |
| seqid = self._seqid = self._seqid + 1 # TODO should keep atomic |
| dr = defer.Deferred() |
| dr.addErrback(self.eb_recv) |
| self._requests[seqid] = dr |
| |
| # ds(deferred send) will wait dr(deferred receive) |
| ds = defer.maybeDeferred(self.send_req, op, seqid) |
| ds.addCallbacks( |
| callback=self.cb_send, |
| callbackArgs=(seqid,), |
| errback=self.eb_send, |
| errbackArgs=(seqid,)) |
| ds.addTimeout(timeout/1000.0, reactor, self.on_timeout) |
| return ds |
| |
| def send_req(self, op, seqid): |
| oprot = self._oprot_factory.getProtocol(self._transport) |
| oprot.trans.seek(ThriftHeader.HEADER_LENGTH) # skip header |
| op.send_data(oprot, seqid) |
| body_length = oprot.trans.tell() - ThriftHeader.HEADER_LENGTH |
| oprot.trans.seek(0) # back to header |
| oprot.trans.write(op.prepare_thrift_header(body_length)) |
| oprot.trans.flush() |
| |
| def recv_ACK(self, iprot, mtype, rseqid, errno, result_type, parser): |
| if rseqid not in self._requests: |
| logger.warning('peer: %s rseqid: %s not found', |
| self.get_peer_addr(), rseqid) |
| return |
| d = self._requests.pop(rseqid) |
| if errno != 'ERR_OK': |
| rc = error_code.value_of(errno) |
| self._container.update_state(rc) |
| return d.callback(rc) |
| else: |
| if mtype == TMessageType.EXCEPTION: |
| x = TApplicationException() |
| x.read(iprot) |
| iprot.readMessageEnd() |
| return d.errback(x) |
| |
| result = result_type() |
| result.read(iprot) |
| if result.success: |
| return d.callback(parser(result.success)) |
| |
| return d.errback(TApplicationException(TApplicationException.MISSING_RESULT, |
| "%s failed: unknown result" % |
| getattr(result_type, '__name__'))) |
| |
| def close(self): |
| self._transport.close() |
| |
| |
| class MetaSession(BaseSession): |
| |
| def __init__(self, transport, oprot_factory, container, timeout): |
| BaseSession.__init__(self, transport, oprot_factory, container, timeout) |
| |
| def recv_RPC_CM_QUERY_PARTITION_CONFIG_BY_INDEX_ACK(self, iprot, mtype, rseqid, errno): |
| self.recv_ACK(iprot, mtype, rseqid, errno, |
| meta.query_cfg_result, |
| QueryCfgOperator.parse_result) |
| |
| |
| class ReplicaSession(BaseSession): |
| |
| def __init__(self, transport, oprot_factory, container, timeout): |
| BaseSession.__init__(self, transport, oprot_factory, container, timeout) |
| |
| def recv_RPC_RRDB_RRDB_PUT_ACK(self, iprot, mtype, rseqid, errno): |
| self.recv_ACK(iprot, mtype, rseqid, errno, |
| rrdb.put_result, RrdbPutOperator.parse_result) |
| |
| def recv_RPC_RRDB_RRDB_TTL_ACK(self, iprot, mtype, rseqid, errno): |
| self.recv_ACK(iprot, mtype, rseqid, errno, |
| rrdb.ttl_result, |
| RrdbTtlOperator.parse_result) |
| |
| def recv_RPC_RRDB_RRDB_GET_ACK(self, iprot, mtype, rseqid, errno): |
| self.recv_ACK(iprot, mtype, rseqid, errno, |
| rrdb.get_result, |
| RrdbGetOperator.parse_result) |
| |
| def recv_RPC_RRDB_RRDB_REMOVE_ACK(self, iprot, mtype, rseqid, errno): |
| self.recv_ACK(iprot, mtype, rseqid, errno, |
| rrdb.put_result, |
| RrdbRemoveOperator.parse_result) |
| |
| def recv_RPC_RRDB_RRDB_SORTKEY_COUNT_ACK(self, iprot, mtype, rseqid, errno): |
| self.recv_ACK(iprot, mtype, rseqid, errno, |
| rrdb.sortkey_count_result, |
| RrdbSortkeyCountOperator.parse_result) |
| |
| def recv_RPC_RRDB_RRDB_MULTI_PUT_ACK(self, iprot, mtype, rseqid, errno): |
| self.recv_ACK(iprot, mtype, rseqid, errno, |
| rrdb.put_result, |
| RrdbMultiPutOperator.parse_result) |
| |
| def recv_RPC_RRDB_RRDB_MULTI_GET_ACK(self, iprot, mtype, rseqid, errno): |
| self.recv_ACK(iprot, mtype, rseqid, errno, |
| rrdb.multi_get_result, |
| RrdbMultiGetOperator.parse_result) |
| |
| def recv_RPC_RRDB_RRDB_MULTI_REMOVE_ACK(self, iprot, mtype, rseqid, errno): |
| self.recv_ACK(iprot, mtype, rseqid, errno, |
| rrdb.multi_remove_result, |
| RrdbMultiRemoveOperator.parse_result) |
| |
| def recv_RPC_RRDB_RRDB_GET_SCANNER_ACK(self, iprot, mtype, rseqid, errno): |
| self.recv_ACK(iprot, mtype, rseqid, errno, |
| rrdb.get_scanner_result, |
| RrdbGetScannerOperator.parse_result) |
| |
| def recv_RPC_RRDB_RRDB_SCAN_ACK(self, iprot, mtype, rseqid, errno): |
| self.recv_ACK(iprot, mtype, rseqid, errno, |
| rrdb.scan_result, |
| RrdbScanOperator.parse_result) |
| |
| |
| class SessionManager(object): |
| def __init__(self, name, timeout): |
| self.name = name |
| self.session_dict = {} # rpc_addr => session |
| self.timeout = timeout |
| |
| def __del__(self): |
| self.close() |
| |
| def got_conn(self, conn): |
| addr = rpc_address() |
| addr.from_string(conn.transport.addr[0] + ':' + str(conn.transport.addr[1])) |
| self.session_dict[addr] = conn.client |
| return conn.client |
| |
| def got_err(self, err): |
| logger.error('table: %s, connect err: %s', |
| self.name, err) |
| |
| def close(self): |
| for session in self.session_dict.values(): |
| session.close() |
| |
| def update_state(self, ec): |
| pass |
| |
| |
| class MetaSessionManager(SessionManager): |
| |
| def __init__(self, table_name, timeout): |
| SessionManager.__init__(self, table_name, timeout) |
| self.addr_list = [] |
| |
| def add_meta_server(self, meta_addr): |
| rpc_addr = rpc_address() |
| if rpc_addr.from_string(meta_addr): |
| ip_port = meta_addr.split(':') |
| if not len(ip_port) == 2: |
| return False |
| |
| ip, port = ip_port[0], int(ip_port[1]) |
| self.addr_list.append((ip, port)) |
| |
| return True |
| else: |
| return False |
| |
| @inlineCallbacks |
| def query_one(self, session): |
| req = query_cfg_request(self.name, []) |
| op = QueryCfgOperator(gpid(0, 0), req, 0) |
| ret = yield session.operate(op) |
| defer.returnValue(ret) |
| |
| def got_results(self, res): |
| for (suc, result) in res: |
| if suc \ |
| and result.__class__.__name__ == "query_cfg_response" \ |
| and result.is_stateful: |
| logger.info('table: %s, partition result: %s', |
| self.name, result) |
| return result |
| |
| logger.error('query partition info err. table: %s err: %s', |
| self.name, res) |
| |
| def query(self): |
| ds = [] |
| for (ip, port) in self.addr_list: |
| rpc_addr = rpc_address() |
| rpc_addr.from_string(ip + ':' + str(port)) |
| if rpc_addr in self.session_dict: |
| self.session_dict[rpc_addr].close() |
| |
| d = ClientCreator(reactor, |
| TPegasusThriftClientProtocol, |
| MetaSession, |
| TBinaryProtocol.TBinaryProtocolFactory(), |
| None, |
| self, |
| self.timeout |
| ).connectTCP(ip, port, self.timeout) |
| d.addCallbacks(self.got_conn, self.got_err) |
| d.addCallbacks(self.query_one, self.got_err) |
| ds.append(d) |
| |
| dlist = defer.DeferredList(ds, consumeErrors=True) |
| dlist.addCallback(self.got_results) |
| return dlist |
| |
| |
| class Table(SessionManager): |
| def __init__(self, name, container, timeout): |
| SessionManager.__init__(self, name, timeout) |
| self.app_id = 0 |
| self.partition_count = 0 |
| self.query_cfg_response = None |
| self.partition_dict = {} # partition_index => rpc_addr |
| self.partition_ballot = {} # partition_index => ballot |
| self.container = container |
| |
| def got_results(self, res): |
| logger.info('table: %s, replica session: %s', |
| self.name, res) |
| return True |
| |
| def update_cfg(self, resp): |
| if resp.__class__.__name__ != "query_cfg_response": |
| logger.error('table: %s, query_cfg_response is error', |
| self.name) |
| return None |
| |
| self.query_cfg_response = resp |
| self.app_id = self.query_cfg_response.app_id |
| self.partition_count = self.query_cfg_response.partition_count |
| |
| ds = [] |
| connected_rpc_addrs = {} |
| for partition in self.query_cfg_response.partitions: |
| rpc_addr = partition.primary |
| self.partition_dict[partition.pid.get_pidx()] = rpc_addr |
| self.partition_ballot[partition.pid.get_pidx()] = partition.ballot |
| |
| # table is partition split, and child partition is not ready |
| # child requests should be redirected to its parent partition |
| # this will be happened when query meta is called during partition split |
| if partition.ballot < 0: |
| continue |
| |
| if rpc_addr in connected_rpc_addrs or rpc_addr.address == 0: |
| continue |
| |
| ip, port = rpc_addr.to_ip_port() |
| if rpc_addr in self.session_dict: |
| self.session_dict[rpc_addr].close() |
| |
| d = ClientCreator(reactor, |
| TPegasusThriftClientProtocol, |
| ReplicaSession, |
| TBinaryProtocol.TBinaryProtocolFactory(), |
| None, |
| self.container, |
| self.timeout |
| ).connectTCP(ip, port, self.timeout) |
| connected_rpc_addrs[rpc_addr] = 1 |
| d.addCallbacks(self.got_conn, self.got_err) |
| ds.append(d) |
| |
| dlist = defer.DeferredList(ds, consumeErrors=True) |
| dlist.addCallback(self.got_results) |
| return dlist |
| |
| def get_hashkey_hash(self, hash_key): |
| if six.PY3 and isinstance(hash_key, six.string_types): |
| hash_key = hash_key.encode("UTF-8") |
| return PegasusHash.default_hash(hash_key) |
| |
| def get_blob_hash(self, blob_key): |
| return PegasusHash.hash(blob_key) |
| |
| def get_gpid_by_hash(self, partition_hash): |
| pidx = partition_hash % self.get_partition_count() |
| if self.partition_ballot[pidx] < 0: |
| logger.warn("table[%s] partition[%d] is not ready, requests will send to parent partition[%d]", |
| self.name, |
| pidx, |
| pidx - int(self.partition_count / 2)) |
| pidx -= int(self.partition_count / 2) |
| return gpid(self.app_id, pidx) |
| |
| def get_all_gpid(self): |
| return [gpid(self.app_id, pidx) |
| for pidx in range(self.get_partition_count())] |
| |
| def get_partition_count(self): |
| return self.query_cfg_response.partition_count |
| |
| def get_session(self, peer_gpid): |
| pidx = peer_gpid.get_pidx() |
| if pidx in self.partition_dict.keys(): |
| replica_addr = self.partition_dict[pidx] |
| if replica_addr in self.session_dict.keys(): |
| return self.session_dict[replica_addr] |
| |
| self.container.update_state(error_types.ERR_OBJECT_NOT_FOUND) |
| return None |
| |
| |
| class PegasusScanner(object): |
| """ |
| Pegasus scanner class, used for scanning data in pegasus table. |
| """ |
| |
| CONTEXT_ID_VALID_MIN = 0 |
| CONTEXT_ID_COMPLETED = -1 |
| CONTEXT_ID_NOT_EXIST = -2 |
| |
| def __init__(self, table, gpid_list, scan_options, partition_hash_list, check_hash, |
| start_key=blob(b'\x00\x00'), stop_key=blob(b'\xFF\xFF')): |
| self._table = table |
| self._gpid = gpid(0) |
| self._gpid_list = gpid_list |
| self._scan_options = scan_options |
| self._start_key = start_key |
| self._stop_key = stop_key |
| self._p = -1 |
| self._context_id = self.CONTEXT_ID_COMPLETED |
| self._kvs = [] |
| self._partition_hash = 0 |
| self._partition_hash_list = partition_hash_list |
| self._check_hash = check_hash |
| |
| def __repr__(self): |
| lst = ['%s=%r' % (key, value) |
| for key, value in self.__dict__.items()] |
| return '%s(%s)' % (self.__class__.__name__, ', '.join(lst)) |
| |
| @inlineCallbacks |
| def get_next(self): |
| """ |
| scan the next k-v pair for the scanner. |
| :return: (tuple<tuple<hash_key, sort_key>, value> or None) |
| all the sort_keys returned by this API are in ascend order. |
| """ |
| self._p += 1 |
| while self._p >= len(self._kvs): |
| if self._context_id == self.CONTEXT_ID_COMPLETED: |
| # reach the end of one partition |
| if len(self._gpid_list) == 0: |
| defer.returnValue(None) |
| else: |
| self._gpid = self._gpid_list.pop() |
| self._partition_hash = self._partition_hash_list.pop() |
| self.split_reset() |
| elif self._context_id == self.CONTEXT_ID_NOT_EXIST: |
| # no valid context_id found |
| yield self.start_scan() |
| else: |
| ret = yield self.next_batch() |
| if ret == 1: |
| # context not found |
| self._context_id = self.CONTEXT_ID_NOT_EXIST |
| elif ret != 0: |
| raise Exception("Rocksdb error: " + ret) |
| |
| self._p += 1 |
| |
| defer.returnValue((restore_key(self._kvs[self._p].key.data), |
| self._kvs[self._p].value.data)) |
| |
| def split_reset(self): |
| self._kvs = [] |
| self._p = -1 |
| self._context_id = self.CONTEXT_ID_NOT_EXIST |
| |
| def scan_cb(self, ctx): |
| if isinstance(ctx, dict) and ctx['error'] == 0: |
| self._kvs = ctx['kvs'] |
| self._p = -1 |
| self._context_id = ctx['context_id'] |
| |
| return ctx['error'] |
| else: |
| raise Exception('operate error!') |
| |
| def scan_err_cb(self, err): |
| logger.error('scan table: %s start_key: %s stop_key: %s is error: %s', |
| self._table.name, self._start_key, self._stop_key, err) |
| return err |
| |
| def start_scan(self): |
| request = get_scanner_request() |
| if len(self._kvs) == 0: |
| request.start_key = self._start_key |
| request.start_inclusive = self._scan_options.start_inclusive |
| else: |
| request.start_key = self._kvs[-1].key |
| request.start_inclusive = False |
| |
| request.stop_key = self._stop_key |
| request.stop_inclusive = self._scan_options.stop_inclusive |
| request.batch_size = self._scan_options.batch_size |
| request.need_check_hash = self._check_hash |
| |
| op = RrdbGetScannerOperator(self._gpid, request, self._partition_hash) |
| session = self._table.get_session(self._gpid) |
| if not session or not op: |
| raise Exception('session or packet error!') |
| |
| ret = session.operate(op, self._scan_options.timeout_millis) |
| ret.addCallbacks(self.scan_cb, self.scan_err_cb) |
| return ret |
| |
| def next_batch(self): |
| request = scan_request(self._context_id) |
| op = RrdbScanOperator(self._gpid, request, self._partition_hash) |
| session = self._table.get_session(self._gpid) |
| if not session or not op: |
| raise Exception('session or packet error!') |
| |
| ret = session.operate(op, self._scan_options.timeout_millis) |
| ret.addCallbacks(self.scan_cb, self.scan_err_cb) |
| return ret |
| |
| def close(self): |
| if self._context_id >= self.CONTEXT_ID_VALID_MIN: |
| op = RrdbClearScannerOperator(self._gpid, self._context_id, self._partition_hash) |
| session = self._table.get_session(self._gpid) |
| self._context_id = self.CONTEXT_ID_COMPLETED |
| if not session or not op: |
| raise Exception('session or packet error!') |
| |
| session.operate(op, self._scan_options.timeout_millis) |
| |
| |
| class PegasusHash(object): |
| polynomial, = struct.unpack('<q', struct.pack('<Q', 0x9a6c9329ac4bc9b5)) |
| table_forward = [0] * 256 |
| |
| @classmethod |
| def unsigned_right_shift(cls, val, n): |
| if val >= 0: |
| val >>= n |
| else: |
| val = ((val + 0x10000000000000000) >> n) |
| return val |
| |
| @classmethod |
| def populate_table(cls): |
| for i in range(256): |
| crc = i |
| for j in range(8): |
| if crc & 1: |
| crc = cls.unsigned_right_shift(crc, 1) |
| crc ^= cls.polynomial |
| else: |
| crc = cls.unsigned_right_shift(crc, 1) |
| cls.table_forward[i] = crc |
| |
| @classmethod |
| def crc64(cls, data, offset, length): |
| crc = 0xffffffffffffffff |
| end = offset + length |
| |
| for c in data[offset:end:1]: |
| crc = cls.table_forward[(c ^ crc) & 0xFF] ^ cls.unsigned_right_shift(crc, 8) |
| return ~crc |
| |
| @classmethod |
| def default_hash(cls, hash_key): |
| return cls.crc64(hash_key, 0, len(hash_key)) |
| |
| @classmethod |
| def hash(cls, blob_key): |
| assert blob_key is not None and len(blob_key) >= 2, 'blob_key is invalid!' |
| |
| # hash_key_len is in big endian |
| s = struct.Struct('>H') |
| hash_key_len = s.unpack(blob_key.data[:2])[0] |
| |
| assert hash_key_len != 0xFFFF and (2 + hash_key_len <= len(blob_key)), 'blob_key hash_key_len is invalid!' |
| if hash_key_len == 0: |
| return cls.crc64(blob_key.data, 2, len(blob_key) - 2) |
| else: |
| return cls.crc64(blob_key.data, 2, hash_key_len) |
| |
| |
| class Pegasus(object): |
| """ |
| Pegasus client class. |
| """ |
| |
| @classmethod |
| def generate_key(cls, hash_key, sort_key): |
| # assert(len(hash_key) < sys.maxsize > 1) |
| if six.PY3 and isinstance(hash_key, six.string_types): |
| hash_key = hash_key.encode("UTF-8") |
| if six.PY3 and isinstance(sort_key, six.string_types): |
| sort_key = sort_key.encode("UTF-8") |
| |
| # hash_key_len is in big endian |
| hash_key_len = len(hash_key) |
| sort_key_len = len(sort_key) |
| |
| if sort_key_len > 0: |
| values = (hash_key_len, hash_key, sort_key) |
| s = struct.Struct('>H'+str(hash_key_len)+'s'+str(sort_key_len)+'s') |
| else: |
| values = (hash_key_len, hash_key) |
| s = struct.Struct('>H'+str(hash_key_len)+'s') |
| |
| buff = ctypes.create_string_buffer(s.size) |
| s.pack_into(buff, 0, *values) |
| |
| return blob(buff) |
| |
| @classmethod |
| def generate_next_bytes(cls, buff): |
| pos = len(buff) - 1 |
| found = False |
| while pos >= 0: |
| if ord(buff[pos]) != 0xFF: |
| buff[pos] += 1 |
| found = True |
| break |
| if found: |
| return buff |
| else: |
| return buff + chr(0) |
| |
| @classmethod |
| def generate_stop_key(cls, hash_key, stop_sort_key): |
| if stop_sort_key: |
| return cls.generate_key(hash_key, stop_sort_key), True |
| else: |
| return cls.generate_next_bytes(hash_key), False |
| |
| def __init__(self, meta_addrs=None, table_name='', |
| timeout=DEFAULT_TIMEOUT): |
| """ |
| :param meta_addrs: (list) pagasus meta servers list. |
| example: ['127.0.0.1:34601', '127.0.0.1:34602', '127.0.0.1:34603'] |
| :param table_name: (str) table name/app name used in pegasus. |
| :param timeout: (int) default timeout in milliseconds when communicate with meta sever and replica server. |
| """ |
| self.name = table_name |
| self.table = Table(table_name, self, timeout) |
| self.meta_session_manager = MetaSessionManager(table_name, timeout) |
| if isinstance(meta_addrs, list): |
| for meta_addr in meta_addrs: |
| self.meta_session_manager.add_meta_server(meta_addr) |
| PegasusHash.populate_table() |
| self.timeout_times = 0 |
| self.update_partition = False |
| self.timer = reactor.callLater(META_CHECK_INTERVAL, self.check_state) |
| |
| def init(self): |
| """ |
| Initialize the client before you can use it. |
| |
| :return: (DeferredList) True when initialized succeed, others when failed. |
| """ |
| dlist = self.meta_session_manager.query() |
| dlist.addCallback(self.table.update_cfg) |
| return dlist |
| |
| def close(self): |
| """ |
| Close the client. The client can not be used again after closed. |
| """ |
| self.timer.cancel() |
| self.table.close() |
| self.meta_session_manager.close() |
| |
| @inlineCallbacks |
| def check_state(self): |
| logger.info('table: %s, checking meta ...', |
| self.name) |
| if self.update_partition: |
| self.update_partition = False |
| yield self.init() |
| self.timer = reactor.callLater(META_CHECK_INTERVAL, self.check_state) |
| |
| def update_state(self, ec): |
| if ec == error_types.ERR_TIMEOUT: |
| self.timeout_times += 1 |
| if self.timeout_times >= MAX_TIMEOUT_THRESHOLD: |
| self.update_partition = True |
| self.timeout_times = 0 |
| elif ec == error_types.ERR_INVALID_DATA: |
| logger.error('table: %s, ignore ec: %s:%s', |
| self.name, ec.name, ec.value) # TODO when it happen? |
| elif ec == error_types.ERR_SESSION_RESET: |
| pass |
| elif (ec == error_types.ERR_OBJECT_NOT_FOUND |
| or ec == error_types.ERR_INACTIVE_STATE |
| or ec == error_types.ERR_INVALID_STATE |
| or ec == error_types.ERR_NOT_ENOUGH_MEMBER |
| or ec == error_types.ERR_PARENT_PARTITION_MISUSED): |
| self.update_partition = True |
| else: |
| logger.error('table: %s, ignore ec: %s:%s', |
| self.name, ec.name, ec.value) |
| |
| def ttl(self, hash_key, sort_key, timeout=0): |
| """ |
| Get ttl(time to live) of the data. |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param sort_key: (bytes) which sort key used for this API. |
| :param timeout: (int) how long will the operation timeout in milliseconds. |
| if timeout > 0, it is a timeout value for current operation, |
| else the timeout value specified to create the instance will be used. |
| :return: (tuple<error_types.code.value, int>) (code, ttl) |
| code: error_types.ERR_OK.value when data exist, error_types.ERR_OBJECT_NOT_FOUND.value when data not found. |
| ttl: in seconds, -1 means forever. |
| """ |
| blob_key = self.generate_key(hash_key, sort_key) |
| partition_hash = self.table.get_blob_hash(blob_key) |
| peer_gpid = self.table.get_gpid_by_hash(partition_hash) |
| session = self.table.get_session(peer_gpid) |
| op = RrdbTtlOperator(peer_gpid, blob_key, partition_hash) |
| if not session or not op: |
| return error_types.ERR_INVALID_STATE.value, 0 |
| |
| return session.operate(op, timeout) |
| |
| def exist(self, hash_key, sort_key, timeout=0): |
| """ |
| Check value exist. |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param sort_key: (bytes) which sort key used for this API. |
| :param timeout: (int) how long will the operation timeout in milliseconds. |
| if timeout > 0, it is a timeout value for current operation, |
| else the timeout value specified to create the instance will be used. |
| :return: (tuple<error_types.code.value, None>) (code, ign) |
| code: error_types.ERR_OK.value when data exist, error_types.ERR_OBJECT_NOT_FOUND.value when data not found. |
| ign: useless, should be ignored. |
| """ |
| return self.ttl(hash_key, sort_key, timeout) |
| |
| def get(self, hash_key, sort_key, timeout=0): |
| """ |
| Get value stored in <hash_key, sort_key>. |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param sort_key: (bytes) which sort key used for this API. |
| :param timeout: (int) how long will the operation timeout in milliseconds. |
| if timeout > 0, it is a timeout value for current operation, |
| else the timeout value specified to create the instance will be used. |
| :return: (tuple<error_types.code.value, bytes>) (code, value). |
| code: error_types.ERR_OK.value when data got succeed, error_types.ERR_OBJECT_NOT_FOUND.value when data not found. |
| value: data stored in this <hash_key, sort_key> |
| """ |
| blob_key = self.generate_key(hash_key, sort_key) |
| partition_hash = self.table.get_blob_hash(blob_key) |
| peer_gpid = self.table.get_gpid_by_hash(partition_hash) |
| session = self.table.get_session(peer_gpid) |
| op = RrdbGetOperator(peer_gpid, blob_key, partition_hash) |
| if not session or not op: |
| return error_types.ERR_INVALID_STATE.value, 0 |
| |
| return session.operate(op, timeout) |
| |
| def set(self, hash_key, sort_key, value, ttl=0, timeout=0): |
| """ |
| Set value to be stored in <hash_key, sort_key>. |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param sort_key: (bytes) which sort key used for this API. |
| :param value: (bytes) value to be stored under <hash_key, sort_key>. |
| :param ttl: (int) ttl(time to live) in seconds of this data. |
| :param timeout: (int) how long will the operation timeout in milliseconds. |
| if timeout > 0, it is a timeout value for current operation, |
| else the timeout value specified to create the instance will be used. |
| :return: (tuple<error_types.code.value, None>) (code, ign) |
| code: error_types.ERR_OK.value when data stored succeed. |
| ign: useless, should be ignored. |
| """ |
| blob_key = self.generate_key(hash_key, sort_key) |
| partition_hash = self.table.get_blob_hash(blob_key) |
| peer_gpid = self.table.get_gpid_by_hash(partition_hash) |
| session = self.table.get_session(peer_gpid) |
| op = RrdbPutOperator(peer_gpid, update_request(blob_key, blob(value), get_ttl(ttl)), partition_hash) |
| if not session or not op: |
| return error_types.ERR_INVALID_STATE.value, 0 |
| |
| return session.operate(op, timeout) |
| |
| def remove(self, hash_key, sort_key, timeout=0): |
| """ |
| Remove the entire <hash_key, sort_key>-value in pegasus. |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param sort_key: (bytes) which sort key used for this API. |
| :param timeout: (int) how long will the operation timeout in milliseconds. |
| if timeout > 0, it is a timeout value for current operation, |
| else the timeout value specified to create the instance will be used. |
| :return: (tuple<error_types.code.value, None>) (code, ign) |
| code: error_types.ERR_OK.value when data stored succeed. |
| ign: useless, should be ignored. |
| """ |
| blob_key = self.generate_key(hash_key, sort_key) |
| partition_hash = self.table.get_blob_hash(blob_key) |
| peer_gpid = self.table.get_gpid_by_hash(partition_hash) |
| session = self.table.get_session(peer_gpid) |
| op = RrdbRemoveOperator(peer_gpid, blob_key, partition_hash) |
| if not session or not op: |
| return error_types.ERR_INVALID_STATE.value, 0 |
| |
| return session.operate(op, timeout) |
| |
| def sort_key_count(self, hash_key, timeout=0): |
| """ |
| Get the total sort key count under the hash_key. |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param timeout: (int) how long will the operation timeout in milliseconds. |
| if timeout > 0, it is a timeout value for current operation, |
| else the timeout value specified to create the instance will be used. |
| :return: (tuple<error_types.code.value, count>) (code, count) |
| code: error_types.ERR_OK.value when data got succeed, error_types.ERR_OBJECT_NOT_FOUND.value when data not found. |
| value: total sort key count under the hash_key. |
| """ |
| partition_hash = self.table.get_hashkey_hash(hash_key) |
| peer_gpid = self.table.get_gpid_by_hash(partition_hash) |
| session = self.table.get_session(peer_gpid) |
| op = RrdbSortkeyCountOperator(peer_gpid, blob(hash_key), partition_hash) |
| if not session or not op: |
| return error_types.ERR_INVALID_STATE.value, 0 |
| |
| return session.operate(op, timeout) |
| |
| def multi_set(self, hash_key, sortkey_value_dict, ttl=0, timeout=0): |
| """ |
| Set multiple sort_keys-values under hash_key to be stored. |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param sortkey_value_dict: (dict) <sort_key, value> pairs in dict. |
| :param ttl: (int) ttl(time to live) in seconds of these data. |
| :param timeout: (int) how long will the operation timeout in milliseconds. |
| if timeout > 0, it is a timeout value for current operation, |
| else the timeout value specified to create the instance will be used. |
| :return: (tuple<error_types.code.value, _>) (code, ign) |
| code: error_types.ERR_OK.value when data stored succeed. |
| ign: useless, should be ignored. |
| """ |
| partition_hash = self.table.get_hashkey_hash(hash_key) |
| peer_gpid = self.table.get_gpid_by_hash(partition_hash) |
| session = self.table.get_session(peer_gpid) |
| kvs = [key_value(blob(k), blob(v)) for k, v in sortkey_value_dict.items()] |
| ttl = get_ttl(ttl) |
| req = multi_put_request(blob(hash_key), kvs, ttl) |
| op = RrdbMultiPutOperator(peer_gpid, req, partition_hash) |
| if not session or not op: |
| return error_types.ERR_INVALID_STATE.value, 0 |
| |
| return session.operate(op, timeout) |
| |
| def multi_get(self, hash_key, |
| sortkey_set, |
| max_kv_count=100, |
| max_kv_size=1000000, |
| no_value=False, |
| timeout=0): |
| """ |
| Get multiple values stored in <hash_key, sortkey> pairs. |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param sortkey_set: (set) sort keys in set. |
| :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit. |
| :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit. |
| :param no_value: (bool) whether to fetch value of these keys. |
| :param timeout: (int) how long will the operation timeout in milliseconds. |
| if timeout > 0, it is a timeout value for current operation, |
| else the timeout value specified to create the instance will be used. |
| :return: (tuple<error_types.code.value, dict>) (code, kvs) |
| code: error_types.ERR_OK.value when data got succeed. |
| kvs: <sort_key, value> pairs in dict. |
| """ |
| partition_hash = self.table.get_hashkey_hash(hash_key) |
| peer_gpid = self.table.get_gpid_by_hash(partition_hash) |
| session = self.table.get_session(peer_gpid) |
| ks = [] |
| if sortkey_set is None: |
| pass |
| elif isinstance(sortkey_set, set): |
| ks = [blob(k) for k in sortkey_set] |
| else: |
| return error_types.ERR_INVALID_PARAMETERS.value, 0 |
| |
| req = multi_get_request(blob(hash_key), ks, |
| max_kv_count, max_kv_size, |
| no_value) |
| op = RrdbMultiGetOperator(peer_gpid, req, partition_hash) |
| if not session or not op: |
| return error_types.ERR_INVALID_STATE.value, 0 |
| |
| return session.operate(op, timeout) |
| |
| def multi_get_opt(self, hash_key, |
| start_sort_key, stop_sort_key, |
| multi_get_options, |
| max_kv_count=100, |
| max_kv_size=1000000, |
| timeout=0): |
| """ |
| Get multiple values stored in hash_key, and sort key range in [start_sort_key, stop_sort_key) as default. |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param start_sort_key: (bytes) returned k-v pairs is start from start_sort_key. |
| :param stop_sort_key: (bytes) returned k-v pairs is stop at stop_sort_key. |
| :param multi_get_options: (MultiGetOptions) configurable multi_get options. |
| :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit. |
| :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit. |
| :param timeout: (int) how long will the operation timeout in milliseconds. |
| if timeout > 0, it is a timeout value for current operation, |
| else the timeout value specified to create the instance will be used. |
| :return: (tuple<error_types.code.value, dict>) (code, kvs) |
| code: error_types.ERR_OK.value when data got succeed. |
| kvs: <sort_key, value> pairs in dict. |
| """ |
| partition_hash = self.table.get_hashkey_hash(hash_key) |
| peer_gpid = self.table.get_gpid_by_hash(partition_hash) |
| session = self.table.get_session(peer_gpid) |
| req = multi_get_request(blob(hash_key), |
| None, |
| max_kv_count, |
| max_kv_size, |
| multi_get_options.no_value, |
| blob(start_sort_key), |
| blob(stop_sort_key), |
| multi_get_options.start_inclusive, |
| multi_get_options.stop_inclusive, |
| multi_get_options.sortkey_filter_type, |
| blob(multi_get_options.sortkey_filter_pattern), |
| multi_get_options.reverse) |
| op = RrdbMultiGetOperator(peer_gpid, req, partition_hash) |
| if not session or not op: |
| return error_types.ERR_INVALID_STATE.value, 0 |
| |
| return session.operate(op, timeout) |
| |
| def get_sort_keys(self, hash_key, |
| max_kv_count=100, |
| max_kv_size=1000000, |
| timeout=0): |
| """ |
| Get multiple sort keys under hash_key. |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param max_kv_count: (int) max count of k-v pairs to be fetched. max_fetch_count <= 0 means no limit. |
| :param max_kv_size: (int) max total data size of k-v pairs to be fetched. max_fetch_size <= 0 means no limit. |
| :param timeout: (int) how long will the operation timeout in milliseconds. |
| if timeout > 0, it is a timeout value for current operation, |
| else the timeout value specified to create the instance will be used. |
| :return: (tuple<error_types.code.value, set>) (code, ks) |
| code: error_types.ERR_OK.value when data got succeed. |
| ks: <sort_key, ign> pairs in dict, ign will always be empty str. |
| """ |
| return self.multi_get(hash_key, None, |
| max_kv_count, max_kv_size, |
| True, timeout) |
| |
| def multi_del(self, hash_key, sortkey_set, timeout=0): |
| """ |
| Remove multiple entire <hash_key, sort_key>-values in pegasus. |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param sortkey_set: (set) sort keys in set. |
| :param timeout: (int) how long will the operation timeout in milliseconds. |
| if timeout > 0, it is a timeout value for current operation, |
| else the timeout value specified to create the instance will be used. |
| :return: (tuple<error_types.code.value, int>) (code, count). |
| code: error_types.ERR_OK.value when data got succeed. |
| count: count of deleted k-v pairs. |
| """ |
| partition_hash = self.table.get_hashkey_hash(hash_key) |
| peer_gpid = self.table.get_gpid_by_hash(partition_hash) |
| session = self.table.get_session(peer_gpid) |
| ks = [] |
| if isinstance(sortkey_set, set): |
| ks = [blob(k) for k in sortkey_set] |
| else: |
| return error_types.ERR_INVALID_PARAMETERS.value, 0 |
| |
| req = multi_remove_request(blob(hash_key), ks) # 100 limit? |
| op = RrdbMultiRemoveOperator(peer_gpid, req, partition_hash) |
| if not session or not op: |
| return error_types.ERR_INVALID_STATE.value, 0 |
| |
| return session.operate(op, timeout) |
| |
| def get_scanner(self, hash_key, |
| start_sort_key, stop_sort_key, |
| scan_options): |
| """ |
| Get scanner for hash_key, start from start_sort_key, and stop at stop_sort_key. |
| Whether the scanner include the start_sort_key and stop_sort_key is configurable by scan_options |
| |
| :param hash_key: (bytes) which hash key used for this API. |
| :param start_sort_key: (bytes) returned scanner is start from start_sort_key. |
| :param stop_sort_key: (bytes) returned scanner is stop at stop_sort_key. |
| :param scan_options: (ScanOptions) configurable scan options. |
| :return: (PegasusScanner) scanner, instance of PegasusScanner. |
| """ |
| start_key = self.generate_key(hash_key, start_sort_key) |
| stop_key, stop_inclusive = self.generate_stop_key(hash_key, stop_sort_key) |
| if not stop_inclusive: |
| scan_options.stop_inclusive = stop_inclusive |
| gpid_list = [] |
| hash_list = [] |
| r = bytes_cmp(start_key.data, stop_key.data) |
| if r < 0 or \ |
| (r == 0 and scan_options.start_inclusive and scan_options.stop_inclusive): |
| partition_hash = self.table.get_blob_hash(start_key) |
| gpid_list.append(self.table.get_gpid_by_hash(partition_hash)) |
| hash_list.append(partition_hash) |
| |
| return PegasusScanner(self.table, gpid_list, scan_options, hash_list, False, start_key, stop_key) |
| |
| def get_unordered_scanners(self, max_split_count, scan_options): |
| """ |
| Get scanners for the whole pegasus table. |
| |
| :param max_split_count: (int) max count of scanners will be returned. |
| :param scan_options: (ScanOptions) configurable scan options. |
| :return: (list) instance of PegasusScanner list. |
| each scanner in this list can scan separate part of the whole pegasus table. |
| """ |
| if max_split_count <= 0: |
| return None |
| |
| all_gpid_list = self.table.get_all_gpid() |
| split = min(len(all_gpid_list), max_split_count) |
| count = len(all_gpid_list) |
| size = count // split |
| more = count % split |
| |
| opt = ScanOptions() |
| opt.timeout_millis = scan_options.timeout_millis |
| opt.batch_size = scan_options.batch_size |
| opt.snapshot = scan_options.snapshot |
| scanner_list = [] |
| for i in range(split): |
| gpid_list = [] |
| hash_list = [] |
| s = i < more and size + 1 or size |
| for j in range(s): |
| if all_gpid_list: |
| count -= 1 |
| gpid_list.append(all_gpid_list[count]) |
| hash_list.append(int(count)) |
| |
| scanner_list.append(PegasusScanner(self.table, gpid_list, opt, hash_list, True)) |
| |
| return scanner_list |