blob: cdce16baab9b0a0fb79a5a9ff609d13931601905 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Streaming read tests using v8_trips_8i3u1d (MOR, partitioned by city)."""
import pyarrow as pa
import pytest
from hudi import (
HudiFileGroupReader,
HudiQueryType,
HudiReadOptions,
HudiRecordBatchStream,
HudiTable,
)
def test_read_options_default():
options = HudiReadOptions()
assert options.filters == []
assert options.projection is None
assert options.hudi_options == {}
def test_read_options_typed_accessors_round_trip():
"""Every builder that writes into hudi_options must round-trip via its typed accessor."""
options = (
HudiReadOptions(filters=[("city", "=", "san_francisco")])
.with_projection(["uuid", "rider"])
.with_query_type(HudiQueryType.Incremental)
.with_batch_size(2048)
.with_as_of_timestamp("20240101000000000")
.with_start_timestamp("20240101000000000")
.with_end_timestamp("20240301000000000")
)
assert options.filters == [("city", "=", "san_francisco")]
assert options.projection == ["uuid", "rider"]
# Builders insert into hudi_options under the matching HudiReadConfig keys.
assert options.hudi_options["hoodie.read.stream.batch_size"] == "2048"
assert options.hudi_options["hoodie.read.as.of.timestamp"] == "20240101000000000"
assert options.hudi_options["hoodie.read.start.timestamp"] == "20240101000000000"
assert options.hudi_options["hoodie.read.end.timestamp"] == "20240301000000000"
# Typed accessors round-trip.
assert options.query_type() == HudiQueryType.Incremental
assert options.batch_size() == 2048
assert options.as_of_timestamp() == "20240101000000000"
assert options.start_timestamp() == "20240101000000000"
assert options.end_timestamp() == "20240301000000000"
assert "HudiReadOptions" in repr(options)
def test_read_options_query_type_bad_value_raises():
"""A bogus value left in hudi_options under the query-type key surfaces as an error."""
bogus = HudiReadOptions(hudi_options={"hoodie.read.query.type": "bogus"})
with pytest.raises(Exception):
bogus.query_type()
def test_read_options_batch_size_bad_value_raises():
"""Bad batch_size values: non-integers stuck in the bag surface at the accessor,
and `with_batch_size(0)` is rejected at the builder so callers catch the misuse
synchronously rather than at read time."""
bogus = HudiReadOptions(
hudi_options={"hoodie.read.stream.batch_size": "not_a_number"}
)
with pytest.raises(Exception):
bogus.batch_size()
with pytest.raises(Exception, match="must be > 0"):
HudiReadOptions().with_batch_size(0)
def test_read_options_with_filters_validates_at_build_time():
"""`with_filters` parses + cardinality-validates upfront, so a bad operator or
empty IN value list raises here rather than at read time."""
with pytest.raises(Exception):
HudiReadOptions().with_filters([("col", "BAD_OP", "x")])
with pytest.raises(Exception, match="at least one value"):
HudiReadOptions().with_filters([("col", "IN", "")])
def test_read_options_bulk_setters():
"""The plural builders (with_filters, with_hudi_options) chain identically to the singular ones."""
options = (
HudiReadOptions()
.with_filters([("city", "=", "x"), ("rider", "=", "rider-A")])
.with_hudi_options({"k1": "v1", "k2": "v2"})
)
assert options.filters == [("city", "=", "x"), ("rider", "=", "rider-A")]
assert options.hudi_options["k1"] == "v1"
assert options.hudi_options["k2"] == "v2"
def test_read_snapshot_stream_yields_all_rows(v8_trips_table):
table = HudiTable(v8_trips_table)
stream = table.read_stream()
assert isinstance(stream, HudiRecordBatchStream)
batches = list(stream)
assert len(batches) > 0
t = pa.Table.from_batches(batches)
assert t.num_rows == 6
def test_read_snapshot_stream_with_partition_filter(v8_trips_table):
table = HudiTable(v8_trips_table)
options = HudiReadOptions(filters=[("city", "=", "san_francisco")])
batches = list(table.read_stream(options))
t = pa.Table.from_batches(batches)
assert t.num_rows == 4
def test_read_snapshot_stream_is_single_use(v8_trips_table):
table = HudiTable(v8_trips_table)
stream = table.read_stream()
first = list(stream)
assert len(first) > 0
# Re-iterating an exhausted stream yields nothing
assert list(stream) == []
def test_read_file_slice_stream_file_group_reader(v8_trips_table):
table = HudiTable(v8_trips_table)
file_slices = table.get_file_slices(
HudiReadOptions(filters=[("city", "=", "san_francisco")])
)
reader = HudiFileGroupReader(v8_trips_table)
batches = list(reader.read_file_slice_stream(file_slices[0]))
t = pa.Table.from_batches(batches)
assert t.num_rows == 4
def test_read_file_slice_from_paths_stream(v8_trips_table):
table = HudiTable(v8_trips_table)
file_slices = table.get_file_slices(
HudiReadOptions(filters=[("city", "=", "san_francisco")])
)
reader = HudiFileGroupReader(v8_trips_table)
base_path = file_slices[0].base_file_relative_path()
batches = list(reader.read_file_slice_from_paths_stream(base_path, []))
t = pa.Table.from_batches(batches)
assert t.num_rows == 4
def test_read_snapshot_stream_with_batch_size(v8_trips_table):
table = HudiTable(v8_trips_table)
options = HudiReadOptions().with_batch_size(1)
batches = list(table.read_stream(options))
t = pa.Table.from_batches(batches)
assert t.num_rows == 6