blob: aa68e3b747b6bc28272a0d1a2fee0360bf7eaa4f [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""HDFS FileIO backed by the hdfs-native protocol client (no JVM, no libhdfs)."""
import logging
import os
import xml.etree.ElementTree as ET
from datetime import datetime, timezone
from pathlib import PurePosixPath
from typing import Dict, Optional
from urllib.parse import urlparse
import pyarrow
import pyarrow.fs as pafs
from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
from pypaimon.common.options.config import HdfsOptions, SecurityOptions
from pypaimon.common.uri_reader import UriReaderFactory
from pypaimon.filesystem import _kerberos
from pypaimon.schema.data_types import AtomicType, DataField, PyarrowFieldParser
from pypaimon.write.blob_format_writer import BlobFormatWriter
class _HdfsFileInfo:
"""pafs.FileInfo-shaped adapter built from hdfs_native.FileStatus."""
__slots__ = ('path', 'size', 'type', 'mtime', 'base_name')
def __init__(self, path: str, size: Optional[int], file_type, mtime):
self.path = path
self.size = size
self.type = file_type
self.mtime = mtime
self.base_name = path.rsplit('/', 1)[-1] if path else ''
class _HdfsWriterAdapter:
"""File-like wrapper over hdfs_native.FileWriter."""
def __init__(self, fw):
self._fw = fw
self._pos = 0
self._closed = False
def write(self, buf) -> int:
n = self._fw.write(buf)
if n is None:
n = len(buf) if hasattr(buf, '__len__') else 0
self._pos += n
return n
def tell(self) -> int:
return self._pos
def flush(self):
pass
def close(self):
if not self._closed:
try:
self._fw.close()
finally:
self._closed = True
@property
def closed(self) -> bool:
return self._closed
def writable(self) -> bool:
return True
def readable(self) -> bool:
return False
def seekable(self) -> bool:
return False
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
return False
class _HdfsReaderAdapter:
"""File-like wrapper over hdfs_native.FileReader.
Delegates read/seek/tell straight to the underlying reader (which is an
io.RawIOBase subclass with full seek/tell support). The wrapper only
exists so that exiting a `with` block guarantees the underlying handle
is closed — hdfs-native's own FileReader.__exit__ is a no-op.
"""
def __init__(self, fr):
self._fr = fr
self._closed = False
def read(self, size: int = -1) -> bytes:
return self._fr.read(-1 if size is None else size)
def read1(self, size: int = -1) -> bytes:
return self.read(size)
def seek(self, pos: int, whence: int = 0) -> int:
self._fr.seek(pos, whence)
return self._fr.tell()
def tell(self) -> int:
return self._fr.tell()
def close(self):
if self._closed:
return
try:
close = getattr(self._fr, 'close', None)
if close is not None:
close()
finally:
self._closed = True
@property
def closed(self) -> bool:
return self._closed
def readable(self) -> bool:
return True
def writable(self) -> bool:
return False
def seekable(self) -> bool:
return True
def __enter__(self):
return self
def __exit__(self, *exc):
self.close()
return False
class HdfsNativeFileIO(FileIO):
"""HDFS FileIO that speaks the HDFS RPC protocol directly.
No JVM, no libhdfs, no Hadoop install required. Hadoop xml is still
consumed if present (HADOOP_CONF_DIR or `hdfs.conf-dir` option) for
viewfs mount tables and HA NameNode lists; alternatively the same
key/values can be delivered via the catalog options channel (a REST
catalog can therefore push the cluster wiring with the response).
"""
NATIVE_KEY_PREFIXES = HdfsOptions.HDFS_NATIVE_CONFIG_KEY_PREFIXES
NS_PREFIX = HdfsOptions.HDFS_CONFIG_PREFIX
def __init__(self, path: str, catalog_options: Options):
self.properties = catalog_options or Options({})
self.logger = logging.getLogger(__name__)
self.uri_reader_factory = UriReaderFactory(self.properties)
scheme, netloc, _ = self.parse_location(path)
if scheme not in {"hdfs", "viewfs"}:
raise ValueError(
f"HdfsNativeFileIO does not support scheme '{scheme}'"
)
self._scheme = scheme
self._netloc = netloc
try:
from hdfs_native import Client, WriteOptions
except ImportError as e:
raise ImportError(
"hdfs-native is not installed. "
"Install with: pip install 'pypaimon[hdfs]'"
) from e
self._WriteOptions = WriteOptions
self._setup_kerberos()
config_dir = (
self.properties.get(HdfsOptions.HDFS_CONF_DIR)
or os.environ.get("HADOOP_CONF_DIR")
)
hadoop_xml = self._load_hadoop_xml(config_dir)
config = self._build_config_dict()
self._maybe_inject_viewfs_fallback(scheme, netloc, config, hadoop_xml)
# Stash for the lazy `filesystem` property (the fsspec/pyarrow facade
# is only built if a caller asks for it).
self._config = config
self._hadoop_xml = hadoop_xml
self._config_dir = config_dir
self._filesystem = None
client_kwargs = {}
url = self._build_url(scheme, netloc)
if url:
client_kwargs["url"] = url
if config:
client_kwargs["config"] = config
if config_dir:
client_kwargs["config_dir"] = config_dir
self._client = Client(**client_kwargs)
def __reduce__(self):
"""Pickle support for Ray / multiprocessing.
hdfs_native.Client is a Rust binding that can't be pickled; rather
than try to serialise live handles, we serialise the constructor
inputs and let workers re-init their own Client. Same pattern
pyarrow.fs.HadoopFileSystem uses.
Pin the resolved config_dir into the carried options. If the
driver resolved it from $HADOOP_CONF_DIR, a worker on a host with
a different env var would otherwise pick up the worker's value
and silently talk to a different cluster.
"""
netloc = self._netloc or ""
path = f"{self._scheme}://{netloc}"
props_map = dict(self.properties.to_map())
if self._config_dir and not props_map.get(
HdfsOptions.HDFS_CONF_DIR.key()
):
props_map[HdfsOptions.HDFS_CONF_DIR.key()] = self._config_dir
return (type(self), (path, Options(props_map)))
@property
def filesystem(self):
"""pyarrow.fs.FileSystem facade backed by hdfs_native.fsspec.
Lazily constructed: FileIO-only call paths
(exists/list_status/new_input_stream/...) never pay the fsspec init
cost; only ds.dataset / open_input_file callers do.
"""
if self._filesystem is None:
import pyarrow.fs as pafs
try:
from hdfs_native.fsspec import (
HdfsFileSystem,
ViewfsFileSystem,
)
except ImportError as e:
raise RuntimeError(
"hdfs-native fsspec adapter is required to bridge "
"HdfsNativeFileIO to a pyarrow.fs filesystem; upgrade "
"hdfs-native (>=0.13)."
) from e
cls = (ViewfsFileSystem if self._scheme == "viewfs"
else HdfsFileSystem)
# Merge xml + overrides so the fsspec instance can connect
# without relying on HADOOP_CONF_DIR (BaseFileSystem.__init__
# only forwards storage_options to Client, not config_dir).
merged_config = {**self._hadoop_xml, **self._config}
fsspec_fs = cls(host=self._netloc, **merged_config)
self._filesystem = pafs.PyFileSystem(
pafs.FSSpecHandler(fsspec_fs))
return self._filesystem
@staticmethod
def parse_location(location: str):
uri = urlparse(location)
if not uri.scheme:
return "file", uri.netloc, os.path.abspath(location)
return uri.scheme, uri.netloc, uri.path
@staticmethod
def _build_url(scheme: str, netloc: Optional[str]) -> Optional[str]:
if not netloc:
return None
return f"{scheme}://{netloc}"
@staticmethod
def _load_hadoop_xml(config_dir: Optional[str]) -> Dict[str, str]:
"""Parse core-site.xml + hdfs-site.xml from a Hadoop config dir into a
flat {name: value} dict. Returns empty dict if the dir is missing or
unreadable.
Used only to discover viewfs mount-table state so we can polyfill the
linkFallback mount that hdfs-native requires but libhdfs tolerates.
The final config dir is still handed to hdfs-native for its own
(more complete) xml parsing.
"""
result: Dict[str, str] = {}
if not config_dir or not os.path.isdir(config_dir):
return result
for fname in ("core-site.xml", "hdfs-site.xml"):
path = os.path.join(config_dir, fname)
if not os.path.isfile(path):
continue
try:
tree = ET.parse(path)
except (ET.ParseError, OSError):
continue
for prop in tree.getroot().findall("property"):
name_el = prop.find("name")
value_el = prop.find("value")
if name_el is None or name_el.text is None:
continue
value = (
value_el.text.strip()
if value_el is not None and value_el.text
else ""
)
result[name_el.text.strip()] = value
return result
@staticmethod
def _maybe_inject_viewfs_fallback(
scheme: str,
netloc: Optional[str],
overrides: Dict[str, str],
hadoop_xml: Dict[str, str],
) -> None:
"""If we're opening a viewfs URI and no linkFallback is configured for
the cluster, pick a usable nameservice URI from existing link.*
targets or dfs.nameservices and inject one into `overrides`.
hdfs-native rejects viewfs init without a fallback mount; libhdfs
tolerates it. This bridges the gap without touching cluster xml.
The mount-table state is read from the merged view of hadoop xml and
catalog-option overrides, so a zero-file viewfs setup (link.* /
dfs.nameservices pushed purely through catalog options) gets a
fallback too; the injected key is still only written back to
`overrides`.
"""
if scheme != "viewfs" or not netloc:
return
cluster = netloc
fallback_key = f"fs.viewfs.mounttable.{cluster}.linkFallback"
if fallback_key in overrides or fallback_key in hadoop_xml:
return
merged = {**hadoop_xml, **overrides}
link_prefix = f"fs.viewfs.mounttable.{cluster}.link."
for key, value in merged.items():
if key.startswith(link_prefix) and value:
parsed = urlparse(value)
if parsed.scheme == "hdfs" and parsed.netloc:
overrides[fallback_key] = f"hdfs://{parsed.netloc}/"
return
nameservices = [
ns.strip()
for ns in merged.get("dfs.nameservices", "").split(",")
if ns.strip()
]
if nameservices:
overrides[fallback_key] = f"hdfs://{nameservices[0]}/"
def _setup_kerberos(self):
principal = (
self.properties.get(SecurityOptions.KERBEROS_PRINCIPAL)
or self.properties.to_map().get("security.principal")
)
keytab = (
self.properties.get(SecurityOptions.KERBEROS_KEYTAB)
or self.properties.to_map().get("security.keytab")
)
if bool(principal) != bool(keytab):
raise ValueError(
"security.kerberos.login.principal and "
"security.kerberos.login.keytab "
"must be both set or both unset"
)
if principal and keytab:
_kerberos.kerberos_login_from_keytab(principal, keytab)
cache_path = _kerberos.get_ticket_cache_path()
if not cache_path:
raise RuntimeError(
"kinit succeeded but no ticket cache path could be "
"determined. Set the KRB5CCNAME environment variable "
"to specify the cache location."
)
# hdfs-native's GSSAPI layer reads KRB5CCNAME from the process
# env, which is global state. If a different cache was already
# configured (typically because another HdfsNativeFileIO with
# a different principal lives in the same process), warn — the
# last writer wins and earlier instances will start using the
# new ticket, which is almost certainly not what the caller
# wanted.
existing = os.environ.get("KRB5CCNAME")
existing_stripped = (
existing[5:] if existing and existing.startswith("FILE:")
else existing
)
if existing_stripped and existing_stripped != cache_path:
self.logger.warning(
"Overwriting process-global KRB5CCNAME from %r to %r; "
"concurrent HdfsNativeFileIO instances with different "
"Kerberos principals share env state and will clobber "
"each other's ticket caches.",
existing, cache_path,
)
# Preserve the `FILE:` qualifier if the existing value carried
# it — some GSSAPI tooling distinguishes cache types by prefix.
os.environ["KRB5CCNAME"] = (
f"FILE:{cache_path}"
if existing and existing.startswith("FILE:")
else cache_path
)
def _build_config_dict(self) -> Dict[str, str]:
config: Dict[str, str] = {}
for key, value in self.properties.to_map().items():
if value is None:
continue
if any(key.startswith(p) for p in self.NATIVE_KEY_PREFIXES):
config[key] = str(value)
elif key.startswith(self.NS_PREFIX):
config[key[len(self.NS_PREFIX):]] = str(value)
return config
def to_filesystem_path(self, path: str) -> str:
# hdfs-native expects an absolute path within the cluster the Client is
# bound to; passing a full URI makes its Rust-side MountTable::resolve
# treat the string as a relative path (since it doesn't start with '/')
# and prepend the user's home dir, producing nonsense like
# `/user/foo/viewfs://cluster/...`. Strip the matching scheme+authority
# so a plain absolute path reaches the client.
parsed = urlparse(path)
if parsed.scheme in ("hdfs", "viewfs"):
if parsed.scheme == self._scheme and (
not parsed.netloc or parsed.netloc == self._netloc
):
return parsed.path or "/"
return path
def _adapt_status(self, status, fallback_path: str = '') -> _HdfsFileInfo:
path = getattr(status, 'path', None) or fallback_path
is_dir = bool(getattr(status, 'isdir', False))
length = getattr(status, 'length', 0)
mtime_ms = getattr(status, 'modification_time', None)
mtime = (
datetime.fromtimestamp(mtime_ms / 1000.0, tz=timezone.utc)
if mtime_ms else None
)
size = None if is_dir else int(length or 0)
ftype = pafs.FileType.Directory if is_dir else pafs.FileType.File
return _HdfsFileInfo(path, size, ftype, mtime)
def new_input_stream(self, path: str):
path_str = self.to_filesystem_path(path)
reader = self._client.read(path_str)
return _HdfsReaderAdapter(reader)
def new_output_stream(self, path: str):
path_str = self.to_filesystem_path(path)
writer = self._client.create(
path_str,
self._WriteOptions(create_parent=True, overwrite=True),
)
return _HdfsWriterAdapter(writer)
def get_file_status(self, path: str):
path_str = self.to_filesystem_path(path)
try:
status = self._client.get_file_info(path_str)
except FileNotFoundError:
raise FileNotFoundError(f"File {path} does not exist")
return self._adapt_status(status, path_str)
def list_status(self, path: str):
path_str = self.to_filesystem_path(path)
return [self._adapt_status(s) for s in self._client.list_status(path_str)]
def exists(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
try:
self._client.get_file_info(path_str)
return True
except FileNotFoundError:
return False
def delete(self, path: str, recursive: bool = False) -> bool:
path_str = self.to_filesystem_path(path)
try:
status = self._client.get_file_info(path_str)
except FileNotFoundError:
return False
if bool(getattr(status, 'isdir', False)) and not recursive:
if next(iter(self._client.list_status(path_str)), None) is not None:
raise OSError(f"Directory {path} is not empty")
return bool(self._client.delete(path_str, recursive))
def mkdirs(self, path: str) -> bool:
path_str = self.to_filesystem_path(path)
try:
status = self._client.get_file_info(path_str)
except FileNotFoundError:
self._client.mkdirs(path_str, create_parent=True)
return True
if bool(getattr(status, 'isdir', False)):
return True
raise FileExistsError(f"Path exists but is not a directory: {path}")
def rename(self, src: str, dst: str) -> bool:
src_str = self.to_filesystem_path(src)
dst_str = self.to_filesystem_path(dst)
dst_parent = str(PurePosixPath(dst_str).parent)
if dst_parent and dst_parent != '.':
try:
self._client.get_file_info(dst_parent)
except FileNotFoundError:
self._client.mkdirs(dst_parent, create_parent=True)
try:
dst_status = self._client.get_file_info(dst_str)
if not getattr(dst_status, 'isdir', False):
return False
src_name = PurePosixPath(src_str).name
dst_str = str(PurePosixPath(dst_str) / src_name)
try:
self._client.get_file_info(dst_str)
return False
except FileNotFoundError:
pass
except FileNotFoundError:
pass
try:
self._client.rename(src_str, dst_str)
return True
except FileNotFoundError:
return False
except (PermissionError, OSError):
return False
def write_parquet(self, path: str, data: pyarrow.Table,
compression: str = 'zstd', zstd_level: int = 1, **kwargs):
try:
import pyarrow.parquet as pq
if compression.lower() == 'zstd':
kwargs['compression_level'] = zstd_level
with self.new_output_stream(path) as raw_stream:
stream = pyarrow.PythonFile(raw_stream, mode='wb')
try:
pq.write_table(
data, stream, compression=compression, **kwargs)
finally:
stream.close()
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Parquet file {path}: {e}") from e
def write_orc(self, path: str, data: pyarrow.Table,
compression: str = 'zstd', zstd_level: int = 1, **kwargs):
try:
import sys
import pyarrow.orc as orc
data = self._cast_time_columns_for_orc(data)
with self.new_output_stream(path) as raw_stream:
stream = pyarrow.PythonFile(raw_stream, mode='wb')
try:
if sys.version_info[:2] == (3, 6):
orc.write_table(data, stream, **kwargs)
else:
orc.write_table(
data, stream, compression=compression, **kwargs)
finally:
stream.close()
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
def write_avro(self, path: str, data: pyarrow.Table,
avro_schema=None, compression: str = 'zstd',
zstd_level: int = 1, **kwargs):
import fastavro
if avro_schema is None:
avro_schema = PyarrowFieldParser.to_avro_schema(data.schema)
records_dict = data.to_pydict()
def record_generator():
num_rows = len(list(records_dict.values())[0])
for i in range(num_rows):
record = {}
for col in records_dict.keys():
value = records_dict[col][i]
if isinstance(value, datetime) and value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
record[col] = value
yield record
codec_map = {
'null': 'null', 'deflate': 'deflate', 'snappy': 'snappy',
'bzip2': 'bzip2', 'xz': 'xz', 'zstandard': 'zstandard',
'zstd': 'zstandard',
}
codec = codec_map.get(compression.lower())
if codec is None:
raise ValueError(
f"Unsupported compression '{compression}' for Avro format. "
f"Supported compressions: {', '.join(sorted(codec_map.keys()))}."
)
if codec == 'zstandard':
kwargs['codec_compression_level'] = zstd_level
with self.new_output_stream(path) as output_stream:
fastavro.writer(output_stream, avro_schema,
record_generator(), codec=codec, **kwargs)
def write_blob(self, path: str, data: pyarrow.Table, **kwargs):
try:
if data.num_columns != 1:
raise RuntimeError(
f"Blob format only supports a single column, "
f"got {data.num_columns} columns")
field = data.schema[0]
if pyarrow.types.is_large_binary(field.type):
fields = [DataField(0, field.name, AtomicType("BLOB"))]
else:
paimon_type = PyarrowFieldParser.to_paimon_type(
field.type, field.nullable)
fields = [DataField(0, field.name, paimon_type)]
records_dict = data.to_pydict()
num_rows = data.num_rows
field_name = fields[0].name
with self.new_output_stream(path) as output_stream:
writer = BlobFormatWriter(output_stream)
for i in range(num_rows):
writer.write_value(records_dict[field_name][i],
fields, self.uri_reader_factory)
writer.close()
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write blob file {path}: {e}") from e
def write_lance(self, path: str, data: pyarrow.Table, **kwargs):
# Mirror the remote-scheme writer: lance/vortex talk to the backend
# through their own object_store, so we hand them the URI plus any
# storage options the FileIO exposes rather than routing through the
# native client. Without these two methods, an HDFS table configured
# with file.format=lance/vortex would hit FileIO's NotImplementedError
# now that this class is the default hdfs:// backend.
try:
import lance
from pypaimon.read.reader.lance_utils import to_lance_specified
file_path_for_lance, storage_options = to_lance_specified(self, path)
writer = lance.file.LanceFileWriter(
file_path_for_lance, data.schema,
storage_options=storage_options, **kwargs)
try:
for batch in data.to_batches():
writer.write_batch(batch)
finally:
writer.close()
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Lance file {path}: {e}") from e
def write_mosaic(self, path: str, data: pyarrow.Table, **kwargs):
try:
import mosaic
with self.new_output_stream(path) as output_stream:
mosaic.write_table(data, output_stream)
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Mosaic file {path}: {e}") from e
def write_vortex(self, path: str, data: pyarrow.Table, **kwargs):
try:
import vortex
from vortex import store
from pypaimon.read.reader.vortex_utils import to_vortex_specified
file_path_for_vortex, store_kwargs = to_vortex_specified(self, path)
if store_kwargs:
vortex_store = store.from_url(file_path_for_vortex, **store_kwargs)
vortex_store.write(vortex.array(data))
else:
from vortex._lib.io import write as vortex_write
vortex_write(vortex.array(data), file_path_for_vortex)
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Vortex file {path}: {e}") from e
def close(self):
self._client = None