blob: c2f9bedcd560ba76a084e67cf32816bc74c29d7f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""
Benchmark: write_arrow_batch vs write_dataframe.
Compares write throughput (rows/s) for:
- Arrow path : write_arrow_batch(pa.RecordBatch)
- DataFrame path: write_dataframe(pd.DataFrame)
Run:
python -m pytest tests/bench_write_arrow_vs_dataframe.py -v -s
python tests/bench_write_arrow_vs_dataframe.py [row_count [batch_size]]
"""
import os
import sys
import time
import numpy as np
import pandas as pd
import pyarrow as pa
import pytest
from tsfile import (
ColumnCategory,
ColumnSchema,
TableSchema,
TSDataType,
TsFileTableWriter,
)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
DEFAULT_ROW_COUNT = 100_000
DEFAULT_BATCH_SIZE = 8_192
DEFAULT_ROUNDS = 3
TABLE_NAME = "bench_table"
BENCH_FILE = "bench_write_arrow.tsfile"
SCHEMA = TableSchema(TABLE_NAME, [
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("v_i64", TSDataType.INT64, ColumnCategory.FIELD),
ColumnSchema("v_f64", TSDataType.DOUBLE, ColumnCategory.FIELD),
ColumnSchema("v_bool", TSDataType.BOOLEAN, ColumnCategory.FIELD),
ColumnSchema("v_str", TSDataType.STRING, ColumnCategory.FIELD),
])
# ---------------------------------------------------------------------------
# Data generation
# ---------------------------------------------------------------------------
def _make_numpy_data(row_count: int):
ts = np.arange(row_count, dtype="int64")
v_i64 = np.arange(row_count, dtype="int64")
v_f64 = np.arange(row_count, dtype="float64") * 1.5
v_bool = (np.arange(row_count) % 2 == 0)
v_str = [f"s{i}" for i in range(row_count)]
device = ["device0"] * row_count
return ts, device, v_i64, v_f64, v_bool, v_str
def _make_arrow_batches(row_count: int, batch_size: int):
ts, device, v_i64, v_f64, v_bool, v_str = _make_numpy_data(row_count)
batches = []
for start in range(0, row_count, batch_size):
end = min(start + batch_size, row_count)
batches.append(pa.record_batch({
"time": pa.array(ts[start:end], type=pa.timestamp("ns")),
"device": pa.array(device[start:end], type=pa.string()),
"v_i64": pa.array(v_i64[start:end], type=pa.int64()),
"v_f64": pa.array(v_f64[start:end], type=pa.float64()),
"v_bool": pa.array(v_bool[start:end], type=pa.bool_()),
"v_str": pa.array(v_str[start:end], type=pa.string()),
}))
return batches
def _make_dataframe_chunks(row_count: int, batch_size: int):
ts, device, v_i64, v_f64, v_bool, v_str = _make_numpy_data(row_count)
chunks = []
for start in range(0, row_count, batch_size):
end = min(start + batch_size, row_count)
chunks.append(pd.DataFrame({
"time": pd.Series(ts[start:end], dtype="int64"),
"device": device[start:end],
"v_i64": pd.Series(v_i64[start:end], dtype="int64"),
"v_f64": pd.Series(v_f64[start:end], dtype="float64"),
"v_bool": pd.Series(v_bool[start:end], dtype="bool"),
"v_str": v_str[start:end],
}))
return chunks
# ---------------------------------------------------------------------------
# Benchmark runners
# ---------------------------------------------------------------------------
def _write_arrow(file_path: str, batches):
schema = TableSchema(TABLE_NAME, [
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("v_i64", TSDataType.INT64, ColumnCategory.FIELD),
ColumnSchema("v_f64", TSDataType.DOUBLE, ColumnCategory.FIELD),
ColumnSchema("v_bool", TSDataType.BOOLEAN, ColumnCategory.FIELD),
ColumnSchema("v_str", TSDataType.STRING, ColumnCategory.FIELD),
])
with TsFileTableWriter(file_path, schema) as w:
for batch in batches:
w.write_arrow_batch(batch)
def _write_dataframe(file_path: str, chunks):
schema = TableSchema(TABLE_NAME, [
ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG),
ColumnSchema("v_i64", TSDataType.INT64, ColumnCategory.FIELD),
ColumnSchema("v_f64", TSDataType.DOUBLE, ColumnCategory.FIELD),
ColumnSchema("v_bool", TSDataType.BOOLEAN, ColumnCategory.FIELD),
ColumnSchema("v_str", TSDataType.STRING, ColumnCategory.FIELD),
])
with TsFileTableWriter(file_path, schema) as w:
for chunk in chunks:
w.write_dataframe(chunk)
def _run_timed(label: str, func, *args, rounds: int = DEFAULT_ROUNDS, row_count: int = 0):
times = []
for _ in range(rounds):
if os.path.exists(BENCH_FILE):
os.remove(BENCH_FILE)
t0 = time.perf_counter()
func(BENCH_FILE, *args)
times.append(time.perf_counter() - t0)
avg = sum(times) / len(times)
best = min(times)
rps = row_count / avg if avg > 0 else 0
print(f" {label:42s} avg={avg:.3f}s best={best:.3f}s {rps:>10.0f} rows/s")
return avg
# ---------------------------------------------------------------------------
# Main benchmark
# ---------------------------------------------------------------------------
def run_benchmark(
row_count: int = DEFAULT_ROW_COUNT,
batch_size: int = DEFAULT_BATCH_SIZE,
rounds: int = DEFAULT_ROUNDS,
):
print()
print(f"=== write benchmark: {row_count:,} rows, batch_size={batch_size}, rounds={rounds} ===")
# Pre-build data once (exclude data-preparation time from timing)
arrow_batches = _make_arrow_batches(row_count, batch_size)
df_chunks = _make_dataframe_chunks(row_count, batch_size)
df_avg = _run_timed(
"write_dataframe",
_write_dataframe, df_chunks,
rounds=rounds, row_count=row_count,
)
arrow_avg = _run_timed(
"write_arrow_batch",
_write_arrow, arrow_batches,
rounds=rounds, row_count=row_count,
)
print()
if arrow_avg > 0 and df_avg > 0:
ratio = df_avg / arrow_avg
if ratio >= 1.0:
print(f" Arrow is {ratio:.2f}x faster than DataFrame")
else:
print(f" DataFrame is {1/ratio:.2f}x faster than Arrow")
print()
if os.path.exists(BENCH_FILE):
os.remove(BENCH_FILE)
return df_avg, arrow_avg
# ---------------------------------------------------------------------------
# Pytest entry points
# ---------------------------------------------------------------------------
def test_bench_write_arrow_small():
"""Quick sanity check with small data (5 k rows)."""
run_benchmark(row_count=5_000, batch_size=1_024, rounds=2)
def test_bench_write_arrow_default():
"""Default benchmark (100 k rows)."""
run_benchmark(
row_count=DEFAULT_ROW_COUNT,
batch_size=DEFAULT_BATCH_SIZE,
rounds=DEFAULT_ROUNDS,
)
def test_bench_write_arrow_large():
"""Large benchmark (1 M rows)."""
run_benchmark(row_count=10_000_000, batch_size=32_384, rounds=3)
# ---------------------------------------------------------------------------
# Script entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
row_count = int(sys.argv[1]) if len(sys.argv) > 1 else DEFAULT_ROW_COUNT
batch_size = int(sys.argv[2]) if len(sys.argv) > 2 else DEFAULT_BATCH_SIZE
run_benchmark(row_count=row_count, batch_size=batch_size)