blob: 6700325b6101d4399d59cbfb5cf4d7ded556bab5 [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Tests for the in-memory SystemReadBuilder / Scan / Read pipeline.
These exercise the duck-typed surface so callers can reach for
``rb.with_filter / with_projection / with_limit / new_scan / new_read``
exactly as they would on a regular data table.
"""
import types
import unittest
import pyarrow as pa
from pypaimon.common.identifier import Identifier
from pypaimon.schema.data_types import AtomicType, DataField, RowType
from pypaimon.table.system.system_table import SystemReadBuilder, SystemTable
from pypaimon.table.system.system_table_scan import SystemSplit, SystemTableScan
from pypaimon.table.system.system_table_read import SystemTableRead
_DUMMY_ROW_TYPE = RowType(False, [
DataField(0, "c1", AtomicType("STRING", nullable=False)),
DataField(1, "c2", AtomicType("BIGINT", nullable=False)),
])
class _DummySystemTable(SystemTable):
"""Returns a 3-row table for pipeline tests."""
def system_table_name(self) -> str:
return "dummy"
def row_type(self) -> RowType:
return _DUMMY_ROW_TYPE
def _build_arrow_table(self):
return pa.table({"c1": ["a", "b", "c"], "c2": [1, 2, 3]})
def _fake_base():
return types.SimpleNamespace(
identifier=Identifier.create("db", "t"),
file_io=object(),
table_path="/tmp/db/t",
)
class SystemReadPipelineTest(unittest.TestCase):
def setUp(self):
self.table = _DummySystemTable(_fake_base())
def _splits_and_read(self, rb):
return rb.new_read(), rb.new_scan().plan().splits()
def test_new_read_builder_returns_system_read_builder(self):
self.assertIsInstance(self.table.new_read_builder(), SystemReadBuilder)
def test_scan_produces_single_in_memory_split(self):
rb = self.table.new_read_builder()
splits = rb.new_scan().plan().splits()
self.assertEqual(1, len(splits))
self.assertIsInstance(splits[0], SystemSplit)
self.assertEqual(3, splits[0].row_count)
self.assertEqual([], splits[0].files)
self.assertIsNone(splits[0].partition)
self.assertEqual(-1, splits[0].bucket)
def test_default_read_returns_full_arrow_table(self):
rb = self.table.new_read_builder()
read, splits = self._splits_and_read(rb)
result = read.to_arrow(splits)
self.assertEqual(["c1", "c2"], result.schema.names)
self.assertEqual(3, result.num_rows)
self.assertEqual(["a", "b", "c"], result.column("c1").to_pylist())
self.assertEqual([1, 2, 3], result.column("c2").to_pylist())
def test_with_projection_drops_columns(self):
rb = self.table.new_read_builder().with_projection(["c1"])
read, splits = self._splits_and_read(rb)
result = read.to_arrow(splits)
self.assertEqual(["c1"], result.schema.names)
self.assertEqual(3, result.num_rows)
def test_projection_silently_skips_unknown_columns(self):
# Mirrors ReadBuilder.with_projection contract: unknown names
# are dropped rather than failing eagerly.
rb = self.table.new_read_builder().with_projection(["c2", "no_such"])
read, splits = self._splits_and_read(rb)
result = read.to_arrow(splits)
self.assertEqual(["c2"], result.schema.names)
def test_with_limit_truncates_rows(self):
rb = self.table.new_read_builder().with_limit(1)
read, splits = self._splits_and_read(rb)
result = read.to_arrow(splits)
self.assertEqual(1, result.num_rows)
self.assertEqual("a", result.column("c1")[0].as_py())
def test_filter_is_not_supported_yet(self):
rb = self.table.new_read_builder().with_filter(object())
read, splits = self._splits_and_read(rb)
with self.assertRaises(NotImplementedError):
read.to_arrow(splits)
def test_to_pandas_returns_dataframe(self):
rb = self.table.new_read_builder()
read, splits = self._splits_and_read(rb)
df = read.to_pandas(splits)
self.assertEqual(3, len(df))
self.assertEqual(["c1", "c2"], list(df.columns))
def test_to_record_batch_iterator_yields_batches(self):
rb = self.table.new_read_builder()
read, splits = self._splits_and_read(rb)
batches = list(read.to_record_batch_iterator(splits))
self.assertEqual(3, sum(b.num_rows for b in batches))
def test_to_iterator_yields_one_dict_per_row(self):
rb = self.table.new_read_builder()
read, splits = self._splits_and_read(rb)
rows = list(read.to_iterator(splits))
self.assertEqual(3, len(rows))
self.assertEqual({"c1", "c2"}, set(rows[0].keys()))
self.assertEqual("a", rows[0]["c1"])
def test_read_type_reflects_current_projection(self):
rb = self.table.new_read_builder()
self.assertEqual(["c1", "c2"], [f.name for f in rb.read_type()])
rb.with_projection(["c2"])
self.assertEqual(["c2"], [f.name for f in rb.read_type()])
def test_new_scan_returns_system_table_scan(self):
rb = self.table.new_read_builder()
self.assertIsInstance(rb.new_scan(), SystemTableScan)
def test_new_read_returns_system_table_read(self):
rb = self.table.new_read_builder()
self.assertIsInstance(rb.new_read(), SystemTableRead)
if __name__ == "__main__":
unittest.main()