blob: 1c71e27fffc069fa7b53a92d9073675d6e0c47f0 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import collections
from typing import Callable, List, Optional
from pypaimon.read.reader.iface.record_iterator import RecordIterator
from pypaimon.read.reader.iface.record_reader import RecordReader
class ConcatRecordReader(RecordReader):
def __init__(self, reader_suppliers: List[Callable]):
self.queue: collections.deque[Callable] = collections.deque(reader_suppliers)
self.current_reader: Optional[RecordReader] = None
def read_batch(self) -> Optional[RecordIterator]:
while True:
if self.current_reader is not None:
iterator = self.current_reader.read_batch()
if iterator is not None:
return iterator
self.current_reader.close()
self.current_reader = None
elif self.queue:
supplier = self.queue.popleft()
self.current_reader = supplier()
else:
return None
def close(self) -> None:
if self.current_reader:
self.current_reader.close()
self.current_reader = None
self.queue.clear()