blob: f5cf12fa10991484a688c0400cf00e36748b3786 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
from pyarrow.lib cimport check_status
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_fs cimport *
from pyarrow._fs cimport FileSystem
from pyarrow.lib import frombytes, tobytes
from pyarrow.util import _stringify_path
cdef class HadoopFileSystem(FileSystem):
"""
HDFS backed FileSystem implementation
Parameters
----------
host : str
HDFS host to connect to.
port : int, default 8020
HDFS port to connect to.
replication : int, default 3
Number of copies each block will have.
buffer_size : int, default 0
If 0, no buffering will happen otherwise the size of the temporary read
and write buffer.
default_block_size : int, default None
None means the default configuration for HDFS, a typical block size is
128 MB.
kerb_ticket : string or path, default None
If not None, the path to the Kerberos ticket cache.
"""
cdef:
CHadoopFileSystem* hdfs
def __init__(self, str host, int port=8020, *, str user=None,
int replication=3, int buffer_size=0,
default_block_size=None, kerb_ticket=None,
extra_conf=None):
cdef:
CHdfsOptions options
shared_ptr[CHadoopFileSystem] wrapped
if not host.startswith(('hdfs://', 'viewfs://')):
# TODO(kszucs): do more sanitization
host = 'hdfs://{}'.format(host)
options.ConfigureEndPoint(tobytes(host), int(port))
options.ConfigureReplication(replication)
options.ConfigureBufferSize(buffer_size)
if user is not None:
options.ConfigureUser(tobytes(user))
if default_block_size is not None:
options.ConfigureBlockSize(default_block_size)
if kerb_ticket is not None:
options.ConfigureKerberosTicketCachePath(
tobytes(_stringify_path(kerb_ticket)))
if extra_conf is not None:
for k, v in extra_conf.items():
options.ConfigureExtraConf(tobytes(k), tobytes(v))
with nogil:
wrapped = GetResultValue(CHadoopFileSystem.Make(options))
self.init(<shared_ptr[CFileSystem]> wrapped)
cdef init(self, const shared_ptr[CFileSystem]& wrapped):
FileSystem.init(self, wrapped)
self.hdfs = <CHadoopFileSystem*> wrapped.get()
@staticmethod
def from_uri(uri):
"""
Instantiate HadoopFileSystem object from an URI string.
The following two calls are equivalent
* HadoopFileSystem.from_uri('hdfs://localhost:8020/?user=test'
'&replication=1')
* HadoopFileSystem('localhost', port=8020, user='test', replication=1)
Parameters
----------
uri : str
A string URI describing the connection to HDFS.
In order to change the user, replication, buffer_size or
default_block_size pass the values as query parts.
Returns
-------
HadoopFileSystem
"""
cdef:
HadoopFileSystem self = HadoopFileSystem.__new__(HadoopFileSystem)
shared_ptr[CHadoopFileSystem] wrapped
CHdfsOptions options
options = GetResultValue(CHdfsOptions.FromUriString(tobytes(uri)))
with nogil:
wrapped = GetResultValue(CHadoopFileSystem.Make(options))
self.init(<shared_ptr[CFileSystem]> wrapped)
return self
@classmethod
def _reconstruct(cls, kwargs):
return cls(**kwargs)
def __reduce__(self):
cdef CHdfsOptions opts = self.hdfs.options()
return (
HadoopFileSystem._reconstruct, (dict(
host=frombytes(opts.connection_config.host),
port=opts.connection_config.port,
user=frombytes(opts.connection_config.user),
replication=opts.replication,
buffer_size=opts.buffer_size,
default_block_size=opts.default_block_size,
kerb_ticket=frombytes(opts.connection_config.kerb_ticket),
extra_conf={frombytes(k): frombytes(v)
for k, v in opts.connection_config.extra_conf},
),)
)