blob: 0346fd5226e9c1ba9d1250f3472f40890b973508 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
from datetime import date, datetime
import pandas as pd
from tsfile import TableSchema, Tablet, TableNotExistError, ColumnCategory
from tsfile import TsFileWriter, ColumnSchema
from tsfile.constants import TSDataType
from tsfile.exceptions import TypeMismatchError, ColumnNotExistError
def validate_dataframe_for_tsfile(df: pd.DataFrame) -> None:
if df is None or df.empty:
raise ValueError("DataFrame cannot be None or empty")
columns = list(df.columns)
seen = set()
duplicates = []
for c in columns:
lower = c.lower()
if lower in seen:
duplicates.append(c)
seen.add(lower)
if duplicates:
raise ValueError(
f"Column names must be unique (case-insensitive). Duplicate columns: {duplicates}"
)
unsupported = []
for col in columns:
dtype = df[col].dtype
try:
TSDataType.from_pandas_datatype(dtype)
except (ValueError, TypeError) as e:
unsupported.append((col, str(dtype), str(e)))
if unsupported:
msg_parts = [f" - {col}: dtype={dtype}" for col, dtype in unsupported]
raise ValueError(
"Data types not supported by tsfile:\n" + "\n".join(msg_parts)
)
def infer_object_column_type(column_series: pd.Series) -> TSDataType:
first_valid_idx = column_series.first_valid_index()
if first_valid_idx is None:
return TSDataType.STRING
value = column_series[first_valid_idx]
if isinstance(value, (bytes, bytearray)):
return TSDataType.BLOB
if isinstance(value, (date, datetime)):
return TSDataType.DATE
if isinstance(value, str):
return TSDataType.STRING
raise TypeError(
f"Cannot infer type from object column: expected str/bytes/date, got {type(value).__name__}: {value!r}"
)
class TsFileTableWriter:
"""
Facilitates writing structured table data into a TsFile with a specified schema.
The TsFileTableWriter class is designed to write structured data,
particularly suitable for time-series data, into a file optimized for
efficient storage and retrieval (referred to as TsFile here). It allows users
to define the schema of the tables they want to write, add rows of data
according to that schema, and serialize this data into a TsFile.
"""
def __init__(self, path: str, table_schema: TableSchema, memory_threshold = 128 * 1024 * 1024):
"""
:param path: The path of tsfile, will create if it doesn't exist.
:param table_schema: describes the schema of the tables they want to write.
:param memory_threshold(Byte): memory usage threshold for flushing data.
"""
self.writer = TsFileWriter(path, memory_threshold)
self.writer.register_table(table_schema)
self.tableSchema = table_schema
def write_table(self, tablet: Tablet):
"""
Write a tablet into table in tsfile.
:param tablet: stored batch data of a table.
:return: no return value.
:raise: TableNotExistError if table does not exist or tablet's table_name does not match tableschema.
"""
if tablet.get_target_name() is None:
tablet.set_table_name(self.tableSchema.get_table_name())
elif (self.tableSchema.get_table_name() is not None
and tablet.get_target_name() != self.tableSchema.get_table_name()):
raise TableNotExistError
self.writer.write_table(tablet)
def write_dataframe(self, dataframe: pd.DataFrame):
"""
Write a pandas DataFrame into table in tsfile.
:param dataframe: pandas DataFrame with 'time' column and data columns matching schema.
:return: no return value.
:raise: ValueError if dataframe is None or is empty.
:raise: ColumnNotExistError if DataFrame columns don't match schema.
:raise: TypeMismatchError if DataFrame column types are incompatible with schema.
"""
validate_dataframe_for_tsfile(dataframe)
# rename columns to lowercase
dataframe = dataframe.rename(columns=str.lower)
time_column = self.tableSchema.get_time_column()
# tag columns used for sorting
tag_columns = self.tableSchema.get_tag_columns()
if time_column is None:
if 'time' in dataframe.columns:
dtype = TSDataType.from_pandas_datatype(dataframe['time'].dtype)
if not TSDataType.TIMESTAMP.is_compatible_with(dtype):
raise TypeMismatchError(
code=27,
context=f"time column require INT/Timestamp"
)
self.tableSchema.add_column(ColumnSchema("time",
TSDataType.TIMESTAMP,
ColumnCategory.TIME))
time_column = self.tableSchema.get_time_column()
type_mismatches = []
for col_name in dataframe.columns:
if time_column is not None and col_name == time_column.get_column_name():
continue
schema_col = self.tableSchema.get_column(col_name)
if schema_col is None:
raise ColumnNotExistError(context=f"{col_name} is not define in table schema")
# Object dtype can represent STRING, DATE, TEXT, BLOB; validation will be performed during insert, skip here
if schema_col.get_data_type() in [TSDataType.INT64, TSDataType.INT32, TSDataType.DOUBLE, TSDataType.FLOAT,
TSDataType.BOOLEAN, TSDataType.TIMESTAMP]:
df_dtype = dataframe[col_name].dtype
df_ts_type = TSDataType.from_pandas_datatype(df_dtype)
expected_ts_type = schema_col.get_data_type()
if not expected_ts_type.is_compatible_with(df_ts_type):
type_mismatches.append(
f"Column '{col_name}': expected {expected_ts_type.name}, got {df_ts_type.name}"
)
if type_mismatches:
raise TypeMismatchError(
code=27,
context=f"Type mismatches: {'; '.join(type_mismatches)}"
)
if time_column:
time_column_name = time_column.get_column_name()
time_series = dataframe[time_column_name]
if time_series.isna().any():
raise ValueError(
f"Time column '{time_column}' must not contain null/NaN values"
)
sort_by = [column.get_column_name() for column in tag_columns]
sort_by.append(time_column_name)
dataframe = dataframe.sort_values(by=sort_by)
self.writer.write_dataframe(self.tableSchema.get_table_name(), dataframe, self.tableSchema)
def close(self):
"""
Close TsFileTableWriter and will flush data automatically.
:return: no return value.
"""
self.writer.close()
def flush(self):
"""
Flush current data to tsfile.
:return: no return value.
"""
self.writer.flush()
def __dealloc__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()