| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # ---------------------------------------------------------------------- |
| # HDFS IO implementation |
| |
| _HDFS_PATH_RE = re.compile(r'hdfs://(.*):(\d+)(.*)') |
| |
| try: |
| # Python 3 |
| from queue import Queue, Empty as QueueEmpty, Full as QueueFull |
| except ImportError: |
| from Queue import Queue, Empty as QueueEmpty, Full as QueueFull |
| |
| |
| def have_libhdfs(): |
| try: |
| with nogil: |
| check_status(HaveLibHdfs()) |
| return True |
| except Exception: |
| return False |
| |
| |
| def have_libhdfs3(): |
| try: |
| with nogil: |
| check_status(HaveLibHdfs3()) |
| return True |
| except Exception: |
| return False |
| |
| |
| def strip_hdfs_abspath(path): |
| m = _HDFS_PATH_RE.match(path) |
| if m: |
| return m.group(3) |
| else: |
| return path |
| |
| |
| cdef class HadoopFileSystem: |
| cdef: |
| shared_ptr[CHadoopFileSystem] client |
| |
| cdef readonly: |
| bint is_open |
| object host |
| object user |
| object kerb_ticket |
| object driver |
| int port |
| dict extra_conf |
| |
| def _connect(self, host, port, user, kerb_ticket, driver, extra_conf): |
| cdef HdfsConnectionConfig conf |
| |
| if host is not None: |
| conf.host = tobytes(host) |
| self.host = host |
| |
| conf.port = port |
| self.port = port |
| |
| if user is not None: |
| conf.user = tobytes(user) |
| self.user = user |
| |
| if kerb_ticket is not None: |
| conf.kerb_ticket = tobytes(kerb_ticket) |
| self.kerb_ticket = kerb_ticket |
| |
| if driver == 'libhdfs': |
| with nogil: |
| check_status(HaveLibHdfs()) |
| conf.driver = HdfsDriver_LIBHDFS |
| elif driver == 'libhdfs3': |
| with nogil: |
| check_status(HaveLibHdfs3()) |
| conf.driver = HdfsDriver_LIBHDFS3 |
| else: |
| raise ValueError("unknown driver: %r" % driver) |
| self.driver = driver |
| |
| if extra_conf is not None and isinstance(extra_conf, dict): |
| conf.extra_conf = {tobytes(k): tobytes(v) |
| for k, v in extra_conf.items()} |
| self.extra_conf = extra_conf |
| |
| with nogil: |
| check_status(CHadoopFileSystem.Connect(&conf, &self.client)) |
| self.is_open = True |
| |
| @classmethod |
| def connect(cls, *args, **kwargs): |
| return cls(*args, **kwargs) |
| |
| def __dealloc__(self): |
| if self.is_open: |
| self.close() |
| |
| def close(self): |
| """ |
| Disconnect from the HDFS cluster |
| """ |
| self._ensure_client() |
| with nogil: |
| check_status(self.client.get().Disconnect()) |
| self.is_open = False |
| |
| cdef _ensure_client(self): |
| if self.client.get() == NULL: |
| raise IOError('HDFS client improperly initialized') |
| elif not self.is_open: |
| raise IOError('HDFS client is closed') |
| |
| def exists(self, path): |
| """ |
| Returns True if the path is known to the cluster, False if it does not |
| (or there is an RPC error) |
| """ |
| self._ensure_client() |
| |
| cdef c_string c_path = tobytes(path) |
| cdef c_bool result |
| with nogil: |
| result = self.client.get().Exists(c_path) |
| return result |
| |
| def isdir(self, path): |
| cdef HdfsPathInfo info |
| try: |
| self._path_info(path, &info) |
| except ArrowIOError: |
| return False |
| return info.kind == ObjectType_DIRECTORY |
| |
| def isfile(self, path): |
| cdef HdfsPathInfo info |
| try: |
| self._path_info(path, &info) |
| except ArrowIOError: |
| return False |
| return info.kind == ObjectType_FILE |
| |
| def get_capacity(self): |
| """ |
| Get reported total capacity of file system |
| |
| Returns |
| ------- |
| capacity : int |
| """ |
| cdef int64_t capacity = 0 |
| with nogil: |
| check_status(self.client.get().GetCapacity(&capacity)) |
| return capacity |
| |
| def get_space_used(self): |
| """ |
| Get space used on file system |
| |
| Returns |
| ------- |
| space_used : int |
| """ |
| cdef int64_t space_used = 0 |
| with nogil: |
| check_status(self.client.get().GetUsed(&space_used)) |
| return space_used |
| |
| def df(self): |
| """ |
| Return free space on disk, like the UNIX df command |
| |
| Returns |
| ------- |
| space : int |
| """ |
| return self.get_capacity() - self.get_space_used() |
| |
| def rename(self, path, new_path): |
| cdef c_string c_path = tobytes(path) |
| cdef c_string c_new_path = tobytes(new_path) |
| with nogil: |
| check_status(self.client.get().Rename(c_path, c_new_path)) |
| |
| def info(self, path): |
| """ |
| Return detailed HDFS information for path |
| |
| Parameters |
| ---------- |
| path : string |
| Path to file or directory |
| |
| Returns |
| ------- |
| path_info : dict |
| """ |
| cdef HdfsPathInfo info |
| self._path_info(path, &info) |
| return { |
| 'path': frombytes(info.name), |
| 'owner': frombytes(info.owner), |
| 'group': frombytes(info.group), |
| 'size': info.size, |
| 'block_size': info.block_size, |
| 'last_modified': info.last_modified_time, |
| 'last_accessed': info.last_access_time, |
| 'replication': info.replication, |
| 'permissions': info.permissions, |
| 'kind': ('directory' if info.kind == ObjectType_DIRECTORY |
| else 'file') |
| } |
| |
| def stat(self, path): |
| """ |
| Return basic file system statistics about path |
| |
| Parameters |
| ---------- |
| path : string |
| Path to file or directory |
| |
| Returns |
| ------- |
| stat : dict |
| """ |
| cdef FileStatistics info |
| cdef c_string c_path = tobytes(path) |
| with nogil: |
| check_status(self.client.get() |
| .Stat(c_path, &info)) |
| return { |
| 'size': info.size, |
| 'kind': ('directory' if info.kind == ObjectType_DIRECTORY |
| else 'file') |
| } |
| |
| cdef _path_info(self, path, HdfsPathInfo* info): |
| cdef c_string c_path = tobytes(path) |
| |
| with nogil: |
| check_status(self.client.get() |
| .GetPathInfo(c_path, info)) |
| |
| def ls(self, path, bint full_info): |
| cdef: |
| c_string c_path = tobytes(path) |
| vector[HdfsPathInfo] listing |
| list results = [] |
| int i |
| |
| self._ensure_client() |
| |
| with nogil: |
| check_status(self.client.get() |
| .ListDirectory(c_path, &listing)) |
| |
| cdef const HdfsPathInfo* info |
| for i in range(<int> listing.size()): |
| info = &listing[i] |
| |
| # Try to trim off the hdfs://HOST:PORT piece |
| name = strip_hdfs_abspath(frombytes(info.name)) |
| |
| if full_info: |
| kind = ('file' if info.kind == ObjectType_FILE |
| else 'directory') |
| |
| results.append({ |
| 'kind': kind, |
| 'name': name, |
| 'owner': frombytes(info.owner), |
| 'group': frombytes(info.group), |
| 'last_modified_time': info.last_modified_time, |
| 'last_access_time': info.last_access_time, |
| 'size': info.size, |
| 'replication': info.replication, |
| 'block_size': info.block_size, |
| 'permissions': info.permissions |
| }) |
| else: |
| results.append(name) |
| |
| return results |
| |
| def chmod(self, path, mode): |
| """ |
| Change file permissions |
| |
| Parameters |
| ---------- |
| path : string |
| absolute path to file or directory |
| mode : int |
| POSIX-like bitmask |
| """ |
| self._ensure_client() |
| cdef c_string c_path = tobytes(path) |
| cdef int c_mode = mode |
| with nogil: |
| check_status(self.client.get() |
| .Chmod(c_path, c_mode)) |
| |
| def chown(self, path, owner=None, group=None): |
| """ |
| Change file permissions |
| |
| Parameters |
| ---------- |
| path : string |
| absolute path to file or directory |
| owner : string, default None |
| New owner, None for no change |
| group : string, default None |
| New group, None for no change |
| """ |
| cdef: |
| c_string c_path |
| c_string c_owner |
| c_string c_group |
| const char* c_owner_ptr = NULL |
| const char* c_group_ptr = NULL |
| |
| self._ensure_client() |
| |
| c_path = tobytes(path) |
| if owner is not None: |
| c_owner = tobytes(owner) |
| c_owner_ptr = c_owner.c_str() |
| |
| if group is not None: |
| c_group = tobytes(group) |
| c_group_ptr = c_group.c_str() |
| |
| with nogil: |
| check_status(self.client.get() |
| .Chown(c_path, c_owner_ptr, c_group_ptr)) |
| |
| def mkdir(self, path): |
| """ |
| Create indicated directory and any necessary parent directories |
| """ |
| self._ensure_client() |
| cdef c_string c_path = tobytes(path) |
| with nogil: |
| check_status(self.client.get() |
| .MakeDirectory(c_path)) |
| |
| def delete(self, path, bint recursive=False): |
| """ |
| Delete the indicated file or directory |
| |
| Parameters |
| ---------- |
| path : string |
| recursive : boolean, default False |
| If True, also delete child paths for directories |
| """ |
| self._ensure_client() |
| |
| cdef c_string c_path = tobytes(path) |
| with nogil: |
| check_status(self.client.get() |
| .Delete(c_path, recursive == 1)) |
| |
| def open(self, path, mode='rb', buffer_size=None, replication=None, |
| default_block_size=None): |
| """ |
| Open HDFS file for reading or writing |
| |
| Parameters |
| ---------- |
| mode : string |
| Must be one of 'rb', 'wb', 'ab' |
| |
| Returns |
| ------- |
| handle : HdfsFile |
| """ |
| self._ensure_client() |
| |
| cdef HdfsFile out = HdfsFile() |
| |
| if mode not in ('rb', 'wb', 'ab'): |
| raise Exception("Mode must be 'rb' (read), " |
| "'wb' (write, new file), or 'ab' (append)") |
| |
| cdef c_string c_path = tobytes(path) |
| cdef c_bool append = False |
| |
| # 0 in libhdfs means "use the default" |
| cdef int32_t c_buffer_size = buffer_size or 0 |
| cdef int16_t c_replication = replication or 0 |
| cdef int64_t c_default_block_size = default_block_size or 0 |
| |
| cdef shared_ptr[HdfsOutputStream] wr_handle |
| cdef shared_ptr[HdfsReadableFile] rd_handle |
| |
| if mode in ('wb', 'ab'): |
| if mode == 'ab': |
| append = True |
| |
| with nogil: |
| check_status( |
| self.client.get() |
| .OpenWritable(c_path, append, c_buffer_size, |
| c_replication, c_default_block_size, |
| &wr_handle)) |
| |
| out.set_output_stream(<shared_ptr[OutputStream]> wr_handle) |
| out.is_writable = True |
| else: |
| with nogil: |
| check_status(self.client.get() |
| .OpenReadable(c_path, &rd_handle)) |
| |
| out.set_random_access_file( |
| <shared_ptr[RandomAccessFile]> rd_handle) |
| out.is_readable = True |
| |
| assert not out.closed |
| |
| if c_buffer_size == 0: |
| c_buffer_size = 2 ** 16 |
| |
| out.mode = mode |
| out.buffer_size = c_buffer_size |
| out.parent = _HdfsFileNanny(self, out) |
| out.own_file = True |
| |
| return out |
| |
| def download(self, path, stream, buffer_size=None): |
| with self.open(path, 'rb') as f: |
| f.download(stream, buffer_size=buffer_size) |
| |
| def upload(self, path, stream, buffer_size=None): |
| """ |
| Upload file-like object to HDFS path |
| """ |
| with self.open(path, 'wb') as f: |
| f.upload(stream, buffer_size=buffer_size) |
| |
| |
| # ARROW-404: Helper class to ensure that files are closed before the |
| # client. During deallocation of the extension class, the attributes are |
| # decref'd which can cause the client to get closed first if the file has the |
| # last remaining reference |
| cdef class _HdfsFileNanny: |
| cdef: |
| object client |
| object file_handle_ref |
| |
| def __cinit__(self, client, file_handle): |
| import weakref |
| self.client = client |
| self.file_handle_ref = weakref.ref(file_handle) |
| |
| def __dealloc__(self): |
| fh = self.file_handle_ref() |
| if fh: |
| fh.close() |
| # avoid cyclic GC |
| self.file_handle_ref = None |
| self.client = None |
| |
| |
| cdef class HdfsFile(NativeFile): |
| cdef readonly: |
| int32_t buffer_size |
| object mode |
| object parent |
| |
| def __dealloc__(self): |
| self.parent = None |