blob: 75bd6c4c1d1993a8c5eadc18d0d5291fe7581f1a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
from datetime import date
import numpy as np
import pandas as pd
import pytest
from tsfile import ColumnSchema, TableSchema, TSDataType, ColumnCategory
from tsfile import Tablet
from tsfile import TsFileTableWriter, TsFileReader
def test_batch_read_arrow_basic():
file_path = "test_batch_arrow_basic.tsfile"
table = TableSchema(
"test_table",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value1", TSDataType.INT64, ColumnCategory.FIELD),
ColumnSchema("value2", TSDataType.DOUBLE, ColumnCategory.FIELD),
],
)
try:
if os.path.exists(file_path):
os.remove(file_path)
print("t1")
with TsFileTableWriter(file_path, table) as writer:
tablet = Tablet(
["device", "value1", "value2"],
[TSDataType.STRING, TSDataType.INT64, TSDataType.DOUBLE],
1000,
)
for i in range(1000):
tablet.add_timestamp(i, i)
tablet.add_value_by_name("device", i, f"device_{i}")
tablet.add_value_by_name("value1", i, i * 10)
tablet.add_value_by_name("value2", i, i * 1.5)
writer.write_table(tablet)
try:
import pyarrow as pa
except ImportError:
pytest.skip("pyarrow is not installed")
reader = TsFileReader(file_path)
result_set = reader.query_table_batch(
table_name="test_table",
column_names=["device", "value1", "value2"],
start_time=0,
end_time=1000,
batch_size=256,
)
total_rows = 0
batch_count = 0
while True:
table = result_set.read_arrow_batch()
if table is None:
break
batch_count += 1
assert isinstance(table, pa.Table)
assert len(table) > 0
total_rows += len(table)
column_names = table.column_names
assert "time" in column_names
assert "device" in column_names
assert "value1" in column_names
assert "value2" in column_names
assert total_rows == 1000
assert batch_count > 0
result_set.close()
reader.close()
finally:
if os.path.exists(file_path):
os.remove(file_path)
def test_batch_read_arrow_compare_with_dataframe():
file_path = "test_batch_arrow_compare.tsfile"
table = TableSchema(
"test_table",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value1", TSDataType.INT32, ColumnCategory.FIELD),
ColumnSchema("value2", TSDataType.FLOAT, ColumnCategory.FIELD),
ColumnSchema("value3", TSDataType.BOOLEAN, ColumnCategory.FIELD),
],
)
try:
if os.path.exists(file_path):
os.remove(file_path)
with TsFileTableWriter(file_path, table) as writer:
tablet = Tablet(
["device", "value1", "value2", "value3"],
[TSDataType.STRING, TSDataType.INT32, TSDataType.FLOAT, TSDataType.BOOLEAN],
500,
)
for i in range(500):
tablet.add_timestamp(i, i)
tablet.add_value_by_name("device", i, f"device_{i}")
tablet.add_value_by_name("value1", i, i * 2)
tablet.add_value_by_name("value2", i, i * 1.1)
tablet.add_value_by_name("value3", i, i % 2 == 0)
writer.write_table(tablet)
try:
import pyarrow as pa
except ImportError:
pytest.skip("pyarrow is not installed")
reader1 = TsFileReader(file_path)
result_set1 = reader1.query_table_batch(
table_name="test_table",
column_names=["device", "value1", "value2", "value3"],
start_time=0,
end_time=500,
batch_size=100,
)
arrow_tables = []
while True:
table = result_set1.read_arrow_batch()
if table is None:
break
arrow_tables.append(table)
if arrow_tables:
combined_arrow_table = pa.concat_tables(arrow_tables)
df_arrow = combined_arrow_table.to_pandas()
else:
df_arrow = pd.DataFrame()
result_set1.close()
reader1.close()
reader2 = TsFileReader(file_path)
result_set2 = reader2.query_table(
table_name="test_table",
column_names=["device", "value1", "value2", "value3"],
start_time=0,
end_time=500,
)
df_traditional = result_set2.read_data_frame(max_row_num=1000)
result_set2.close()
reader2.close()
assert len(df_arrow) == len(df_traditional)
assert len(df_arrow) == 500
for col in ["time", "device", "value1", "value2", "value3"]:
assert col in df_arrow.columns
assert col in df_traditional.columns
df_arrow_sorted = df_arrow.sort_values("time").reset_index(drop=True)
df_traditional_sorted = df_traditional.sort_values("time").reset_index(drop=True)
for i in range(len(df_arrow_sorted)):
assert df_arrow_sorted.iloc[i]["time"] == df_traditional_sorted.iloc[i]["time"]
assert df_arrow_sorted.iloc[i]["device"] == df_traditional_sorted.iloc[i]["device"]
assert df_arrow_sorted.iloc[i]["value1"] == df_traditional_sorted.iloc[i]["value1"]
assert abs(df_arrow_sorted.iloc[i]["value2"] - df_traditional_sorted.iloc[i]["value2"]) < 1e-5
assert df_arrow_sorted.iloc[i]["value3"] == df_traditional_sorted.iloc[i]["value3"]
finally:
if os.path.exists(file_path):
os.remove(file_path)
def test_batch_read_arrow_empty_result():
file_path = "test_batch_arrow_empty.tsfile"
table = TableSchema(
"test_table",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value", TSDataType.INT64, ColumnCategory.FIELD),
],
)
try:
if os.path.exists(file_path):
os.remove(file_path)
with TsFileTableWriter(file_path, table) as writer:
tablet = Tablet(
["device", "value"],
[TSDataType.STRING, TSDataType.INT64],
10,
)
for i in range(10):
tablet.add_timestamp(i, i)
tablet.add_value_by_name("device", i, f"device_{i}")
tablet.add_value_by_name("value", i, i)
writer.write_table(tablet)
try:
import pyarrow as pa
except ImportError:
pytest.skip("pyarrow is not installed")
reader = TsFileReader(file_path)
result_set = reader.query_table_batch(
table_name="test_table",
column_names=["device", "value"],
start_time=1000,
end_time=2000,
batch_size=100,
)
table = result_set.read_arrow_batch()
assert table is None
result_set.close()
reader.close()
finally:
if os.path.exists(file_path):
os.remove(file_path)
def test_batch_read_arrow_time_range():
file_path = "test_batch_arrow_time_range.tsfile"
table = TableSchema(
"test_table",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value", TSDataType.INT64, ColumnCategory.FIELD),
],
)
try:
if os.path.exists(file_path):
os.remove(file_path)
with TsFileTableWriter(file_path, table) as writer:
tablet = Tablet(
["device", "value"],
[TSDataType.STRING, TSDataType.INT64],
1000,
)
for i in range(1000):
tablet.add_timestamp(i, i)
tablet.add_value_by_name("device", i, f"device_{i}")
tablet.add_value_by_name("value", i, i)
writer.write_table(tablet)
try:
import pyarrow as pa
except ImportError:
pytest.skip("pyarrow is not installed")
reader = TsFileReader(file_path)
result_set = reader.query_table_batch(
table_name="test_table",
column_names=["device", "value"],
start_time=100,
end_time=199,
batch_size=50,
)
total_rows = 0
while True:
table = result_set.read_arrow_batch()
if table is None:
break
total_rows += len(table)
df = table.to_pandas()
assert df["time"].min() >= 100
assert df["time"].max() <= 199
assert total_rows == 100
result_set.close()
reader.close()
finally:
if os.path.exists(file_path):
os.remove(file_path)
def test_batch_read_arrow_all_datatypes():
file_path = "test_batch_arrow_all_datatypes.tsfile"
table = TableSchema(
"test_table",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("bool_val", TSDataType.BOOLEAN, ColumnCategory.FIELD),
ColumnSchema("int32_val", TSDataType.INT32, ColumnCategory.FIELD),
ColumnSchema("int64_val", TSDataType.INT64, ColumnCategory.FIELD),
ColumnSchema("float_val", TSDataType.FLOAT, ColumnCategory.FIELD),
ColumnSchema("double_val", TSDataType.DOUBLE, ColumnCategory.FIELD),
ColumnSchema("string_val", TSDataType.STRING, ColumnCategory.FIELD),
ColumnSchema("date_val", TSDataType.DATE, ColumnCategory.FIELD),
],
)
try:
if os.path.exists(file_path):
os.remove(file_path)
with TsFileTableWriter(file_path, table) as writer:
tablet = Tablet(
["device", "bool_val", "int32_val", "int64_val", "float_val", "double_val", "string_val", "date_val"],
[
TSDataType.STRING,
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.STRING,
TSDataType.DATE,
],
200,
)
for i in range(200):
tablet.add_timestamp(i, i)
tablet.add_value_by_name("device", i, f"device_{i}")
tablet.add_value_by_name("bool_val", i, i % 2 == 0)
tablet.add_value_by_name("int32_val", i, i * 2)
tablet.add_value_by_name("int64_val", i, i * 3)
tablet.add_value_by_name("float_val", i, i * 1.1)
tablet.add_value_by_name("double_val", i, i * 2.2)
tablet.add_value_by_name("string_val", i, f"string_{i}")
tablet.add_value_by_name("date_val", i, date(2025, 1, (i % 28) + 1))
writer.write_table(tablet)
try:
import pyarrow as pa
except ImportError:
pytest.skip("pyarrow is not installed")
reader = TsFileReader(file_path)
result_set = reader.query_table_batch(
table_name="test_table",
column_names=["device", "bool_val", "int32_val", "int64_val", "float_val", "double_val", "string_val", "date_val"],
start_time=0,
end_time=200,
batch_size=50,
)
total_rows = 0
while True:
table = result_set.read_arrow_batch()
if table is None:
break
total_rows += len(table)
df = table.to_pandas()
assert "time" in df.columns
assert "device" in df.columns
assert "bool_val" in df.columns
assert "int32_val" in df.columns
assert "int64_val" in df.columns
assert "float_val" in df.columns
assert "double_val" in df.columns
assert "string_val" in df.columns
assert "date_val" in df.columns
assert total_rows == 200
result_set.close()
reader.close()
finally:
if os.path.exists(file_path):
os.remove(file_path)
def test_batch_read_arrow_no_pyarrow():
file_path = "test_batch_arrow_no_pyarrow.tsfile"
table = TableSchema(
"test_table",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value", TSDataType.INT64, ColumnCategory.FIELD),
],
)
try:
if os.path.exists(file_path):
os.remove(file_path)
with TsFileTableWriter(file_path, table) as writer:
tablet = Tablet(
["device", "value"],
[TSDataType.STRING, TSDataType.INT64],
10,
)
for i in range(10):
tablet.add_timestamp(i, i)
tablet.add_value_by_name("device", i, f"device_{i}")
tablet.add_value_by_name("value", i, i)
writer.write_table(tablet)
reader = TsFileReader(file_path)
result_set = reader.query_table_batch(
table_name="test_table",
column_names=["device", "value"],
start_time=0,
end_time=10,
batch_size=5,
)
result_set.close()
reader.close()
finally:
if os.path.exists(file_path):
os.remove(file_path)
if __name__ == "__main__":
os.chdir(os.path.dirname(os.path.abspath(__file__)))
pytest.main([
"test_batch_arrow.py",
"-s", "-v"
])