blob: e1f6c421ad89721fbccc5e07747752e62656f12d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
Benchmark: query_table_batch + read_arrow_batch vs query_table + read_data_frame.
Compares throughput and elapsed time when reading the same table data via
- Arrow path: query_table_batch(batch_size=N) then read_arrow_batch() in a loop
- DataFrame path: query_table() then result.next() + read_data_frame(N) in a loop
Run from project root or python/tests, e.g.:
python -m pytest tests/bench_batch_arrow_vs_dataframe.py -v -s
python tests/bench_batch_arrow_vs_dataframe.py # if run as script
"""
import os
import sys
import time
from os import remove
import pandas as pd
import pyarrow as pa
import pytest
from tsfile import (
ColumnSchema,
ColumnCategory,
TSDataType,
TableSchema,
TsFileReader,
TsFileTableWriter,
)
# Default benchmark size
DEFAULT_ROW_COUNT = 50_000
DEFAULT_BATCH_SIZE = 4096
DEFAULT_TIMED_ROUNDS = 3
BENCH_FILE = "bench_arrow_vs_dataframe.tsfile"
TABLE_NAME = "bench_table"
COLUMNS = ["device", "value1", "value2"]
def _ensure_bench_tsfile(file_path: str, row_count: int) -> None:
"""Create tsfile with table data if not present. Uses DataFrame for fast data generation."""
if os.path.exists(file_path):
remove(file_path)
# Build data with pandas/numpy (vectorized, much faster than row-by-row Tablet)
import numpy as np
df = pd.DataFrame({
"time": np.arange(row_count, dtype=np.int64),
"device": pd.Series([f"device" for i in range(row_count)]),
"value1": np.arange(0, row_count * 10, 10, dtype=np.int64),
"value2": np.arange(row_count, dtype=np.float64) * 1.5,
})
table = TableSchema(
TABLE_NAME,
[
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("value1", TSDataType.INT64, ColumnCategory.FIELD),
ColumnSchema("value2", TSDataType.DOUBLE, ColumnCategory.FIELD),
],
)
with TsFileTableWriter(file_path, table) as writer:
writer.write_dataframe(df)
def _read_via_arrow(file_path: str, batch_size: int, end_time: int) -> int:
"""Read all rows using query_table_batch + read_arrow_batch. Returns total rows."""
reader = TsFileReader(file_path)
result_set = reader.query_table_batch(
table_name=TABLE_NAME,
column_names=COLUMNS,
start_time=0,
end_time=end_time,
batch_size=batch_size,
)
total_rows = 0
try:
while True:
batch = result_set.read_arrow_batch()
if batch is None:
break
total_rows += len(batch)
finally:
result_set.close()
reader.close()
return total_rows
def _read_via_dataframe(file_path: str, batch_size: int, end_time: int) -> int:
"""Read all rows using query_table + next + read_data_frame. Returns total rows."""
reader = TsFileReader(file_path)
result_set = reader.query_table(
TABLE_NAME,
COLUMNS,
start_time=0,
end_time=end_time,
)
total_rows = 0
try:
while result_set.next():
df = result_set.read_data_frame(max_row_num=batch_size)
if df is None or len(df) == 0:
break
total_rows += len(df)
finally:
result_set.close()
reader.close()
return total_rows
def _run_timed(name: str, func, *args, rounds: int = DEFAULT_TIMED_ROUNDS):
times = []
for _ in range(rounds):
start = time.perf_counter()
n = func(*args)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg = sum(times) / len(times)
total_rows = n
rows_per_sec = total_rows / avg if avg > 0 else 0
print(f" {name}: {avg:.3f}s avg ({min(times):.3f}s min) rows={total_rows} {rows_per_sec:.0f} rows/s")
return avg, total_rows
def run_benchmark(
row_count: int = DEFAULT_ROW_COUNT,
batch_size: int = DEFAULT_BATCH_SIZE,
timed_rounds: int = DEFAULT_TIMED_ROUNDS,
file_path: str = BENCH_FILE,
):
_ensure_bench_tsfile(file_path, row_count)
end_time = row_count + 1
print(f"Benchmark: {row_count} rows, batch_size={batch_size}, timed_rounds={timed_rounds}")
df_avg, df_rows = _run_timed(
"query_table + read_data_frame",
_read_via_dataframe,
file_path,
batch_size,
end_time,
rounds=timed_rounds,
)
arrow_avg, arrow_rows = _run_timed(
"query_table_batch + read_arrow_batch",
_read_via_arrow,
file_path,
batch_size,
end_time,
rounds=timed_rounds,
)
print()
if df_avg > 0:
speedup = arrow_avg / df_avg
print(f" Arrow vs DataFrame time ratio: {speedup:.2f}x ({'Arrow faster' if speedup < 1 else 'DataFrame faster'})")
assert df_rows == row_count, f"DataFrame path row count {df_rows} != {row_count}"
assert arrow_rows == row_count, f"Arrow path row count {arrow_rows} != {row_count}"
print()
return df_avg, arrow_avg
def test_bench_arrow_vs_dataframe_default():
"""Run benchmark with default size (quick sanity check)."""
run_benchmark(
row_count=5_000,
batch_size=1024,
timed_rounds=2,
)
def test_bench_arrow_vs_dataframe_medium():
"""Run benchmark with medium size."""
run_benchmark(
row_count=DEFAULT_ROW_COUNT,
batch_size=DEFAULT_BATCH_SIZE,
timed_rounds=DEFAULT_TIMED_ROUNDS,
)
def test_bench_arrow_vs_dataframe_large():
run_benchmark(
row_count=2000_000,
batch_size=8192,
timed_rounds=3,
)
if __name__ == "__main__":
row_count = DEFAULT_ROW_COUNT
batch_size = DEFAULT_BATCH_SIZE
if len(sys.argv) > 1:
row_count = int(sys.argv[1])
if len(sys.argv) > 2:
batch_size = int(sys.argv[2])
run_benchmark(row_count=row_count, batch_size=batch_size)
# Clean up bench file when run as script (optional)
if os.path.exists(BENCH_FILE):
os.remove(BENCH_FILE)