| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| import os |
| from datetime import date |
| |
| import numpy as np |
| import pandas as pd |
| import pytest |
| |
| from tsfile import to_dataframe, TsFileReader, ColumnCategory, TIME_COLUMN |
| from tsfile.utils import dataframe_to_tsfile |
| |
| |
| def convert_to_nullable_types(df): |
| df = df.copy() |
| for col in df.columns: |
| dtype = df[col].dtype |
| if dtype == "int64": |
| df[col] = df[col].astype("Int64") |
| elif dtype == "int32": |
| df[col] = df[col].astype("Int32") |
| elif dtype == "float64": |
| df[col] = df[col].astype("Float64") |
| elif dtype == "float32": |
| df[col] = df[col].astype("Float32") |
| elif dtype == "bool": |
| df[col] = df[col].astype("boolean") |
| elif pd.api.types.is_object_dtype(df[col]): |
| non_null = df[col].dropna() |
| if len(non_null) and non_null.map(lambda x: isinstance(x, str)).all(): |
| df[col] = df[col].astype("string") |
| elif pd.api.types.is_string_dtype(df[col]): |
| df[col] = df[col].astype("string") |
| return df |
| |
| |
| def test_dataframe_to_tsfile_basic(): |
| tsfile_path = "test_dataframe_to_tsfile_basic.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame( |
| { |
| "time": [i for i in range(100)], |
| "device": [f"device{i}" for i in range(100)], |
| "value": [i * 1.5 for i in range(100)], |
| "value2": [i * 10 for i in range(100)], |
| } |
| ) |
| |
| dataframe_to_tsfile(df, tsfile_path, table_name="test_table") |
| |
| df_read = to_dataframe(tsfile_path, table_name="test_table") |
| df_read = df_read.sort_values("time").reset_index(drop=True) |
| df_sorted = convert_to_nullable_types( |
| df.sort_values("time").reset_index(drop=True) |
| ) |
| df_read = convert_to_nullable_types(df_read) |
| |
| assert df_read.shape == (100, 4) |
| assert df_read["time"].equals(df_sorted["time"]) |
| assert df_read["device"].equals(df_sorted["device"]) |
| assert df_read["value"].equals(df_sorted["value"]) |
| assert df_read["value2"].equals(df_sorted["value2"]) |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_default_table_name(): |
| tsfile_path = "test_dataframe_to_tsfile_default.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame({"time": [0, 1], "value": [1.0, 2.0]}) |
| dataframe_to_tsfile(df, tsfile_path) |
| |
| df_read = to_dataframe(tsfile_path, table_name="default_table") |
| assert len(df_read) == 2 |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_with_index(): |
| tsfile_path = "test_dataframe_to_tsfile_index.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame( |
| { |
| "device": [f"device{i}" for i in range(30)], |
| "value": [i * 2.0 for i in range(30)], |
| } |
| ) |
| df.index = [i * 100 for i in range(30)] |
| dataframe_to_tsfile(df, tsfile_path, table_name="test_table") |
| |
| df_read = to_dataframe(tsfile_path, table_name="test_table") |
| df_read = df_read.sort_values("time").reset_index(drop=True) |
| df_read = convert_to_nullable_types(df_read) |
| time_expected = pd.Series(df.index.values, dtype="Int64") |
| assert df_read.shape == (30, 3) |
| assert df_read["time"].equals(time_expected) |
| |
| with TsFileReader(tsfile_path) as reader: |
| table_schema = reader.get_table_schema("test_table") |
| device_col = table_schema.get_column("device") |
| assert device_col is not None |
| assert device_col.get_category() == ColumnCategory.FIELD |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_custom_time_column(): |
| tsfile_path = "test_dataframe_to_tsfile_custom_time.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame( |
| { |
| "timestamp": [i for i in range(30)], |
| "device": [f"device{i}" for i in range(30)], |
| "value": [i * 3.0 for i in range(30)], |
| } |
| ) |
| |
| dataframe_to_tsfile( |
| df, tsfile_path, table_name="test_table", time_column="timestamp" |
| ) |
| |
| df_read = to_dataframe(tsfile_path, table_name="test_table") |
| df_read = df_read.sort_values("timestamp").reset_index(drop=True) |
| df_sorted = convert_to_nullable_types( |
| df.sort_values("timestamp").reset_index(drop=True) |
| ) |
| df_read = convert_to_nullable_types(df_read) |
| |
| assert df_read.shape == (30, 3) |
| assert df_read["timestamp"].equals(df_sorted["timestamp"]) |
| assert df_read["device"].equals(df_sorted["device"]) |
| assert df_read["value"].equals(df_sorted["value"]) |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_case_insensitive_time(): |
| tsfile_path = "test_dataframe_to_tsfile_case_time.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame( |
| {"Time": [i for i in range(20)], "value": [i * 2.0 for i in range(20)]} |
| ) |
| |
| dataframe_to_tsfile(df, tsfile_path, table_name="test_table") |
| |
| df_read = to_dataframe(tsfile_path, table_name="test_table") |
| df_read = convert_to_nullable_types(df_read) |
| assert df_read.shape == (20, 2) |
| assert df_read["time"].equals(pd.Series([i for i in range(20)], dtype="Int64")) |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_with_tag_columns(): |
| tsfile_path = "test_dataframe_to_tsfile_tags.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame( |
| { |
| "time": [i for i in range(20)], |
| "device": [f"device{i}" for i in range(20)], |
| "location": [f"loc{i % 5}" for i in range(20)], |
| "value": [i * 1.5 for i in range(20)], |
| } |
| ) |
| |
| dataframe_to_tsfile( |
| df, tsfile_path, table_name="test_table", tag_column=["device", "location"] |
| ) |
| |
| df_read = to_dataframe(tsfile_path, table_name="test_table") |
| df_read = df_read.sort_values(TIME_COLUMN).reset_index(drop=True) |
| df_sorted = convert_to_nullable_types( |
| df.sort_values("time").reset_index(drop=True) |
| ) |
| df_read = convert_to_nullable_types(df_read) |
| |
| assert df_read.shape == (20, 4) |
| assert df_read["device"].equals(df_sorted["device"]) |
| assert df_read["location"].equals(df_sorted["location"]) |
| assert df_read["value"].equals(df_sorted["value"]) |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_tag_time_unsorted(): |
| tsfile_path = "test_dataframe_to_tsfile_tag_time_unsorted.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame( |
| { |
| "time": [30, 10, 20, 50, 40, 15, 25, 35, 5, 45], |
| "device": [ |
| "device1", |
| "device1", |
| "device1", |
| "device2", |
| "device2", |
| "device1", |
| "device1", |
| "device2", |
| "device1", |
| "device2", |
| ], |
| "value": [i * 1.5 for i in range(10)], |
| } |
| ) |
| |
| dataframe_to_tsfile( |
| df, tsfile_path, table_name="test_table", tag_column=["device"] |
| ) |
| |
| df_read = to_dataframe(tsfile_path, table_name="test_table") |
| df_expected = df.sort_values(by=["device", "time"]).reset_index(drop=True) |
| df_expected = convert_to_nullable_types(df_expected) |
| df_read = convert_to_nullable_types(df_read) |
| |
| assert df_read.shape == (10, 3) |
| assert df_read["device"].equals(df_expected["device"]) |
| assert df_read[TIME_COLUMN].equals(df_expected["time"]) |
| assert df_read["value"].equals(df_expected["value"]) |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_all_datatypes(): |
| tsfile_path = "test_dataframe_to_tsfile_all_types.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame( |
| { |
| "time": [i for i in range(50)], |
| "bool_col": [i % 2 == 0 for i in range(50)], |
| "int32_col": pd.Series([i for i in range(50)], dtype="int32"), |
| "int64_col": [i * 10 for i in range(50)], |
| "float_col": pd.Series([i * 1.5 for i in range(50)], dtype="float32"), |
| "double_col": [i * 2.5 for i in range(50)], |
| "string_col": [f"str{i}" for i in range(50)], |
| "blob_col": [f"blob{i}".encode("utf-8") for i in range(50)], |
| "text_col": [f"text{i}" for i in range(50)], |
| "date_col": [date(2025, i % 11 + 1, i % 20 + 1) for i in range(50)], |
| "timestamp_col": [i for i in range(50)], |
| } |
| ) |
| |
| dataframe_to_tsfile(df, tsfile_path, table_name="test_table") |
| |
| df_read = to_dataframe(tsfile_path, table_name="test_table") |
| df_read = df_read.sort_values(TIME_COLUMN).reset_index(drop=True) |
| df_sorted = convert_to_nullable_types( |
| df.sort_values("time").reset_index(drop=True) |
| ) |
| df_read = convert_to_nullable_types(df_read) |
| |
| assert df_read.shape == (50, 11) |
| assert df_read["bool_col"].equals(df_sorted["bool_col"]) |
| assert df_read["int32_col"].equals(df_sorted["int32_col"]) |
| assert df_read["int64_col"].equals(df_sorted["int64_col"]) |
| assert np.allclose(df_read["float_col"], df_sorted["float_col"]) |
| assert np.allclose(df_read["double_col"], df_sorted["double_col"]) |
| assert df_read["string_col"].equals(df_sorted["string_col"]) |
| assert df_read["text_col"].equals(df_sorted["text_col"]) |
| assert df_read["date_col"].equals(df_sorted["date_col"]) |
| assert df_read["timestamp_col"].equals(df_sorted["timestamp_col"]) |
| for i in range(50): |
| assert df_read["blob_col"].iloc[i] == df_sorted["blob_col"].iloc[i] |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_empty_dataframe(): |
| tsfile_path = "test_dataframe_to_tsfile_empty.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame() |
| |
| with pytest.raises(ValueError, match="DataFrame cannot be None or empty"): |
| dataframe_to_tsfile(df, tsfile_path) |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_no_data_columns(): |
| tsfile_path = "test_dataframe_to_tsfile_no_data.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame({"time": [i for i in range(10)]}) |
| |
| with pytest.raises( |
| ValueError, match="DataFrame must have at least one data column" |
| ): |
| dataframe_to_tsfile(df, tsfile_path) |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_only_time_column_raises(): |
| """Only time column (no FIELD/TAG columns) must raise ValueError.""" |
| tsfile_path = "test_only_time_column.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| df = pd.DataFrame({"time": [1, 2, 3]}) |
| with pytest.raises( |
| ValueError, match="at least one data column besides the time column" |
| ): |
| dataframe_to_tsfile(df, tsfile_path) |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_time_column_not_found(): |
| tsfile_path = "test_dataframe_to_tsfile_time_err.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame({"time": [0, 1], "value": [1.0, 2.0]}) |
| with pytest.raises(ValueError, match="Time column 'timestamp' not found"): |
| dataframe_to_tsfile(df, tsfile_path, time_column="timestamp") |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_invalid_time_column(): |
| tsfile_path = "test_dataframe_to_tsfile_invalid_time.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame( |
| {"timestamp": [i for i in range(10)], "value": [i * 1.0 for i in range(10)]} |
| ) |
| |
| with pytest.raises(ValueError, match="Time column 'time' not found"): |
| dataframe_to_tsfile(df, tsfile_path, time_column="time") |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_non_integer_time_column(): |
| tsfile_path = "test_dataframe_to_tsfile_non_int_time.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame( |
| { |
| "time": [f"time{i}" for i in range(10)], |
| "value": [i * 1.0 for i in range(10)], |
| } |
| ) |
| |
| with pytest.raises(TypeError, match="must be integer type"): |
| dataframe_to_tsfile(df, tsfile_path) |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_tag_column_not_found(): |
| tsfile_path = "test_dataframe_to_tsfile_tag_err.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame({"time": [0, 1], "device": ["a", "b"], "value": [1.0, 2.0]}) |
| with pytest.raises(ValueError, match="Tag column 'invalid' not found"): |
| dataframe_to_tsfile(df, tsfile_path, tag_column=["invalid"]) |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| |
| def test_dataframe_to_tsfile_invalid_tag_column(): |
| tsfile_path = "test_dataframe_to_tsfile_invalid_tag.tsfile" |
| try: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |
| |
| df = pd.DataFrame( |
| {"time": [i for i in range(10)], "value": [i * 1.0 for i in range(10)]} |
| ) |
| |
| with pytest.raises(ValueError, match="Tag column 'invalid' not found"): |
| dataframe_to_tsfile(df, tsfile_path, tag_column=["invalid"]) |
| finally: |
| if os.path.exists(tsfile_path): |
| os.remove(tsfile_path) |