| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| import logging |
| import os |
| import shutil |
| import threading |
| import uuid |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any, Dict, Optional |
| from urllib.parse import urlparse |
| |
| import pyarrow |
| import pyarrow.fs as pafs |
| |
| from pypaimon.common.file_io import FileIO |
| from pypaimon.common.options import Options |
| from pypaimon.common.uri_reader import UriReaderFactory |
| from pypaimon.filesystem.local import PaimonLocalFileSystem |
| from pypaimon.schema.data_types import DataField, AtomicType, PyarrowFieldParser |
| from pypaimon.table.row.blob import BlobData, BlobDescriptor, Blob |
| from pypaimon.table.row.generic_row import GenericRow |
| from pypaimon.table.row.row_kind import RowKind |
| from pypaimon.write.blob_format_writer import BlobFormatWriter |
| |
| |
| class LocalFileIO(FileIO): |
| """ |
| Local file system implementation of FileIO. |
| """ |
| |
| RENAME_LOCK = threading.Lock() |
| |
| def __init__(self, path: str = None, catalog_options: Optional[Options] = None): |
| self.logger = logging.getLogger(__name__) |
| self.path = path |
| self.properties = catalog_options or Options({}) |
| self.filesystem = PaimonLocalFileSystem() |
| self.uri_reader_factory = UriReaderFactory(self.properties) |
| |
| @staticmethod |
| def create(): |
| return LocalFileIO() |
| |
| def _to_file(self, path: str) -> Path: |
| parsed = urlparse(path) |
| |
| if parsed.scheme and len(parsed.scheme) == 1 and not parsed.netloc: |
| return Path(path) |
| |
| if parsed.scheme == 'file' and parsed.netloc and parsed.netloc.endswith(':'): |
| drive_letter = parsed.netloc.rstrip(':') |
| path_part = parsed.path.lstrip('/') if parsed.path else '' |
| if path_part: |
| return Path(f"{drive_letter}:/{path_part}") |
| else: |
| return Path(f"{drive_letter}:") |
| |
| local_path = parsed.path if parsed.scheme else path |
| |
| if not local_path: |
| return Path(".") |
| |
| return Path(local_path) |
| |
| def new_input_stream(self, path: str): |
| if self.logger.isEnabledFor(logging.DEBUG): |
| self.logger.debug(f"Invoking new_input_stream for {path}") |
| |
| file_path = self._to_file(path) |
| if not file_path.exists(): |
| raise FileNotFoundError(f"File {path} does not exist") |
| |
| return open(file_path, 'rb') |
| |
| def new_output_stream(self, path: str): |
| if self.logger.isEnabledFor(logging.DEBUG): |
| self.logger.debug(f"Invoking new_output_stream for {path}") |
| |
| file_path = self._to_file(path) |
| # Create parent directories if needed |
| parent = file_path.parent |
| if parent and not parent.exists(): |
| parent.mkdir(parents=True, exist_ok=True) |
| |
| return open(file_path, 'wb') |
| |
| def get_file_status(self, path: str): |
| if self.logger.isEnabledFor(logging.DEBUG): |
| self.logger.debug(f"Invoking get_file_status for {path}") |
| |
| file_path = self._to_file(path) |
| if not file_path.exists(): |
| import getpass |
| user = getpass.getuser() |
| raise FileNotFoundError( |
| f"File {path} does not exist or the user running " |
| f"Paimon ('{user}') has insufficient permissions to access it." |
| ) |
| |
| class LocalFileStatus: |
| def __init__(self, file_path: Path, original_path: str): |
| stat_info = file_path.stat() |
| self.path = str(file_path.absolute()) |
| self.original_path = original_path |
| self.base_name = os.path.basename(original_path) |
| self.size = stat_info.st_size if file_path.is_file() else None |
| self.type = ( |
| pafs.FileType.Directory if file_path.is_dir() |
| else pafs.FileType.File if file_path.is_file() |
| else pafs.FileType.NotFound |
| ) |
| self.mtime = stat_info.st_mtime |
| |
| return LocalFileStatus(file_path, path) |
| |
| def list_status(self, path: str): |
| if self.logger.isEnabledFor(logging.DEBUG): |
| self.logger.debug(f"Invoking list_status for {path}") |
| |
| file_path = self._to_file(path) |
| results = [] |
| |
| if not file_path.exists(): |
| return results |
| |
| if file_path.is_file(): |
| results.append(self.get_file_status(path)) |
| elif file_path.is_dir(): |
| try: |
| for item in file_path.iterdir(): |
| try: |
| if path.startswith('file://'): |
| item_path = f"file://{item}" |
| else: |
| item_path = str(item) |
| results.append(self.get_file_status(item_path)) |
| except FileNotFoundError: |
| pass |
| except PermissionError: |
| pass |
| |
| return results |
| |
| def exists(self, path: str) -> bool: |
| if self.logger.isEnabledFor(logging.DEBUG): |
| self.logger.debug(f"Invoking exists for {path}") |
| |
| file_path = self._to_file(path) |
| return file_path.exists() |
| |
| def delete(self, path: str, recursive: bool = False) -> bool: |
| if self.logger.isEnabledFor(logging.DEBUG): |
| self.logger.debug(f"Invoking delete for {path}") |
| |
| file_path = self._to_file(path) |
| |
| if not file_path.exists(): |
| return False |
| |
| if file_path.is_file(): |
| file_path.unlink() |
| return True |
| elif file_path.is_dir(): |
| if not recursive: |
| try: |
| items = list(file_path.iterdir()) |
| if items: |
| raise OSError(f"Directory {path} is not empty") |
| except PermissionError: |
| raise OSError( |
| f"Directory {path} does not exist or an I/O error occurred" |
| ) |
| file_path.rmdir() |
| else: |
| shutil.rmtree(file_path) |
| return True |
| |
| return False |
| |
| def mkdirs(self, path: str) -> bool: |
| if self.logger.isEnabledFor(logging.DEBUG): |
| self.logger.debug(f"Invoking mkdirs for {path}") |
| |
| file_path = self._to_file(path) |
| |
| if file_path.is_dir(): |
| return True |
| elif file_path.exists() and not file_path.is_dir(): |
| raise FileExistsError(f"Path exists but is not a directory: {path}") |
| |
| file_path.mkdir(parents=True, exist_ok=True) |
| return True |
| |
| def rename(self, src: str, dst: str) -> bool: |
| if self.logger.isEnabledFor(logging.DEBUG): |
| self.logger.debug(f"Invoking rename for {src} to {dst}") |
| |
| src_file = self._to_file(src) |
| dst_file = self._to_file(dst) |
| |
| dst_parent = dst_file.parent |
| if dst_parent and not dst_parent.exists(): |
| dst_parent.mkdir(parents=True, exist_ok=True) |
| |
| try: |
| with LocalFileIO.RENAME_LOCK: |
| if dst_file.exists(): |
| if dst_file.is_file(): |
| return False |
| # Make it compatible with HadoopFileIO: if dst is an existing directory, |
| # dst=dst/srcFileName |
| dst_file = dst_file / src_file.name |
| if dst_file.exists(): |
| return False |
| |
| # Perform atomic move |
| src_file.rename(dst_file) |
| return True |
| except FileNotFoundError: |
| return False |
| except (PermissionError, OSError): |
| return False |
| |
| def try_to_write_atomic(self, path: str, content: str) -> bool: |
| file_path = self._to_file(path) |
| if file_path.exists() and file_path.is_dir(): |
| return False |
| |
| parent = file_path.parent |
| if parent and not parent.exists(): |
| parent.mkdir(parents=True, exist_ok=True) |
| |
| temp_path = file_path.parent / f"{file_path.name}.{uuid.uuid4()}.tmp" |
| success = False |
| try: |
| with open(temp_path, 'w', encoding='utf-8') as f: |
| f.write(content) |
| success = self.rename(str(temp_path), path) |
| finally: |
| if not success and temp_path.exists(): |
| self.delete_quietly(str(temp_path)) |
| return success |
| |
| def copy_file(self, source_path: str, target_path: str, overwrite: bool = False): |
| if not overwrite and self.exists(target_path): |
| raise FileExistsError(f"Target file {target_path} already exists and overwrite=False") |
| |
| source_file = self._to_file(source_path) |
| target_file = self._to_file(target_path) |
| |
| target_parent = target_file.parent |
| if target_parent and not target_parent.exists(): |
| target_parent.mkdir(parents=True, exist_ok=True) |
| |
| shutil.copy2(source_file, target_file) |
| |
| def to_filesystem_path(self, path: str) -> str: |
| file_path = self._to_file(path) |
| result = str(file_path) |
| parsed = urlparse(path) |
| original_path = parsed.path if parsed.scheme else path |
| if original_path.startswith('./') and not result.startswith('./'): |
| result = './' + result |
| return result |
| |
| @staticmethod |
| def parse_location(location: str): |
| uri = urlparse(location) |
| if not uri.scheme: |
| return "file", uri.netloc, os.path.abspath(location) |
| elif uri.scheme == "file": |
| return "file", uri.netloc, uri.path |
| else: |
| raise ValueError(f"LocalFileIO only supports file:// scheme, got {uri.scheme}") |
| |
| def write_parquet(self, path: str, data: pyarrow.Table, compression: str = 'zstd', |
| zstd_level: int = 1, **kwargs): |
| try: |
| import pyarrow.parquet as pq |
| |
| file_path = self._to_file(path) |
| parent = file_path.parent |
| if parent and not parent.exists(): |
| parent.mkdir(parents=True, exist_ok=True) |
| |
| with open(file_path, 'wb') as f: |
| if compression.lower() == 'zstd': |
| kwargs['compression_level'] = zstd_level |
| pq.write_table(data, f, compression=compression, **kwargs) |
| except Exception as e: |
| self.delete_quietly(path) |
| raise RuntimeError(f"Failed to write Parquet file {path}: {e}") from e |
| |
| def write_orc(self, path: str, data: pyarrow.Table, compression: str = 'zstd', |
| zstd_level: int = 1, **kwargs): |
| try: |
| import sys |
| import pyarrow.orc as orc |
| |
| file_path = self._to_file(path) |
| parent = file_path.parent |
| if parent and not parent.exists(): |
| parent.mkdir(parents=True, exist_ok=True) |
| |
| with open(file_path, 'wb') as f: |
| if sys.version_info[:2] == (3, 6): |
| orc.write_table(data, f, **kwargs) |
| else: |
| orc.write_table(data, f, compression=compression, **kwargs) |
| except Exception as e: |
| self.delete_quietly(path) |
| raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e |
| |
| def write_avro(self, path: str, data: pyarrow.Table, |
| avro_schema: Optional[Dict[str, Any]] = None, |
| compression: str = 'zstd', zstd_level: int = 1, **kwargs): |
| import fastavro |
| if avro_schema is None: |
| avro_schema = PyarrowFieldParser.to_avro_schema(data.schema) |
| |
| records_dict = data.to_pydict() |
| |
| def record_generator(): |
| num_rows = len(list(records_dict.values())[0]) |
| for i in range(num_rows): |
| record = {} |
| for col in records_dict.keys(): |
| value = records_dict[col][i] |
| if isinstance(value, datetime) and value.tzinfo is None: |
| value = value.replace(tzinfo=timezone.utc) |
| record[col] = value |
| yield record |
| |
| records = record_generator() |
| |
| codec_map = { |
| 'null': 'null', |
| 'deflate': 'deflate', |
| 'snappy': 'snappy', |
| 'bzip2': 'bzip2', |
| 'xz': 'xz', |
| 'zstandard': 'zstandard', |
| 'zstd': 'zstandard', |
| } |
| compression_lower = compression.lower() |
| |
| codec = codec_map.get(compression_lower) |
| if codec is None: |
| raise ValueError( |
| f"Unsupported compression '{compression}' for Avro format. " |
| f"Supported compressions: {', '.join(sorted(codec_map.keys()))}." |
| ) |
| |
| file_path = self._to_file(path) |
| parent = file_path.parent |
| if parent and not parent.exists(): |
| parent.mkdir(parents=True, exist_ok=True) |
| |
| with open(file_path, 'wb') as output_stream: |
| if codec == 'zstandard': |
| kwargs['codec_compression_level'] = zstd_level |
| fastavro.writer(output_stream, avro_schema, records, codec=codec, **kwargs) |
| |
| def write_lance(self, path: str, data: pyarrow.Table, **kwargs): |
| try: |
| import lance |
| from pypaimon.read.reader.lance_utils import to_lance_specified |
| file_path_for_lance, storage_options = to_lance_specified(self, path) |
| |
| writer = lance.file.LanceFileWriter( |
| file_path_for_lance, data.schema, storage_options=storage_options, **kwargs) |
| try: |
| for batch in data.to_batches(): |
| writer.write_batch(batch) |
| finally: |
| writer.close() |
| except Exception as e: |
| self.delete_quietly(path) |
| raise RuntimeError(f"Failed to write Lance file {path}: {e}") from e |
| |
| def write_blob(self, path: str, data: pyarrow.Table, blob_as_descriptor: bool, **kwargs): |
| try: |
| if data.num_columns != 1: |
| raise RuntimeError(f"Blob format only supports a single column, got {data.num_columns} columns") |
| |
| column = data.column(0) |
| if column.null_count > 0: |
| raise RuntimeError("Blob format does not support null values") |
| |
| field = data.schema[0] |
| if pyarrow.types.is_large_binary(field.type): |
| fields = [DataField(0, field.name, AtomicType("BLOB"))] |
| else: |
| paimon_type = PyarrowFieldParser.to_paimon_type(field.type, field.nullable) |
| fields = [DataField(0, field.name, paimon_type)] |
| |
| records_dict = data.to_pydict() |
| num_rows = data.num_rows |
| field_name = fields[0].name |
| |
| file_path = self._to_file(path) |
| parent = file_path.parent |
| if parent and not parent.exists(): |
| parent.mkdir(parents=True, exist_ok=True) |
| |
| with open(file_path, 'wb') as output_stream: |
| writer = BlobFormatWriter(output_stream) |
| for i in range(num_rows): |
| col_data = records_dict[field_name][i] |
| if hasattr(fields[0].type, 'type') and fields[0].type.type == "BLOB": |
| if blob_as_descriptor: |
| blob_descriptor = BlobDescriptor.deserialize(col_data) |
| uri_reader = self.uri_reader_factory.create(blob_descriptor.uri) |
| blob_data = Blob.from_descriptor(uri_reader, blob_descriptor) |
| elif isinstance(col_data, bytes): |
| blob_data = BlobData(col_data) |
| else: |
| if hasattr(col_data, 'as_py'): |
| col_data = col_data.as_py() |
| if isinstance(col_data, str): |
| col_data = col_data.encode('utf-8') |
| blob_data = BlobData(col_data) |
| row_values = [blob_data] |
| else: |
| row_values = [col_data] |
| row = GenericRow(row_values, fields, RowKind.INSERT) |
| writer.add_element(row) |
| writer.close() |
| except Exception as e: |
| self.delete_quietly(path) |
| raise RuntimeError(f"Failed to write blob file {path}: {e}") from e |