blob: 153c7a8ece3e058d1da194bf0a8b39f74b407fb5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# distutils: language = c++
# cython: embedsignature = True
from libcpp.string cimport string
from libcpp cimport bool as c_bool
cimport cpython
from cython.operator cimport dereference as deref
from libkudu_client cimport *
from kudu.compat import tobytes, frombytes
from kudu.schema cimport Schema, ColumnSchema
from kudu.errors cimport check_status
from errors import KuduException
import six
cdef class TimeDelta:
"""
Wrapper interface for kudu MonoDelta class, which is used to specify
timedeltas for timeouts and other uses.
"""
cdef:
MonoDelta delta
def __cinit__(self):
pass
@staticmethod
def from_seconds(seconds):
"""
Construct a new TimeDelta from fractional seconds.
Parameters
----------
seconds : double
Returns
-------
delta : TimeDelta
"""
cdef TimeDelta result = TimeDelta()
result.init(MonoDelta.FromSeconds(seconds))
return result
@staticmethod
def from_millis(int64_t ms):
"""
Construct a new TimeDelta from integer milliseconds.
Parameters
----------
ms : int
Returns
-------
delta : TimeDelta
"""
cdef TimeDelta result = TimeDelta()
result.init(MonoDelta.FromMilliseconds(ms))
return result
@staticmethod
def from_micros(int64_t us):
"""
Construct a new TimeDelta from integer microseconds.
Parameters
----------
us : int
Returns
-------
delta : TimeDelta
"""
cdef TimeDelta result = TimeDelta()
result.init(MonoDelta.FromMicroseconds(us))
return result
@staticmethod
def from_nanos(seconds):
"""
Construct a new TimeDelta from integer nanoseconds.
Parameters
----------
ns : int
Returns
-------
delta : TimeDelta
"""
cdef TimeDelta result = TimeDelta()
result.init(MonoDelta.FromNanoseconds(seconds))
return result
cpdef double to_seconds(self):
"""
Return timedelta as fractional seconds.
"""
return self.delta.ToSeconds()
cpdef int64_t to_millis(self):
"""
Return timedelta as exact milliseconds.
"""
return self.delta.ToMilliseconds()
cpdef int64_t to_micros(self):
"""
Return timedelta as exact microseconds.
"""
return self.delta.ToMicroseconds()
cpdef int64_t to_nanos(self):
"""
Return timedelta as exact nanoseconds.
"""
return self.delta.ToNanoseconds()
cdef init(self, const MonoDelta& val):
self.delta = val
def __repr__(self):
cdef object as_string
if self.delta.Initialized():
as_string = self.delta.ToString()
return 'kudu.TimeDelta({0})'.format(as_string)
else:
return 'kudu.TimeDelta()'
def __richcmp__(TimeDelta self, TimeDelta other, int op):
if op == cpython.Py_EQ:
return self.delta.Equals(other.delta)
elif op == cpython.Py_NE:
return not self.delta.Equals(other.delta)
elif op == cpython.Py_LT:
return self.delta.LessThan(other.delta)
elif op == cpython.Py_LE:
return not self.delta.MoreThan(other.delta)
elif op == cpython.Py_GT:
return self.delta.MoreThan(other.delta)
elif op == cpython.Py_GE:
return not self.delta.LessThan(other.delta)
else:
raise ValueError('invalid operation: {0}'.format(op))
cdef class Client:
"""
The primary class for interacting with a Kudu cluster. Can connect to one
or more Kudu master servers. Do not instantiate this class directly; use
kudu.connect instead.
"""
cdef:
shared_ptr[KuduClient] client
KuduClient* cp
cdef readonly:
list master_addrs
def __cinit__(self, addr_or_addrs, admin_timeout_ms=None,
rpc_timeout_ms=None):
cdef:
string c_addr
vector[string] c_addrs
KuduClientBuilder builder
TimeDelta timeout
if isinstance(addr_or_addrs, six.string_types):
addr_or_addrs = [addr_or_addrs]
elif not isinstance(addr_or_addrs, list):
addr_or_addrs = list(addr_or_addrs)
self.master_addrs = addr_or_addrs
for addr in addr_or_addrs:
c_addrs.push_back(tobytes(addr))
builder.master_server_addrs(c_addrs)
if admin_timeout_ms is not None:
timeout = TimeDelta.from_millis(admin_timeout_ms)
builder.default_admin_operation_timeout(timeout.delta)
if rpc_timeout_ms is not None:
timeout = TimeDelta.from_millis(rpc_timeout_ms)
builder.default_rpc_timeout(timeout.delta)
check_status(builder.Build(&self.client))
# A convenience
self.cp = self.client.get()
def __dealloc__(self):
self.close()
property is_multimaster:
def __get__(self):
return self.cp.IsMultiMaster()
cpdef close(self):
# Nothing yet to clean up here
pass
def create_table(self, table_name, Schema schema):
"""
Creates a new Kudu table from the passed Schema and options.
Parameters
----------
table_name : string
schema : kudu.Schema
Create using kudu.schema_builder
"""
cdef:
KuduTableCreator* c
Status s
c = self.cp.NewTableCreator()
try:
s = (c.table_name(tobytes(table_name))
.schema(schema.schema)
.Create())
check_status(s)
finally:
del c
def delete_table(self, table_name):
"""
Delete a Kudu table. Raises KuduNotFound if the table does not exist.
Parameters
----------
table_name : string
"""
check_status(self.cp.DeleteTable(tobytes(table_name)))
def table_exists(self, table_name):
"""Return True if the indicated table exists in the Kudu cluster.
Parameters
----------
table_name : string
Returns
-------
exists : bool
"""
cdef:
string c_name = tobytes(table_name)
c_bool exists
check_status(self.cp.TableExists(c_name, &exists))
return exists
def table(self, table_name):
"""
Construct a kudu.Table and retrieve its schema from the cluster.
Raises KuduNotFound if the table does not exist.
Parameters
----------
table_name : string
Returns
-------
table : kudu.Table
"""
table_name = tobytes(table_name)
cdef Table table = Table(table_name, self)
check_status(self.cp.OpenTable(table_name, &table.table))
table.init()
return table
def list_tables(self, match_substring=None):
"""
Retrieve a list of table names in the Kudu cluster with an optional
substring filter.
Parameters
----------
match_substring : string, optional
If passed, the string must be exactly contained in the table names
Returns
-------
tables : list[string]
Table names returned from Kudu
"""
cdef:
vector[string] tables
string c_match
size_t i
if match_substring is not None:
c_match = tobytes(match_substring)
check_status(self.cp.ListTables(&tables, c_match))
else:
check_status(self.cp.ListTables(&tables))
result = []
for i in range(tables.size()):
result.append(frombytes(tables[i]))
return result
def new_session(self, flush_mode='manual', timeout_ms=5000):
"""
Create a new KuduSession for applying write operations.
Parameters
----------
flush_mode : {'manual', 'sync', 'background'}, default 'manual'
See Session.set_flush_mode
timeout_ms : int, default 5000
Timeout in milliseconds
Returns
-------
session : kudu.Session
"""
cdef Session result = Session()
result.s = self.cp.NewSession()
result.set_flush_mode(flush_mode)
result.set_timeout_ms(timeout_ms)
return result
#----------------------------------------------------------------------
# Handle marshalling Python values to raw values. Since range predicates
# require a const void*, this is one valid (though a bit verbose)
# approach. Note that later versions of Cython handle many Python -> C type
# casting problems (and integer overflows), but these should all be tested
# rigorously in our test suite
cdef class RawValue:
cdef:
void* data
def __cinit__(self):
self.data = NULL
cdef class Int8Val(RawValue):
cdef:
int8_t val
def __cinit__(self, obj):
self.val = <int8_t> obj
self.data = &self.val
cdef class Int16Val(RawValue):
cdef:
int16_t val
def __cinit__(self, obj):
self.val = <int16_t> obj
self.data = &self.val
cdef class Int32Val(RawValue):
cdef:
int32_t val
def __cinit__(self, obj):
self.val = <int32_t> obj
self.data = &self.val
cdef class Int64Val(RawValue):
cdef:
int64_t val
def __cinit__(self, obj):
self.val = <int64_t> obj
self.data = &self.val
cdef class DoubleVal(RawValue):
cdef:
double val
def __cinit__(self, obj):
self.val = <double> obj
self.data = &self.val
cdef class FloatVal(RawValue):
cdef:
float val
def __cinit__(self, obj):
self.val = <float> obj
self.data = &self.val
cdef class BoolVal(RawValue):
cdef:
c_bool val
def __cinit__(self, obj):
self.val = <c_bool> obj
self.data = &self.val
cdef class StringVal(RawValue):
cdef:
# Python "str" object that was passed into the constructor.
# We hold a reference to this so that the underlying data
# doesn't go out of scope.
object py_str
# Heap-allocated Slice object, owned by this instance,
# which points to the data in 'py_str'
cdef Slice* val
def __cinit__(self, obj):
self.py_str = obj
self.val = new Slice(<char*>self.py_str, len(self.py_str))
# The C++ API expects a Slice* to be passed to the range predicate
# constructor.
self.data = self.val
def __dealloc__(self):
del self.val
#----------------------------------------------------------------------
cdef class Table:
"""
Represents a Kudu table, containing the schema and other tools. Create by
using the kudu.Client.table method after connecting to a cluster.
"""
cdef:
shared_ptr[KuduTable] table
cdef readonly:
object _name
Schema schema
Client parent
def __cinit__(self, name, Client client):
self._name = name
self.parent = client
# Users should not instantiate directly
self.schema = Schema()
cdef init(self):
# Called after the refptr has been populated
self.schema.schema = &self.ptr().schema()
self.schema.own_schema = 0
self.schema.parent = self
def __len__(self):
# TODO: is this cheaply knowable?
raise NotImplementedError
def __getitem__(self, key):
spec = self.schema[key]
return Column(self, key, spec)
property name:
def __get__(self):
return frombytes(self.ptr().name())
# XXX: don't love this name
property num_columns:
def __get__(self):
return len(self.schema)
def rename(self, new_name):
raise NotImplementedError
def drop(self):
raise NotImplementedError
def new_insert(self):
"""
Create a new Insert operation. Pass the completed Insert to a Session.
Returns
-------
insert : Insert
"""
return Insert(self)
def new_update(self):
"""
Create a new Update operation. Pass the completed Update to a Session.
Returns
-------
update : Update
"""
return Update(self)
def new_delete(self):
"""
Create a new Delete operation. Pass the completed Update to a Session.
Returns
-------
delete : Delete
"""
return Delete(self)
def scanner(self):
"""
Create a new scanner for this table for retrieving a selection of table
rows.
Examples
--------
scanner = table.scanner()
scanner.add_predicate(table['key'] > 10)
scanner.open()
batch = scanner.read_all()
tuples = batch.as_tuples()
Returns
-------
scanner : kudu.Scanner
"""
cdef Scanner result = Scanner(self)
result.scanner = new KuduScanner(self.ptr())
return result
cdef inline KuduTable* ptr(self):
return self.table.get()
cdef class Column:
"""
A reference to a Kudu table column intended to simplify creating predicates
and other column-specific operations.
Write arithmetic comparisons to create new Predicate objects that can be
passed to a Scanner.
Examples
--------
scanner.add_predicate(table[col_name] <= 10)
"""
cdef readonly:
object name
Table parent
ColumnSchema spec
def __cinit__(self, Table parent, object name, ColumnSchema spec):
self.name = tobytes(name)
self.parent = parent
self.spec = spec
def __repr__(self):
result = ('Column({0}, parent={1}, type={2})'
.format(frombytes(self.name),
self.parent.name,
self.spec.type.name))
return result
cdef KuduValue* box_value(self, object obj) except NULL:
cdef:
KuduValue* val
Slice* slc
if isinstance(obj, unicode):
obj = obj.encode('utf8')
if isinstance(obj, bytes):
slc = new Slice(<char*> obj, len(obj))
val = KuduValue.CopyString(deref(slc))
del slc
elif isinstance(obj, int):
val = KuduValue.FromInt(obj)
elif isinstance(obj, float):
val = KuduValue.FromDouble(obj)
else:
raise TypeError(obj)
return val
def __richcmp__(Column self, value, int op):
cdef:
KuduPredicate* pred
KuduValue* val
Slice* col_name_slice
ComparisonOp cmp_op
Predicate result
col_name_slice = new Slice(<char*> self.name,
len(self.name))
try:
if op == 1: # <=
cmp_op = KUDU_LESS_EQUAL
elif op == 2: # ==
cmp_op = KUDU_EQUAL
elif op == 5: # >=
cmp_op = KUDU_GREATER_EQUAL
else:
raise NotImplementedError
val = self.box_value(value)
pred = (self.parent.ptr()
.NewComparisonPredicate(deref(col_name_slice),
cmp_op, val))
finally:
del col_name_slice
result = Predicate()
result.init(pred)
return result
cdef class Predicate:
"""
Wrapper for a KuduPredicate. Pass to Scanner.add_predicates
"""
cdef:
KuduPredicate* pred
def __cinit__(self):
self.pred = NULL
def __dealloc__(self):
if self.pred != NULL:
del self.pred
cdef init(self, KuduPredicate* pred):
self.pred = pred
FLUSH_AUTO_SYNC = FlushMode_AutoSync
FLUSH_AUTO_BACKGROUND = FlushMode_AutoBackground
FLUSH_MANUAL = FlushMode_Manual
cdef dict _flush_modes = {
'manual': FlushMode_Manual,
'sync': FlushMode_AutoSync,
'background': FlushMode_AutoBackground
}
cdef class Session:
"""
Wrapper for a client KuduSession to build up write operations to interact
with the cluster.
"""
cdef:
shared_ptr[KuduSession] s
def __cinit__(self):
pass
def set_flush_mode(self, flush_mode='manual'):
"""
Set the session operation flush mode
Parameters
----------
flush_mode : {'manual', 'sync', 'background'}, default 'manual'
You can also use the constants FLUSH_MANUAL, FLUSH_AUTO_SYNC,
and FLUSH_AUTO_BACKGROUND
"""
cdef Status status
cdef FlushMode fmode
if isinstance(flush_mode, int):
# todo: validation
fmode = <FlushMode> flush_mode
else:
try:
fmode = _flush_modes[flush_mode.lower()]
except KeyError:
raise ValueError('Invalid flush mode: {0}'
.format(flush_mode))
status = self.s.get().SetFlushMode(fmode)
check_status(status)
def set_timeout_ms(self, int64_t ms):
"""
Set the session timeout in milliseconds
"""
self.s.get().SetTimeoutMillis(ms)
def apply(self, WriteOperation op):
"""
Apply the indicated write operation
Examples
--------
# Executes a single Insert operation
session = client.new_session()
op = table.new_insert()
op['key'] = 0
op['value1'] = 5
op['value2'] = 3.5
session.apply(op)
session.flush()
"""
return op.add_to_session(self)
def flush(self):
"""
Flush pending operations
"""
check_status(self.s.get().Flush())
def get_pending_errors(self):
"""
Returns a list of buffered Kudu errors. A second value is returned
indicating if there were more errors than could be stored in the
session's error buffer (i.e. False means there was no error overflow)
Returns
-------
errors, overflowed : list, bool
"""
cdef:
KuduError error
vector[C_KuduError*] v_errors
c_bool overflowed
size_t i
self.s.get().GetPendingErrors(&v_errors, &overflowed)
result = []
for i in range(v_errors.size()):
error = KuduError()
error.error = v_errors[i]
result.append(error)
return result, overflowed
cdef class Row:
"""
A single row from a row batch
"""
cdef:
RowBatch parent
# This object is owned by the parent RowBatch
KuduRowPtr row
def __cinit__(self, batch):
self.parent = batch
def __dealloc__(self):
pass
cpdef tuple as_tuple(self):
"""
Return the row as a Python tuple
"""
cdef:
int i, k
tuple tup
k = self.parent.batch.projection_schema().num_columns()
tup = cpython.PyTuple_New(k)
for i in range(k):
val = None
if not self.is_null(i):
val = self.get_slot(i)
cpython.Py_INCREF(val)
cpython.PyTuple_SET_ITEM(tup, i, val)
return tup
cdef inline get_bool(self, int i):
cdef c_bool val
check_status(self.row.GetBool(i, &val))
# The built-in bool is masked by the libcpp typedef
return bool(val)
cdef inline get_int8(self, int i):
cdef int8_t val
check_status(self.row.GetInt8(i, &val))
return val
cdef inline get_int16(self, int i):
cdef int16_t val
check_status(self.row.GetInt16(i, &val))
return val
cdef inline get_int32(self, int i):
cdef int32_t val
check_status(self.row.GetInt32(i, &val))
return val
cdef inline get_int64(self, int i):
cdef int64_t val
check_status(self.row.GetInt64(i, &val))
return val
cdef inline get_double(self, int i):
cdef double val
check_status(self.row.GetDouble(i, &val))
return val
cdef inline get_float(self, int i):
cdef float val
check_status(self.row.GetFloat(i, &val))
return val
cdef inline get_string(self, int i):
cdef Slice val
check_status(self.row.GetString(i, &val))
return cpython.PyBytes_FromStringAndSize(<char*> val.mutable_data(),
val.size())
cdef inline get_slot(self, int i):
cdef:
Status s
DataType t = self.parent.batch.projection_schema().Column(i).type()
if t == KUDU_BOOL:
return self.get_bool(i)
elif t == KUDU_INT8:
return self.get_int8(i)
elif t == KUDU_INT16:
return self.get_int16(i)
elif t == KUDU_INT32:
return self.get_int32(i)
elif t == KUDU_INT64:
return self.get_int64(i)
elif t == KUDU_DOUBLE:
return self.get_double(i)
elif t == KUDU_FLOAT:
return self.get_float(i)
elif t == KUDU_STRING:
return frombytes(self.get_string(i))
else:
raise TypeError(t)
cdef inline bint is_null(self, int i):
return self.row.IsNull(i)
cdef class RowBatch:
"""
Class holding a batch of rows from a Scanner
"""
# This class owns the KuduScanBatch data
cdef:
KuduScanBatch batch
def __len__(self):
return self.batch.NumRows()
def __getitem__(self, i):
return self.get_row(i).as_tuple()
def __iter__(self):
cdef int i = 0
for i in range(len(self)):
yield self.get_row(i).as_tuple()
def as_tuples(self):
"""
Return RowBatch as a list of Python tuples
To simplify testing for the moment.
"""
cdef list tuples = []
for i in range(self.batch.NumRows()):
tuples.append(self.get_row(i).as_tuple())
return tuples
cdef Row get_row(self, i):
# TODO: boundscheck
# For safety, we need to increment the parent reference count and hold
# on to a reference internally so that if the RowBatch goes out of
# scope we won't end up with orphaned Row objects. This isn't the best,
# but an intermediate solution until we can do something better..
cdef Row row = Row(self)
row.row = self.batch.Row(i)
return row
cdef class Scanner:
"""
A class for defining a selection of data we wish to scan out of a Kudu
table. Create a scanner using Table.scanner.
"""
cdef:
Table table
KuduScanner* scanner
bint is_open
def __cinit__(self, Table table):
self.table = table
self.scanner = NULL
self.is_open = 0
def __dealloc__(self):
# We own this one
if self.scanner != NULL:
del self.scanner
cdef inline ensure_open(self):
if not self.is_open:
self.open()
def add_predicates(self, preds):
"""
Add a list of scan predicates to the scanner. Select columns from the
parent table and make comparisons to create predicates.
Examples
--------
c = table[col_name]
preds = [c >= 0, c <= 10]
scanner.add_predicates(preds)
Parameters
----------
preds : list of Predicate
"""
for pred in preds:
self.add_predicate(pred)
cpdef add_predicate(self, Predicate pred):
"""
Add a scan predicates to the scanner. Select columns from the
parent table and make comparisons to create predicates.
Examples
--------
pred = table[col_name] <= 10
scanner.add_predicate(pred)
Parameters
----------
pred : kudu.Predicate
"""
cdef KuduPredicate* clone
# We clone the KuduPredicate so that the Predicate wrapper class can be
# reused
clone = pred.pred.Clone()
check_status(self.scanner.AddConjunctPredicate(clone))
def set_projected_column_names(self, names):
"""
Sets the columns to be scanned.
Parameters
----------
names : list of string
Returns
-------
self : Scanner
"""
cdef vector[string] v_names
for name in names:
v_names.push_back(tobytes(name))
check_status(self.scanner.SetProjectedColumnNames(v_names))
return self
def set_fault_tolerant(self):
"""
Makes the underlying KuduScanner fault tolerant.
Returns a reference to itself to facilitate chaining.
Returns
-------
self : Scanner
"""
check_status(self.scanner.SetFaultTolerant())
return self
def new_bound(self):
"""
Returns a new instance of a ScanBound (subclass of PartialRow) to be
later set with add_lower_bound()/add_exclusive_upper_bound().
Returns
-------
bound : ScanBound
"""
return ScanBound(self.table)
def add_lower_bound(self, ScanBound bound):
"""
Sets the (inclusive) lower bound of the scan.
Returns a reference to itself to facilitate chaining.
Returns
-------
self : Scanner
"""
check_status(self.scanner.AddLowerBound(deref(bound.row)))
return self
def add_exclusive_upper_bound(self, ScanBound bound):
"""
Sets the (exclusive) upper bound of the scan.
Returns a reference to itself to facilitate chaining.
Returns
-------
self : Scanner
"""
check_status(self.scanner.AddExclusiveUpperBound(deref(bound.row)))
return self
def open(self):
"""
Returns a reference to itself to facilitate chaining
Returns
-------
self : Scanner
"""
if not self.is_open:
check_status(self.scanner.Open())
self.is_open = 1
return self
def has_more_rows(self):
"""
Returns True if there are more rows to be read.
"""
return self.scanner.HasMoreRows()
def read_all_tuples(self):
"""
Compute a RowBatch containing all rows from the scan operation (which
hopefully fit into memory, probably not handled gracefully at the
moment).
"""
cdef list tuples = []
cdef RowBatch batch
self.ensure_open()
while self.has_more_rows():
batch = self.next_batch()
tuples.extend(batch.as_tuples())
return tuples
def read_next_batch_tuples(self):
return self.next_batch().as_tuples()
cpdef RowBatch next_batch(self):
"""
Retrieve the next batch of rows from the scanner.
Returns
-------
batch : RowBatch
"""
if not self.has_more_rows():
raise StopIteration
cdef RowBatch batch = RowBatch()
check_status(self.scanner.NextBatch(&batch.batch))
return batch
cdef class KuduError:
"""
Wrapper for a C++ KuduError indicating a client error resulting from
applying operations in a session.
"""
cdef:
C_KuduError* error
def __cinit__(self):
self.error = NULL
def __dealloc__(self):
# We own this object
if self.error != NULL:
del self.error
def failed_op(self):
raise NotImplementedError
def __repr__(self):
return "KuduError('%s')" % (self.error.status().ToString())
cdef class PartialRow:
cdef:
Table table
KuduPartialRow* row
def __cinit__(self, Table table):
# This gets called before any subclass cinit methods
self.table = table
def __setitem__(self, key, value):
if isinstance(key, basestring):
self.set_field(key, value)
else:
self.set_loc(key, value)
cpdef set_field(self, key, value):
cdef:
int i = self.table.schema.get_loc(key)
DataType t = self.table.schema.loc_type(i)
cdef Slice* slc
if value is None:
self.row.SetNull(i)
return
# Leave it to Cython to do the coercion and complain if it doesn't
# work. Cython will catch many casting problems but we should verify
# with unit tests.
if t == KUDU_BOOL:
self.row.SetBool(i, <c_bool> value)
elif t == KUDU_INT8:
self.row.SetInt8(i, <int8_t> value)
elif t == KUDU_INT16:
self.row.SetInt16(i, <int16_t> value)
elif t == KUDU_INT32:
self.row.SetInt32(i, <int32_t> value)
elif t == KUDU_INT64:
self.row.SetInt64(i, <int64_t> value)
elif t == KUDU_FLOAT:
self.row.SetFloat(i, <float> value)
elif t == KUDU_DOUBLE:
self.row.SetDouble(i, <double> value)
elif t == KUDU_STRING:
if not cpython.PyBytes_Check(value):
value = value.encode('utf8')
# TODO: It would be much better not to heap-allocate a Slice object
slc = new Slice(cpython.PyBytes_AsString(value))
# Not safe to take a reference to PyBytes data for now
self.row.SetStringCopy(i, deref(slc))
del slc
cpdef set_loc(self, int i, value):
pass
cpdef set_field_null(self, key):
pass
cpdef set_loc_null(self, int i):
pass
cdef add_to_session(self, Session s):
pass
cdef class ScanBound(PartialRow):
def __cinit__(self, Table table):
self.row = self.table.schema.new_row()
def __dealloc__(self):
del self.row
cdef class WriteOperation(PartialRow):
cdef:
# Whether the WriteOperation has been applied.
# Set by subclasses.
bint applied
def __cinit__(self, Table table):
self.applied = 0
cdef class Insert(WriteOperation):
cdef:
KuduInsert* op
def __cinit__(self, Table table):
self.op = self.table.ptr().NewInsert()
self.row = self.op.mutable_row()
def __dealloc__(self):
del self.op
cdef add_to_session(self, Session s):
if self.applied:
raise Exception
check_status(s.s.get().Apply(self.op))
self.op = NULL
self.applied = 1
cdef class Update(WriteOperation):
cdef:
KuduUpdate* op
def __cinit__(self, Table table):
self.table = table
self.op = table.ptr().NewUpdate()
self.row = self.op.mutable_row()
def __dealloc__(self):
del self.op
cdef add_to_session(self, Session s):
pass
cdef class Delete(WriteOperation):
cdef:
KuduDelete* op
def __cinit__(self, Table table):
self.table = table
self.op = table.ptr().NewDelete()
self.row = self.op.mutable_row()
def __dealloc__(self):
del self.op
cdef add_to_session(self, Session s):
if self.applied:
raise Exception
check_status(s.s.get().Apply(self.op))
self.applied = 1
self.op = NULL
cdef inline cast_pyvalue(DataType t, object o):
if t == KUDU_BOOL:
return BoolVal(o)
elif t == KUDU_INT8:
return Int8Val(o)
elif t == KUDU_INT16:
return Int16Val(o)
elif t == KUDU_INT32:
return Int32Val(o)
elif t == KUDU_INT64:
return Int64Val(o)
elif t == KUDU_DOUBLE:
return DoubleVal(o)
elif t == KUDU_FLOAT:
return FloatVal(o)
elif t == KUDU_STRING:
return StringVal(o)
else:
raise TypeError(t)