| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| from typing import List, Optional |
| |
| import pyarrow as pa |
| from pyarrow import RecordBatch |
| |
| from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader |
| |
| |
| class DataEvolutionMergeReader(RecordBatchReader): |
| """ |
| This is a union reader which contains multiple inner readers, Each reader is responsible for reading one file. |
| |
| This reader, assembling multiple reader into one big and great reader, will merge the batches from all readers. |
| |
| For example, if rowOffsets is {0, 2, 0, 1, 2, 1} and fieldOffsets is {0, 0, 1, 1, 1, 0}, it means: |
| - The first field comes from batch0, and it is at offset 0 in batch0. |
| - The second field comes from batch2, and it is at offset 0 in batch2. |
| - The third field comes from batch0, and it is at offset 1 in batch0. |
| - The fourth field comes from batch1, and it is at offset 1 in batch1. |
| - The fifth field comes from batch2, and it is at offset 1 in batch2. |
| - The sixth field comes from batch1, and it is at offset 0 in batch1. |
| """ |
| |
| def __init__(self, row_offsets: List[int], field_offsets: List[int], readers: List[Optional[RecordBatchReader]]): |
| if row_offsets is None: |
| raise ValueError("Row offsets must not be null") |
| if field_offsets is None: |
| raise ValueError("Field offsets must not be null") |
| if len(row_offsets) != len(field_offsets): |
| raise ValueError("Row offsets and field offsets must have the same length") |
| if not row_offsets: |
| raise ValueError("Row offsets must not be empty") |
| if not readers or len(readers) < 1: |
| raise ValueError("Readers should be more than 0") |
| self.row_offsets = row_offsets |
| self.field_offsets = field_offsets |
| self.readers = readers |
| |
| def read_arrow_batch(self) -> Optional[RecordBatch]: |
| batches: List[Optional[RecordBatch]] = [None] * len(self.readers) |
| for i, reader in enumerate(self.readers): |
| if reader is not None: |
| batch = reader.read_arrow_batch() |
| if batch is None: |
| # all readers are aligned, as long as one returns null, the others will also have no data |
| return None |
| batches[i] = batch |
| # Assemble record batches from batches based on row_offsets and field_offsets |
| columns = [] |
| names = [] |
| for i in range(len(self.row_offsets)): |
| batch_index = self.row_offsets[i] |
| field_index = self.field_offsets[i] |
| if batches[batch_index] is not None: |
| column = batches[batch_index].column(field_index) |
| columns.append(column) |
| names.append(batches[batch_index].schema.names[field_index]) |
| if columns: |
| return pa.RecordBatch.from_arrays(columns, names) |
| return None |
| |
| def close(self) -> None: |
| try: |
| for reader in self.readers: |
| if reader is not None: |
| reader.close() |
| except Exception as e: |
| raise IOError("Failed to close inner readers") from e |