| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| |
| from typing import List, Optional |
| |
| from pypaimon.api import ReadBuilder, PredicateBuilder, TableRead, TableScan, Predicate |
| from pypaimon.pynative.common.data_field import DataField |
| from pypaimon.pynative.common.predicate import PredicateImpl |
| from pypaimon.pynative.common.predicate_builder import PredicateBuilderImpl |
| from pypaimon.pynative.read.table_scan_impl import TableScanImpl |
| from pypaimon.pynative.read.table_read_impl import TableReadImpl |
| |
| |
| class ReadBuilderImpl(ReadBuilder): |
| """Implementation of ReadBuilder for native Python reading.""" |
| |
| def __init__(self, table): |
| from pypaimon.pynative.table.file_store_table import FileStoreTable |
| |
| self.table: FileStoreTable = table |
| self._predicate: Optional[Predicate] = None |
| self._projection: Optional[List[str]] = None |
| self._limit: Optional[int] = None |
| |
| def with_filter(self, predicate: PredicateImpl) -> 'ReadBuilder': |
| self._predicate = predicate |
| return self |
| |
| def with_projection(self, projection: List[str]) -> 'ReadBuilder': |
| self._projection = projection |
| return self |
| |
| def with_limit(self, limit: int) -> 'ReadBuilder': |
| self._limit = limit |
| return self |
| |
| def new_scan(self) -> TableScan: |
| return TableScanImpl( |
| table=self.table, |
| predicate=self._predicate, |
| limit=self._limit, |
| read_type=self.read_type() |
| ) |
| |
| def new_read(self) -> TableRead: |
| return TableReadImpl( |
| table=self.table, |
| predicate=self._predicate, |
| read_type=self.read_type() |
| ) |
| |
| def new_predicate_builder(self) -> PredicateBuilder: |
| return PredicateBuilderImpl(self.read_type()) |
| |
| def read_type(self) -> List[DataField]: |
| table_fields = self.table.fields |
| |
| if not self._projection: |
| return table_fields |
| else: |
| field_map = {field.name: field for field in self.table.fields} |
| return [field_map[name] for name in self._projection if name in field_map] |