blob: b8d7c623b188d59c149de4690232bf297d85eced [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import List, Optional
from pypaimon.api import ReadBuilder, PredicateBuilder, TableRead, TableScan, Predicate
from pypaimon.pynative.common.data_field import DataField
from pypaimon.pynative.common.predicate import PredicateImpl
from pypaimon.pynative.common.predicate_builder import PredicateBuilderImpl
from pypaimon.pynative.read.table_scan_impl import TableScanImpl
from pypaimon.pynative.read.table_read_impl import TableReadImpl
class ReadBuilderImpl(ReadBuilder):
"""Implementation of ReadBuilder for native Python reading."""
def __init__(self, table):
from pypaimon.pynative.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self._predicate: Optional[Predicate] = None
self._projection: Optional[List[str]] = None
self._limit: Optional[int] = None
def with_filter(self, predicate: PredicateImpl) -> 'ReadBuilder':
self._predicate = predicate
return self
def with_projection(self, projection: List[str]) -> 'ReadBuilder':
self._projection = projection
return self
def with_limit(self, limit: int) -> 'ReadBuilder':
self._limit = limit
return self
def new_scan(self) -> TableScan:
return TableScanImpl(
table=self.table,
predicate=self._predicate,
limit=self._limit,
read_type=self.read_type()
)
def new_read(self) -> TableRead:
return TableReadImpl(
table=self.table,
predicate=self._predicate,
read_type=self.read_type()
)
def new_predicate_builder(self) -> PredicateBuilder:
return PredicateBuilderImpl(self.read_type())
def read_type(self) -> List[DataField]:
table_fields = self.table.fields
if not self._projection:
return table_fields
else:
field_map = {field.name: field for field in self.table.fields}
return [field_map[name] for name in self._projection if name in field_map]