| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # Cython wrappers for IO interfaces defined in arrow::io and messaging in |
| # arrow::ipc |
| |
| from libc.stdlib cimport malloc, free |
| from pyarrow.compat import frombytes, tobytes, encode_file_path |
| |
| import re |
| import six |
| import sys |
| import threading |
| import time |
| import warnings |
| |
| |
| # 64K |
| DEFAULT_BUFFER_SIZE = 2 ** 16 |
| |
| |
| # To let us get a PyObject* and avoid Cython auto-ref-counting |
| cdef extern from "Python.h": |
| PyObject* PyBytes_FromStringAndSizeNative" PyBytes_FromStringAndSize"( |
| char *v, Py_ssize_t len) except NULL |
| |
| |
| def _stringify_path(path): |
| """ |
| Convert *path* to a string or unicode path if possible. |
| """ |
| if isinstance(path, six.string_types): |
| return path |
| try: |
| return path.__fspath__() |
| except AttributeError: |
| raise TypeError("not a path-like object") |
| |
| |
| cdef class NativeFile: |
| def __cinit__(self): |
| self.closed = True |
| self.own_file = False |
| self.is_readable = False |
| self.is_writable = False |
| |
| def __dealloc__(self): |
| if self.own_file and not self.closed: |
| self.close() |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_value, tb): |
| self.close() |
| |
| property mode: |
| """ |
| The file mode. Currently instances of NativeFile may support: |
| |
| * rb: binary read |
| * wb: binary write |
| * rb+: binary read and write |
| """ |
| |
| def __get__(self): |
| # Emulate built-in file modes |
| if self.is_readable and self.is_writable: |
| return 'rb+' |
| elif self.is_readable: |
| return 'rb' |
| elif self.is_writable: |
| return 'wb' |
| else: |
| raise ValueError('File object is malformed, has no mode') |
| |
| def readable(self): |
| self._assert_open() |
| return self.is_readable |
| |
| def writable(self): |
| self._assert_open() |
| return self.is_writable |
| |
| def seekable(self): |
| self._assert_open() |
| return self.is_readable |
| |
| def close(self): |
| if not self.closed: |
| with nogil: |
| if self.is_readable: |
| check_status(self.rd_file.get().Close()) |
| else: |
| check_status(self.wr_file.get().Close()) |
| self.closed = True |
| |
| def flush(self): |
| """Flush the buffer stream, if applicable. |
| |
| No-op to match the IOBase interface.""" |
| self._assert_open() |
| |
| cdef read_handle(self, shared_ptr[RandomAccessFile]* file): |
| self._assert_readable() |
| file[0] = <shared_ptr[RandomAccessFile]> self.rd_file |
| |
| cdef write_handle(self, shared_ptr[OutputStream]* file): |
| self._assert_writable() |
| file[0] = <shared_ptr[OutputStream]> self.wr_file |
| |
| def _assert_open(self): |
| if self.closed: |
| raise ValueError("I/O operation on closed file") |
| |
| def _assert_readable(self): |
| self._assert_open() |
| if not self.is_readable: |
| raise IOError("only valid on readonly files") |
| |
| def _assert_writable(self): |
| self._assert_open() |
| if not self.is_writable: |
| raise IOError("only valid on writable files") |
| |
| def size(self): |
| """ |
| Return file size |
| """ |
| cdef int64_t size |
| self._assert_readable() |
| with nogil: |
| check_status(self.rd_file.get().GetSize(&size)) |
| return size |
| |
| def tell(self): |
| """ |
| Return current stream position |
| """ |
| cdef int64_t position |
| self._assert_open() |
| with nogil: |
| if self.is_readable: |
| check_status(self.rd_file.get().Tell(&position)) |
| else: |
| check_status(self.wr_file.get().Tell(&position)) |
| return position |
| |
| def seek(self, int64_t position, int whence=0): |
| """ |
| Change current file stream position |
| |
| Parameters |
| ---------- |
| position : int |
| Byte offset, interpreted relative to value of whence argument |
| whence : int, default 0 |
| Point of reference for seek offset |
| |
| Notes |
| ----- |
| Values of whence: |
| * 0 -- start of stream (the default); offset should be zero or positive |
| * 1 -- current stream position; offset may be negative |
| * 2 -- end of stream; offset is usually negative |
| |
| Returns |
| ------- |
| new_position : the new absolute stream position |
| """ |
| cdef int64_t offset |
| self._assert_readable() |
| |
| with nogil: |
| if whence == 0: |
| offset = position |
| elif whence == 1: |
| check_status(self.rd_file.get().Tell(&offset)) |
| offset = offset + position |
| elif whence == 2: |
| check_status(self.rd_file.get().GetSize(&offset)) |
| offset = offset + position |
| else: |
| with gil: |
| raise ValueError("Invalid value of whence: {0}" |
| .format(whence)) |
| check_status(self.rd_file.get().Seek(offset)) |
| |
| return self.tell() |
| |
| def write(self, data): |
| """ |
| Write byte from any object implementing buffer protocol (bytes, |
| bytearray, ndarray, pyarrow.Buffer) |
| """ |
| self._assert_writable() |
| |
| if isinstance(data, six.string_types): |
| data = tobytes(data) |
| |
| cdef Buffer arrow_buffer = py_buffer(data) |
| |
| cdef const uint8_t* buf = arrow_buffer.buffer.get().data() |
| cdef int64_t bufsize = len(arrow_buffer) |
| with nogil: |
| check_status(self.wr_file.get().Write(buf, bufsize)) |
| |
| def read(self, nbytes=None): |
| """ |
| Read indicated number of bytes from file, or read all remaining bytes |
| if no argument passed |
| |
| Parameters |
| ---------- |
| nbytes : int, default None |
| |
| Returns |
| ------- |
| data : bytes |
| """ |
| cdef: |
| int64_t c_nbytes |
| int64_t bytes_read = 0 |
| PyObject* obj |
| |
| if nbytes is None: |
| c_nbytes = self.size() - self.tell() |
| else: |
| c_nbytes = nbytes |
| |
| self._assert_readable() |
| |
| # Allocate empty write space |
| obj = PyBytes_FromStringAndSizeNative(NULL, c_nbytes) |
| |
| cdef uint8_t* buf = <uint8_t*> cp.PyBytes_AS_STRING(<object> obj) |
| with nogil: |
| check_status(self.rd_file.get().Read(c_nbytes, &bytes_read, buf)) |
| |
| if bytes_read < c_nbytes: |
| cp._PyBytes_Resize(&obj, <Py_ssize_t> bytes_read) |
| |
| return PyObject_to_object(obj) |
| |
| def read1(self, nbytes=None): |
| """Read and return up to n bytes. |
| |
| Alias for read, needed to match the IOBase interface.""" |
| return self.read(nbytes=None) |
| |
| def read_buffer(self, nbytes=None): |
| cdef: |
| int64_t c_nbytes |
| int64_t bytes_read = 0 |
| shared_ptr[CBuffer] output |
| self._assert_readable() |
| |
| if nbytes is None: |
| c_nbytes = self.size() - self.tell() |
| else: |
| c_nbytes = nbytes |
| |
| with nogil: |
| check_status(self.rd_file.get().ReadB(c_nbytes, &output)) |
| |
| return pyarrow_wrap_buffer(output) |
| |
| def download(self, stream_or_path, buffer_size=None): |
| """ |
| Read file completely to local path (rather than reading completely into |
| memory). First seeks to the beginning of the file. |
| """ |
| cdef: |
| int64_t bytes_read = 0 |
| uint8_t* buf |
| self._assert_readable() |
| |
| buffer_size = buffer_size or DEFAULT_BUFFER_SIZE |
| |
| write_queue = Queue(50) |
| |
| if not hasattr(stream_or_path, 'read'): |
| stream = open(stream_or_path, 'wb') |
| |
| def cleanup(): |
| stream.close() |
| else: |
| stream = stream_or_path |
| |
| def cleanup(): |
| pass |
| |
| done = False |
| exc_info = None |
| |
| def bg_write(): |
| try: |
| while not done or write_queue.qsize() > 0: |
| try: |
| buf = write_queue.get(timeout=0.01) |
| except QueueEmpty: |
| continue |
| stream.write(buf) |
| except Exception as e: |
| exc_info = sys.exc_info() |
| finally: |
| cleanup() |
| |
| self.seek(0) |
| |
| writer_thread = threading.Thread(target=bg_write) |
| |
| # This isn't ideal -- PyBytes_FromStringAndSize copies the data from |
| # the passed buffer, so it's hard for us to avoid doubling the memory |
| buf = <uint8_t*> malloc(buffer_size) |
| if buf == NULL: |
| raise MemoryError("Failed to allocate {0} bytes" |
| .format(buffer_size)) |
| |
| writer_thread.start() |
| |
| cdef int64_t total_bytes = 0 |
| cdef int32_t c_buffer_size = buffer_size |
| |
| try: |
| while True: |
| with nogil: |
| check_status(self.rd_file.get() |
| .Read(c_buffer_size, &bytes_read, buf)) |
| |
| total_bytes += bytes_read |
| |
| # EOF |
| if bytes_read == 0: |
| break |
| |
| pybuf = cp.PyBytes_FromStringAndSize(<const char*>buf, |
| bytes_read) |
| |
| if writer_thread.is_alive(): |
| while write_queue.full(): |
| time.sleep(0.01) |
| else: |
| break |
| |
| write_queue.put_nowait(pybuf) |
| finally: |
| free(buf) |
| done = True |
| |
| writer_thread.join() |
| if exc_info is not None: |
| raise exc_info[0], exc_info[1], exc_info[2] |
| |
| def upload(self, stream, buffer_size=None): |
| """ |
| Pipe file-like object to file |
| """ |
| write_queue = Queue(50) |
| self._assert_writable() |
| |
| buffer_size = buffer_size or DEFAULT_BUFFER_SIZE |
| |
| done = False |
| exc_info = None |
| |
| def bg_write(): |
| try: |
| while not done or write_queue.qsize() > 0: |
| try: |
| buf = write_queue.get(timeout=0.01) |
| except QueueEmpty: |
| continue |
| |
| self.write(buf) |
| |
| except Exception as e: |
| exc_info = sys.exc_info() |
| |
| writer_thread = threading.Thread(target=bg_write) |
| writer_thread.start() |
| |
| try: |
| while True: |
| buf = stream.read(buffer_size) |
| if not buf: |
| break |
| |
| if writer_thread.is_alive(): |
| while write_queue.full(): |
| time.sleep(0.01) |
| else: |
| break |
| |
| write_queue.put_nowait(buf) |
| finally: |
| done = True |
| |
| writer_thread.join() |
| if exc_info is not None: |
| raise exc_info[0], exc_info[1], exc_info[2] |
| |
| |
| # ---------------------------------------------------------------------- |
| # Python file-like objects |
| |
| |
| cdef class PythonFile(NativeFile): |
| cdef: |
| object handle |
| |
| def __cinit__(self, handle, mode=None): |
| self.handle = handle |
| |
| if mode is None: |
| try: |
| mode = handle.mode |
| except AttributeError: |
| # Not all file-like objects have a mode attribute |
| # (e.g. BytesIO) |
| try: |
| mode = 'w' if handle.writable() else 'r' |
| except AttributeError: |
| raise ValueError("could not infer open mode for file-like " |
| "object %r, please pass it explicitly" |
| % (handle,)) |
| if mode.startswith('w'): |
| self.wr_file.reset(new PyOutputStream(handle)) |
| self.is_writable = True |
| elif mode.startswith('r'): |
| self.rd_file.reset(new PyReadableFile(handle)) |
| self.is_readable = True |
| else: |
| raise ValueError('Invalid file mode: {0}'.format(mode)) |
| |
| self.closed = False |
| |
| |
| cdef class MemoryMappedFile(NativeFile): |
| """ |
| Supports 'r', 'r+w', 'w' modes |
| """ |
| cdef: |
| object path |
| |
| @staticmethod |
| def create(path, size): |
| cdef: |
| shared_ptr[CMemoryMappedFile] handle |
| c_string c_path = encode_file_path(path) |
| int64_t c_size = size |
| |
| with nogil: |
| check_status(CMemoryMappedFile.Create(c_path, c_size, &handle)) |
| |
| cdef MemoryMappedFile result = MemoryMappedFile() |
| result.path = path |
| result.is_readable = True |
| result.is_writable = True |
| result.wr_file = <shared_ptr[OutputStream]> handle |
| result.rd_file = <shared_ptr[RandomAccessFile]> handle |
| result.closed = False |
| |
| return result |
| |
| def _open(self, path, mode='r'): |
| self.path = path |
| |
| cdef: |
| FileMode c_mode |
| shared_ptr[CMemoryMappedFile] handle |
| c_string c_path = encode_file_path(path) |
| |
| if mode in ('r', 'rb'): |
| c_mode = FileMode_READ |
| self.is_readable = True |
| elif mode in ('w', 'wb'): |
| c_mode = FileMode_WRITE |
| self.is_writable = True |
| elif mode in ('r+', 'r+b', 'rb+'): |
| c_mode = FileMode_READWRITE |
| self.is_readable = True |
| self.is_writable = True |
| else: |
| raise ValueError('Invalid file mode: {0}'.format(mode)) |
| |
| with nogil: |
| check_status(CMemoryMappedFile.Open(c_path, c_mode, &handle)) |
| |
| self.wr_file = <shared_ptr[OutputStream]> handle |
| self.rd_file = <shared_ptr[RandomAccessFile]> handle |
| self.closed = False |
| |
| |
| def memory_map(path, mode='r'): |
| """ |
| Open memory map at file path. Size of the memory map cannot change |
| |
| Parameters |
| ---------- |
| path : string |
| mode : {'r', 'w'}, default 'r' |
| |
| Returns |
| ------- |
| mmap : MemoryMappedFile |
| """ |
| cdef MemoryMappedFile mmap = MemoryMappedFile() |
| mmap._open(path, mode) |
| return mmap |
| |
| |
| def create_memory_map(path, size): |
| """ |
| Create memory map at indicated path of the given size, return open |
| writable file object |
| |
| Parameters |
| ---------- |
| path : string |
| size : int |
| |
| Returns |
| ------- |
| mmap : MemoryMappedFile |
| """ |
| return MemoryMappedFile.create(path, size) |
| |
| |
| cdef class OSFile(NativeFile): |
| """ |
| Supports 'r', 'w' modes |
| """ |
| cdef: |
| object path |
| |
| def __cinit__(self, path, mode='r', MemoryPool memory_pool=None): |
| self.path = path |
| |
| cdef: |
| FileMode c_mode |
| shared_ptr[Readable] handle |
| c_string c_path = encode_file_path(path) |
| |
| if mode in ('r', 'rb'): |
| self._open_readable(c_path, maybe_unbox_memory_pool(memory_pool)) |
| elif mode in ('w', 'wb'): |
| self._open_writable(c_path) |
| else: |
| raise ValueError('Invalid file mode: {0}'.format(mode)) |
| |
| self.closed = False |
| |
| cdef _open_readable(self, c_string path, CMemoryPool* pool): |
| cdef shared_ptr[ReadableFile] handle |
| |
| with nogil: |
| check_status(ReadableFile.Open(path, pool, &handle)) |
| |
| self.is_readable = True |
| self.rd_file = <shared_ptr[RandomAccessFile]> handle |
| |
| cdef _open_writable(self, c_string path): |
| with nogil: |
| check_status(FileOutputStream.Open(path, &self.wr_file)) |
| self.is_writable = True |
| |
| |
| cdef class FixedSizeBufferWriter(NativeFile): |
| |
| def __cinit__(self, Buffer buffer): |
| self.wr_file.reset(new CFixedSizeBufferWriter(buffer.buffer)) |
| self.is_writable = True |
| self.closed = False |
| |
| def set_memcopy_threads(self, int num_threads): |
| cdef CFixedSizeBufferWriter* writer = \ |
| <CFixedSizeBufferWriter*> self.wr_file.get() |
| writer.set_memcopy_threads(num_threads) |
| |
| def set_memcopy_blocksize(self, int64_t blocksize): |
| cdef CFixedSizeBufferWriter* writer = \ |
| <CFixedSizeBufferWriter*> self.wr_file.get() |
| writer.set_memcopy_blocksize(blocksize) |
| |
| def set_memcopy_threshold(self, int64_t threshold): |
| cdef CFixedSizeBufferWriter* writer = \ |
| <CFixedSizeBufferWriter*> self.wr_file.get() |
| writer.set_memcopy_threshold(threshold) |
| |
| |
| # ---------------------------------------------------------------------- |
| # Arrow buffers |
| |
| |
| cdef class Buffer: |
| |
| def __cinit__(self): |
| pass |
| |
| cdef void init(self, const shared_ptr[CBuffer]& buffer): |
| self.buffer = buffer |
| self.shape[0] = self.size |
| self.strides[0] = <Py_ssize_t>(1) |
| |
| cdef int _check_nullptr(self) except -1: |
| if self.buffer.get() == NULL: |
| raise ReferenceError("operation on uninitialized Buffer object") |
| return 0 |
| |
| def __len__(self): |
| return self.size |
| |
| property size: |
| |
| def __get__(self): |
| self._check_nullptr() |
| return self.buffer.get().size() |
| |
| property is_mutable: |
| |
| def __get__(self): |
| self._check_nullptr() |
| return self.buffer.get().is_mutable() |
| |
| property parent: |
| |
| def __get__(self): |
| self._check_nullptr() |
| cdef shared_ptr[CBuffer] parent_buf = self.buffer.get().parent() |
| |
| if parent_buf.get() == NULL: |
| return None |
| else: |
| return pyarrow_wrap_buffer(parent_buf) |
| |
| def __getitem__(self, key): |
| # TODO(wesm): buffer slicing |
| self._check_nullptr() |
| raise NotImplementedError |
| |
| def equals(self, Buffer other): |
| """ |
| Determine if two buffers contain exactly the same data |
| |
| Parameters |
| ---------- |
| other : Buffer |
| |
| Returns |
| ------- |
| are_equal : True if buffer contents and size are equal |
| """ |
| self._check_nullptr() |
| other._check_nullptr() |
| cdef c_bool result = False |
| with nogil: |
| result = self.buffer.get().Equals(deref(other.buffer.get())) |
| return result |
| |
| def __eq__(self, other): |
| if isinstance(other, Buffer): |
| return self.equals(other) |
| else: |
| return NotImplemented |
| |
| def to_pybytes(self): |
| self._check_nullptr() |
| return cp.PyBytes_FromStringAndSize( |
| <const char*>self.buffer.get().data(), |
| self.buffer.get().size()) |
| |
| def __getbuffer__(self, cp.Py_buffer* buffer, int flags): |
| self._check_nullptr() |
| |
| buffer.buf = <char *>self.buffer.get().data() |
| buffer.format = 'b' |
| buffer.internal = NULL |
| buffer.itemsize = 1 |
| buffer.len = self.size |
| buffer.ndim = 1 |
| buffer.obj = self |
| if self.buffer.get().is_mutable(): |
| buffer.readonly = 0 |
| else: |
| buffer.readonly = 1 |
| buffer.shape = self.shape |
| buffer.strides = self.strides |
| buffer.suboffsets = NULL |
| |
| def __getsegcount__(self, Py_ssize_t *len_out): |
| self._check_nullptr() |
| if len_out != NULL: |
| len_out[0] = <Py_ssize_t>self.size |
| return 1 |
| |
| def __getreadbuffer__(self, Py_ssize_t idx, void **p): |
| self._check_nullptr() |
| if idx != 0: |
| raise SystemError("accessing non-existent buffer segment") |
| if p != NULL: |
| p[0] = <void*> self.buffer.get().data() |
| return self.size |
| |
| def __getwritebuffer__(self, Py_ssize_t idx, void **p): |
| self._check_nullptr() |
| if not self.buffer.get().is_mutable(): |
| raise SystemError("trying to write an immutable buffer") |
| if idx != 0: |
| raise SystemError("accessing non-existent buffer segment") |
| if p != NULL: |
| p[0] = <void*> self.buffer.get().data() |
| return self.size |
| |
| |
| cdef class ResizableBuffer(Buffer): |
| |
| cdef void init_rz(self, const shared_ptr[CResizableBuffer]& buffer): |
| self.init(<shared_ptr[CBuffer]> buffer) |
| |
| def resize(self, int64_t new_size, shrink_to_fit=False): |
| """ |
| Resize buffer to indicated size |
| |
| Parameters |
| ---------- |
| new_size : int64_t |
| New size of buffer (padding may be added internally) |
| shrink_to_fit : boolean, default False |
| If new_size is less than the current size, shrink internal |
| capacity, otherwise leave at current capacity |
| """ |
| cdef c_bool c_shrink_to_fit = shrink_to_fit |
| with nogil: |
| check_status((<CResizableBuffer*> self.buffer.get()) |
| .Resize(new_size, c_shrink_to_fit)) |
| |
| |
| cdef shared_ptr[PoolBuffer] _allocate_buffer(CMemoryPool* pool): |
| cdef shared_ptr[PoolBuffer] result |
| result.reset(new PoolBuffer(pool)) |
| return result |
| |
| |
| def allocate_buffer(int64_t size, MemoryPool memory_pool=None, |
| resizable=False): |
| """ |
| Allocate mutable fixed-size buffer |
| |
| Parameters |
| ---------- |
| size : int |
| Number of bytes to allocate (plus internal padding) |
| memory_pool : MemoryPool, optional |
| Uses default memory pool if not provided |
| resizable : boolean, default False |
| |
| Returns |
| ------- |
| buffer : Buffer or ResizableBuffer |
| """ |
| cdef: |
| shared_ptr[CBuffer] buffer |
| shared_ptr[CResizableBuffer] rz_buffer |
| CMemoryPool* cpool = maybe_unbox_memory_pool(memory_pool) |
| |
| if resizable: |
| with nogil: |
| check_status(AllocateResizableBuffer(cpool, size, &rz_buffer)) |
| return pyarrow_wrap_resizable_buffer(rz_buffer) |
| else: |
| with nogil: |
| check_status(AllocateBuffer(cpool, size, &buffer)) |
| return pyarrow_wrap_buffer(buffer) |
| |
| |
| cdef class BufferOutputStream(NativeFile): |
| |
| cdef: |
| shared_ptr[PoolBuffer] buffer |
| |
| def __cinit__(self, MemoryPool memory_pool=None): |
| self.buffer = _allocate_buffer(maybe_unbox_memory_pool(memory_pool)) |
| self.wr_file.reset(new CBufferOutputStream( |
| <shared_ptr[CResizableBuffer]> self.buffer)) |
| self.is_writable = True |
| self.closed = False |
| |
| def get_result(self): |
| with nogil: |
| check_status(self.wr_file.get().Close()) |
| self.closed = True |
| return pyarrow_wrap_buffer(<shared_ptr[CBuffer]> self.buffer) |
| |
| |
| cdef class MockOutputStream(NativeFile): |
| |
| def __cinit__(self): |
| self.wr_file.reset(new CMockOutputStream()) |
| self.is_writable = True |
| self.closed = False |
| |
| def size(self): |
| return (<CMockOutputStream*>self.wr_file.get()).GetExtentBytesWritten() |
| |
| |
| cdef class BufferReader(NativeFile): |
| """ |
| Zero-copy reader from objects convertible to Arrow buffer |
| |
| Parameters |
| ---------- |
| obj : Python bytes or pyarrow.Buffer |
| """ |
| cdef: |
| Buffer buffer |
| |
| def __cinit__(self, object obj): |
| |
| if isinstance(obj, Buffer): |
| self.buffer = obj |
| else: |
| self.buffer = py_buffer(obj) |
| |
| self.rd_file.reset(new CBufferReader(self.buffer.buffer)) |
| self.is_readable = True |
| self.closed = False |
| |
| |
| def py_buffer(object obj): |
| """ |
| Construct an Arrow buffer from a Python bytes object |
| """ |
| cdef shared_ptr[CBuffer] buf |
| check_status(PyBuffer.FromPyObject(obj, &buf)) |
| return pyarrow_wrap_buffer(buf) |
| |
| |
| def foreign_buffer(address, size, base): |
| """ |
| Construct an Arrow buffer with the given *address* and *size*, |
| backed by the Python *base* object. |
| """ |
| cdef: |
| intptr_t c_addr = address |
| int64_t c_size = size |
| shared_ptr[CBuffer] buf |
| |
| check_status(PyForeignBuffer.Make(<uint8_t*> c_addr, c_size, |
| base, &buf)) |
| return pyarrow_wrap_buffer(buf) |
| |
| |
| cdef get_reader(object source, shared_ptr[RandomAccessFile]* reader): |
| cdef NativeFile nf |
| |
| try: |
| source_path = _stringify_path(source) |
| except TypeError: |
| if isinstance(source, Buffer): |
| source = BufferReader(source) |
| elif not isinstance(source, NativeFile) and hasattr(source, 'read'): |
| # Optimistically hope this is file-like |
| source = PythonFile(source, mode='r') |
| else: |
| source = memory_map(source_path, mode='r') |
| |
| if isinstance(source, NativeFile): |
| nf = source |
| |
| # TODO: what about read-write sources (e.g. memory maps) |
| if not nf.is_readable: |
| raise IOError('Native file is not readable') |
| |
| nf.read_handle(reader) |
| else: |
| raise TypeError('Unable to read from object of type: {0}' |
| .format(type(source))) |
| |
| |
| cdef get_writer(object source, shared_ptr[OutputStream]* writer): |
| cdef NativeFile nf |
| |
| try: |
| source_path = _stringify_path(source) |
| except TypeError: |
| if not isinstance(source, NativeFile) and hasattr(source, 'write'): |
| # Optimistically hope this is file-like |
| source = PythonFile(source, mode='w') |
| else: |
| source = OSFile(source_path, mode='w') |
| |
| if isinstance(source, NativeFile): |
| nf = source |
| |
| if not nf.is_writable: |
| raise IOError('Native file is not writable') |
| |
| nf.write_handle(writer) |
| else: |
| raise TypeError('Unable to read from object of type: {0}' |
| .format(type(source))) |
| |
| |
| # --------------------------------------------------------------------- |
| |
| cdef CompressionType _get_compression_type(object name): |
| if name is None or name == 'uncompressed': |
| return CompressionType_UNCOMPRESSED |
| elif name == 'snappy': |
| return CompressionType_SNAPPY |
| elif name == 'gzip': |
| return CompressionType_GZIP |
| elif name == 'brotli': |
| return CompressionType_BROTLI |
| elif name == 'zstd': |
| return CompressionType_ZSTD |
| elif name == 'lz4': |
| return CompressionType_LZ4 |
| else: |
| raise ValueError("Unrecognized compression type: {0}" |
| .format(str(name))) |
| |
| |
| def compress(object buf, codec='lz4', asbytes=False, memory_pool=None): |
| """ |
| Compress pyarrow.Buffer or Python object supporting the buffer (memoryview) |
| protocol |
| |
| Parameters |
| ---------- |
| buf : pyarrow.Buffer, bytes, or other object supporting buffer protocol |
| codec : string, default 'lz4' |
| Compression codec. |
| Supported types: {'brotli, 'gzip', 'lz4', 'snappy', 'zstd'} |
| asbytes : boolean, default False |
| Return result as Python bytes object, otherwise Buffer |
| memory_pool : MemoryPool, default None |
| Memory pool to use for buffer allocations, if any |
| |
| Returns |
| ------- |
| compressed : pyarrow.Buffer or bytes (if asbytes=True) |
| """ |
| cdef: |
| CompressionType c_codec = _get_compression_type(codec) |
| unique_ptr[CCodec] compressor |
| cdef CBuffer* c_buf |
| cdef PyObject* pyobj |
| cdef ResizableBuffer out_buf |
| |
| with nogil: |
| check_status(CCodec.Create(c_codec, &compressor)) |
| |
| if not isinstance(buf, Buffer): |
| buf = py_buffer(buf) |
| |
| c_buf = (<Buffer> buf).buffer.get() |
| |
| cdef int64_t max_output_size = (compressor.get() |
| .MaxCompressedLen(c_buf.size(), |
| c_buf.data())) |
| cdef uint8_t* output_buffer = NULL |
| |
| if asbytes: |
| pyobj = PyBytes_FromStringAndSizeNative(NULL, max_output_size) |
| output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(<object> pyobj) |
| else: |
| out_buf = allocate_buffer(max_output_size, memory_pool=memory_pool, |
| resizable=True) |
| output_buffer = out_buf.buffer.get().mutable_data() |
| |
| cdef int64_t output_length = 0 |
| with nogil: |
| check_status(compressor.get() |
| .Compress(c_buf.size(), c_buf.data(), |
| max_output_size, output_buffer, |
| &output_length)) |
| |
| if asbytes: |
| cp._PyBytes_Resize(&pyobj, <Py_ssize_t> output_length) |
| return PyObject_to_object(pyobj) |
| else: |
| out_buf.resize(output_length) |
| return out_buf |
| |
| |
| def decompress(object buf, decompressed_size=None, codec='lz4', |
| asbytes=False, memory_pool=None): |
| """ |
| Decompress data from buffer-like object |
| |
| Parameters |
| ---------- |
| buf : pyarrow.Buffer, bytes, or memoryview-compatible object |
| decompressed_size : int64_t, default None |
| If not specified, will be computed if the codec is able to determine |
| the uncompressed buffer size |
| codec : string, default 'lz4' |
| Compression codec. |
| Supported types: {'brotli, 'gzip', 'lz4', 'snappy', 'zstd'} |
| asbytes : boolean, default False |
| Return result as Python bytes object, otherwise Buffer |
| memory_pool : MemoryPool, default None |
| Memory pool to use for buffer allocations, if any |
| |
| Returns |
| ------- |
| uncompressed : pyarrow.Buffer or bytes (if asbytes=True) |
| """ |
| cdef: |
| CompressionType c_codec = _get_compression_type(codec) |
| unique_ptr[CCodec] compressor |
| cdef CBuffer* c_buf |
| cdef Buffer out_buf |
| |
| with nogil: |
| check_status(CCodec.Create(c_codec, &compressor)) |
| |
| if not isinstance(buf, Buffer): |
| buf = py_buffer(buf) |
| |
| c_buf = (<Buffer> buf).buffer.get() |
| |
| if decompressed_size is None: |
| raise ValueError("Must pass decompressed_size for {0} codec" |
| .format(codec)) |
| |
| cdef int64_t output_size = decompressed_size |
| cdef uint8_t* output_buffer = NULL |
| |
| if asbytes: |
| pybuf = cp.PyBytes_FromStringAndSize(NULL, output_size) |
| output_buffer = <uint8_t*> cp.PyBytes_AS_STRING(pybuf) |
| else: |
| out_buf = allocate_buffer(output_size, memory_pool=memory_pool) |
| output_buffer = out_buf.buffer.get().mutable_data() |
| |
| with nogil: |
| check_status(compressor.get() |
| .Decompress(c_buf.size(), c_buf.data(), |
| output_size, output_buffer)) |
| |
| return pybuf if asbytes else out_buf |