blob: 980e7e5dd9c9e0daf504fe3d1a0d478b32b6800e [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Optional
from pypaimon.pynative.common.row.internal_row import InternalRow
from pypaimon.pynative.common.row.key_value import KeyValue
from pypaimon.pynative.common.row.offset_row import OffsetRow
from pypaimon.pynative.common.row.row_kind import RowKind
from pypaimon.pynative.reader.core.file_record_iterator import FileRecordIterator
from pypaimon.pynative.reader.core.file_record_reader import FileRecordReader
class KeyValueWrapReader(FileRecordReader[KeyValue]):
"""
RecordReader for reading KeyValue data files.
Corresponds to the KeyValueDataFileRecordReader in Java version.
"""
def __init__(self, wrapped_reader: FileRecordReader[InternalRow],
level, key_arity, value_arity):
self.wrapped_reader = wrapped_reader
self.level = level
self.key_arity = key_arity
self.value_arity = value_arity
def read_batch(self) -> Optional[FileRecordIterator[KeyValue]]:
iterator = self.wrapped_reader.read_batch()
if iterator is None:
return None
return KeyValueWrapIterator(iterator, self.key_arity, self.value_arity, self.level)
def close(self):
self.wrapped_reader.close()
class KeyValueWrapIterator(FileRecordIterator[KeyValue]):
"""
An Iterator that converts an PrimaryKey InternalRow into a KeyValue
"""
def __init__(
self,
iterator: FileRecordIterator,
key_arity: int,
value_arity: int,
level: int
):
self.iterator = iterator
self.key_arity = key_arity
self.value_arity = value_arity
self.level = level
self.reused_key = OffsetRow(None, 0, key_arity)
self.reused_value = OffsetRow(None, key_arity + 2, value_arity)
def next(self) -> Optional[KeyValue]:
row = self.iterator.next()
if row is None:
return None
self.reused_key.replace(row)
self.reused_value.replace(row)
sequence_number = row.get_field(self.key_arity)
value_kind = RowKind(row.get_field(self.key_arity + 1))
return KeyValue(
key=self.reused_key,
sequence_number=sequence_number,
value_kind=value_kind,
value=self.reused_value
).set_level(self.level)
def returned_position(self) -> int:
return self.iterator.returned_position()
def file_path(self) -> str:
return self.iterator.file_path()
def release_batch(self):
self.iterator.release_batch()