blob: e376add7a2d655f93b5e5682f8dfd721de08dd30 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
from datetime import date
import numpy as np
import pandas as pd
import pytest
from tsfile import ColumnSchema, TableSchema, TSDataType, TIME_COLUMN
from tsfile import TsFileTableWriter, ColumnCategory
from tsfile import to_dataframe
from tsfile.exceptions import ColumnNotExistError, TypeMismatchError
from tsfile.tsfile_table_writer import (
validate_dataframe_for_tsfile,
infer_object_column_type,
)
def convert_to_nullable_types(df):
"""
Convert DataFrame columns to nullable types to match returned DataFrame from to_dataframe.
This handles the fact that returned DataFrames use nullable types (Int64, Float64, etc.)
to support Null values.
"""
df = df.copy()
for col in df.columns:
dtype = df[col].dtype
if dtype == "int64":
df[col] = df[col].astype("Int64")
elif dtype == "int32":
df[col] = df[col].astype("Int32")
elif dtype == "float64":
df[col] = df[col].astype("Float64")
elif dtype == "float32":
df[col] = df[col].astype("Float32")
elif dtype == "bool":
df[col] = df[col].astype("boolean")
return df
def test_infer_object_column_type_bool():
"""infer_object_column_type should infer BOOLEAN for object column containing bool values."""
s_true = pd.Series([True, False], dtype=object)
assert infer_object_column_type(s_true) == TSDataType.BOOLEAN
s_false = pd.Series([False], dtype=object)
assert infer_object_column_type(s_false) == TSDataType.BOOLEAN
def test_write_dataframe_basic():
table = TableSchema(
"test_table",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD),
ColumnSchema("value2", TSDataType.INT64, ColumnCategory.FIELD),
],
)
tsfile_path = "test_write_dataframe_basic.tsfile"
try:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
with TsFileTableWriter(tsfile_path, table) as writer:
df = pd.DataFrame(
{
"time": [i for i in range(100)],
"device": [f"device{i}" for i in range(100)],
"value": [i * 1.5 for i in range(100)],
"value2": [i * 10 for i in range(100)],
}
)
writer.write_dataframe(df)
df_read = to_dataframe(tsfile_path, table_name="test_table")
df_read = df_read.sort_values(TIME_COLUMN).reset_index(drop=True)
df_sorted = convert_to_nullable_types(
df.sort_values("time").reset_index(drop=True)
)
assert df_read.shape == (100, 4)
assert df_read[TIME_COLUMN].equals(df_sorted["time"])
assert df_read["device"].equals(df_sorted["device"])
assert df_read["value"].equals(df_sorted["value"])
assert df_read["value2"].equals(df_sorted["value2"])
finally:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
def test_write_dataframe_with_index():
table = TableSchema(
"test_table",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD),
],
)
tsfile_path = "test_write_dataframe_index.tsfile"
try:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
with TsFileTableWriter(tsfile_path, table) as writer:
df = pd.DataFrame(
{
"device": [f"device{i}" for i in range(50)],
"value": [i * 2.5 for i in range(50)],
}
)
df.index = [i * 10 for i in range(50)] # Set index as timestamps
writer.write_dataframe(df)
df_read = to_dataframe(tsfile_path, table_name="test_table")
df_read = df_read.sort_values(TIME_COLUMN).reset_index(drop=True)
df_sorted = df.sort_index()
df_sorted = convert_to_nullable_types(df_sorted.reset_index(drop=True))
time_series = pd.Series(df.sort_index().index.values, dtype="Int64")
assert df_read.shape == (50, 3)
assert df_read[TIME_COLUMN].equals(time_series)
assert df_read["device"].equals(df_sorted["device"])
assert df_read["value"].equals(df_sorted["value"])
finally:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
def test_write_dataframe_case_insensitive():
table = TableSchema(
"test_table",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD),
],
)
tsfile_path = "test_write_dataframe_case.tsfile"
try:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
with TsFileTableWriter(tsfile_path, table) as writer:
df = pd.DataFrame(
{
"Time": [i for i in range(30)], # Capital T
"Device": [f"device{i}" for i in range(30)], # Capital D
"VALUE": [i * 3.0 for i in range(30)], # All caps
}
)
writer.write_dataframe(df)
df_read = to_dataframe(tsfile_path, table_name="test_table")
df_read = df_read.sort_values(TIME_COLUMN).reset_index(drop=True)
df_sorted = convert_to_nullable_types(
df.sort_values("Time").reset_index(drop=True)
)
assert df_read.shape == (30, 3)
assert df_read[TIME_COLUMN].equals(df_sorted["Time"])
assert df_read["device"].equals(df_sorted["Device"])
assert df_read["value"].equals(df_sorted["VALUE"])
finally:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
def test_write_dataframe_column_not_in_schema():
table = TableSchema(
"test_table",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD),
],
)
tsfile_path = "test_write_dataframe_extra_col.tsfile"
try:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
with TsFileTableWriter(tsfile_path, table) as writer:
df = pd.DataFrame(
{
"time": [i for i in range(10)],
"device": [f"device{i}" for i in range(10)],
"value": [i * 1.0 for i in range(10)],
"extra_column": [i for i in range(10)], # Not in schema
}
)
with pytest.raises(ColumnNotExistError):
writer.write_dataframe(df)
finally:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
def test_write_dataframe_type_mismatch():
table = TableSchema(
"test_table", [ColumnSchema("value", TSDataType.STRING, ColumnCategory.FIELD)]
)
tsfile_path = "test_write_dataframe_type_mismatch.tsfile"
try:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
with TsFileTableWriter(tsfile_path, table) as writer:
df = pd.DataFrame(
{"time": [i for i in range(10)], "value": [i for i in range(10)]}
)
with pytest.raises(TypeMismatchError) as exc_info:
writer.write_dataframe(df)
finally:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
def test_write_dataframe_all_datatypes():
table = TableSchema(
"test_table",
[
ColumnSchema("bool_col", TSDataType.BOOLEAN, ColumnCategory.FIELD),
ColumnSchema("int32_col", TSDataType.INT32, ColumnCategory.FIELD),
ColumnSchema("int64_col", TSDataType.INT64, ColumnCategory.FIELD),
ColumnSchema("float_col", TSDataType.FLOAT, ColumnCategory.FIELD),
ColumnSchema("double_col", TSDataType.DOUBLE, ColumnCategory.FIELD),
ColumnSchema("string_col", TSDataType.STRING, ColumnCategory.FIELD),
ColumnSchema("blob_col", TSDataType.BLOB, ColumnCategory.FIELD),
ColumnSchema("text_col", TSDataType.TEXT, ColumnCategory.FIELD),
ColumnSchema("date_col", TSDataType.DATE, ColumnCategory.FIELD),
ColumnSchema("timestamp_col", TSDataType.TIMESTAMP, ColumnCategory.FIELD),
],
)
tsfile_path = "test_write_dataframe_all_types.tsfile"
try:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
with TsFileTableWriter(tsfile_path, table) as writer:
df = pd.DataFrame(
{
"time": [i for i in range(50)],
"bool_col": [i % 2 == 0 for i in range(50)],
"int32_col": pd.Series([i for i in range(50)], dtype="int32"),
"int64_col": [i * 10 for i in range(50)],
"float_col": pd.Series(
[i * 1.5 for i in range(50)], dtype="float32"
),
"double_col": [i * 2.5 for i in range(50)],
"string_col": [f"str{i}" for i in range(50)],
"blob_col": [f"blob{i}".encode("utf-8") for i in range(50)],
"text_col": [f"text{i}" for i in range(50)],
"date_col": [date(2025, i % 11 + 1, i % 20 + 1) for i in range(50)],
"timestamp_col": [i for i in range(50)],
}
)
writer.write_dataframe(df)
df_read = to_dataframe(tsfile_path, table_name="test_table")
df_read = df_read.sort_values(TIME_COLUMN).reset_index(drop=True)
df_sorted = convert_to_nullable_types(
df.sort_values("time").reset_index(drop=True)
)
assert df_read.shape == (50, 11)
assert df_read["bool_col"].equals(df_sorted["bool_col"])
assert df_read["int32_col"].equals(df_sorted["int32_col"])
assert df_read["int64_col"].equals(df_sorted["int64_col"])
assert np.allclose(df_read["float_col"], df_sorted["float_col"])
assert np.allclose(df_read["double_col"], df_sorted["double_col"])
assert df_read["string_col"].equals(df_sorted["string_col"])
assert df_read["blob_col"].equals(df_sorted["blob_col"])
assert df_read["text_col"].equals(df_sorted["text_col"])
assert df_read["date_col"].equals(df_sorted["date_col"])
assert df_read["timestamp_col"].equals(df_sorted["timestamp_col"])
for i in range(50):
assert df_read["blob_col"].iloc[i] == df_sorted["blob_col"].iloc[i]
finally:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
def test_write_dataframe_schema_time_column():
table = TableSchema(
"test_table",
[
ColumnSchema("time", TSDataType.TIMESTAMP, ColumnCategory.TIME),
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD),
],
)
tsfile_path = "test_write_dataframe_schema_time.tsfile"
try:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
with TsFileTableWriter(tsfile_path, table) as writer:
df = pd.DataFrame(
{
"time": [i * 100 for i in range(50)],
"device": [f"device{i}" for i in range(50)],
"value": [i * 1.5 for i in range(50)],
}
)
writer.write_dataframe(df)
df_read = to_dataframe(tsfile_path, table_name="test_table")
df_read = df_read.sort_values(TIME_COLUMN).reset_index(drop=True)
df_sorted = convert_to_nullable_types(
df.sort_values("time").reset_index(drop=True)
)
assert df_read.shape == (50, 3)
assert df_read[TIME_COLUMN].equals(df_sorted[TIME_COLUMN])
assert df_read["device"].equals(df_sorted["device"])
assert df_read["value"].equals(df_sorted["value"])
finally:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
def test_write_dataframe_schema_time_and_dataframe_time():
table = TableSchema(
"test_table",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD),
],
)
tsfile_path = "test_write_dataframe_schema_and_df_time.tsfile"
try:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
with TsFileTableWriter(tsfile_path, table) as writer:
df = pd.DataFrame(
{
"Time": [i for i in range(30)],
"device": [f"dev{i}" for i in range(30)],
"value": [float(i) for i in range(30)],
}
)
writer.write_dataframe(df)
df_read = to_dataframe(tsfile_path, table_name="test_table")
df_read = df_read.sort_values(TIME_COLUMN).reset_index(drop=True)
df_sorted = convert_to_nullable_types(
df.sort_values("Time").rename(columns=str.lower).reset_index(drop=True)
)
assert df_read.shape == (30, 3)
assert df_read["time"].equals(df_sorted["time"])
assert df_read["device"].equals(df_sorted["device"])
assert df_read["value"].equals(df_sorted["value"])
finally:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
def test_write_dataframe_empty():
table = TableSchema(
"test_table", [ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)]
)
tsfile_path = "test_write_dataframe_empty.tsfile"
try:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
with TsFileTableWriter(tsfile_path, table) as writer:
df = pd.DataFrame({"time": [], "value": []})
with pytest.raises(ValueError):
writer.write_dataframe(df)
finally:
if os.path.exists(tsfile_path):
os.remove(tsfile_path)
def test_validate_dataframe_none_column_name():
df = pd.DataFrame([[1, 2]], columns=[None, "value"])
with pytest.raises(ValueError, match="Column name cannot be None or empty"):
validate_dataframe_for_tsfile(df)