blob: ff86db7f5508fa3a99986a31857f31e985a35430 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import pytest
import os
from tsfile import TsFileWriter, TsFileReader, ColumnCategory
from tsfile import TimeseriesSchema, DeviceSchema, ResultSetMetaData
from tsfile import ColumnSchema, TableSchema
from tsfile import TSDataType
from tsfile import Tablet, RowRecord, Field
from tsfile import TsFileTableWriter
def test_row_record_write_and_read():
try:
writer = TsFileWriter("record_write_and_read.tsfile")
timeseries = TimeseriesSchema("level1", TSDataType.INT64)
writer.register_timeseries("root.device1", timeseries)
writer.register_timeseries("root.device1", TimeseriesSchema("level2", TSDataType.DOUBLE))
writer.register_timeseries("root.device1", TimeseriesSchema("level3", TSDataType.INT32))
max_row_num = 1000
for i in range(max_row_num):
row = RowRecord("root.device1", i,
[Field("level1", i + 1, TSDataType.INT64),
Field("level2", i * 1.1, TSDataType.DOUBLE),
Field("level3", i * 2, TSDataType.INT32)])
writer.write_row_record(row)
writer.close()
reader = TsFileReader("record_write_and_read.tsfile")
result = reader.query_timeseries("root.device1", ["level1", "level2"], 10, 100)
i = 10
while result.next():
print(result.get_value_by_index(1))
print(reader.get_active_query_result())
result.close()
print(reader.get_active_query_result())
reader.close()
finally:
if os.path.exists("record_write_and_read.tsfile"):
os.remove("record_write_and_read.tsfile")
def test_tablet_write_and_read():
try:
writer = TsFileWriter("tablet_write_and_read.tsfile")
measurement_num = 30
for i in range(measurement_num):
writer.register_timeseries("root.device1", TimeseriesSchema('level' + str(i), TSDataType.INT64))
max_row_num = 10000
tablet_row_num = 1000
tablet_num = 0
for i in range(max_row_num // tablet_row_num):
tablet = Tablet([f'level{j}' for j in range(measurement_num)],
[TSDataType.INT64 for _ in range(measurement_num)], tablet_row_num)
tablet.set_table_name("root.device1")
for row in range(tablet_row_num):
tablet.add_timestamp(row, row + tablet_num * tablet_row_num)
for col in range(measurement_num):
tablet.add_value_by_index(col, row, row + tablet_num * tablet_row_num)
writer.write_tablet(tablet)
tablet_num += 1
writer.close()
reader = TsFileReader("tablet_write_and_read.tsfile")
result = reader.query_timeseries("root.device1", ["level0"], 0, 1000000)
row_num = 0
print(result.get_result_column_info())
while result.next():
assert result.is_null_by_index(1) == False
assert result.get_value_by_index(1) == row_num
assert result.get_value_by_name("level0") == row_num
row_num = row_num + 1
assert row_num == max_row_num
reader.close()
with pytest.raises(Exception):
result.next()
finally:
if os.path.exists("tablet_write_and_read.tsfile"):
os.remove("tablet_write_and_read.tsfile")
def test_table_writer():
table = TableSchema("test_table",
[ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value", TSDataType.DOUBLE, ColumnCategory.FIELD)])
try:
with TsFileTableWriter("table_write.tsfile", table) as writer:
tablet = Tablet(["device", "value"],
[TSDataType.STRING, TSDataType.DOUBLE], 100)
for i in range(100):
tablet.add_timestamp(i, i)
tablet.add_value_by_name("device", i, "device" + str(i))
tablet.add_value_by_index(1, i, i * 100.0)
writer.write_table(tablet)
# with TsFileReader("table_write.tsfile") as reader:
# pass
# with reader.query_table("test_table", ["device", "value"],
# 10, 50) as result:
# while result.next():
# print(result.get_value_by_name("device"))
# print(result.get_value_by_name("value"))
finally:
if os.path.exists("table_write.tsfile"):
os.remove("table_write.tsfile")
def test_query_result_detach_from_reader():
try:
## Prepare data
writer = TsFileWriter("query_result_detach_from_reader.tsfile")
timeseries = TimeseriesSchema("level1", TSDataType.INT64)
writer.register_timeseries("root.device1", timeseries)
max_row_num = 1000
for i in range(max_row_num):
row = RowRecord("root.device1", i,
[Field("level1", i, TSDataType.INT64)])
writer.write_row_record(row)
writer.close()
reader = TsFileReader("query_result_detach_from_reader.tsfile")
result1 = reader.query_timeseries("root.device1", ["level1"], 0, 100)
assert 1 == len(reader.get_active_query_result())
result2 = reader.query_timeseries("root.device1", ["level1"], 20, 100)
assert 2 == len(reader.get_active_query_result())
result1.close()
assert 1 == len(reader.get_active_query_result())
reader.close()
with pytest.raises(Exception):
result1.next()
with pytest.raises(Exception):
result2.next()
finally:
if os.path.exists("query_result_detach_from_reader.tsfile"):
os.remove("query_result_detach_from_reader.tsfile")