blob: c5be5cad5ef6972fbbf12af024edc954ede52260 [file] [log] [blame]
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import logging
from bookkeeper.common import util
from bookkeeper.common.exceptions import from_table_rpc_response
from bookkeeper.common.method import wrap_method
from bookkeeper.common.retry import Retry
from bookkeeper.common.timeout import ExponentialTimeout
from bookkeeper.proto import kv_rpc_pb2
from bookkeeper.proto.kv_rpc_pb2_grpc import TableServiceStub
__logger__ = logging.getLogger("bookkeeper.kv.Table")
class Table(object):
def __init__(self, channel, stream_props):
self.__table_service__ = TableServiceStub(channel=channel)
self.__stream_props__ = stream_props
self.__default_retry__ = Retry(deadline=60)
self.__default_timeout__ = ExponentialTimeout()
self.__put_with_retries__ =\
wrap_method(self.__do_put__, self.__default_retry__)
self.__get_with_retries__ =\
wrap_method(self.__do_get__, self.__default_retry__)
self.__del_with_retries__ =\
wrap_method(self.__do_del__, self.__default_retry__)
self.__incr_with_retries__ =\
wrap_method(self.__do_incr__, self.__default_retry__)
__logger__.info("initialized table instance with properties : %s",
stream_props)
def __make_routing_metadata__(self, key):
return [
('bk-rt-sid-bin',
util.to_bytes(self.__stream_props__.stream_id, 8, "big")),
('bk-rt-key-bin',
key)
]
def __make_routing_header__(self, key):
return kv_rpc_pb2.RoutingHeader(
stream_id=self.__stream_props__.stream_id,
r_key=key
)
def put_str(self, key_str, val_str):
key = key_str.encode('utf-8')
value = val_str.encode('utf-8')
return self.put(key, value)
def put(self, key, value):
metadata = self.__make_routing_metadata__(key)
header = self.__make_routing_header__(key)
return self.__put_with_retries__(key, value, header, metadata)
def __do_put__(self, key, value, routing_header, grpc_metadata):
put_req = kv_rpc_pb2.PutRequest(
key=key,
value=value,
header=routing_header
)
put_resp = self.__table_service__.Put(
request=put_req,
metadata=grpc_metadata
)
from_table_rpc_response(put_resp)
def incr_str(self, key_str, amount):
key = key_str.encode('utf-8')
return self.incr(key, amount)
def incr(self, key, amount):
metadata = self.__make_routing_metadata__(key)
header = self.__make_routing_header__(key)
return self.__incr_with_retries__(key, amount, header, metadata)
def __do_incr__(self, key, amount, routing_header, grpc_metadata):
incr_req = kv_rpc_pb2.IncrementRequest(
key=key,
amount=amount,
header=routing_header
)
incr_resp = self.__table_service__.Increment(
request=incr_req,
metadata=grpc_metadata
)
from_table_rpc_response(incr_resp)
def get_str(self, key_str):
key = key_str.encode('utf-8')
return self.get(key)
def get(self, key):
metadata = self.__make_routing_metadata__(key)
header = self.__make_routing_header__(key)
return self.__get_with_retries__(key, header, metadata)
def __do_get__(self, key, routing_header, grpc_metadata):
get_req = kv_rpc_pb2.RangeRequest(
key=key,
header=routing_header
)
get_resp = self.__table_service__.Range(
request=get_req,
metadata=grpc_metadata
)
get_resp = from_table_rpc_response(get_resp)
if get_resp.count == 0:
return None
else:
return get_resp.kvs[0]
def delete_str(self, key_str):
key = key_str.encode('utf-8')
return self.delete(key)
def delete(self, key):
metadata = self.__make_routing_metadata__(key)
header = self.__make_routing_header__(key)
return self.__del_with_retries__(key, header, metadata)
def __do_del__(self, key, routing_header, grpc_metadata):
del_req = kv_rpc_pb2.DeleteRangeRequest(
key=key,
header=routing_header
)
del_resp = self.__table_service__.Delete(
request=del_req,
metadata=grpc_metadata
)
del_resp = from_table_rpc_response(del_resp)
return del_resp.deleted