add batch write for tsfile.
diff --git a/cpp/src/common/device_id.h b/cpp/src/common/device_id.h
index 021cb6a..071d6f2 100644
--- a/cpp/src/common/device_id.h
+++ b/cpp/src/common/device_id.h
@@ -67,6 +67,8 @@
public:
explicit StringArrayDeviceID(const std::vector<std::string>& segments)
: segments_(formalize(segments)) {}
+ StringArrayDeviceID(const std::vector<std::string>& segments, bool fast)
+ :segments_(segments) {}
explicit StringArrayDeviceID(const std::string& device_id_string)
: segments_(split_device_id_string(device_id_string)) {}
diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc
index c216e13..94f22db 100644
--- a/cpp/src/common/tablet.cc
+++ b/cpp/src/common/tablet.cc
@@ -350,6 +350,7 @@
std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const {
std::vector<std::string> id_array;
+ id_array.reserve(id_column_indexes_.size() + 1);
id_array.push_back(insert_target_name_);
for (auto id_column_idx : id_column_indexes_) {
common::TSDataType data_type = INVALID_DATATYPE;
@@ -364,6 +365,9 @@
break;
}
}
+ if (id_array.size() == id_column_indexes_.size() + 1) {
+ return std::make_shared<StringArrayDeviceID>(id_array, true);
+ }
return std::make_shared<StringArrayDeviceID>(id_array);
}
diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h
index 31e82bf..c7c5db4 100644
--- a/cpp/src/common/tablet.h
+++ b/cpp/src/common/tablet.h
@@ -277,6 +277,7 @@
void set_batch_timestamp(int64_t* timestamp) {
memcpy(timestamps_, timestamp, max_row_num_);
+ cur_row_size_ = max_row_num_;
}
int set_null_value(uint32_t col_index, uint32_t row_index);
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc
index 4df05f6..b0088e9 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -666,28 +666,28 @@
const int ret = w->write_record(*record);
return ret;
}
+void _tablet_set_target_name(Tablet tablet, char *target_name) {
+ auto tab = static_cast<storage::Tablet *>(tablet);
+ tab->set_table_name(target_name);
+}
-ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index,
- const void *data, char* mask) {
+ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, void *data,
+ char *mask) {
auto tab = static_cast<storage::Tablet *>(tablet);
int ret = 0;
ret = tab->set_batch_data(col_index, data, mask);
return ret;
}
-ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index,
- const char **data) {
+ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, char **data) {
auto tab = static_cast<storage::Tablet *>(tablet);
int ret = 0;
- ret = tab->set_batch_data(col_index, data, nullptr);
+ ret = tab->set_batch_data_char(col_index, data);
return ret;
};
-ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t max_row_num) {
+ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t *timestamp) {
auto tab = static_cast<storage::Tablet *>(tablet);
- if (max_row_num != tab->get_max_row_size()) {
- return common::E_INVALID_ARG;
- }
tab->set_batch_timestamp(timestamp);
return common::E_OK;
}
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h
index ffa1356..b01cc6a 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.h
+++ b/cpp/src/cwrapper/tsfile_cwrapper.h
@@ -548,10 +548,10 @@
INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(float);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(double);
-
-ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void* data, char* mask);
-ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char** data);
-ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t max_row_num);
+void _tablet_set_target_name(Tablet tablet, char* target_name);
+ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, void* data, char* mask);
+ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, char** data);
+ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp);
ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index, uint32_t col_index);
// Write a tablet into a device.
diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py
index df51bcf..d15adf4 100644
--- a/python/tsfile/__init__.py
+++ b/python/tsfile/__init__.py
@@ -32,4 +32,5 @@
from .exceptions import *
from .tsfile_reader import TsFileReaderPy as TsFileReader, ResultSetPy as ResultSet
from .tsfile_writer import TsFileWriterPy as TsFileWriter
-from .tsfile_table_writer import TsFileTableWriter
\ No newline at end of file
+from .tsfile_writer import CTablet
+from .tsfile_table_writer import TsFileTableWriter
diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd
index b65585c..5acad98 100644
--- a/python/tsfile/tsfile_cpp.pxd
+++ b/python/tsfile/tsfile_cpp.pxd
@@ -158,9 +158,10 @@
void _free_tsfile_ts_record(TsRecord * record);
+ void _tablet_set_target_name(Tablet tablet, char * target_name);
ErrorCode _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void* data, char* mask);
ErrorCode _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char** data);
- ErrorCode _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp, uint32_t max_row_num);
+ ErrorCode _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp);
# resulSet : query data from tsfile reader
ResultSet tsfile_query_table(TsFileReader reader,
diff --git a/python/tsfile/tsfile_table_writer.py b/python/tsfile/tsfile_table_writer.py
index c312a0e..9b49b17 100644
--- a/python/tsfile/tsfile_table_writer.py
+++ b/python/tsfile/tsfile_table_writer.py
@@ -17,7 +17,7 @@
#
from tsfile import TableSchema, Tablet, TableNotExistError
-from tsfile import TsFileWriter
+from tsfile import TsFileWriter, CTablet
class TsFileTableWriter:
@@ -53,6 +53,13 @@
raise TableNotExistError
self.writer.write_table(tablet)
+ def write_ctablet(self, tablet: CTablet):
+ if self.exclusive_table_name_ is None:
+ raise TableNotExistError
+ tablet.set_target_name(self.exclusive_table_name_)
+ self.writer.write_ctablet(tablet)
+
+
def close(self):
"""
Close TsFileTableWriter and will flush data automatically.
diff --git a/python/tsfile/tsfile_writer.pyx b/python/tsfile/tsfile_writer.pyx
index 64cab0a..f3562af 100644
--- a/python/tsfile/tsfile_writer.pyx
+++ b/python/tsfile/tsfile_writer.pyx
@@ -20,11 +20,17 @@
import pandas as pd
import numpy as np
+from cpython.unicode cimport PyUnicode_AsUTF8String
+
from libc.stdlib cimport free
from libc.stdlib cimport malloc
+from libc.string cimport strdup
from .tsfile_cpp cimport *
from .tsfile_py_cpp cimport *
+import numpy as np
+import pandas as pd
+cimport numpy as cnp
from tsfile.row_record import RowRecord
from tsfile.schema import TimeseriesSchema as TimeseriesSchemaPy, DeviceSchema as DeviceSchemaPy
@@ -41,7 +47,7 @@
"string": TSDataTypePy.STRING
}
-cdef bint is_compatible(TSDataTypePy expected, TSDataTypePy actual):
+cdef bint is_compatible(object expected, object actual):
if expected == actual:
return True
if expected == TSDataTypePy.INT64 and actual == TSDataTypePy.INT32:
@@ -50,7 +56,7 @@
return True
return False
-def convert_series(pd.Series series, TSDataTypePy target) -> np.ndarray:
+cdef object convert_series(object series, object target):
dtype_map = {
TSDataTypePy.INT64: "int64",
TSDataTypePy.INT32: "int32",
@@ -64,7 +70,7 @@
if str(series.dtype) == target_str:
return series.to_numpy()
return series.astype(target_str).to_numpy()
-def encode_or_null(x):
+cdef encode_or_null(x):
if pd.isna(x):
return None
return str(x).encode('utf-8')
@@ -83,30 +89,40 @@
self.column_name = column_name
self.data_type = data_types
self.max_row_num = max_row_num
- column_num = len(column_name)
- if len(data_types) != column_num:
+ self.column_num = len(column_name)
+ if len(data_types) != self.column_num:
raise ValueError("Length of column_name and data_types must be equal")
- column_names = <char**> malloc(sizeof(char*) * column_num)
- column_data_types = <TSDataType*> malloc(sizeof(TSDataType) * column_num)
+ self.column_names = <char**> malloc(sizeof(char*) * self.column_num)
+ self.column_data_types = <TSDataType*> malloc(sizeof(TSDataType) * self.column_num)
ind = 0
for name, dtype in zip(column_name, data_types):
- column_names[ind] = strdup(name.encode('utf-8'))
- column_data_types[ind] = to_c_data_type(dtype)
+ self.column_names[ind] = strdup(name.encode('utf-8'))
+ self.column_data_types[ind] = to_c_data_type(dtype)
+ ind = ind + 1
-
+ cpdef set_target_name(self, object target_name):
+ cdef bytes device_id_bytes
+ cdef char * device_id_c
+ device_id_bytes = PyUnicode_AsUTF8String(target_name)
+ device_id_c = device_id_bytes
+ _tablet_set_target_name(self.tablet, device_id_c)
+ cdef Tablet get_tablet(self):
+ return self.tablet
cdef init_c_tablet(self):
if self.tablet != NULL:
- free_tablet(self.tablet)
- tablet_new(self.column_names, self.column_data_types, self.column_num, self.max_row_num)
+ free_tablet(&self.tablet)
+ self.tablet = tablet_new(self.column_names, self.column_data_types, self.column_num, self.max_row_num)
cpdef from_data_frame(self, data_frame: pd.DataFrame):
cdef void * data_ptr
- cdef const uint32_t * mask_ptr
+ cdef char * mask_ptr
cdef size_t length
cdef char** str_ptr
cdef bytes item
+ cdef int64_t* time_ptr
+ cdef cnp.ndarray[cnp.int64_t, ndim=1] time_array
if not isinstance(data_frame, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")
if data_frame.shape[1] != len(self.column_name) + 1:
@@ -119,12 +135,15 @@
if data_frame["time"].dtype != np.int64:
raise TypeError(f"Column 'time' must be int64, but got {data_frame['time'].dtype}")
+ if self.max_row_num !=len(data_frame["time"]):
+ raise ValueError(f"Time column length {len(data_frame['time'])} doesn't match expected {self.max_row_num}")
+
self.init_c_tablet()
+ time_array = data_frame["time"].to_numpy()
+ time_ptr = <int64_t *>time_array.data
+ _tablet_set_batch_timestamp(self.tablet, time_ptr)
- data_ptr = <void*> data_frame["time"].to_numpy().data
- _tablet_set_batch_timestamp(self.tablet, data_ptr)
-
- for i, col_name in enumerate(self.column_name):
+ for ind, col_name in enumerate(self.column_name):
if col_name not in data_frame.columns:
raise KeyError(f"Column '{col_name}' missing from DataFrame")
series = data_frame[col_name]
@@ -132,7 +151,7 @@
if dtype_str not in _pandas_dtype_to_ts:
raise TypeError(f"Unsupported pandas dtype {dtype_str} for column {col_name}")
actual_ts_type = _pandas_dtype_to_ts[dtype_str]
- expected_ts_type = self.data_type[i]
+ expected_ts_type = self.data_type[ind]
if not is_compatible(expected_ts_type, actual_ts_type):
raise TypeError(
f"Column '{col_name}' type mismatch: expected {expected_ts_type.name}, got {actual_ts_type.name}")
@@ -144,23 +163,33 @@
if array[i] is None:
str_ptr[i] = NULL
else:
- str_ptr[i] = <const char*>array[i]
- _tablet_set_batch_str(self.tablet, i, str_ptr)
+ str_ptr[i] = strdup(<char*>array[i])
+ _tablet_set_batch_str(self.tablet, ind, str_ptr)
for i in range(self.max_row_num):
if str_ptr[i] != NULL:
free(str_ptr[i])
free(str_ptr)
else:
array = convert_series(series, expected_ts_type)
- mask = series.notna.to_numpy().astype(np.byte)
- data_ptr = <void*>array.data
- mask_ptr = <char*> mask.data
+ mask = series.notna().to_numpy().astype(np.byte)
+ data_ptr = <void*>cnp.PyArray_DATA(array)
+ mask_ptr = <char*> cnp.PyArray_DATA(mask)
+ _tablet_set_batch_data(self.tablet, ind, data_ptr, mask_ptr)
- _tablet_set_batch_data(self.tablet, i, data_ptr, mask_ptr)
+ def __dealloc__(self):
+ if self.tablet != NULL:
+ free_tablet(&self.tablet)
+ self.tablet = NULL
+ if self.column_names != NULL:
+ for i in range(self.column_num):
+ free(self.column_names[i])
+ free(self.column_names)
+ self.column_names = NULL
-
-
+ if self.column_data_types != NULL:
+ free(self.column_data_types)
+ self.column_data_types = NULL
cdef class TsFileWriterPy:
cdef TsFileWriter writer
@@ -249,6 +278,13 @@
finally:
free_c_tablet(ctablet)
+
+ def write_ctablet(self, tablet: CTablet):
+ cdef ErrorCode errno
+ errno = _tsfile_writer_write_table(self.writer, tablet.get_tablet())
+ check_error(errno)
+
+
cpdef close(self):
"""
Flush data and Close tsfile writer.