| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import bz2 |
| from io import (BytesIO, StringIO, TextIOWrapper, BufferedIOBase, IOBase) |
| import gc |
| import gzip |
| import os |
| import pickle |
| import pytest |
| import sys |
| import tempfile |
| import weakref |
| |
| try: |
| import pathlib |
| except ImportError: |
| import pathlib2 as pathlib |
| |
| import numpy as np |
| |
| from pyarrow.compat import u, guid, PY2 |
| import pyarrow as pa |
| |
| |
| def check_large_seeks(file_factory): |
| if sys.platform in ('win32', 'darwin'): |
| pytest.skip("need sparse file support") |
| try: |
| filename = tempfile.mktemp(prefix='test_io') |
| with open(filename, 'wb') as f: |
| f.truncate(2 ** 32 + 10) |
| f.seek(2 ** 32 + 5) |
| f.write(b'mark\n') |
| with file_factory(filename) as f: |
| assert f.seek(2 ** 32 + 5) == 2 ** 32 + 5 |
| assert f.tell() == 2 ** 32 + 5 |
| assert f.read(5) == b'mark\n' |
| assert f.tell() == 2 ** 32 + 10 |
| finally: |
| os.unlink(filename) |
| |
| |
| # ---------------------------------------------------------------------- |
| # Python file-like objects |
| |
| |
| def test_python_file_write(): |
| buf = BytesIO() |
| |
| f = pa.PythonFile(buf) |
| |
| assert f.tell() == 0 |
| |
| s1 = b'enga\xc3\xb1ado' |
| s2 = b'foobar' |
| |
| f.write(s1) |
| assert f.tell() == len(s1) |
| |
| f.write(s2) |
| |
| expected = s1 + s2 |
| |
| result = buf.getvalue() |
| assert result == expected |
| |
| assert not f.closed |
| f.close() |
| assert f.closed |
| |
| with pytest.raises(TypeError, match="binary file expected"): |
| pa.PythonFile(StringIO()) |
| |
| |
| def test_python_file_read(): |
| data = b'some sample data' |
| |
| buf = BytesIO(data) |
| f = pa.PythonFile(buf, mode='r') |
| |
| assert f.size() == len(data) |
| |
| assert f.tell() == 0 |
| |
| assert f.read(4) == b'some' |
| assert f.tell() == 4 |
| |
| f.seek(0) |
| assert f.tell() == 0 |
| |
| f.seek(5) |
| assert f.tell() == 5 |
| |
| v = f.read(50) |
| assert v == b'sample data' |
| assert len(v) == 11 |
| |
| assert f.size() == len(data) |
| |
| assert not f.closed |
| f.close() |
| assert f.closed |
| |
| with pytest.raises(TypeError, match="binary file expected"): |
| pa.PythonFile(StringIO(), mode='r') |
| |
| |
| def test_python_file_read_at(): |
| data = b'some sample data' |
| |
| buf = BytesIO(data) |
| f = pa.PythonFile(buf, mode='r') |
| |
| # test simple read at |
| v = f.read_at(nbytes=5, offset=3) |
| assert v == b'e sam' |
| assert len(v) == 5 |
| |
| # test reading entire file when nbytes > len(file) |
| w = f.read_at(nbytes=50, offset=0) |
| assert w == data |
| assert len(w) == 16 |
| |
| |
| def test_python_file_readall(): |
| data = b'some sample data' |
| |
| buf = BytesIO(data) |
| with pa.PythonFile(buf, mode='r') as f: |
| assert f.readall() == data |
| |
| |
| def test_python_file_readinto(): |
| length = 10 |
| data = b'some sample data longer than 10' |
| dst_buf = bytearray(length) |
| src_buf = BytesIO(data) |
| |
| with pa.PythonFile(src_buf, mode='r') as f: |
| assert f.readinto(dst_buf) == 10 |
| |
| assert dst_buf[:length] == data[:length] |
| assert len(dst_buf) == length |
| |
| |
| def test_python_file_correct_abc(): |
| with pa.PythonFile(BytesIO(b''), mode='r') as f: |
| assert isinstance(f, BufferedIOBase) |
| assert isinstance(f, IOBase) |
| |
| |
| def test_python_file_iterable(): |
| data = b'''line1 |
| line2 |
| line3 |
| ''' |
| |
| buf = BytesIO(data) |
| buf2 = BytesIO(data) |
| |
| with pa.PythonFile(buf, mode='r') as f: |
| for read, expected in zip(f, buf2): |
| assert read == expected |
| |
| |
| def test_python_file_large_seeks(): |
| def factory(filename): |
| return pa.PythonFile(open(filename, 'rb')) |
| |
| check_large_seeks(factory) |
| |
| |
| def test_bytes_reader(): |
| # Like a BytesIO, but zero-copy underneath for C++ consumers |
| data = b'some sample data' |
| f = pa.BufferReader(data) |
| assert f.tell() == 0 |
| |
| assert f.size() == len(data) |
| |
| assert f.read(4) == b'some' |
| assert f.tell() == 4 |
| |
| f.seek(0) |
| assert f.tell() == 0 |
| |
| f.seek(0, 2) |
| assert f.tell() == len(data) |
| |
| f.seek(5) |
| assert f.tell() == 5 |
| |
| assert f.read(50) == b'sample data' |
| |
| assert not f.closed |
| f.close() |
| assert f.closed |
| |
| |
| def test_bytes_reader_non_bytes(): |
| with pytest.raises(TypeError): |
| pa.BufferReader(u('some sample data')) |
| |
| |
| def test_bytes_reader_retains_parent_reference(): |
| import gc |
| |
| # ARROW-421 |
| def get_buffer(): |
| data = b'some sample data' * 1000 |
| reader = pa.BufferReader(data) |
| reader.seek(5) |
| return reader.read_buffer(6) |
| |
| buf = get_buffer() |
| gc.collect() |
| assert buf.to_pybytes() == b'sample' |
| assert buf.parent is not None |
| |
| |
| def test_python_file_implicit_mode(tmpdir): |
| path = os.path.join(str(tmpdir), 'foo.txt') |
| with open(path, 'wb') as f: |
| pf = pa.PythonFile(f) |
| assert pf.writable() |
| assert not pf.readable() |
| assert not pf.seekable() # PyOutputStream isn't seekable |
| f.write(b'foobar\n') |
| |
| with open(path, 'rb') as f: |
| pf = pa.PythonFile(f) |
| assert pf.readable() |
| assert not pf.writable() |
| assert pf.seekable() |
| assert pf.read() == b'foobar\n' |
| |
| bio = BytesIO() |
| pf = pa.PythonFile(bio) |
| assert pf.writable() |
| assert not pf.readable() |
| assert not pf.seekable() |
| pf.write(b'foobar\n') |
| assert bio.getvalue() == b'foobar\n' |
| |
| |
| def test_python_file_writelines(tmpdir): |
| lines = [b'line1\n', b'line2\n' b'line3'] |
| path = os.path.join(str(tmpdir), 'foo.txt') |
| with open(path, 'wb') as f: |
| try: |
| f = pa.PythonFile(f, mode='w') |
| assert f.writable() |
| f.writelines(lines) |
| finally: |
| f.close() |
| |
| with open(path, 'rb') as f: |
| try: |
| f = pa.PythonFile(f, mode='r') |
| assert f.readable() |
| assert f.read() == b''.join(lines) |
| finally: |
| f.close() |
| |
| |
| def test_python_file_closing(): |
| bio = BytesIO() |
| pf = pa.PythonFile(bio) |
| wr = weakref.ref(pf) |
| del pf |
| assert wr() is None # object was destroyed |
| assert not bio.closed |
| pf = pa.PythonFile(bio) |
| pf.close() |
| assert bio.closed |
| |
| |
| # ---------------------------------------------------------------------- |
| # Buffers |
| |
| |
| def test_buffer_bytes(): |
| val = b'some data' |
| |
| buf = pa.py_buffer(val) |
| assert isinstance(buf, pa.Buffer) |
| assert not buf.is_mutable |
| |
| result = buf.to_pybytes() |
| |
| assert result == val |
| |
| # Check that buffers survive a pickle roundtrip |
| result_buf = pickle.loads(pickle.dumps(buf)) |
| result = result_buf.to_pybytes() |
| assert result == val |
| |
| |
| def test_buffer_memoryview(): |
| val = b'some data' |
| |
| buf = pa.py_buffer(val) |
| assert isinstance(buf, pa.Buffer) |
| assert not buf.is_mutable |
| |
| result = memoryview(buf) |
| |
| assert result == val |
| |
| |
| def test_buffer_bytearray(): |
| val = bytearray(b'some data') |
| |
| buf = pa.py_buffer(val) |
| assert isinstance(buf, pa.Buffer) |
| assert buf.is_mutable |
| |
| result = bytearray(buf) |
| |
| assert result == val |
| |
| |
| def test_buffer_invalid(): |
| with pytest.raises(TypeError, |
| match="(bytes-like object|buffer interface)"): |
| pa.py_buffer(None) |
| |
| |
| @pytest.mark.parametrize('val, expected_hex_buffer', |
| [(b'check', b'636865636B'), |
| (b'\a0', b'0730'), |
| (b'', b'')]) |
| def test_buffer_hex(val, expected_hex_buffer): |
| buf = pa.py_buffer(val) |
| assert buf.hex() == expected_hex_buffer |
| |
| |
| def test_buffer_to_numpy(): |
| # Make sure creating a numpy array from an arrow buffer works |
| byte_array = bytearray(20) |
| byte_array[0] = 42 |
| buf = pa.py_buffer(byte_array) |
| array = np.frombuffer(buf, dtype="uint8") |
| assert array[0] == byte_array[0] |
| byte_array[0] += 1 |
| assert array[0] == byte_array[0] |
| assert array.base == buf |
| |
| |
| def test_buffer_from_numpy(): |
| # C-contiguous |
| arr = np.arange(12, dtype=np.int8).reshape((3, 4)) |
| buf = pa.py_buffer(arr) |
| assert buf.to_pybytes() == arr.tobytes() |
| # F-contiguous; note strides informations is lost |
| buf = pa.py_buffer(arr.T) |
| assert buf.to_pybytes() == arr.tobytes() |
| # Non-contiguous |
| with pytest.raises(ValueError, match="not contiguous"): |
| buf = pa.py_buffer(arr.T[::2]) |
| |
| |
| def test_buffer_address(): |
| b1 = b'some data!' |
| b2 = bytearray(b1) |
| b3 = bytearray(b1) |
| |
| buf1 = pa.py_buffer(b1) |
| buf2 = pa.py_buffer(b1) |
| buf3 = pa.py_buffer(b2) |
| buf4 = pa.py_buffer(b3) |
| |
| assert buf1.address > 0 |
| assert buf1.address == buf2.address |
| assert buf3.address != buf2.address |
| assert buf4.address != buf3.address |
| |
| arr = np.arange(5) |
| buf = pa.py_buffer(arr) |
| assert buf.address == arr.ctypes.data |
| |
| |
| def test_buffer_equals(): |
| # Buffer.equals() returns true iff the buffers have the same contents |
| def eq(a, b): |
| assert a.equals(b) |
| assert a == b |
| assert not (a != b) |
| |
| def ne(a, b): |
| assert not a.equals(b) |
| assert not (a == b) |
| assert a != b |
| |
| b1 = b'some data!' |
| b2 = bytearray(b1) |
| b3 = bytearray(b1) |
| b3[0] = 42 |
| buf1 = pa.py_buffer(b1) |
| buf2 = pa.py_buffer(b2) |
| buf3 = pa.py_buffer(b2) |
| buf4 = pa.py_buffer(b3) |
| buf5 = pa.py_buffer(np.frombuffer(b2, dtype=np.int16)) |
| eq(buf1, buf1) |
| eq(buf1, buf2) |
| eq(buf2, buf3) |
| ne(buf2, buf4) |
| # Data type is indifferent |
| eq(buf2, buf5) |
| |
| |
| def test_buffer_getitem(): |
| data = bytearray(b'some data!') |
| buf = pa.py_buffer(data) |
| |
| n = len(data) |
| for ix in range(-n, n - 1): |
| assert buf[ix] == data[ix] |
| |
| with pytest.raises(IndexError): |
| buf[n] |
| |
| with pytest.raises(IndexError): |
| buf[-n - 1] |
| |
| |
| def test_buffer_slicing(): |
| data = b'some data!' |
| buf = pa.py_buffer(data) |
| |
| sliced = buf.slice(2) |
| expected = pa.py_buffer(b'me data!') |
| assert sliced.equals(expected) |
| |
| sliced2 = buf.slice(2, 4) |
| expected2 = pa.py_buffer(b'me d') |
| assert sliced2.equals(expected2) |
| |
| # 0 offset |
| assert buf.slice(0).equals(buf) |
| |
| # Slice past end of buffer |
| assert len(buf.slice(len(buf))) == 0 |
| |
| with pytest.raises(IndexError): |
| buf.slice(-1) |
| |
| # Test slice notation |
| assert buf[2:].equals(buf.slice(2)) |
| assert buf[2:5].equals(buf.slice(2, 3)) |
| assert buf[-5:].equals(buf.slice(len(buf) - 5)) |
| with pytest.raises(IndexError): |
| buf[::-1] |
| with pytest.raises(IndexError): |
| buf[::2] |
| |
| n = len(buf) |
| for start in range(-n * 2, n * 2): |
| for stop in range(-n * 2, n * 2): |
| assert buf[start:stop].to_pybytes() == buf.to_pybytes()[start:stop] |
| |
| |
| def test_buffer_hashing(): |
| # Buffers are unhashable |
| with pytest.raises(TypeError, match="unhashable"): |
| hash(pa.py_buffer(b'123')) |
| |
| |
| def test_buffer_protocol_respects_immutability(): |
| # ARROW-3228; NumPy's frombuffer ctor determines whether a buffer-like |
| # object is mutable by first attempting to get a mutable buffer using |
| # PyObject_FromBuffer. If that fails, it assumes that the object is |
| # immutable |
| a = b'12345' |
| arrow_ref = pa.py_buffer(a) |
| numpy_ref = np.frombuffer(arrow_ref, dtype=np.uint8) |
| assert not numpy_ref.flags.writeable |
| |
| |
| def test_foreign_buffer(): |
| obj = np.array([1, 2], dtype=np.int32) |
| addr = obj.__array_interface__["data"][0] |
| size = obj.nbytes |
| buf = pa.foreign_buffer(addr, size, obj) |
| wr = weakref.ref(obj) |
| del obj |
| assert np.frombuffer(buf, dtype=np.int32).tolist() == [1, 2] |
| assert wr() is not None |
| del buf |
| assert wr() is None |
| |
| |
| def test_allocate_buffer(): |
| buf = pa.allocate_buffer(100) |
| assert buf.size == 100 |
| assert buf.is_mutable |
| assert buf.parent is None |
| |
| bit = b'abcde' |
| writer = pa.FixedSizeBufferWriter(buf) |
| writer.write(bit) |
| |
| assert buf.to_pybytes()[:5] == bit |
| |
| |
| def test_allocate_buffer_resizable(): |
| buf = pa.allocate_buffer(100, resizable=True) |
| assert isinstance(buf, pa.ResizableBuffer) |
| |
| buf.resize(200) |
| assert buf.size == 200 |
| |
| |
| def test_compress_decompress(): |
| INPUT_SIZE = 10000 |
| test_data = (np.random.randint(0, 255, size=INPUT_SIZE) |
| .astype(np.uint8) |
| .tostring()) |
| test_buf = pa.py_buffer(test_data) |
| |
| codecs = ['lz4', 'snappy', 'gzip', 'zstd', 'brotli'] |
| for codec in codecs: |
| compressed_buf = pa.compress(test_buf, codec=codec) |
| compressed_bytes = pa.compress(test_data, codec=codec, asbytes=True) |
| |
| assert isinstance(compressed_bytes, bytes) |
| |
| decompressed_buf = pa.decompress(compressed_buf, INPUT_SIZE, |
| codec=codec) |
| decompressed_bytes = pa.decompress(compressed_bytes, INPUT_SIZE, |
| codec=codec, asbytes=True) |
| |
| assert isinstance(decompressed_bytes, bytes) |
| |
| assert decompressed_buf.equals(test_buf) |
| assert decompressed_bytes == test_data |
| |
| with pytest.raises(ValueError): |
| pa.decompress(compressed_bytes, codec=codec) |
| |
| |
| def test_buffer_memoryview_is_immutable(): |
| val = b'some data' |
| |
| buf = pa.py_buffer(val) |
| assert not buf.is_mutable |
| assert isinstance(buf, pa.Buffer) |
| |
| result = memoryview(buf) |
| assert result.readonly |
| |
| with pytest.raises(TypeError) as exc: |
| result[0] = b'h' |
| assert 'cannot modify read-only' in str(exc.value) |
| |
| b = bytes(buf) |
| with pytest.raises(TypeError) as exc: |
| b[0] = b'h' |
| assert 'cannot modify read-only' in str(exc.value) |
| |
| |
| def test_uninitialized_buffer(): |
| # ARROW-2039: calling Buffer() directly creates an uninitialized object |
| # ARROW-2638: prevent calling extension class constructors directly |
| with pytest.raises(TypeError): |
| pa.Buffer() |
| |
| |
| def test_memory_output_stream(): |
| # 10 bytes |
| val = b'dataabcdef' |
| f = pa.BufferOutputStream() |
| |
| K = 1000 |
| for i in range(K): |
| f.write(val) |
| |
| buf = f.getvalue() |
| assert len(buf) == len(val) * K |
| assert buf.to_pybytes() == val * K |
| |
| |
| def test_inmemory_write_after_closed(): |
| f = pa.BufferOutputStream() |
| f.write(b'ok') |
| assert not f.closed |
| f.getvalue() |
| assert f.closed |
| |
| with pytest.raises(ValueError): |
| f.write(b'not ok') |
| |
| |
| def test_buffer_protocol_ref_counting(): |
| def make_buffer(bytes_obj): |
| return bytearray(pa.py_buffer(bytes_obj)) |
| |
| buf = make_buffer(b'foo') |
| gc.collect() |
| assert buf == b'foo' |
| |
| # ARROW-1053 |
| val = b'foo' |
| refcount_before = sys.getrefcount(val) |
| for i in range(10): |
| make_buffer(val) |
| gc.collect() |
| assert refcount_before == sys.getrefcount(val) |
| |
| |
| def test_nativefile_write_memoryview(): |
| f = pa.BufferOutputStream() |
| data = b'ok' |
| |
| arr = np.frombuffer(data, dtype='S1') |
| |
| f.write(arr) |
| f.write(bytearray(data)) |
| |
| buf = f.getvalue() |
| |
| assert buf.to_pybytes() == data * 2 |
| |
| |
| # ---------------------------------------------------------------------- |
| # Mock output stream |
| |
| |
| def test_mock_output_stream(): |
| # Make sure that the MockOutputStream and the BufferOutputStream record the |
| # same size |
| |
| # 10 bytes |
| val = b'dataabcdef' |
| |
| f1 = pa.MockOutputStream() |
| f2 = pa.BufferOutputStream() |
| |
| K = 1000 |
| for i in range(K): |
| f1.write(val) |
| f2.write(val) |
| |
| assert f1.size() == len(f2.getvalue()) |
| |
| # Do the same test with a table |
| record_batch = pa.RecordBatch.from_arrays([pa.array([1, 2, 3])], ['a']) |
| |
| f1 = pa.MockOutputStream() |
| f2 = pa.BufferOutputStream() |
| |
| stream_writer1 = pa.RecordBatchStreamWriter(f1, record_batch.schema) |
| stream_writer2 = pa.RecordBatchStreamWriter(f2, record_batch.schema) |
| |
| stream_writer1.write_batch(record_batch) |
| stream_writer2.write_batch(record_batch) |
| stream_writer1.close() |
| stream_writer2.close() |
| |
| assert f1.size() == len(f2.getvalue()) |
| |
| |
| # ---------------------------------------------------------------------- |
| # OS files and memory maps |
| |
| |
| @pytest.fixture |
| def sample_disk_data(request, tmpdir): |
| SIZE = 4096 |
| arr = np.random.randint(0, 256, size=SIZE).astype('u1') |
| data = arr.tobytes()[:SIZE] |
| |
| path = os.path.join(str(tmpdir), guid()) |
| |
| with open(path, 'wb') as f: |
| f.write(data) |
| |
| def teardown(): |
| _try_delete(path) |
| |
| request.addfinalizer(teardown) |
| return path, data |
| |
| |
| def _check_native_file_reader(FACTORY, sample_data): |
| path, data = sample_data |
| |
| f = FACTORY(path, mode='r') |
| |
| assert f.read(10) == data[:10] |
| assert f.read(0) == b'' |
| assert f.tell() == 10 |
| |
| assert f.read() == data[10:] |
| |
| assert f.size() == len(data) |
| |
| f.seek(0) |
| assert f.tell() == 0 |
| |
| # Seeking past end of file not supported in memory maps |
| f.seek(len(data) + 1) |
| assert f.tell() == len(data) + 1 |
| assert f.read(5) == b'' |
| |
| # Test whence argument of seek, ARROW-1287 |
| assert f.seek(3) == 3 |
| assert f.seek(3, os.SEEK_CUR) == 6 |
| assert f.tell() == 6 |
| |
| ex_length = len(data) - 2 |
| assert f.seek(-2, os.SEEK_END) == ex_length |
| assert f.tell() == ex_length |
| |
| |
| def test_memory_map_reader(sample_disk_data): |
| _check_native_file_reader(pa.memory_map, sample_disk_data) |
| |
| |
| def test_memory_map_retain_buffer_reference(sample_disk_data): |
| path, data = sample_disk_data |
| |
| cases = [] |
| with pa.memory_map(path, 'rb') as f: |
| cases.append((f.read_buffer(100), data[:100])) |
| cases.append((f.read_buffer(100), data[100:200])) |
| cases.append((f.read_buffer(100), data[200:300])) |
| |
| # Call gc.collect() for good measure |
| gc.collect() |
| |
| for buf, expected in cases: |
| assert buf.to_pybytes() == expected |
| |
| |
| def test_os_file_reader(sample_disk_data): |
| _check_native_file_reader(pa.OSFile, sample_disk_data) |
| |
| |
| def test_os_file_large_seeks(): |
| check_large_seeks(pa.OSFile) |
| |
| |
| def _try_delete(path): |
| try: |
| os.remove(path) |
| except os.error: |
| pass |
| |
| |
| def test_memory_map_writer(tmpdir): |
| SIZE = 4096 |
| arr = np.random.randint(0, 256, size=SIZE).astype('u1') |
| data = arr.tobytes()[:SIZE] |
| |
| path = os.path.join(str(tmpdir), guid()) |
| with open(path, 'wb') as f: |
| f.write(data) |
| |
| f = pa.memory_map(path, mode='r+b') |
| |
| f.seek(10) |
| f.write(b'peekaboo') |
| assert f.tell() == 18 |
| |
| f.seek(10) |
| assert f.read(8) == b'peekaboo' |
| |
| f2 = pa.memory_map(path, mode='r+b') |
| |
| f2.seek(10) |
| f2.write(b'booapeak') |
| f2.seek(10) |
| |
| f.seek(10) |
| assert f.read(8) == b'booapeak' |
| |
| # Does not truncate file |
| f3 = pa.memory_map(path, mode='w') |
| f3.write(b'foo') |
| |
| with pa.memory_map(path) as f4: |
| assert f4.size() == SIZE |
| |
| with pytest.raises(IOError): |
| f3.read(5) |
| |
| f.seek(0) |
| assert f.read(3) == b'foo' |
| |
| |
| def test_memory_map_resize(tmpdir): |
| SIZE = 4096 |
| arr = np.random.randint(0, 256, size=SIZE).astype(np.uint8) |
| data1 = arr.tobytes()[:(SIZE // 2)] |
| data2 = arr.tobytes()[(SIZE // 2):] |
| |
| path = os.path.join(str(tmpdir), guid()) |
| |
| mmap = pa.create_memory_map(path, SIZE / 2) |
| mmap.write(data1) |
| |
| mmap.resize(SIZE) |
| mmap.write(data2) |
| |
| mmap.close() |
| |
| with open(path, 'rb') as f: |
| assert f.read() == arr.tobytes() |
| |
| |
| def test_memory_zero_length(tmpdir): |
| path = os.path.join(str(tmpdir), guid()) |
| f = open(path, 'wb') |
| f.close() |
| with pa.memory_map(path, mode='r+b') as memory_map: |
| assert memory_map.size() == 0 |
| |
| |
| def test_memory_map_large_seeks(): |
| check_large_seeks(pa.memory_map) |
| |
| |
| def test_os_file_writer(tmpdir): |
| SIZE = 4096 |
| arr = np.random.randint(0, 256, size=SIZE).astype('u1') |
| data = arr.tobytes()[:SIZE] |
| |
| path = os.path.join(str(tmpdir), guid()) |
| with open(path, 'wb') as f: |
| f.write(data) |
| |
| # Truncates file |
| f2 = pa.OSFile(path, mode='w') |
| f2.write(b'foo') |
| |
| with pa.OSFile(path) as f3: |
| assert f3.size() == 3 |
| |
| with pytest.raises(IOError): |
| f2.read(5) |
| |
| |
| def test_native_file_write_reject_unicode(): |
| # ARROW-3227 |
| nf = pa.BufferOutputStream() |
| with pytest.raises(TypeError): |
| nf.write(u'foo') |
| |
| |
| def test_native_file_modes(tmpdir): |
| path = os.path.join(str(tmpdir), guid()) |
| with open(path, 'wb') as f: |
| f.write(b'foooo') |
| |
| with pa.OSFile(path, mode='r') as f: |
| assert f.mode == 'rb' |
| assert f.readable() |
| assert not f.writable() |
| assert f.seekable() |
| |
| with pa.OSFile(path, mode='rb') as f: |
| assert f.mode == 'rb' |
| assert f.readable() |
| assert not f.writable() |
| assert f.seekable() |
| |
| with pa.OSFile(path, mode='w') as f: |
| assert f.mode == 'wb' |
| assert not f.readable() |
| assert f.writable() |
| assert not f.seekable() |
| |
| with pa.OSFile(path, mode='wb') as f: |
| assert f.mode == 'wb' |
| assert not f.readable() |
| assert f.writable() |
| assert not f.seekable() |
| |
| with open(path, 'wb') as f: |
| f.write(b'foooo') |
| |
| with pa.memory_map(path, 'r') as f: |
| assert f.mode == 'rb' |
| assert f.readable() |
| assert not f.writable() |
| assert f.seekable() |
| |
| with pa.memory_map(path, 'r+') as f: |
| assert f.mode == 'rb+' |
| assert f.readable() |
| assert f.writable() |
| assert f.seekable() |
| |
| with pa.memory_map(path, 'r+b') as f: |
| assert f.mode == 'rb+' |
| assert f.readable() |
| assert f.writable() |
| assert f.seekable() |
| |
| |
| def test_native_file_raises_ValueError_after_close(tmpdir): |
| path = os.path.join(str(tmpdir), guid()) |
| with open(path, 'wb') as f: |
| f.write(b'foooo') |
| |
| with pa.OSFile(path, mode='rb') as os_file: |
| assert not os_file.closed |
| assert os_file.closed |
| |
| with pa.memory_map(path, mode='rb') as mmap_file: |
| assert not mmap_file.closed |
| assert mmap_file.closed |
| |
| files = [os_file, |
| mmap_file] |
| |
| methods = [('tell', ()), |
| ('seek', (0,)), |
| ('size', ()), |
| ('flush', ()), |
| ('readable', ()), |
| ('writable', ()), |
| ('seekable', ())] |
| |
| for f in files: |
| for method, args in methods: |
| with pytest.raises(ValueError): |
| getattr(f, method)(*args) |
| |
| |
| def test_native_file_TextIOWrapper(tmpdir): |
| data = (u'foooo\n' |
| u'barrr\n' |
| u'bazzz\n') |
| |
| path = os.path.join(str(tmpdir), guid()) |
| with open(path, 'wb') as f: |
| f.write(data.encode('utf-8')) |
| |
| with TextIOWrapper(pa.OSFile(path, mode='rb')) as fil: |
| assert fil.readable() |
| res = fil.read() |
| assert res == data |
| assert fil.closed |
| |
| with TextIOWrapper(pa.OSFile(path, mode='rb')) as fil: |
| # Iteration works |
| lines = list(fil) |
| assert ''.join(lines) == data |
| |
| # Writing |
| path2 = os.path.join(str(tmpdir), guid()) |
| with TextIOWrapper(pa.OSFile(path2, mode='wb')) as fil: |
| assert fil.writable() |
| fil.write(data) |
| |
| with TextIOWrapper(pa.OSFile(path2, mode='rb')) as fil: |
| res = fil.read() |
| assert res == data |
| |
| |
| # ---------------------------------------------------------------------- |
| # Compressed input and output streams |
| |
| def check_compressed_input(data, fn, compression): |
| raw = pa.OSFile(fn, mode="rb") |
| with pa.CompressedInputStream(raw, compression) as compressed: |
| assert not compressed.closed |
| assert compressed.readable() |
| assert not compressed.writable() |
| assert not compressed.seekable() |
| got = compressed.read() |
| assert got == data |
| assert compressed.closed |
| assert raw.closed |
| |
| # Same with read_buffer() |
| raw = pa.OSFile(fn, mode="rb") |
| with pa.CompressedInputStream(raw, compression) as compressed: |
| buf = compressed.read_buffer() |
| assert isinstance(buf, pa.Buffer) |
| assert buf.to_pybytes() == data |
| |
| |
| def test_compressed_input_gzip(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| fn = str(tmpdir / "compressed_input_test.gz") |
| with gzip.open(fn, "wb") as f: |
| f.write(data) |
| check_compressed_input(data, fn, "gzip") |
| |
| |
| def test_compressed_input_bz2(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| fn = str(tmpdir / "compressed_input_test.bz2") |
| with bz2.BZ2File(fn, "w") as f: |
| f.write(data) |
| try: |
| check_compressed_input(data, fn, "bz2") |
| except NotImplementedError as e: |
| pytest.skip(str(e)) |
| |
| |
| def test_compressed_input_invalid(): |
| data = b"foo" * 10 |
| raw = pa.BufferReader(data) |
| with pytest.raises(ValueError): |
| pa.CompressedInputStream(raw, "unknown_compression") |
| with pytest.raises(ValueError): |
| pa.CompressedInputStream(raw, None) |
| |
| with pa.CompressedInputStream(raw, "gzip") as compressed: |
| with pytest.raises(IOError, match="zlib inflate failed"): |
| compressed.read() |
| |
| |
| def make_compressed_output(data, fn, compression): |
| raw = pa.BufferOutputStream() |
| with pa.CompressedOutputStream(raw, compression) as compressed: |
| assert not compressed.closed |
| assert not compressed.readable() |
| assert compressed.writable() |
| assert not compressed.seekable() |
| compressed.write(data) |
| assert compressed.closed |
| assert raw.closed |
| with open(fn, "wb") as f: |
| f.write(raw.getvalue()) |
| |
| |
| def test_compressed_output_gzip(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| fn = str(tmpdir / "compressed_output_test.gz") |
| make_compressed_output(data, fn, "gzip") |
| with gzip.open(fn, "rb") as f: |
| got = f.read() |
| assert got == data |
| |
| |
| def test_compressed_output_bz2(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| fn = str(tmpdir / "compressed_output_test.bz2") |
| try: |
| make_compressed_output(data, fn, "bz2") |
| except NotImplementedError as e: |
| pytest.skip(str(e)) |
| with bz2.BZ2File(fn, "r") as f: |
| got = f.read() |
| assert got == data |
| |
| |
| @pytest.mark.parametrize("compression", |
| ["bz2", "brotli", "gzip", "lz4", "zstd"]) |
| def test_compressed_roundtrip(compression): |
| data = b"some test data\n" * 10 + b"eof\n" |
| raw = pa.BufferOutputStream() |
| try: |
| with pa.CompressedOutputStream(raw, compression) as compressed: |
| compressed.write(data) |
| except NotImplementedError as e: |
| if compression == "bz2": |
| pytest.skip(str(e)) |
| else: |
| raise |
| cdata = raw.getvalue() |
| assert len(cdata) < len(data) |
| raw = pa.BufferReader(cdata) |
| with pa.CompressedInputStream(raw, compression) as compressed: |
| got = compressed.read() |
| assert got == data |
| |
| |
| # ---------------------------------------------------------------------- |
| # High-level API |
| |
| if PY2: |
| def gzip_compress(data): |
| fd, fn = tempfile.mkstemp(suffix='.gz') |
| try: |
| os.close(fd) |
| with gzip.GzipFile(fn, 'wb') as f: |
| f.write(data) |
| with open(fn, 'rb') as f: |
| return f.read() |
| finally: |
| os.unlink(fn) |
| |
| def gzip_decompress(data): |
| fd, fn = tempfile.mkstemp(suffix='.gz') |
| try: |
| os.close(fd) |
| with open(fn, 'wb') as f: |
| f.write(data) |
| with gzip.GzipFile(fn, 'rb') as f: |
| return f.read() |
| finally: |
| os.unlink(fn) |
| else: |
| gzip_compress = gzip.compress |
| gzip_decompress = gzip.decompress |
| |
| |
| def test_input_stream_buffer(): |
| data = b"some test data\n" * 10 + b"eof\n" |
| for arg in [pa.py_buffer(data), memoryview(data)]: |
| stream = pa.input_stream(arg) |
| assert stream.read() == data |
| |
| gz_data = gzip_compress(data) |
| stream = pa.input_stream(memoryview(gz_data)) |
| assert stream.read() == gz_data |
| stream = pa.input_stream(memoryview(gz_data), compression='gzip') |
| assert stream.read() == data |
| |
| |
| def test_input_stream_file_path(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| file_path = tmpdir / 'input_stream' |
| with open(str(file_path), 'wb') as f: |
| f.write(data) |
| |
| stream = pa.input_stream(file_path) |
| assert stream.read() == data |
| stream = pa.input_stream(str(file_path)) |
| assert stream.read() == data |
| stream = pa.input_stream(pathlib.Path(str(file_path))) |
| assert stream.read() == data |
| |
| |
| def test_input_stream_file_path_compressed(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| gz_data = gzip_compress(data) |
| file_path = tmpdir / 'input_stream.gz' |
| with open(str(file_path), 'wb') as f: |
| f.write(gz_data) |
| |
| stream = pa.input_stream(file_path) |
| assert stream.read() == data |
| stream = pa.input_stream(str(file_path)) |
| assert stream.read() == data |
| stream = pa.input_stream(pathlib.Path(str(file_path))) |
| assert stream.read() == data |
| |
| stream = pa.input_stream(file_path, compression='gzip') |
| assert stream.read() == data |
| stream = pa.input_stream(file_path, compression=None) |
| assert stream.read() == gz_data |
| |
| |
| def test_input_stream_file_path_buffered(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| file_path = tmpdir / 'input_stream.buffered' |
| with open(str(file_path), 'wb') as f: |
| f.write(data) |
| |
| stream = pa.input_stream(file_path, buffer_size=32) |
| assert stream.read() == data |
| stream = pa.input_stream(str(file_path), buffer_size=64) |
| assert stream.read() == data |
| stream = pa.input_stream(pathlib.Path(str(file_path)), buffer_size=1024) |
| assert stream.read() == data |
| |
| unbuffered_stream = pa.input_stream(file_path, buffer_size=0) |
| assert isinstance(unbuffered_stream, pa.OSFile) |
| |
| msg = 'Buffer size must be larger than zero' |
| with pytest.raises(ValueError, match=msg): |
| pa.input_stream(file_path, buffer_size=-1) |
| with pytest.raises(TypeError): |
| pa.input_stream(file_path, buffer_size='million') |
| |
| |
| def test_input_stream_file_path_compressed_and_buffered(tmpdir): |
| data = b"some test data\n" * 100 + b"eof\n" |
| gz_data = gzip_compress(data) |
| file_path = tmpdir / 'input_stream_compressed_and_buffered.gz' |
| with open(str(file_path), 'wb') as f: |
| f.write(gz_data) |
| |
| stream = pa.input_stream(file_path, buffer_size=32, compression='gzip') |
| assert stream.read() == data |
| stream = pa.input_stream(str(file_path), buffer_size=64) |
| assert stream.read() == data |
| stream = pa.input_stream(pathlib.Path(str(file_path)), buffer_size=1024) |
| assert stream.read() == data |
| |
| |
| def test_input_stream_python_file(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| bio = BytesIO(data) |
| |
| stream = pa.input_stream(bio) |
| assert stream.read() == data |
| |
| gz_data = gzip_compress(data) |
| bio = BytesIO(gz_data) |
| stream = pa.input_stream(bio) |
| assert stream.read() == gz_data |
| bio.seek(0) |
| stream = pa.input_stream(bio, compression='gzip') |
| assert stream.read() == data |
| |
| file_path = tmpdir / 'input_stream' |
| with open(str(file_path), 'wb') as f: |
| f.write(data) |
| with open(str(file_path), 'rb') as f: |
| stream = pa.input_stream(f) |
| assert stream.read() == data |
| |
| |
| def test_input_stream_native_file(): |
| data = b"some test data\n" * 10 + b"eof\n" |
| gz_data = gzip_compress(data) |
| reader = pa.BufferReader(gz_data) |
| stream = pa.input_stream(reader) |
| assert stream is reader |
| reader = pa.BufferReader(gz_data) |
| stream = pa.input_stream(reader, compression='gzip') |
| assert stream.read() == data |
| |
| |
| def test_input_stream_errors(tmpdir): |
| buf = memoryview(b"") |
| with pytest.raises(ValueError): |
| pa.input_stream(buf, compression="foo") |
| |
| for arg in [bytearray(), StringIO()]: |
| with pytest.raises(TypeError): |
| pa.input_stream(arg) |
| |
| with pytest.raises(IOError): |
| pa.input_stream("non_existent_file") |
| |
| with open(str(tmpdir / 'new_file'), 'wb') as f: |
| with pytest.raises(TypeError, match="readable file expected"): |
| pa.input_stream(f) |
| |
| |
| def test_output_stream_buffer(): |
| data = b"some test data\n" * 10 + b"eof\n" |
| buf = bytearray(len(data)) |
| stream = pa.output_stream(pa.py_buffer(buf)) |
| stream.write(data) |
| assert buf == data |
| |
| buf = bytearray(len(data)) |
| stream = pa.output_stream(memoryview(buf)) |
| stream.write(data) |
| assert buf == data |
| |
| |
| def test_output_stream_file_path(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| file_path = tmpdir / 'output_stream' |
| |
| def check_data(file_path, data): |
| with pa.output_stream(file_path) as stream: |
| stream.write(data) |
| with open(str(file_path), 'rb') as f: |
| assert f.read() == data |
| |
| check_data(file_path, data) |
| check_data(str(file_path), data) |
| check_data(pathlib.Path(str(file_path)), data) |
| |
| |
| def test_output_stream_file_path_compressed(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| file_path = tmpdir / 'output_stream.gz' |
| |
| def check_data(file_path, data, **kwargs): |
| with pa.output_stream(file_path, **kwargs) as stream: |
| stream.write(data) |
| with open(str(file_path), 'rb') as f: |
| return f.read() |
| |
| assert gzip_decompress(check_data(file_path, data)) == data |
| assert gzip_decompress(check_data(str(file_path), data)) == data |
| assert gzip_decompress( |
| check_data(pathlib.Path(str(file_path)), data)) == data |
| |
| assert gzip_decompress( |
| check_data(file_path, data, compression='gzip')) == data |
| assert check_data(file_path, data, compression=None) == data |
| |
| with pytest.raises(ValueError, match='Unrecognized compression type'): |
| assert check_data(file_path, data, compression='rabbit') == data |
| |
| |
| def test_output_stream_file_path_buffered(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| file_path = tmpdir / 'output_stream.buffered' |
| |
| def check_data(file_path, data, **kwargs): |
| with pa.output_stream(file_path, **kwargs) as stream: |
| stream.write(data) |
| with open(str(file_path), 'rb') as f: |
| return f.read() |
| |
| unbuffered_stream = pa.output_stream(file_path, buffer_size=0) |
| assert isinstance(unbuffered_stream, pa.OSFile) |
| |
| msg = 'Buffer size must be larger than zero' |
| with pytest.raises(ValueError, match=msg): |
| assert check_data(file_path, data, buffer_size=-128) == data |
| |
| assert check_data(file_path, data, buffer_size=32) == data |
| assert check_data(file_path, data, buffer_size=1024) == data |
| assert check_data(str(file_path), data, buffer_size=32) == data |
| |
| result = check_data(pathlib.Path(str(file_path)), data, buffer_size=32) |
| assert result == data |
| |
| |
| def test_output_stream_file_path_compressed_and_buffered(tmpdir): |
| data = b"some test data\n" * 100 + b"eof\n" |
| file_path = tmpdir / 'output_stream_compressed_and_buffered.gz' |
| |
| def check_data(file_path, data, **kwargs): |
| with pa.output_stream(file_path, **kwargs) as stream: |
| stream.write(data) |
| with open(str(file_path), 'rb') as f: |
| return f.read() |
| |
| result = check_data(file_path, data, buffer_size=32) |
| assert gzip_decompress(result) == data |
| |
| result = check_data(file_path, data, buffer_size=1024) |
| assert gzip_decompress(result) == data |
| |
| result = check_data(file_path, data, buffer_size=1024, compression='gzip') |
| assert gzip_decompress(result) == data |
| |
| |
| def test_output_stream_python_file(tmpdir): |
| data = b"some test data\n" * 10 + b"eof\n" |
| |
| def check_data(data, **kwargs): |
| # XXX cannot use BytesIO because stream.close() is necessary |
| # to finish writing compressed data, but it will also close the |
| # underlying BytesIO |
| fn = str(tmpdir / 'output_stream_file') |
| with open(fn, 'wb') as f: |
| with pa.output_stream(f, **kwargs) as stream: |
| stream.write(data) |
| with open(fn, 'rb') as f: |
| return f.read() |
| |
| assert check_data(data) == data |
| assert gzip_decompress(check_data(data, compression='gzip')) == data |
| |
| |
| def test_output_stream_errors(tmpdir): |
| buf = memoryview(bytearray()) |
| with pytest.raises(ValueError): |
| pa.output_stream(buf, compression="foo") |
| |
| for arg in [bytearray(), StringIO()]: |
| with pytest.raises(TypeError): |
| pa.output_stream(arg) |
| |
| fn = str(tmpdir / 'new_file') |
| with open(fn, 'wb') as f: |
| pass |
| with open(fn, 'rb') as f: |
| with pytest.raises(TypeError, match="writable file expected"): |
| pa.output_stream(f) |