blob: e4e7d0f24d00c320ca21e8b7de34cd15aaead67b [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
import tempfile
import pandas as pd
import pytest
from tsfile import (
ColumnCategory,
ColumnSchema,
Field,
RowRecord,
TableSchema,
TimeseriesSchema,
TsFileReader,
TsFileTableWriter,
TsFileWriter,
)
from tsfile import TSDataType
from tsfile.schema import (
BoolTimeseriesStatistic,
DeviceID,
IntTimeseriesStatistic,
StringTimeseriesStatistic,
)
def test_get_all_devices_segments():
path = os.path.join(tempfile.gettempdir(), "py_reader_metadata_details.tsfile")
try:
os.unlink(path)
except OSError:
pass
device = "root.sg.py_details"
writer = TsFileWriter(path)
writer.register_timeseries(device, TimeseriesSchema("m", TSDataType.INT32))
writer.write_row_record(RowRecord(device, 1, [Field("m", 1, TSDataType.INT32)]))
writer.close()
reader = TsFileReader(path)
try:
details = reader.get_all_devices()
assert len(details) == 1
d0 = details[0]
assert d0.path == device
assert d0.table_name == "root.sg"
assert d0.segments == ("root.sg", "py_details")
grp = reader.get_timeseries_metadata(None)[device]
assert grp.table_name == "root.sg"
assert grp.segments == ("root.sg", "py_details")
assert len(grp.timeseries) == 1
finally:
reader.close()
try:
os.unlink(path)
except OSError:
pass
def test_get_all_devices_and_timeseries_metadata_statistic():
path = os.path.join(tempfile.gettempdir(), "py_reader_metadata_stat.tsfile")
try:
os.unlink(path)
except OSError:
pass
device = "root.sg.py_meta"
writer = TsFileWriter(path)
writer.register_timeseries(device, TimeseriesSchema("m_int", TSDataType.INT32))
for row in range(3):
v = (row + 1) * 10
writer.write_row_record(
RowRecord(
device,
row + 1,
[Field("m_int", v, TSDataType.INT32)],
)
)
writer.close()
reader = TsFileReader(path)
try:
devices = reader.get_all_devices()
assert len(devices) == 1
assert devices[0].path == device
meta_all = reader.get_timeseries_metadata(None)
assert list(meta_all.keys()) == [device]
grp = meta_all[device]
assert grp.table_name == "root.sg"
assert grp.segments == ("root.sg", "py_meta")
series = grp.timeseries
assert len(series) == 1
m = series[0]
assert m.measurement_name == "m_int"
assert m.data_type == TSDataType.INT32
st = m.statistic
assert isinstance(st, IntTimeseriesStatistic)
assert st.has_statistic
assert st.row_count == 3
assert st.start_time == 1
assert st.end_time == 3
assert st.sum == pytest.approx(60.0)
assert st.min_int64 == 10
assert st.max_int64 == 30
assert st.first_int64 == 10
assert st.last_int64 == 30
assert reader.get_timeseries_metadata([]) == {}
sub = reader.get_timeseries_metadata([DeviceID(device, None, ())])
assert device in sub
assert len(sub[device].timeseries) == 1
sub_str = reader.get_timeseries_metadata([device])
assert device in sub_str
finally:
reader.close()
try:
os.unlink(path)
except OSError:
pass
def test_get_timeseries_metadata_boolean_statistic():
path = os.path.join(tempfile.gettempdir(), "py_reader_metadata_bool.tsfile")
try:
os.unlink(path)
except OSError:
pass
device = "root.sg.py_bool"
writer = TsFileWriter(path)
writer.register_timeseries(device, TimeseriesSchema("m_b", TSDataType.BOOLEAN))
for row, b in enumerate([True, False, True]):
writer.write_row_record(
RowRecord(
device,
row + 1,
[Field("m_b", b, TSDataType.BOOLEAN)],
)
)
writer.close()
reader = TsFileReader(path)
try:
meta_all = reader.get_timeseries_metadata(None)
st = meta_all[device].timeseries[0].statistic
assert isinstance(st, BoolTimeseriesStatistic)
assert st.has_statistic
assert st.sum == pytest.approx(2.0)
assert st.first_bool is True
assert st.last_bool is True
finally:
reader.close()
try:
os.unlink(path)
except OSError:
pass
def test_get_timeseries_metadata_string_statistic():
path = os.path.join(tempfile.gettempdir(), "py_reader_metadata_str.tsfile")
try:
os.unlink(path)
except OSError:
pass
device = "root.sg.py_str"
writer = TsFileWriter(path)
writer.register_timeseries(device, TimeseriesSchema("m_str", TSDataType.STRING))
for row, s in enumerate(["aa", "cc", "bb"]):
writer.write_row_record(
RowRecord(
device,
row + 1,
[Field("m_str", s, TSDataType.STRING)],
)
)
writer.close()
reader = TsFileReader(path)
try:
meta_all = reader.get_timeseries_metadata(None)
m = meta_all[device].timeseries[0]
assert m.measurement_name == "m_str"
assert m.data_type == TSDataType.STRING
st = m.statistic
assert isinstance(st, StringTimeseriesStatistic)
assert st.has_statistic
assert st.str_min == "aa"
assert st.str_max == "cc"
assert st.str_first == "aa"
assert st.str_last == "bb"
finally:
reader.close()
try:
os.unlink(path)
except OSError:
pass
def test_get_timeseries_metadata_table_timeline_statistic_keeps_null_rows():
path = os.path.join(
tempfile.gettempdir(), "py_reader_metadata_table_timeline.tsfile"
)
try:
os.unlink(path)
except OSError:
pass
schema = TableSchema(
"weather",
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD),
ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD),
],
)
df = pd.DataFrame(
{
"time": [0, 1],
"device": ["device_a", "device_a"],
"temperature": [float("nan"), 21.0],
"humidity": [50.0, 51.0],
}
)
with TsFileTableWriter(path, schema) as writer:
writer.write_dataframe(df)
reader = TsFileReader(path)
try:
meta_all = reader.get_timeseries_metadata(None)
group = meta_all[next(iter(meta_all))]
by_name = {item.measurement_name: item for item in group.timeseries}
temperature = by_name["temperature"]
assert temperature.statistic.row_count == 1
assert temperature.statistic.start_time == 1
assert temperature.statistic.end_time == 1
assert temperature.timeline_statistic.row_count == 2
assert temperature.timeline_statistic.start_time == 0
assert temperature.timeline_statistic.end_time == 1
humidity = by_name["humidity"]
assert humidity.statistic.row_count == 2
assert humidity.timeline_statistic.row_count == 2
finally:
reader.close()
try:
os.unlink(path)
except OSError:
pass