blob: a910a496a9c6feed1a7112b210c76c3d70012272 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#cython: language_level=3
from libc.string cimport strcpy
from libc.stdlib cimport malloc, free
import pandas as pd
from cpython.bytes cimport PyBytes_AsString
cimport numpy as cnp
import numpy as np
from .tsfile cimport *
TIMESTAMP_STR = "Time"
TS_TYPE_INT32 = 1 << 8
TS_TYPE_BOOLEAN = 1 << 9
TS_TYPE_FLOAT = 1 << 10
TS_TYPE_DOUBLE = 1 << 11
TS_TYPE_INT64 = 1 << 12
TS_TYPE_TEXT = 1 << 13
type_mapping = {
'int32': TS_TYPE_INT32,
'bool': TS_TYPE_BOOLEAN,
'float32': TS_TYPE_FLOAT,
'float64': TS_TYPE_DOUBLE,
'int64': TS_TYPE_INT64
}
cdef class tsfile_reader:
cdef CTsFileReader reader
cdef QueryDataRet ret
cdef int batch_size
cdef bint read_all_at_once
def __init__(self, pathname, table_name, columns, start_time=None, end_time=None, batch_size=None):
self.open_reader(pathname)
self.query_data_ret(table_name, columns, start_time, end_time)
if batch_size is not None:
self.batch_size = batch_size
self.read_all_at_once = False
else:
self.batch_size = 1024
self.read_all_at_once = True
cdef open_reader(self, pathname):
cdef ErrorCode err_code
err_code = 0
self.reader = ts_reader_open(pathname.encode('utf-8'), &err_code)
if (err_code != 0):
raise Exception("Failed to open tsfile: %s, %s" %( pathname, err_code))
cdef query_data_ret(self, table_name, columns, start_time = None, end_time=None):
cdef bytes py_table_name
cdef char** c_columns
py_table_name = table_name.encode('utf-8')
c_table_name = PyBytes_AsString(py_table_name)
if isinstance(columns, str):
columns = [columns]
c_columns = <char**>malloc(len(columns) * sizeof(char*))
if not c_columns:
raise MemoryError("Failed to allocate memory for columns")
for i in range(len(columns)):
c_columns[i] = <char*>malloc(len(columns[i]) + 1)
if not c_columns[i]:
for j in range(i):
free(c_columns[j])
free(c_columns)
raise MemoryError("Failed to allocate memory for columns")
column_binary = columns[i].encode('utf-8')
column = PyBytes_AsString(column_binary)
strcpy(c_columns[i], column)
# query data from tsfile
if start_time is not None or end_time is not None:
if start_time is None:
start_time = LLONG_MIN
if end_time is None:
end_time = LLONG_MAX
self.ret = ts_reader_begin_end(self.reader, c_table_name, c_columns, len(columns), start_time, end_time)
else:
self.ret = ts_reader_read(self.reader, table_name.encode('utf-8'), c_columns, len(columns))
for i in range(len(columns)):
free(c_columns[i])
free(c_columns)
def read_tsfile(self):
# open tsfile to read
res = pd.DataFrame()
not_null_maps = []
if self.read_all_at_once:
while True:
chunk, not_null_map = self.get_next_dataframe()
if chunk is not None:
res = pd.concat([res, chunk])
not_null_maps.append(not_null_map)
else:
break
else:
res, not_null_map = self.get_next_dataframe()
not_null_maps.append(not_null_map)
self.free_resources()
not_null_map_all = None
if (not_null_maps != []):
not_null_map_all = np.vstack(not_null_maps)
return res, not_null_map_all
def __iter__(self):
return self
def __next__(self):
res, not_null_map = self.get_next_dataframe()
if res is None:
raise StopIteration
return res, not_null_map
def get_next_dataframe(self):
cdef:
DataResult* result
ColumnSchema* schema = NULL
cnp.ndarray[cnp.int64_t, ndim=1, mode='c'] np_array_i64
cnp.ndarray[cnp.int32_t, ndim=1, mode='c'] np_array_i32
cnp.ndarray[cnp.float32_t, ndim=1, mode='c'] np_array_float
cnp.ndarray[cnp.float64_t, ndim=1, mode='c'] np_array_double
cnp.ndarray[bint, ndim=1, mode='c'] np_array_bool
cnp.npy_intp length
bint has_null
bytes pystr
str py_string
res = {}
column_order = []
not_null_map = []
# Time column will be the first column
column_order.append(TIMESTAMP_STR)
for i in range(self.ret.column_num):
pystr = self.ret.column_names[i]
py_string = pystr.decode('utf-8', 'ignore')
column_order.append(py_string)
res[py_string] = []
res[TIMESTAMP_STR] = []
if self.ret.data == NULL:
return None, None
result = ts_next(self.ret, self.batch_size)
# there is no data meet our requirement
if result.column_schema == NULL:
# free memory
if (destory_tablet(result) != 0):
raise Exception("Failed to destroy tablet")
return None, None
# time column
length = result.cur_num + 1
cdef cnp.ndarray[cnp.int64_t, ndim=1, mode='c'] data_array = \
cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_INT64, result.times)
res[TIMESTAMP_STR] = np.array(data_array, dtype = np.int64)
for i in range(result.column_num):
# column name
schema = result.column_schema[i]
pystr = schema.name
column_name = pystr.decode('utf-8')
# column bitmap
is_not_null = np.empty(length, dtype = bool)
bool_ptr = <char*> result.bitmap[i]
has_null = False
for j in range(length):
is_not_null[j] = bool_ptr[j] != 0
if bool_ptr[j] == 0 and ~has_null:
has_null = True
if schema.column_def == TS_TYPE_INT32:
np_array_i32 = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_INT32, result.value[i])
arr = np.array(np_array_i32, dtype = np.int32)
elif schema.column_def == TS_TYPE_BOOLEAN:
arr_bool_ = np.empty(length, dtype=np.bool_)
bool_ptr = <char*> result.value[i]
for j in range(length):
arr_bool_[j] = bool_ptr[j] != 0
arr = np.array(arr_bool_, dtype = np.bool_)
elif schema.column_def == TS_TYPE_FLOAT:
np_array_float = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_FLOAT32, result.value[i])
arr = np.array(np_array_float, dtype = np.float32)
arr = np.where(is_not_null, arr, np.nan)
res[column_name]=arr
continue
elif schema.column_def == TS_TYPE_DOUBLE:
np_array_double = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_FLOAT64, result.value[i])
arr= np.array(np_array_double, dtype = np.float64)
arr = np.where(is_not_null, arr, np.nan)
res[column_name]=arr
continue
elif schema.column_def == TS_TYPE_INT64:
np_array_i64 = cnp.PyArray_SimpleNewFromData(1, &length, cnp.NPY_INT64, result.value[i])
arr = np.array(np_array_i64, dtype = np.int64)
else:
raise Exception("UnSupport column type")
if has_null:
tmp_array = np.full(length, np.nan, np.float64)
tmp_array[is_not_null] = arr[is_not_null]
if schema.column_def == TS_TYPE_INT32:
arr = pd.Series(tmp_array).astype('Int32')
elif schema.column_def == TS_TYPE_BOOLEAN:
arr = pd.Series(tmp_array).astype(np.bool_)
elif schema.column_def == TS_TYPE_INT64:
arr = pd.Series(tmp_array).astype('Int64')
res[column_name] = arr
not_null_map.append(is_not_null)
if (destory_tablet(result) != 0):
raise Exception("Failed to destroy tablet")
return pd.DataFrame(res, columns = column_order), not_null_map
def __dealloc__(self):
self.free_resources()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.free_resources()
cdef free_resources(self):
if self.reader:
if ts_reader_close(self.reader) != 0 :
raise Exception("Failed to close tsfile")
if self.ret:
if destory_query_dataret(self.ret) != 0:
raise Exception("Failed to free query data ret")
self.reader = NULL
self.ret = NULL
cdef class tsfile_writer:
cdef CTsFileWriter writer
cdef TsFileRowData row_data
def __init__(self, pathname):
self.open_writer(pathname)
cdef open_writer(self, pathname):
cdef ErrorCode err_code
err_code = 0
self.writer = ts_writer_open(pathname.encode('utf-8'), &err_code)
if (err_code != 0):
raise Exception("Failed to open tsfile: %s, %s" %( pathname, err_code))
def resister_timeseries(self, table_name, column_name, data_type):
cdef char* c_columns
cdef bytes py_table_name
cdef ColumnSchema schema
cdef bytes encoded_column_name = column_name.encode('utf-8')
py_table_name = table_name.encode('utf-8')
c_table_name = PyBytes_AsString(py_table_name)
schema.name = encoded_column_name
schema.column_def = data_type
if tsfile_register_table_column(self.writer, c_table_name, &schema) != 0:
raise Exception("Failed to register timeseries")
cdef create_row_data(self, table_name, time, column_length):
self.row_data = create_tsfile_row(table_name.encode('utf-8'), time, column_length)
def write_into_row_data(self, column_name, value, type):
cdef char* c_column_name = PyBytes_AsString(column_name.encode('utf-8'))
if type == TS_TYPE_INT32:
insert_data_into_tsfile_row_int32(self.row_data, c_column_name, value)
elif type == TS_TYPE_BOOLEAN:
insert_data_into_tsfile_row_boolean(self.row_data, c_column_name, value)
elif type == TS_TYPE_FLOAT:
insert_data_into_tsfile_row_float(self.row_data, c_column_name, value)
elif type == TS_TYPE_DOUBLE:
insert_data_into_tsfile_row_double(self.row_data, c_column_name, value)
elif type == TS_TYPE_INT64:
insert_data_into_tsfile_row_int64(self.row_data, c_column_name, value)
else:
raise TypeError("Unknown column type")
def write_tsfile(self, table_name, df):
column_names = df.columns.tolist()
column_types = df.dtypes
column_ctypes = []
for i in range(len(column_names)):
column_type = column_types[i].name
if column_type in type_mapping:
column_ctypes.append(type_mapping[column_type])
else:
raise TypeError("Unknown column type")
if (column_names[i] != TIMESTAMP_STR):
self.resister_timeseries(table_name, column_names[i], column_ctypes[i])
for i in range(len(df)):
time = df.iloc[i][TIMESTAMP_STR]
self.create_row_data(table_name, time, len(column_names))
for j in range(1, len(column_names)):
column_name = column_names[j]
column_value = df.iloc[i][column_name]
column_ctype = column_ctypes[j]
self.write_into_row_data(column_name, column_value, column_ctype)
if tsfile_write_row_data(self.writer, self.row_data) != 0:
raise Exception("Failed to write row data")
if tsfile_flush_data(self.writer) != 0:
raise Exception("Failed to flush data")
self.row_data = NULL
self.free_resources()
def free_resources(self):
if self.writer != NULL:
if ts_writer_close(self.writer) != 0:
raise Exception("Failed to close tsfile")
self.writer = NULL
def __dealloc__(self):
self.free_resources()