blob: 5b6b12dd18de2c20252537b4dd5d7737ee6564c3 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""HDFS FileIO benchmark: native (hdfs-native) vs pyarrow (libhdfs/JVM).
Compares throughput of common FileIO operations between the two backends
against the same HDFS cluster. Each backend is exercised via the FileIO
factory by toggling the `hdfs.client.impl` option.
Usage:
python -m pypaimon.benchmark.hdfs_io_bench \\
--warehouse hdfs://localhost:8020/bench \\
[--backend native|pyarrow|both] \\
[--write-size-mb 256] \\
[--list-files 1000] \\
[--read-iters 3]
Notes:
- `pyarrow` backend requires HADOOP_HOME + HADOOP_CONF_DIR + libhdfs.
- `native` backend requires `pip install pypaimon[hdfs]`.
- The benchmark writes/reads scratch files under <warehouse>/bench_<uuid>/
and removes them on exit.
"""
import argparse
import os
import sys
import time
import uuid
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from pypaimon.common.file_io import FileIO # noqa: E402
from pypaimon.common.options import Options # noqa: E402
def _build_file_io(warehouse: str, backend: str) -> FileIO:
opts = Options({"hdfs.client.impl": backend})
return FileIO.get(warehouse, opts)
def _human(seconds: float) -> str:
if seconds < 1e-3:
return f"{seconds * 1e6:.0f}us"
if seconds < 1:
return f"{seconds * 1e3:.1f}ms"
return f"{seconds:.2f}s"
def _bench_write(file_io: FileIO, root: str, size_mb: int) -> float:
payload = os.urandom(min(size_mb, 16) * 1024 * 1024)
path = f"{root}/write-{uuid.uuid4().hex[:8]}.bin"
t0 = time.perf_counter()
with file_io.new_output_stream(path) as stream:
written = 0
target = size_mb * 1024 * 1024
while written < target:
chunk = payload[: min(len(payload), target - written)]
n = stream.write(chunk)
written += n if isinstance(n, int) and n > 0 else len(chunk)
return time.perf_counter() - t0
def _bench_read(file_io: FileIO, path: str) -> float:
t0 = time.perf_counter()
with file_io.new_input_stream(path) as stream:
while True:
chunk = stream.read(8 * 1024 * 1024)
if not chunk:
break
return time.perf_counter() - t0
def _bench_list(file_io: FileIO, root: str, num_files: int) -> float:
scratch = f"{root}/list-{uuid.uuid4().hex[:8]}"
file_io.mkdirs(scratch)
try:
for i in range(num_files):
with file_io.new_output_stream(f"{scratch}/f-{i:06d}.txt") as s:
s.write(b"x")
t0 = time.perf_counter()
results = file_io.list_status(scratch)
_ = list(results)
return time.perf_counter() - t0
finally:
file_io.delete(scratch, recursive=True)
def run_one(backend: str, args) -> None:
print(f"\n=== backend={backend} ===")
try:
file_io = _build_file_io(args.warehouse, backend)
except Exception as e:
print(f" init failed: {e}")
return
bench_root = f"{args.warehouse.rstrip('/')}/bench_{uuid.uuid4().hex[:8]}"
file_io.mkdirs(bench_root)
try:
# Write
sample_path = f"{bench_root}/write-sample.bin"
with file_io.new_output_stream(sample_path) as stream:
payload = os.urandom(min(args.write_size_mb, 16) * 1024 * 1024)
written = 0
target = args.write_size_mb * 1024 * 1024
t0 = time.perf_counter()
while written < target:
chunk = payload[: min(len(payload), target - written)]
n = stream.write(chunk)
written += n if isinstance(n, int) and n > 0 else len(chunk)
write_elapsed = time.perf_counter() - t0
mb_per_s = args.write_size_mb / write_elapsed if write_elapsed else 0
print(f" write {args.write_size_mb}MB: "
f"{_human(write_elapsed)} ({mb_per_s:.1f} MB/s)")
# Read (warm)
read_times = []
for _ in range(args.read_iters):
read_times.append(_bench_read(file_io, sample_path))
avg_read = sum(read_times) / len(read_times)
rmb_per_s = args.write_size_mb / avg_read if avg_read else 0
print(f" read {args.write_size_mb}MB (avg of {args.read_iters}): "
f"{_human(avg_read)} ({rmb_per_s:.1f} MB/s)")
# List
list_elapsed = _bench_list(file_io, bench_root, args.list_files)
print(f" list {args.list_files} files: {_human(list_elapsed)}")
finally:
try:
file_io.delete(bench_root, recursive=True)
except Exception as e:
print(f" cleanup failed: {e}")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--warehouse", required=True,
help="HDFS URI under which scratch files are created")
parser.add_argument("--backend", default="both",
choices=["native", "pyarrow", "both"])
parser.add_argument("--write-size-mb", type=int, default=128)
parser.add_argument("--list-files", type=int, default=1000)
parser.add_argument("--read-iters", type=int, default=3)
args = parser.parse_args()
backends = ["native", "pyarrow"] if args.backend == "both" else [args.backend]
for backend in backends:
run_one(backend, args)
if __name__ == "__main__":
main()