blob: 953384cc7dc1b987f27922331625115e53aad6c6 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Any, Dict, Iterator, List, Optional
import pandas
import pyarrow
from pypaimon.common.predicate import Predicate
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.read.split import Split
from pypaimon.read.split_read import (DataEvolutionSplitRead,
MergeFileSplitRead, RawFileSplitRead,
SplitRead)
from pypaimon.schema.data_types import DataField, PyarrowFieldParser
from pypaimon.table.row.offset_row import OffsetRow
class TableRead:
"""Implementation of TableRead for native Python reading."""
def __init__(self, table, predicate: Optional[Predicate], read_type: List[DataField]):
from pypaimon.table.file_store_table import FileStoreTable
self.table: FileStoreTable = table
self.predicate = predicate
self.read_type = read_type
def to_iterator(self, splits: List[Split]) -> Iterator:
def _record_generator():
for split in splits:
reader = self._create_split_read(split).create_reader()
try:
for batch in iter(reader.read_batch, None):
yield from iter(batch.next, None)
finally:
reader.close()
return _record_generator()
def to_arrow_batch_reader(self, splits: List[Split]) -> pyarrow.ipc.RecordBatchReader:
schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
batch_iterator = self._arrow_batch_generator(splits, schema)
return pyarrow.ipc.RecordBatchReader.from_batches(schema, batch_iterator)
@staticmethod
def _try_to_pad_batch_by_schema(batch: pyarrow.RecordBatch, target_schema):
if batch.schema.names == target_schema.names:
return batch
columns = []
num_rows = batch.num_rows
for field in target_schema:
if field.name in batch.column_names:
col = batch.column(field.name)
else:
col = pyarrow.nulls(num_rows, type=field.type)
columns.append(col)
return pyarrow.RecordBatch.from_arrays(columns, schema=target_schema)
def to_arrow(self, splits: List[Split]) -> Optional[pyarrow.Table]:
batch_reader = self.to_arrow_batch_reader(splits)
schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
table_list = []
for batch in iter(batch_reader.read_next_batch, None):
if batch.num_rows == 0:
continue
table_list.append(self._try_to_pad_batch_by_schema(batch, schema))
if not table_list:
return pyarrow.Table.from_arrays([pyarrow.array([], type=field.type) for field in schema], schema=schema)
else:
return pyarrow.Table.from_batches(table_list)
def _arrow_batch_generator(self, splits: List[Split], schema: pyarrow.Schema) -> Iterator[pyarrow.RecordBatch]:
chunk_size = 65536
for split in splits:
reader = self._create_split_read(split).create_reader()
try:
if isinstance(reader, RecordBatchReader):
yield from iter(reader.read_arrow_batch, None)
else:
row_tuple_chunk = []
for row_iterator in iter(reader.read_batch, None):
for row in iter(row_iterator.next, None):
if not isinstance(row, OffsetRow):
raise TypeError(f"Expected OffsetRow, but got {type(row).__name__}")
row_tuple_chunk.append(row.row_tuple[row.offset: row.offset + row.arity])
if len(row_tuple_chunk) >= chunk_size:
batch = self.convert_rows_to_arrow_batch(row_tuple_chunk, schema)
yield batch
row_tuple_chunk = []
if row_tuple_chunk:
batch = self.convert_rows_to_arrow_batch(row_tuple_chunk, schema)
yield batch
finally:
reader.close()
def to_pandas(self, splits: List[Split]) -> pandas.DataFrame:
arrow_table = self.to_arrow(splits)
return arrow_table.to_pandas()
def to_duckdb(self, splits: List[Split], table_name: str,
connection: Optional["DuckDBPyConnection"] = None) -> "DuckDBPyConnection":
import duckdb
con = connection or duckdb.connect(database=":memory:")
con.register(table_name, self.to_arrow(splits))
return con
def to_ray(
self,
splits: List[Split],
*,
ray_remote_args: Optional[Dict[str, Any]] = None,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
**read_args,
) -> "ray.data.dataset.Dataset":
"""Convert Paimon table data to Ray Dataset.
Args:
splits: List of splits to read from the Paimon table.
ray_remote_args: Optional kwargs passed to :func:`ray.remote` in read tasks.
For example, ``{"num_cpus": 2, "max_retries": 3}``.
concurrency: Optional max number of Ray tasks to run concurrently.
By default, dynamically decided based on available resources.
override_num_blocks: Optional override for the number of output blocks.
You needn't manually set this in most cases.
**read_args: Additional kwargs passed to the datasource.
For example, ``per_task_row_limit`` (Ray 2.52.0+).
See `Ray Data API <https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html>`_
for details.
"""
import ray
if not splits:
schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
empty_table = pyarrow.Table.from_arrays(
[pyarrow.array([], type=field.type) for field in schema],
schema=schema
)
return ray.data.from_arrow(empty_table)
if override_num_blocks is not None and override_num_blocks < 1:
raise ValueError(f"override_num_blocks must be at least 1, got {override_num_blocks}")
from pypaimon.read.ray_datasource import PaimonDatasource
datasource = PaimonDatasource(self, splits)
return ray.data.read_datasource(
datasource,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_blocks=override_num_blocks,
**read_args
)
def _create_split_read(self, split: Split) -> SplitRead:
if self.table.is_primary_key_table and not split.raw_convertible:
return MergeFileSplitRead(
table=self.table,
predicate=self.predicate,
read_type=self.read_type,
split=split,
row_tracking_enabled=False
)
elif self.table.options.data_evolution_enabled():
return DataEvolutionSplitRead(
table=self.table,
predicate=self.predicate,
read_type=self.read_type,
split=split,
row_tracking_enabled=True
)
else:
return RawFileSplitRead(
table=self.table,
predicate=self.predicate,
read_type=self.read_type,
split=split,
row_tracking_enabled=self.table.options.row_tracking_enabled()
)
@staticmethod
def convert_rows_to_arrow_batch(row_tuples: List[tuple], schema: pyarrow.Schema) -> pyarrow.RecordBatch:
columns_data = zip(*row_tuples)
pydict = {name: list(column) for name, column in zip(schema.names, columns_data)}
return pyarrow.RecordBatch.from_pydict(pydict, schema=schema)