blob: 6cc6b0042d19cace61dd993e3d57c5f17c1ef798 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#cython: language_level=3
import weakref
from typing import List
import pandas as pd
from libc.stdint cimport INT64_MIN, INT64_MAX
from libc.stdlib cimport free
from tsfile.schema import TSDataType as TSDataTypePy
from .tsfile_cpp cimport *
from .tsfile_py_cpp cimport *
cdef class ResultSetPy:
"""
Get data from a query result. When reader run a query, a query handler will return.
If reader is closed, result set will not invalid anymore.
"""
# a tag for enable weakref in cython.
__pyx_allow_weakref__ = True
cdef object __weakref__
# ResultSet from C interface.
cdef ResultSet result
cdef object metadata
# ResultSet is valid or not, if the reader is closed, valid will be False.
cdef object valid
# The reader
cdef object tsfile_reader
cdef object is_tree
def __init__(self, tsfile_reader : TsFileReaderPy, is_tree: bint = False):
self.metadata = None
self.valid = True
self.tsfile_reader = weakref.ref(tsfile_reader)
self.is_tree = is_tree
cdef init_c(self, ResultSet result, object device_name):
"""
Init c symbols.
"""
cdef ResultSetMetaData metadata_c
self.result = result
metadata_c = tsfile_result_set_get_metadata(self.result)
self.metadata = from_c_result_set_meta_data(metadata_c)
self.metadata.set_table_name(device_name)
free_result_set_meta_data(metadata_c)
def next(self):
"""
Check and get next rows in query result.
:return: boolean, true means get next rows.
"""
cdef ErrorCode code = 0
self.check_result_set_invalid()
has_next = tsfile_result_set_next(self.result, &code)
check_error(code)
return has_next
def get_result_column_info(self):
"""
Get result set's columns info.
:return: a dict contains column's name and datatype.
"""
return {
column_name: column_type
for column_name, column_type in zip(
self.metadata.column_list,
self.metadata.data_types
)
}
def read_data_frame(self, max_row_num : int = 1024):
"""
:param max_row_num: default row num: 1024
:return: a dataframe contains data from query result.
"""
self.check_result_set_invalid()
column_names = self.metadata.get_column_list()
column_num = self.metadata.get_column_num()
date_columns = [
column_names[i]
for i in range(column_num)
if self.metadata.get_data_type(i + 1) == TSDataTypePy.DATE
]
data_type = [self.metadata.get_data_type(i + 1).to_pandas_dtype() for i in range(column_num)]
data_container = {
column_name: [] for column_name in column_names
}
cur_line = 0
# User may call result_set.next() before or not, so we just get current data.
# if there is no data in result set, we just get a None list.
row_data = [
self.get_value_by_index(i + 1)
for i in range(column_num)
]
if not all(value is None for value in row_data):
for column_name, value in zip(column_names, row_data):
data_container[column_name].append(value)
cur_line += 1
while cur_line < max_row_num:
if self.next():
row_data = (
self.get_value_by_index(i + 1)
for i in range(column_num)
)
for column_name, value in zip(column_names, row_data):
data_container[column_name].append(value)
cur_line += 1
else:
break
df = pd.DataFrame(data_container)
data_type_dict = {col: dtype for col, dtype in zip(column_names, data_type)}
df = df.astype(data_type_dict)
for col in date_columns:
try:
df[col] = pd.to_datetime(
df[col].astype(str),
format='%Y%m%d',
errors='coerce'
).dt.normalize()
except KeyError:
raise ValueError(f"DATE column '{col}' not found in DataFrame")
return df
def get_value_by_index(self, index : int):
"""
Get value by index from query result set.
NOTE: index start from 1.
"""
cdef char * string = NULL
self.check_result_set_invalid()
# Well when we check is null, id from 0, so there index -1.
if tsfile_result_set_is_null_by_index(self.result, index):
return None
# data type in metadata is an array, id from 0.
data_type = self.metadata.get_data_type(index)
if data_type == TSDataTypePy.INT32:
return tsfile_result_set_get_value_by_index_int32_t(self.result, index)
elif data_type == TSDataTypePy.INT64:
return tsfile_result_set_get_value_by_index_int64_t(self.result, index)
elif data_type == TSDataTypePy.FLOAT:
return tsfile_result_set_get_value_by_index_float(self.result, index)
elif data_type == TSDataTypePy.DOUBLE:
return tsfile_result_set_get_value_by_index_double(self.result, index)
elif data_type == TSDataTypePy.BOOLEAN:
return tsfile_result_set_get_value_by_index_bool(self.result, index)
elif data_type == TSDataTypePy.STRING or data_type == TSDataTypePy.TEXT:
try:
string = tsfile_result_set_get_value_by_index_string(self.result, index)
py_str = string.decode('utf-8')
return py_str
finally:
if string != NULL:
free(string)
def get_value_by_name(self, column_name : str):
"""
Get value by name from query result set.
"""
self.check_result_set_invalid()
if tsfile_result_set_is_null_by_name_c(self.result, column_name.lower()):
return None
# get index in metadata, metadata ind from 0.
ind = self.metadata.get_column_name_index(column_name.lower(), self.is_tree)
return self.get_value_by_index(ind)
def get_metadata(self):
return self.metadata
def is_null_by_index(self, index : int):
"""
Checks whether the field at the specified index in the result set is null.
This method queries the underlying result set to determine if the value
at the given column index position represents a null value.
Index start from 1.
"""
self.check_result_set_invalid()
if index > (len(self.metadata.column_list) + 1) or index < 1:
raise IndexError(
f"Column index {index} out of range (column count: {len(self.metadata.column_list)})"
)
return tsfile_result_set_is_null_by_index(self.result, index)
def is_null_by_name(self, name : str):
"""
Checks whether the field with the specified column name in the result set is null.
"""
self.check_result_set_invalid()
ind = self.metadata.get_column_name_index(name, self.is_tree)
return self.is_null_by_index(ind)
def check_result_set_invalid(self):
if not self.valid:
raise Exception("Invalid result set. TsFile Reader not exists")
def get_result_set_valid(self):
return self.valid
def close(self):
"""
Close result set, free C resource.
:return:
"""
if self.result != NULL:
free_tsfile_result_set(&self.result)
if self.tsfile_reader is not None:
reader = self.tsfile_reader()
if reader is not None:
reader.notify_result_set_discard(self)
self.result = NULL
self.valid = False
def set_invalid_result_set(self):
self.valid = False
self.close()
def __dealloc__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
cdef class TsFileReaderPy:
"""
Cython wrapper class for interacting with TsFileReader C implementation.
Provides a Pythonic interface to read and query time series data from TsFiles.
"""
__pyx_allow_weakref__ = True
cdef object __weakref__
cdef TsFileReader reader
cdef object activate_result_set_list
def __init__(self, pathname):
"""
Initialize a TsFile reader for the specified file path.
"""
self.init_reader(pathname)
self.activate_result_set_list = weakref.WeakSet()
cdef init_reader(self, pathname):
self.reader = tsfile_reader_new_c(pathname)
def query_table(self, table_name : str, column_names : List[str],
start_time : int = INT64_MIN, end_time : int = INT64_MAX) -> ResultSetPy:
"""
Execute a time range query on specified table and columns.
:return: query result handler.
"""
cdef ResultSet result;
result = tsfile_reader_query_table_c(self.reader, table_name.lower(),
[column_name.lower() for column_name in column_names], start_time,
end_time)
pyresult = ResultSetPy(self)
pyresult.init_c(result, table_name)
self.activate_result_set_list.add(pyresult)
return pyresult
def query_table_on_tree(self, column_names : List[str],
start_time : int = INT64_MIN, end_time : int = INT64_MAX) -> ResultSetPy:
"""
Execute a time range query on specified columns on tree structure.
:return: query result handler.
"""
cdef ResultSet result;
result = tsfile_reader_query_table_on_tree_c(self.reader,
[column_name.lower() for column_name in column_names], start_time,
end_time)
pyresult = ResultSetPy(self, True)
pyresult.init_c(result, "root")
self.activate_result_set_list.add(pyresult)
return pyresult
def query_timeseries(self, device_name : str, sensor_list : List[str], start_time : int = 0,
end_time : int = 0) -> ResultSetPy:
"""
Execute a time range query on a specify device.
"""
cdef ResultSet result;
result = tsfile_reader_query_paths_c(self.reader, device_name, sensor_list, start_time, end_time)
pyresult = ResultSetPy(self, True)
pyresult.init_c(result, device_name)
self.activate_result_set_list.add(pyresult)
return pyresult
def notify_result_set_discard(self, result_set: ResultSetPy):
"""
Remove activate result set from activate_result_set_list, called when a result set close.
:param result_set:
:return:
"""
self.activate_result_set_list.discard(result_set)
def get_table_schema(self, table_name : str):
"""
Get table's schema with specify table name.
"""
return get_table_schema(self.reader, table_name)
def get_all_table_schemas(self):
"""
Get all tables schemas
"""
return get_all_table_schema(self.reader)
def get_all_timeseries_schemas(self):
"""
Get all timeseries schemas
"""
return get_all_timeseries_schema(self.reader)
def close(self):
"""
Close TsFile Reader, if reader has result sets, invalid them.
"""
if self.reader == NULL:
return
# result_set_bak to avoid runtime error.
result_set_bak = list(self.activate_result_set_list)
for result_set in result_set_bak:
result_set.set_invalid_result_set()
cdef ErrorCode err_code
err_code = tsfile_reader_close(self.reader)
check_error(err_code)
self.reader = NULL
def get_active_query_result(self):
return self.activate_result_set_list
def __dealloc__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()