blob: 9ce2d9ea26ad654aef5892541b0a7fb2deb4ac7f [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Optional
import pyarrow.fs as pafs
from pypaimon.common.file_io import FileIO
from pypaimon.read.plan import Plan
from pypaimon.table.format.format_data_split import FormatDataSplit
from pypaimon.table.format.format_table import FormatTable
def _is_data_file_name(name: str) -> bool:
if name is None:
return False
return not name.startswith(".") and not name.startswith("_")
def _is_reserved_dir_name(name: str) -> bool:
if not name:
return True
if name.startswith(".") or name.startswith("_"):
return True
if name.lower() in ("schema", "_schema"):
return True
return False
def _list_data_files_recursive(
file_io: FileIO,
path: str,
partition_keys: List[str],
partition_only_value: bool,
rel_path_parts: Optional[List[str]] = None,
) -> List[FormatDataSplit]:
splits: List[FormatDataSplit] = []
rel_path_parts = rel_path_parts or []
try:
infos = file_io.list_status(path)
except Exception:
return splits
if not infos:
return splits
path_rstrip = path.rstrip("/")
for info in infos:
name = info.path.split("/")[-1] if "/" in info.path else info.path
full_path = f"{path_rstrip}/{name}" if path_rstrip else name
if info.path.startswith("/") or info.path.startswith("file:"):
full_path = info.path
if info.type == pafs.FileType.Directory:
if _is_reserved_dir_name(name):
continue
part_value = name
if not partition_only_value and "=" in name:
part_value = name.split("=", 1)[1]
child_parts = rel_path_parts + [part_value]
if len(child_parts) <= len(partition_keys):
sub_splits = _list_data_files_recursive(
file_io,
full_path,
partition_keys,
partition_only_value,
child_parts,
)
splits.extend(sub_splits)
elif info.type == pafs.FileType.File and _is_data_file_name(name):
size = getattr(info, "size", None) or 0
part_spec: Optional[Dict[str, str]] = None
if partition_keys and len(rel_path_parts) >= len(partition_keys):
part_spec = dict(
zip(
partition_keys,
rel_path_parts[: len(partition_keys)],
)
)
splits.append(
FormatDataSplit(
file_path=full_path,
file_size=size,
partition=part_spec,
)
)
return splits
class FormatTableScan:
def __init__(
self,
table: FormatTable,
partition_filter: Optional[Dict[str, str]] = None,
):
self.table = table
self.partition_filter = partition_filter # optional equality filter
def plan(self) -> Plan:
partition_only_value = self.table.options().get(
"format-table.partition-path-only-value", "false"
).lower() == "true"
splits = _list_data_files_recursive(
self.table.file_io,
self.table.location(),
self.table.partition_keys,
partition_only_value,
)
if self.partition_filter:
filtered = []
for s in splits:
match = s.partition and all(
str(s.partition.get(k)) == str(v)
for k, v in self.partition_filter.items()
)
if match:
filtered.append(s)
splits = filtered
return Plan(_splits=splits)