blob: 22994d4bb28ad7566c8b67bd8fef98cb8311a00e [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Optional
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
from pypaimon.read.plan import Plan
from pypaimon.read.scanner.empty_starting_scanner import EmptyStartingScanner
from pypaimon.read.scanner.full_starting_scanner import FullStartingScanner
from pypaimon.read.scanner.incremental_starting_scanner import \
IncrementalStartingScanner
from pypaimon.read.scanner.starting_scanner import StartingScanner
from pypaimon.snapshot.snapshot_manager import SnapshotManager
class TableScan:
"""Implementation of TableScan for native Python reading."""
def __init__(self, table, predicate: Optional[Predicate], limit: Optional[int]):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.predicate = predicate
self.limit = limit
self.starting_scanner = self._create_starting_scanner()
def plan(self) -> Plan:
return self.starting_scanner.scan()
def _create_starting_scanner(self) -> Optional[StartingScanner]:
options = self.table.options
if CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP in options:
ts = options[CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP].split(",")
if len(ts) != 2:
raise ValueError(
"The incremental-between-timestamp must specific start(exclusive) and end timestamp. But is: " +
options[CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP])
earliest_snapshot = SnapshotManager(self.table).try_get_earliest_snapshot()
latest_snapshot = SnapshotManager(self.table).get_latest_snapshot()
if earliest_snapshot is None or latest_snapshot is None:
return EmptyStartingScanner()
start_timestamp = int(ts[0])
end_timestamp = int(ts[1])
if start_timestamp >= end_timestamp:
raise ValueError(
"Ending timestamp %s should be >= starting timestamp %s." % (end_timestamp, start_timestamp))
if (start_timestamp == end_timestamp or start_timestamp > latest_snapshot.time_millis
or end_timestamp < earliest_snapshot.time_millis):
return EmptyStartingScanner()
return IncrementalStartingScanner.between_timestamps(self.table, self.predicate, self.limit,
start_timestamp, end_timestamp)
return FullStartingScanner(self.table, self.predicate, self.limit)
def with_shard(self, idx_of_this_subtask, number_of_para_subtasks) -> 'TableScan':
self.starting_scanner.with_shard(idx_of_this_subtask, number_of_para_subtasks)
return self