blob: 957ee8f123dd575a6996fb2def49c09b8195b2fc [file]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import sys
import tempfile
import types
import unittest
from unittest.mock import MagicMock, patch
import pyarrow.fs as pafs
from pypaimon.common.file_io import FileIO
from pypaimon.common.options import Options
from pypaimon.common.options.config import HdfsOptions
def _install_fake_hdfs_native():
"""Install a fake hdfs_native module (with .fsspec submodule) into
sys.modules.
Returns (fake_module, mock_client_cls, mock_write_options_cls).
"""
fake = types.ModuleType("hdfs_native")
fake.Client = MagicMock(name="Client")
fake.WriteOptions = MagicMock(name="WriteOptions")
fsspec_mod = types.ModuleType("hdfs_native.fsspec")
fsspec_mod.HdfsFileSystem = MagicMock(name="HdfsFileSystem")
fsspec_mod.ViewfsFileSystem = MagicMock(name="ViewfsFileSystem")
fake.fsspec = fsspec_mod
sys.modules["hdfs_native"] = fake
sys.modules["hdfs_native.fsspec"] = fsspec_mod
return fake, fake.Client, fake.WriteOptions
def _uninstall_fake_hdfs_native():
sys.modules.pop("hdfs_native", None)
sys.modules.pop("hdfs_native.fsspec", None)
# Also drop the cached HdfsNativeFileIO so a re-import sees the new fake
sys.modules.pop(
"pypaimon.filesystem.hdfs_native_file_io", None)
class HdfsOptionsTest(unittest.TestCase):
def test_defaults(self):
opts = Options({})
self.assertEqual(opts.get(HdfsOptions.HDFS_CLIENT_IMPL), "native")
self.assertTrue(opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW))
self.assertIsNone(opts.get(HdfsOptions.HDFS_CONF_DIR))
def test_explicit_pyarrow(self):
opts = Options({"hdfs.client.impl": "pyarrow"})
self.assertEqual(opts.get(HdfsOptions.HDFS_CLIENT_IMPL), "pyarrow")
def test_explicit_fallback_false(self):
opts = Options({"hdfs.client.fallback-to-pyarrow": "false"})
self.assertFalse(opts.get(HdfsOptions.HDFS_CLIENT_FALLBACK_TO_PYARROW))
class HdfsNativeFileIORoutingTest(unittest.TestCase):
def setUp(self):
_uninstall_fake_hdfs_native()
def tearDown(self):
_uninstall_fake_hdfs_native()
def test_local_paths_unaffected(self):
fio = FileIO.get("file:///tmp/foo")
self.assertEqual(type(fio).__name__, "LocalFileIO")
def test_default_hdfs_routes_to_native(self):
_install_fake_hdfs_native()
fio = FileIO.get("hdfs://ns/foo", Options({}))
self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
def test_explicit_pyarrow_routes_to_pyarrow(self):
# No hdfs-native module needed; should go straight to pyarrow.
with patch(
"pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__",
return_value=None,
):
fio = FileIO.get(
"hdfs://ns/foo",
Options({"hdfs.client.impl": "pyarrow"}),
)
self.assertEqual(type(fio).__name__, "PyArrowFileIO")
def test_native_init_failure_falls_back_to_pyarrow(self):
# hdfs_native not installed; default fallback enabled.
_uninstall_fake_hdfs_native()
with patch(
"pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__",
return_value=None,
):
fio = FileIO.get("hdfs://ns/foo", Options({}))
self.assertEqual(type(fio).__name__, "PyArrowFileIO")
def test_native_init_failure_no_fallback_raises(self):
_uninstall_fake_hdfs_native()
with self.assertRaises(ImportError):
FileIO.get(
"hdfs://ns/foo",
Options({"hdfs.client.fallback-to-pyarrow": "false"}),
)
def test_unsupported_impl_raises(self):
with self.assertRaises(ValueError) as ctx:
FileIO.get(
"hdfs://ns/foo",
Options({"hdfs.client.impl": "bogus"}),
)
self.assertIn("Unsupported hdfs.client.impl", str(ctx.exception))
def test_env_var_override_when_option_absent(self):
_install_fake_hdfs_native()
with patch(
"pypaimon.filesystem.pyarrow_file_io.PyArrowFileIO.__init__",
return_value=None,
):
with patch.dict(os.environ, {"PYPAIMON_HDFS_IMPL": "pyarrow"}):
fio = FileIO.get("hdfs://ns/foo", Options({}))
self.assertEqual(type(fio).__name__, "PyArrowFileIO")
def test_option_wins_over_env_var(self):
_install_fake_hdfs_native()
with patch.dict(os.environ, {"PYPAIMON_HDFS_IMPL": "pyarrow"}):
fio = FileIO.get(
"hdfs://ns/foo",
Options({"hdfs.client.impl": "native"}),
)
self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
def test_viewfs_scheme_routes_to_native(self):
_install_fake_hdfs_native()
fio = FileIO.get("viewfs://cluster/foo", Options({}))
self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
def test_empty_impl_option_treated_as_unset(self):
# Templated configs sometimes blank the option to opt out — that
# should fall through to the default ("native"), not raise
# "Unsupported hdfs.client.impl ''".
_install_fake_hdfs_native()
fio = FileIO.get("hdfs://ns/foo", Options({"hdfs.client.impl": ""}))
self.assertEqual(type(fio).__name__, "HdfsNativeFileIO")
class HdfsNativeFileIOInitTest(unittest.TestCase):
def setUp(self):
self._fake, self._client_cls, self._wo_cls = _install_fake_hdfs_native()
def tearDown(self):
_uninstall_fake_hdfs_native()
def _make(self, path, props=None):
from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
return HdfsNativeFileIO(path, Options(props or {}))
def test_constructs_client_with_url(self):
self._make("hdfs://ns1/warehouse")
self._client_cls.assert_called_once()
_, kwargs = self._client_cls.call_args
self.assertEqual(kwargs.get("url"), "hdfs://ns1")
self.assertNotIn("config", kwargs)
def test_viewfs_scheme_passes_through(self):
self._make("viewfs://cluster1/")
_, kwargs = self._client_cls.call_args
self.assertEqual(kwargs.get("url"), "viewfs://cluster1")
def test_native_hadoop_keys_forwarded_as_config(self):
self._make("hdfs://ns1/foo", {
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "host1:8020",
"fs.viewfs.mounttable.cluster.link./prod": "hdfs://ns1/prod",
"warehouse": "hdfs://ns1/warehouse", # should NOT be forwarded
})
_, kwargs = self._client_cls.call_args
config = kwargs.get("config", {})
self.assertEqual(config.get("dfs.nameservices"), "ns1")
self.assertEqual(config.get("dfs.ha.namenodes.ns1"), "nn1,nn2")
self.assertEqual(
config.get("dfs.namenode.rpc-address.ns1.nn1"), "host1:8020")
self.assertEqual(
config.get("fs.viewfs.mounttable.cluster.link./prod"),
"hdfs://ns1/prod")
self.assertNotIn("warehouse", config)
def test_namespaced_overrides_forwarded(self):
self._make("hdfs://ns1/foo", {
"hdfs.config.dfs.client.read.shortcircuit": "true",
})
_, kwargs = self._client_cls.call_args
config = kwargs.get("config", {})
self.assertEqual(
config.get("dfs.client.read.shortcircuit"), "true")
def test_conf_dir_from_option(self):
self._make("hdfs://ns1/foo", {
"hdfs.conf-dir": "/tmp/conf",
})
_, kwargs = self._client_cls.call_args
self.assertEqual(kwargs.get("config_dir"), "/tmp/conf")
def test_conf_dir_from_env(self):
env = dict(os.environ)
env["HADOOP_CONF_DIR"] = "/env/conf"
with patch.dict(os.environ, env, clear=True):
self._make("hdfs://ns1/foo")
_, kwargs = self._client_cls.call_args
self.assertEqual(kwargs.get("config_dir"), "/env/conf")
def test_option_conf_dir_overrides_env(self):
env = dict(os.environ)
env["HADOOP_CONF_DIR"] = "/env/conf"
with patch.dict(os.environ, env, clear=True):
self._make("hdfs://ns1/foo", {"hdfs.conf-dir": "/opt/conf"})
_, kwargs = self._client_cls.call_args
self.assertEqual(kwargs.get("config_dir"), "/opt/conf")
@patch("pypaimon.filesystem._kerberos.subprocess.run")
def test_kerberos_principal_keytab_triggers_kinit(self, mock_kinit):
mock_kinit.return_value = MagicMock()
with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
with patch.dict(os.environ, {"KRB5CCNAME": "/tmp/kc_test"}):
self._make("hdfs://ns1/foo", {
"security.kerberos.login.principal": "user@REALM",
"security.kerberos.login.keytab": keytab_file.name,
})
kinit_calls = [
c for c in mock_kinit.call_args_list
if c[0][0][0] == "kinit"
]
self.assertEqual(len(kinit_calls), 1)
self.assertEqual(
kinit_calls[0][0][0],
["kinit", "-kt", keytab_file.name, "user@REALM"],
)
def test_principal_without_keytab_raises(self):
with self.assertRaises(ValueError) as ctx:
self._make("hdfs://ns1/foo", {
"security.kerberos.login.principal": "user@REALM",
})
self.assertIn("must be both set or both unset", str(ctx.exception))
@patch("pypaimon.filesystem._kerberos.subprocess.run")
def test_kerberos_preserves_FILE_prefix_on_krb5ccname(self, mock_kinit):
# If KRB5CCNAME had a `FILE:` qualifier, the rewrite after kinit
# must keep it so GSSAPI cache-type detection isn't perturbed.
mock_kinit.return_value = MagicMock()
with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
with patch.dict(os.environ,
{"KRB5CCNAME": "FILE:/tmp/kc_test"},
clear=True):
self._make("hdfs://ns1/foo", {
"security.kerberos.login.principal": "user@REALM",
"security.kerberos.login.keytab": keytab_file.name,
})
self.assertEqual(
os.environ["KRB5CCNAME"], "FILE:/tmp/kc_test")
@patch("pypaimon.filesystem._kerberos.get_ticket_cache_path",
return_value="/tmp/freshly_kinit_cache")
@patch("pypaimon.filesystem._kerberos.subprocess.run")
def test_kerberos_warns_when_overwriting_different_cache(
self, mock_kinit, _mock_cache,
):
# Multi-principal in the same process clobbers KRB5CCNAME; we warn
# so the operator sees the race instead of silently mis-routing
# earlier instances' RPCs.
mock_kinit.return_value = MagicMock()
with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
with patch.dict(os.environ,
{"KRB5CCNAME": "/tmp/some_other_cache"},
clear=True):
with self.assertLogs(
"pypaimon.filesystem.hdfs_native_file_io",
level="WARNING",
) as log_ctx:
self._make("hdfs://ns1/foo", {
"security.kerberos.login.principal": "user@REALM",
"security.kerberos.login.keytab": keytab_file.name,
})
self.assertTrue(
any("Overwriting process-global KRB5CCNAME" in m
for m in log_ctx.output),
log_ctx.output,
)
@patch("pypaimon.filesystem._kerberos.get_ticket_cache_path",
return_value="/tmp/kc_test")
@patch("pypaimon.filesystem._kerberos.subprocess.run")
def test_kerberos_no_warning_when_cache_unchanged(
self, mock_kinit, _mock_cache,
):
import logging as _logging
mock_kinit.return_value = MagicMock()
with tempfile.NamedTemporaryFile(suffix=".keytab") as keytab_file:
# Pre-existing value matches the kinit-resolved cache → no warn.
with patch.dict(os.environ,
{"KRB5CCNAME": "/tmp/kc_test"},
clear=True):
# assertNoLogs is 3.10+; patch warning explicitly so older
# interpreters keep working too.
logger = _logging.getLogger(
"pypaimon.filesystem.hdfs_native_file_io")
with patch.object(logger, "warning") as warn:
self._make("hdfs://ns1/foo", {
"security.kerberos.login.principal": "user@REALM",
"security.kerberos.login.keytab": keytab_file.name,
})
warn.assert_not_called()
def test_unsupported_scheme_raises(self):
with self.assertRaises(ValueError):
self._make("s3://bucket/key")
class HdfsNativeFileIOOpsTest(unittest.TestCase):
def setUp(self):
self._fake, self._client_cls, self._wo_cls = _install_fake_hdfs_native()
self._mock_client = MagicMock(name="ClientInstance")
self._client_cls.return_value = self._mock_client
from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
self.fio = HdfsNativeFileIO("hdfs://ns/", Options({}))
def tearDown(self):
_uninstall_fake_hdfs_native()
def _file_status(self, path, isdir=False, length=0, mtime=0):
s = MagicMock()
s.path = path
s.isdir = isdir
s.length = length
s.modification_time = mtime
return s
def test_exists_true(self):
self._mock_client.get_file_info.return_value = self._file_status("/x")
self.assertTrue(self.fio.exists("/x"))
def test_exists_false(self):
self._mock_client.get_file_info.side_effect = FileNotFoundError("nope")
self.assertFalse(self.fio.exists("/missing"))
def test_get_file_status_adapts_to_pafs_filetype(self):
self._mock_client.get_file_info.return_value = self._file_status(
"/x", isdir=False, length=42, mtime=1700000000000,
)
info = self.fio.get_file_status("/x")
self.assertEqual(info.path, "/x")
self.assertEqual(info.size, 42)
self.assertEqual(info.type, pafs.FileType.File)
self.assertIsNotNone(info.mtime)
def test_list_status(self):
self._mock_client.list_status.return_value = iter([
self._file_status("/x/a", isdir=False, length=10),
self._file_status("/x/b", isdir=True),
])
infos = self.fio.list_status("/x")
self.assertEqual(len(infos), 2)
self.assertEqual(infos[0].type, pafs.FileType.File)
self.assertEqual(infos[1].type, pafs.FileType.Directory)
self.assertIsNone(infos[1].size)
def test_delete_missing_returns_false(self):
self._mock_client.get_file_info.side_effect = FileNotFoundError("nope")
self.assertFalse(self.fio.delete("/missing"))
self._mock_client.delete.assert_not_called()
def test_delete_file(self):
self._mock_client.get_file_info.return_value = self._file_status("/x")
self._mock_client.delete.return_value = True
self.assertTrue(self.fio.delete("/x"))
self._mock_client.delete.assert_called_once_with("/x", False)
def test_delete_nonempty_dir_without_recursive_raises(self):
self._mock_client.get_file_info.return_value = self._file_status(
"/x", isdir=True)
self._mock_client.list_status.return_value = iter([
self._file_status("/x/a")])
with self.assertRaises(OSError):
self.fio.delete("/x", recursive=False)
def test_mkdirs_creates_when_missing(self):
self._mock_client.get_file_info.side_effect = FileNotFoundError("nope")
self.assertTrue(self.fio.mkdirs("/new"))
self._mock_client.mkdirs.assert_called_once_with(
"/new", create_parent=True)
def test_mkdirs_idempotent_on_existing_dir(self):
self._mock_client.get_file_info.return_value = self._file_status(
"/x", isdir=True)
self.assertTrue(self.fio.mkdirs("/x"))
self._mock_client.mkdirs.assert_not_called()
def test_mkdirs_existing_file_raises(self):
self._mock_client.get_file_info.return_value = self._file_status(
"/x", isdir=False)
with self.assertRaises(FileExistsError):
self.fio.mkdirs("/x")
class HdfsNativeAdaptersTest(unittest.TestCase):
def setUp(self):
_install_fake_hdfs_native()
def tearDown(self):
_uninstall_fake_hdfs_native()
def test_writer_adapter_tracks_position_and_closes_once(self):
from pypaimon.filesystem.hdfs_native_file_io import _HdfsWriterAdapter
fw = MagicMock()
fw.write.side_effect = lambda buf: len(buf)
adapter = _HdfsWriterAdapter(fw)
adapter.write(b"abc")
adapter.write(b"defg")
self.assertEqual(adapter.tell(), 7)
adapter.close()
adapter.close() # idempotent
fw.close.assert_called_once()
def test_reader_adapter_seek_and_read(self):
from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter
fr = MagicMock()
fr.tell.side_effect = [20, 30]
fr.read.return_value = b"x" * 10
adapter = _HdfsReaderAdapter(fr)
self.assertEqual(adapter.seek(20), 20)
fr.seek.assert_called_once_with(20, 0)
data = adapter.read(10)
self.assertEqual(data, b"x" * 10)
fr.read.assert_called_once_with(10)
self.assertEqual(adapter.tell(), 30)
def test_reader_adapter_read_negative_reads_all(self):
from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter
fr = MagicMock()
fr.read.return_value = b"all-content"
adapter = _HdfsReaderAdapter(fr)
self.assertEqual(adapter.read(), b"all-content")
fr.read.assert_called_once_with(-1)
def test_reader_adapter_close_releases_underlying(self):
from pypaimon.filesystem.hdfs_native_file_io import _HdfsReaderAdapter
fr = MagicMock()
adapter = _HdfsReaderAdapter(fr)
adapter.close()
adapter.close() # idempotent
fr.close.assert_called_once()
self.assertTrue(adapter.closed)
def _write_hadoop_xml(path, entries):
"""Write a minimal Hadoop-style xml file at `path`."""
body = ['<?xml version="1.0"?>', "<configuration>"]
for name, value in entries.items():
body.append(
f" <property><name>{name}</name><value>{value}</value></property>"
)
body.append("</configuration>")
with open(path, "w") as f:
f.write("\n".join(body))
class ViewFsFallbackTest(unittest.TestCase):
"""Cover _load_hadoop_xml + _maybe_inject_viewfs_fallback polyfill."""
def setUp(self):
_install_fake_hdfs_native()
from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
self.Fio = HdfsNativeFileIO
def tearDown(self):
_uninstall_fake_hdfs_native()
def test_load_hadoop_xml_merges_two_files(self):
with tempfile.TemporaryDirectory() as d:
_write_hadoop_xml(os.path.join(d, "core-site.xml"),
{"fs.defaultFS": "viewfs://c1"})
_write_hadoop_xml(os.path.join(d, "hdfs-site.xml"),
{"dfs.nameservices": "ns1"})
cfg = self.Fio._load_hadoop_xml(d)
self.assertEqual(cfg.get("fs.defaultFS"), "viewfs://c1")
self.assertEqual(cfg.get("dfs.nameservices"), "ns1")
def test_load_hadoop_xml_missing_dir_returns_empty(self):
self.assertEqual(self.Fio._load_hadoop_xml(None), {})
self.assertEqual(self.Fio._load_hadoop_xml("/no/such/dir/xyz"), {})
def test_load_hadoop_xml_malformed_file_skipped(self):
with tempfile.TemporaryDirectory() as d:
with open(os.path.join(d, "core-site.xml"), "w") as f:
f.write("<not really xml")
_write_hadoop_xml(os.path.join(d, "hdfs-site.xml"),
{"dfs.nameservices": "ns1"})
cfg = self.Fio._load_hadoop_xml(d)
self.assertEqual(cfg, {"dfs.nameservices": "ns1"})
def test_fallback_from_existing_link_target(self):
overrides = {}
xml = {
"fs.viewfs.mounttable.c1.link./home": "hdfs://ns-prod/home",
"fs.viewfs.mounttable.c1.link./tmp": "hdfs://ns-tmp/tmp",
}
self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
self.assertEqual(
overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
"hdfs://ns-prod/",
)
def test_fallback_from_nameservices_when_no_links(self):
overrides = {}
xml = {"dfs.nameservices": "nsA,nsB"}
self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
self.assertEqual(
overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
"hdfs://nsA/",
)
def test_fallback_from_link_in_overrides_only(self):
# Zero-file viewfs setup: link.* arrives via catalog options
# (overrides), no hadoop xml present. The fallback must still be
# derived from the merged view.
overrides = {
"fs.viewfs.mounttable.c1.link./home": "hdfs://ns-prod/home",
}
xml = {}
self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
self.assertEqual(
overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
"hdfs://ns-prod/",
)
def test_fallback_from_nameservices_in_overrides_only(self):
overrides = {"dfs.nameservices": "nsA,nsB"}
xml = {}
self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
self.assertEqual(
overrides.get("fs.viewfs.mounttable.c1.linkFallback"),
"hdfs://nsA/",
)
def test_fallback_already_in_xml_not_overridden(self):
overrides = {}
xml = {
"fs.viewfs.mounttable.c1.linkFallback": "hdfs://ns-existing/",
"dfs.nameservices": "nsA",
}
self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
self.assertNotIn("fs.viewfs.mounttable.c1.linkFallback", overrides)
def test_fallback_already_in_overrides_kept(self):
overrides = {"fs.viewfs.mounttable.c1.linkFallback": "hdfs://manual/"}
xml = {"dfs.nameservices": "nsA"}
self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
self.assertEqual(
overrides["fs.viewfs.mounttable.c1.linkFallback"],
"hdfs://manual/",
)
def test_hdfs_scheme_does_not_inject(self):
overrides = {}
xml = {"dfs.nameservices": "nsA"}
self.Fio._maybe_inject_viewfs_fallback("hdfs", "ns1", overrides, xml)
self.assertEqual(overrides, {})
def test_no_signal_no_inject(self):
overrides = {}
xml = {"unrelated.key": "v"}
self.Fio._maybe_inject_viewfs_fallback("viewfs", "c1", overrides, xml)
self.assertEqual(overrides, {})
def test_init_auto_injects_for_viewfs_uri(self):
with tempfile.TemporaryDirectory() as d:
_write_hadoop_xml(
os.path.join(d, "hdfs-site.xml"),
{
"dfs.nameservices": "ns1",
"fs.viewfs.mounttable.hadoop-lt-cluster.link./home":
"hdfs://ns1/home",
},
)
opts = Options({"hdfs.conf-dir": d})
self.Fio("viewfs://hadoop-lt-cluster/home/x", opts)
client_cls = sys.modules["hdfs_native"].Client
_, kwargs = client_cls.call_args
config = kwargs.get("config", {})
self.assertEqual(
config.get("fs.viewfs.mounttable.hadoop-lt-cluster.linkFallback"),
"hdfs://ns1/",
)
class ToFilesystemPathTest(unittest.TestCase):
"""Cover URI -> absolute-path normalisation for hdfs-native."""
def setUp(self):
_install_fake_hdfs_native()
def tearDown(self):
_uninstall_fake_hdfs_native()
def _make(self, root):
from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
return HdfsNativeFileIO(root, Options({}))
def test_viewfs_uri_same_cluster_returns_path(self):
fio = self._make("viewfs://cluster1/")
self.assertEqual(
fio.to_filesystem_path("viewfs://cluster1/home/hudi/x"),
"/home/hudi/x",
)
def test_viewfs_uri_no_path_returns_root(self):
fio = self._make("viewfs://cluster1/")
self.assertEqual(fio.to_filesystem_path("viewfs://cluster1"), "/")
def test_viewfs_absolute_path_unchanged(self):
fio = self._make("viewfs://cluster1/")
self.assertEqual(fio.to_filesystem_path("/foo/bar"), "/foo/bar")
def test_hdfs_uri_same_ns_returns_path(self):
fio = self._make("hdfs://ns1/")
self.assertEqual(
fio.to_filesystem_path("hdfs://ns1/foo/bar"),
"/foo/bar",
)
def test_hdfs_uri_different_ns_unchanged(self):
fio = self._make("hdfs://ns1/")
self.assertEqual(
fio.to_filesystem_path("hdfs://nsX/foo"),
"hdfs://nsX/foo",
)
def test_hdfs_client_with_viewfs_uri_unchanged(self):
fio = self._make("hdfs://ns1/")
# Different scheme; let hdfs-native error rather than silently rewrite.
self.assertEqual(
fio.to_filesystem_path("viewfs://cluster1/foo"),
"viewfs://cluster1/foo",
)
def test_exists_passes_path_only_to_client(self):
from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
fio = HdfsNativeFileIO("viewfs://cluster1/", Options({}))
client = sys.modules["hdfs_native"].Client.return_value
client.get_file_info.return_value = MagicMock(
path="/home/hudi/x", isdir=False, length=0, modification_time=0)
fio.exists("viewfs://cluster1/home/hudi/x")
client.get_file_info.assert_called_once_with("/home/hudi/x")
class FilesystemPropertyTest(unittest.TestCase):
"""Cover the lazy pyarrow.fs facade backed by hdfs_native.fsspec."""
def setUp(self):
_install_fake_hdfs_native()
self._patcher = patch("pyarrow.fs.PyFileSystem")
self._handler_patcher = patch("pyarrow.fs.FSSpecHandler")
self.MockPyFs = self._patcher.start()
self.MockHandler = self._handler_patcher.start()
def tearDown(self):
self._patcher.stop()
self._handler_patcher.stop()
_uninstall_fake_hdfs_native()
def _make(self, root, props=None, xml_entries=None):
from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
if xml_entries:
d = tempfile.mkdtemp()
self.addCleanup(lambda: __import__("shutil").rmtree(d, ignore_errors=True))
_write_hadoop_xml(os.path.join(d, "hdfs-site.xml"), xml_entries)
base_props = {"hdfs.conf-dir": d}
base_props.update(props or {})
props = base_props
return HdfsNativeFileIO(root, Options(props or {}))
def test_viewfs_uses_viewfs_fsspec_class(self):
fio = self._make("viewfs://cluster1/")
fs_instance = fio.filesystem # trigger lazy
VFs = sys.modules["hdfs_native.fsspec"].ViewfsFileSystem
HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
VFs.assert_called_once()
HFs.assert_not_called()
_, kwargs = VFs.call_args
self.assertEqual(kwargs.get("host"), "cluster1")
self.assertIs(fs_instance, self.MockPyFs.return_value)
def test_hdfs_uses_hdfs_fsspec_class(self):
self._make("hdfs://ns1/").filesystem
HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
VFs = sys.modules["hdfs_native.fsspec"].ViewfsFileSystem
HFs.assert_called_once()
VFs.assert_not_called()
_, kwargs = HFs.call_args
self.assertEqual(kwargs.get("host"), "ns1")
def test_lazy_caches_after_first_access(self):
fio = self._make("hdfs://ns1/")
first = fio.filesystem
second = fio.filesystem
self.assertIs(first, second)
HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
self.assertEqual(HFs.call_count, 1)
def test_xml_and_catalog_options_merged_into_fsspec_storage_options(self):
fio = self._make(
"hdfs://ns1/",
props={"dfs.client.read.shortcircuit": "true"},
xml_entries={"dfs.nameservices": "ns1"},
)
fio.filesystem # trigger
HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
_, kwargs = HFs.call_args
# Both xml and option keys should land in the fsspec kwargs.
self.assertEqual(kwargs.get("dfs.nameservices"), "ns1")
self.assertEqual(kwargs.get("dfs.client.read.shortcircuit"), "true")
def test_catalog_option_overrides_xml(self):
fio = self._make(
"hdfs://ns1/",
props={"dfs.foo": "v_user"},
xml_entries={"dfs.foo": "v_xml"},
)
fio.filesystem
HFs = sys.modules["hdfs_native.fsspec"].HdfsFileSystem
_, kwargs = HFs.call_args
self.assertEqual(kwargs.get("dfs.foo"), "v_user")
def test_missing_fsspec_raises_clear_error(self):
fio = self._make("hdfs://ns1/")
# Remove the fsspec submodule but keep hdfs_native itself, to
# simulate an old/partial install.
sys.modules.pop("hdfs_native.fsspec", None)
sys.modules["hdfs_native"].fsspec = None
with self.assertRaises(RuntimeError) as ctx:
fio.filesystem
self.assertIn("hdfs-native fsspec adapter", str(ctx.exception))
class PickleTest(unittest.TestCase):
"""Cover __reduce__ so Ray / multiprocessing can ship FileIO."""
def setUp(self):
_install_fake_hdfs_native()
# Isolate from any HADOOP_CONF_DIR on the host so __reduce__'s
# env-derived config_dir pinning is deterministic across machines.
self._env_patcher = patch.dict(os.environ, {}, clear=True)
self._env_patcher.start()
def tearDown(self):
self._env_patcher.stop()
_uninstall_fake_hdfs_native()
def _make(self, path, props=None):
from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
return HdfsNativeFileIO(path, Options(props or {}))
def test_reduce_returns_class_and_args(self):
from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
fio = self._make("viewfs://cluster1/some/sub/path",
{"dfs.nameservices": "ns1"})
cls, args = fio.__reduce__()
self.assertIs(cls, HdfsNativeFileIO)
path, options = args
# Path is rebuilt from scheme+netloc (path segment dropped) — that
# is intentional because __init__ ignores path beyond scheme+netloc.
self.assertEqual(path, "viewfs://cluster1")
self.assertEqual(options.to_map(), {"dfs.nameservices": "ns1"})
def test_reduce_for_empty_netloc(self):
fio = self._make("hdfs://")
_, (path, _) = fio.__reduce__()
self.assertEqual(path, "hdfs://")
def test_reduce_pins_env_resolved_config_dir_into_options(self):
# config_dir resolved from $HADOOP_CONF_DIR should be carried into
# the pickled options so a worker on a host with a different env
# value still uses the driver's resolved directory.
with tempfile.TemporaryDirectory() as d:
with patch.dict(os.environ, {"HADOOP_CONF_DIR": d}, clear=True):
fio = self._make("hdfs://ns1/foo")
_, (_, options) = fio.__reduce__()
self.assertEqual(options.to_map().get("hdfs.conf-dir"), d)
def test_reduce_does_not_override_explicit_conf_dir_option(self):
with tempfile.TemporaryDirectory() as opt_dir:
with patch.dict(os.environ,
{"HADOOP_CONF_DIR": "/env/dir"}, clear=True):
fio = self._make("hdfs://ns1/foo",
{"hdfs.conf-dir": opt_dir})
_, (_, options) = fio.__reduce__()
self.assertEqual(options.to_map().get("hdfs.conf-dir"), opt_dir)
def test_pickle_roundtrip_preserves_type_and_options(self):
import pickle
fio = self._make("hdfs://ns1/foo",
{"dfs.foo": "bar", "fs.viewfs.x": "y"})
client_cls = sys.modules["hdfs_native"].Client
client_cls.reset_mock()
# Roundtrip via the highest pickle protocol.
blob = pickle.dumps(fio, protocol=pickle.HIGHEST_PROTOCOL)
restored = pickle.loads(blob)
from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
self.assertIsInstance(restored, HdfsNativeFileIO)
self.assertEqual(restored.properties.to_map(),
{"dfs.foo": "bar", "fs.viewfs.x": "y"})
# The original __init__ ran once; the unpickle ran __init__ again.
self.assertEqual(client_cls.call_count, 1)
def test_pickle_with_viewfs_scheme(self):
import pickle
fio = self._make("viewfs://cluster1/")
restored = pickle.loads(pickle.dumps(fio))
self.assertEqual(restored._scheme, "viewfs")
self.assertEqual(restored._netloc, "cluster1")
def test_pickle_does_not_serialise_live_client(self):
# If the live _client were pickled, the call would fail (MagicMocks
# are picklable but the real RawClient would not be). This test
# documents the contract: __reduce__ MUST sidestep _client.
import pickle
fio = self._make("hdfs://ns1/")
blob = pickle.dumps(fio)
# The pickled blob should reference the constructor inputs only;
# specifically it should not embed the literal mock _client.
self.assertNotIn(b"_client", blob)
class HdfsNativeWriteFormatTest(unittest.TestCase):
"""HdfsNativeFileIO is the default hdfs:// backend, so it must keep the
same write surface as the pyarrow backend it replaces — including
lance/vortex, which otherwise fall through to FileIO's NotImplementedError.
"""
def setUp(self):
self._fake, self._client_cls, _ = _install_fake_hdfs_native()
self._client_cls.return_value = MagicMock(name="ClientInstance")
from pypaimon.filesystem.hdfs_native_file_io import HdfsNativeFileIO
self.fio = HdfsNativeFileIO("hdfs://ns/", Options({}))
def tearDown(self):
_uninstall_fake_hdfs_native()
def test_lance_and_vortex_are_overridden(self):
# The regression this guards: with these unimplemented, an HDFS table
# using file.format=lance/vortex would hit FileIO's NotImplementedError.
from pypaimon.common.file_io import FileIO
self.assertIsNot(
type(self.fio).write_lance, FileIO.write_lance)
self.assertIsNot(
type(self.fio).write_vortex, FileIO.write_vortex)
def test_write_lance_delegates_to_lance_specified(self):
import pyarrow
table = pyarrow.table({"a": [1, 2]})
writer = MagicMock(name="LanceFileWriter")
fake_lance = types.ModuleType("lance")
fake_lance.file = types.SimpleNamespace(
LanceFileWriter=MagicMock(return_value=writer))
with patch.dict(sys.modules, {"lance": fake_lance}), \
patch("pypaimon.read.reader.lance_utils.to_lance_specified",
return_value=("hdfs://ns/x.lance", {"opt": "v"})) as spec:
self.fio.write_lance("hdfs://ns/x.lance", table)
spec.assert_called_once()
_, kwargs = fake_lance.file.LanceFileWriter.call_args
self.assertEqual(kwargs.get("storage_options"), {"opt": "v"})
writer.close.assert_called_once()
def test_write_vortex_delegates_to_vortex_specified(self):
import pyarrow
table = pyarrow.table({"a": [1, 2]})
fake_vortex = types.ModuleType("vortex")
fake_vortex.array = MagicMock(return_value="varr")
fake_vortex.store = types.SimpleNamespace(from_url=MagicMock())
fake_io = types.ModuleType("vortex._lib.io")
fake_io.write = MagicMock()
fake_lib = types.ModuleType("vortex._lib")
fake_lib.io = fake_io
fake_modules = {
"vortex": fake_vortex,
"vortex._lib": fake_lib,
"vortex._lib.io": fake_io,
}
with patch.dict(sys.modules, fake_modules), \
patch("pypaimon.read.reader.vortex_utils.to_vortex_specified",
return_value=("hdfs://ns/x.vortex", None)):
self.fio.write_vortex("hdfs://ns/x.vortex", table)
fake_io.write.assert_called_once_with("varr", "hdfs://ns/x.vortex")
if __name__ == "__main__":
unittest.main()