| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from __future__ import absolute_import |
| from __future__ import division |
| from __future__ import print_function |
| |
| import multiprocessing |
| import os |
| import pytest |
| import random |
| import signal |
| import struct |
| import subprocess |
| import sys |
| import time |
| |
| import numpy as np |
| import pyarrow as pa |
| |
| |
| DEFAULT_PLASMA_STORE_MEMORY = 10 ** 8 |
| USE_VALGRIND = os.getenv("PLASMA_VALGRIND") == "1" |
| EXTERNAL_STORE = "hashtable://test" |
| SMALL_OBJECT_SIZE = 9000 |
| |
| |
| def random_name(): |
| return str(random.randint(0, 99999999)) |
| |
| |
| def random_object_id(): |
| import pyarrow.plasma as plasma |
| return plasma.ObjectID(np.random.bytes(20)) |
| |
| |
| def generate_metadata(length): |
| metadata = bytearray(length) |
| if length > 0: |
| metadata[0] = random.randint(0, 255) |
| metadata[-1] = random.randint(0, 255) |
| for _ in range(100): |
| metadata[random.randint(0, length - 1)] = random.randint(0, 255) |
| return metadata |
| |
| |
| def write_to_data_buffer(buff, length): |
| array = np.frombuffer(buff, dtype="uint8") |
| if length > 0: |
| array[0] = random.randint(0, 255) |
| array[-1] = random.randint(0, 255) |
| for _ in range(100): |
| array[random.randint(0, length - 1)] = random.randint(0, 255) |
| |
| |
| def create_object_with_id(client, object_id, data_size, metadata_size, |
| seal=True): |
| metadata = generate_metadata(metadata_size) |
| memory_buffer = client.create(object_id, data_size, metadata) |
| write_to_data_buffer(memory_buffer, data_size) |
| if seal: |
| client.seal(object_id) |
| return memory_buffer, metadata |
| |
| |
| def create_object(client, data_size, metadata_size=0, seal=True): |
| object_id = random_object_id() |
| memory_buffer, metadata = create_object_with_id(client, object_id, |
| data_size, metadata_size, |
| seal=seal) |
| return object_id, memory_buffer, metadata |
| |
| |
| def assert_get_object_equal(unit_test, client1, client2, object_id, |
| memory_buffer=None, metadata=None): |
| import pyarrow.plasma as plasma |
| client1_buff = client1.get_buffers([object_id])[0] |
| client2_buff = client2.get_buffers([object_id])[0] |
| client1_metadata = client1.get_metadata([object_id])[0] |
| client2_metadata = client2.get_metadata([object_id])[0] |
| assert len(client1_buff) == len(client2_buff) |
| assert len(client1_metadata) == len(client2_metadata) |
| # Check that the buffers from the two clients are the same. |
| assert plasma.buffers_equal(client1_buff, client2_buff) |
| # Check that the metadata buffers from the two clients are the same. |
| assert plasma.buffers_equal(client1_metadata, client2_metadata) |
| # If a reference buffer was provided, check that it is the same as well. |
| if memory_buffer is not None: |
| assert plasma.buffers_equal(memory_buffer, client1_buff) |
| # If reference metadata was provided, check that it is the same as well. |
| if metadata is not None: |
| assert plasma.buffers_equal(metadata, client1_metadata) |
| |
| |
| @pytest.mark.plasma |
| class TestPlasmaClient(object): |
| |
| def setup_method(self, test_method): |
| import pyarrow.plasma as plasma |
| # Start Plasma store. |
| self.plasma_store_ctx = plasma.start_plasma_store( |
| plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY, |
| use_valgrind=USE_VALGRIND) |
| self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() |
| # Connect to Plasma. |
| self.plasma_client = plasma.connect(self.plasma_store_name) |
| self.plasma_client2 = plasma.connect(self.plasma_store_name) |
| |
| def teardown_method(self, test_method): |
| try: |
| # Check that the Plasma store is still alive. |
| assert self.p.poll() is None |
| # Ensure Valgrind and/or coverage have a clean exit |
| # Valgrind misses SIGTERM if it is delivered before the |
| # event loop is ready; this race condition is mitigated |
| # but not solved by time.sleep(). |
| if USE_VALGRIND: |
| time.sleep(1.0) |
| self.p.send_signal(signal.SIGTERM) |
| if sys.version_info >= (3, 3): |
| self.p.wait(timeout=5) |
| else: |
| self.p.wait() |
| assert self.p.returncode == 0 |
| finally: |
| self.plasma_store_ctx.__exit__(None, None, None) |
| |
| def test_connection_failure_raises_exception(self): |
| import pyarrow.plasma as plasma |
| # ARROW-1264 |
| with pytest.raises(IOError): |
| plasma.connect('unknown-store-name', num_retries=1) |
| |
| def test_create(self): |
| # Create an object id string. |
| object_id = random_object_id() |
| # Create a new buffer and write to it. |
| length = 50 |
| memory_buffer = np.frombuffer(self.plasma_client.create(object_id, |
| length), |
| dtype="uint8") |
| for i in range(length): |
| memory_buffer[i] = i % 256 |
| # Seal the object. |
| self.plasma_client.seal(object_id) |
| # Get the object. |
| memory_buffer = np.frombuffer( |
| self.plasma_client.get_buffers([object_id])[0], dtype="uint8") |
| for i in range(length): |
| assert memory_buffer[i] == i % 256 |
| |
| def test_create_with_metadata(self): |
| for length in range(0, 1000, 3): |
| # Create an object id string. |
| object_id = random_object_id() |
| # Create a random metadata string. |
| metadata = generate_metadata(length) |
| # Create a new buffer and write to it. |
| memory_buffer = np.frombuffer(self.plasma_client.create(object_id, |
| length, |
| metadata), |
| dtype="uint8") |
| for i in range(length): |
| memory_buffer[i] = i % 256 |
| # Seal the object. |
| self.plasma_client.seal(object_id) |
| # Get the object. |
| memory_buffer = np.frombuffer( |
| self.plasma_client.get_buffers([object_id])[0], dtype="uint8") |
| for i in range(length): |
| assert memory_buffer[i] == i % 256 |
| # Get the metadata. |
| metadata_buffer = np.frombuffer( |
| self.plasma_client.get_metadata([object_id])[0], dtype="uint8") |
| assert len(metadata) == len(metadata_buffer) |
| for i in range(len(metadata)): |
| assert metadata[i] == metadata_buffer[i] |
| |
| def test_create_existing(self): |
| # This test is partially used to test the code path in which we create |
| # an object with an ID that already exists |
| length = 100 |
| for _ in range(1000): |
| object_id = random_object_id() |
| self.plasma_client.create(object_id, length, |
| generate_metadata(length)) |
| try: |
| self.plasma_client.create(object_id, length, |
| generate_metadata(length)) |
| # TODO(pcm): Introduce a more specific error type here. |
| except pa.lib.ArrowException: |
| pass |
| else: |
| assert False |
| |
| def test_create_and_seal(self): |
| |
| # Create a bunch of objects. |
| object_ids = [] |
| for i in range(1000): |
| object_id = random_object_id() |
| object_ids.append(object_id) |
| self.plasma_client.create_and_seal(object_id, i * b'a', i * b'b') |
| |
| for i in range(1000): |
| [data_tuple] = self.plasma_client.get_buffers([object_ids[i]], |
| with_meta=True) |
| assert data_tuple[1].to_pybytes() == i * b'a' |
| assert (self.plasma_client.get_metadata( |
| [object_ids[i]])[0].to_pybytes() |
| == i * b'b') |
| |
| # Make sure that creating the same object twice raises an exception. |
| object_id = random_object_id() |
| self.plasma_client.create_and_seal(object_id, b'a', b'b') |
| with pytest.raises(pa.PlasmaObjectExists): |
| self.plasma_client.create_and_seal(object_id, b'a', b'b') |
| |
| # Make sure that these objects can be evicted. |
| big_object = DEFAULT_PLASMA_STORE_MEMORY // 10 * b'a' |
| object_ids = [] |
| for _ in range(20): |
| object_id = random_object_id() |
| object_ids.append(object_id) |
| self.plasma_client.create_and_seal(random_object_id(), big_object, |
| big_object) |
| for i in range(10): |
| assert not self.plasma_client.contains(object_ids[i]) |
| |
| def test_get(self): |
| num_object_ids = 60 |
| # Test timing out of get with various timeouts. |
| for timeout in [0, 10, 100, 1000]: |
| object_ids = [random_object_id() for _ in range(num_object_ids)] |
| results = self.plasma_client.get_buffers(object_ids, |
| timeout_ms=timeout) |
| assert results == num_object_ids * [None] |
| |
| data_buffers = [] |
| metadata_buffers = [] |
| for i in range(num_object_ids): |
| if i % 2 == 0: |
| data_buffer, metadata_buffer = create_object_with_id( |
| self.plasma_client, object_ids[i], 2000, 2000) |
| data_buffers.append(data_buffer) |
| metadata_buffers.append(metadata_buffer) |
| |
| # Test timing out from some but not all get calls with various |
| # timeouts. |
| for timeout in [0, 10, 100, 1000]: |
| data_results = self.plasma_client.get_buffers(object_ids, |
| timeout_ms=timeout) |
| # metadata_results = self.plasma_client.get_metadata( |
| # object_ids, timeout_ms=timeout) |
| for i in range(num_object_ids): |
| if i % 2 == 0: |
| array1 = np.frombuffer(data_buffers[i // 2], dtype="uint8") |
| array2 = np.frombuffer(data_results[i], dtype="uint8") |
| np.testing.assert_equal(array1, array2) |
| # TODO(rkn): We should compare the metadata as well. But |
| # currently the types are different (e.g., memoryview |
| # versus bytearray). |
| # assert plasma.buffers_equal( |
| # metadata_buffers[i // 2], metadata_results[i]) |
| else: |
| assert results[i] is None |
| |
| # Test trying to get an object that was created by the same client but |
| # not sealed. |
| object_id = random_object_id() |
| self.plasma_client.create(object_id, 10, b"metadata") |
| assert self.plasma_client.get_buffers( |
| [object_id], timeout_ms=0, with_meta=True)[0][1] is None |
| assert self.plasma_client.get_buffers( |
| [object_id], timeout_ms=1, with_meta=True)[0][1] is None |
| self.plasma_client.seal(object_id) |
| assert self.plasma_client.get_buffers( |
| [object_id], timeout_ms=0, with_meta=True)[0][1]is not None |
| |
| def test_buffer_lifetime(self): |
| # ARROW-2195 |
| arr = pa.array([1, 12, 23, 3, 34], pa.int32()) |
| batch = pa.RecordBatch.from_arrays([arr], ['field1']) |
| |
| # Serialize RecordBatch into Plasma store |
| sink = pa.MockOutputStream() |
| writer = pa.RecordBatchStreamWriter(sink, batch.schema) |
| writer.write_batch(batch) |
| writer.close() |
| |
| object_id = random_object_id() |
| data_buffer = self.plasma_client.create(object_id, sink.size()) |
| stream = pa.FixedSizeBufferWriter(data_buffer) |
| writer = pa.RecordBatchStreamWriter(stream, batch.schema) |
| writer.write_batch(batch) |
| writer.close() |
| self.plasma_client.seal(object_id) |
| del data_buffer |
| |
| # Unserialize RecordBatch from Plasma store |
| [data_buffer] = self.plasma_client2.get_buffers([object_id]) |
| reader = pa.RecordBatchStreamReader(data_buffer) |
| read_batch = reader.read_next_batch() |
| # Lose reference to returned buffer. The RecordBatch must still |
| # be backed by valid memory. |
| del data_buffer, reader |
| |
| assert read_batch.equals(batch) |
| |
| def test_put_and_get(self): |
| for value in [["hello", "world", 3, 1.0], None, "hello"]: |
| object_id = self.plasma_client.put(value) |
| [result] = self.plasma_client.get([object_id]) |
| assert result == value |
| |
| result = self.plasma_client.get(object_id) |
| assert result == value |
| |
| object_id = random_object_id() |
| [result] = self.plasma_client.get([object_id], timeout_ms=0) |
| assert result == pa.plasma.ObjectNotAvailable |
| |
| def test_put_and_get_raw_buffer(self): |
| temp_id = random_object_id() |
| use_meta = b"RAW" |
| |
| def deserialize_or_output(data_tuple): |
| if data_tuple[0] == use_meta: |
| return data_tuple[1].to_pybytes() |
| else: |
| if data_tuple[1] is None: |
| return pa.plasma.ObjectNotAvailable |
| else: |
| return pa.deserialize(data_tuple[1]) |
| |
| for value in [b"Bytes Test", temp_id.binary(), 10 * b"\x00", 123]: |
| if isinstance(value, bytes): |
| object_id = self.plasma_client.put_raw_buffer( |
| value, metadata=use_meta) |
| else: |
| object_id = self.plasma_client.put(value) |
| [result] = self.plasma_client.get_buffers([object_id], |
| with_meta=True) |
| result = deserialize_or_output(result) |
| assert result == value |
| |
| object_id = random_object_id() |
| [result] = self.plasma_client.get_buffers([object_id], |
| timeout_ms=0, |
| with_meta=True) |
| result = deserialize_or_output(result) |
| assert result == pa.plasma.ObjectNotAvailable |
| |
| def test_put_and_get_serialization_context(self): |
| |
| class CustomType(object): |
| def __init__(self, val): |
| self.val = val |
| |
| val = CustomType(42) |
| |
| with pytest.raises(pa.ArrowSerializationError): |
| self.plasma_client.put(val) |
| |
| serialization_context = pa.SerializationContext() |
| serialization_context.register_type(CustomType, 20*"\x00") |
| |
| object_id = self.plasma_client.put( |
| val, None, serialization_context=serialization_context) |
| |
| with pytest.raises(pa.ArrowSerializationError): |
| result = self.plasma_client.get(object_id) |
| |
| result = self.plasma_client.get( |
| object_id, -1, serialization_context=serialization_context) |
| assert result.val == val.val |
| |
| def test_store_arrow_objects(self): |
| data = np.random.randn(10, 4) |
| # Write an arrow object. |
| object_id = random_object_id() |
| tensor = pa.Tensor.from_numpy(data) |
| data_size = pa.get_tensor_size(tensor) |
| buf = self.plasma_client.create(object_id, data_size) |
| stream = pa.FixedSizeBufferWriter(buf) |
| pa.write_tensor(tensor, stream) |
| self.plasma_client.seal(object_id) |
| # Read the arrow object. |
| [tensor] = self.plasma_client.get_buffers([object_id]) |
| reader = pa.BufferReader(tensor) |
| array = pa.read_tensor(reader).to_numpy() |
| # Assert that they are equal. |
| np.testing.assert_equal(data, array) |
| |
| @pytest.mark.pandas |
| def test_store_pandas_dataframe(self): |
| import pandas as pd |
| import pyarrow.plasma as plasma |
| d = {'one': pd.Series([1., 2., 3.], index=['a', 'b', 'c']), |
| 'two': pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])} |
| df = pd.DataFrame(d) |
| |
| # Write the DataFrame. |
| record_batch = pa.RecordBatch.from_pandas(df) |
| # Determine the size. |
| s = pa.MockOutputStream() |
| stream_writer = pa.RecordBatchStreamWriter(s, record_batch.schema) |
| stream_writer.write_batch(record_batch) |
| data_size = s.size() |
| object_id = plasma.ObjectID(np.random.bytes(20)) |
| |
| buf = self.plasma_client.create(object_id, data_size) |
| stream = pa.FixedSizeBufferWriter(buf) |
| stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema) |
| stream_writer.write_batch(record_batch) |
| |
| self.plasma_client.seal(object_id) |
| |
| # Read the DataFrame. |
| [data] = self.plasma_client.get_buffers([object_id]) |
| reader = pa.RecordBatchStreamReader(pa.BufferReader(data)) |
| result = reader.read_next_batch().to_pandas() |
| |
| pd.util.testing.assert_frame_equal(df, result) |
| |
| def test_pickle_object_ids(self): |
| # This can be used for sharing object IDs between processes. |
| import pickle |
| object_id = random_object_id() |
| data = pickle.dumps(object_id) |
| object_id2 = pickle.loads(data) |
| assert object_id == object_id2 |
| |
| def test_store_full(self): |
| # The store is started with 1GB, so make sure that create throws an |
| # exception when it is full. |
| def assert_create_raises_plasma_full(unit_test, size): |
| partial_size = np.random.randint(size) |
| try: |
| _, memory_buffer, _ = create_object(unit_test.plasma_client, |
| partial_size, |
| size - partial_size) |
| # TODO(pcm): More specific error here. |
| except pa.lib.ArrowException: |
| pass |
| else: |
| # For some reason the above didn't throw an exception, so fail. |
| assert False |
| |
| PERCENT = DEFAULT_PLASMA_STORE_MEMORY // 100 |
| |
| # Create a list to keep some of the buffers in scope. |
| memory_buffers = [] |
| _, memory_buffer, _ = create_object(self.plasma_client, 50 * PERCENT) |
| memory_buffers.append(memory_buffer) |
| # Remaining space is 50%. Make sure that we can't create an |
| # object of size 50% + 1, but we can create one of size 20%. |
| assert_create_raises_plasma_full( |
| self, 50 * PERCENT + SMALL_OBJECT_SIZE) |
| _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) |
| del memory_buffer |
| _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) |
| del memory_buffer |
| assert_create_raises_plasma_full( |
| self, 50 * PERCENT + SMALL_OBJECT_SIZE) |
| |
| _, memory_buffer, _ = create_object(self.plasma_client, 20 * PERCENT) |
| memory_buffers.append(memory_buffer) |
| # Remaining space is 30%. |
| assert_create_raises_plasma_full( |
| self, 30 * PERCENT + SMALL_OBJECT_SIZE) |
| |
| _, memory_buffer, _ = create_object(self.plasma_client, 10 * PERCENT) |
| memory_buffers.append(memory_buffer) |
| # Remaining space is 20%. |
| assert_create_raises_plasma_full( |
| self, 20 * PERCENT + SMALL_OBJECT_SIZE) |
| |
| def test_contains(self): |
| fake_object_ids = [random_object_id() for _ in range(100)] |
| real_object_ids = [random_object_id() for _ in range(100)] |
| for object_id in real_object_ids: |
| assert self.plasma_client.contains(object_id) is False |
| self.plasma_client.create(object_id, 100) |
| self.plasma_client.seal(object_id) |
| assert self.plasma_client.contains(object_id) |
| for object_id in fake_object_ids: |
| assert not self.plasma_client.contains(object_id) |
| for object_id in real_object_ids: |
| assert self.plasma_client.contains(object_id) |
| |
| def test_hash(self): |
| # Check the hash of an object that doesn't exist. |
| object_id1 = random_object_id() |
| try: |
| self.plasma_client.hash(object_id1) |
| # TODO(pcm): Introduce a more specific error type here |
| except pa.lib.ArrowException: |
| pass |
| else: |
| assert False |
| |
| length = 1000 |
| # Create a random object, and check that the hash function always |
| # returns the same value. |
| metadata = generate_metadata(length) |
| memory_buffer = np.frombuffer(self.plasma_client.create(object_id1, |
| length, |
| metadata), |
| dtype="uint8") |
| for i in range(length): |
| memory_buffer[i] = i % 256 |
| self.plasma_client.seal(object_id1) |
| assert (self.plasma_client.hash(object_id1) == |
| self.plasma_client.hash(object_id1)) |
| |
| # Create a second object with the same value as the first, and check |
| # that their hashes are equal. |
| object_id2 = random_object_id() |
| memory_buffer = np.frombuffer(self.plasma_client.create(object_id2, |
| length, |
| metadata), |
| dtype="uint8") |
| for i in range(length): |
| memory_buffer[i] = i % 256 |
| self.plasma_client.seal(object_id2) |
| assert (self.plasma_client.hash(object_id1) == |
| self.plasma_client.hash(object_id2)) |
| |
| # Create a third object with a different value from the first two, and |
| # check that its hash is different. |
| object_id3 = random_object_id() |
| metadata = generate_metadata(length) |
| memory_buffer = np.frombuffer(self.plasma_client.create(object_id3, |
| length, |
| metadata), |
| dtype="uint8") |
| for i in range(length): |
| memory_buffer[i] = (i + 1) % 256 |
| self.plasma_client.seal(object_id3) |
| assert (self.plasma_client.hash(object_id1) != |
| self.plasma_client.hash(object_id3)) |
| |
| # Create a fourth object with the same value as the third, but |
| # different metadata. Check that its hash is different from any of the |
| # previous three. |
| object_id4 = random_object_id() |
| metadata4 = generate_metadata(length) |
| memory_buffer = np.frombuffer(self.plasma_client.create(object_id4, |
| length, |
| metadata4), |
| dtype="uint8") |
| for i in range(length): |
| memory_buffer[i] = (i + 1) % 256 |
| self.plasma_client.seal(object_id4) |
| assert (self.plasma_client.hash(object_id1) != |
| self.plasma_client.hash(object_id4)) |
| assert (self.plasma_client.hash(object_id3) != |
| self.plasma_client.hash(object_id4)) |
| |
| def test_many_hashes(self): |
| hashes = [] |
| length = 2 ** 10 |
| |
| for i in range(256): |
| object_id = random_object_id() |
| memory_buffer = np.frombuffer(self.plasma_client.create(object_id, |
| length), |
| dtype="uint8") |
| for j in range(length): |
| memory_buffer[j] = i |
| self.plasma_client.seal(object_id) |
| hashes.append(self.plasma_client.hash(object_id)) |
| |
| # Create objects of varying length. Each pair has two bits different. |
| for i in range(length): |
| object_id = random_object_id() |
| memory_buffer = np.frombuffer(self.plasma_client.create(object_id, |
| length), |
| dtype="uint8") |
| for j in range(length): |
| memory_buffer[j] = 0 |
| memory_buffer[i] = 1 |
| self.plasma_client.seal(object_id) |
| hashes.append(self.plasma_client.hash(object_id)) |
| |
| # Create objects of varying length, all with value 0. |
| for i in range(length): |
| object_id = random_object_id() |
| memory_buffer = np.frombuffer(self.plasma_client.create(object_id, |
| i), |
| dtype="uint8") |
| for j in range(i): |
| memory_buffer[j] = 0 |
| self.plasma_client.seal(object_id) |
| hashes.append(self.plasma_client.hash(object_id)) |
| |
| # Check that all hashes were unique. |
| assert len(set(hashes)) == 256 + length + length |
| |
| # def test_individual_delete(self): |
| # length = 100 |
| # # Create an object id string. |
| # object_id = random_object_id() |
| # # Create a random metadata string. |
| # metadata = generate_metadata(100) |
| # # Create a new buffer and write to it. |
| # memory_buffer = self.plasma_client.create(object_id, length, |
| # metadata) |
| # for i in range(length): |
| # memory_buffer[i] = chr(i % 256) |
| # # Seal the object. |
| # self.plasma_client.seal(object_id) |
| # # Check that the object is present. |
| # assert self.plasma_client.contains(object_id) |
| # # Delete the object. |
| # self.plasma_client.delete(object_id) |
| # # Make sure the object is no longer present. |
| # self.assertFalse(self.plasma_client.contains(object_id)) |
| # |
| # def test_delete(self): |
| # # Create some objects. |
| # object_ids = [random_object_id() for _ in range(100)] |
| # for object_id in object_ids: |
| # length = 100 |
| # # Create a random metadata string. |
| # metadata = generate_metadata(100) |
| # # Create a new buffer and write to it. |
| # memory_buffer = self.plasma_client.create(object_id, length, |
| # metadata) |
| # for i in range(length): |
| # memory_buffer[i] = chr(i % 256) |
| # # Seal the object. |
| # self.plasma_client.seal(object_id) |
| # # Check that the object is present. |
| # assert self.plasma_client.contains(object_id) |
| # |
| # # Delete the objects and make sure they are no longer present. |
| # for object_id in object_ids: |
| # # Delete the object. |
| # self.plasma_client.delete(object_id) |
| # # Make sure the object is no longer present. |
| # self.assertFalse(self.plasma_client.contains(object_id)) |
| |
| def test_illegal_functionality(self): |
| # Create an object id string. |
| object_id = random_object_id() |
| # Create a new buffer and write to it. |
| length = 1000 |
| memory_buffer = self.plasma_client.create(object_id, length) |
| # Make sure we cannot access memory out of bounds. |
| with pytest.raises(Exception): |
| memory_buffer[length] |
| # Seal the object. |
| self.plasma_client.seal(object_id) |
| # This test is commented out because it currently fails. |
| # # Make sure the object is ready only now. |
| # def illegal_assignment(): |
| # memory_buffer[0] = chr(0) |
| # with pytest.raises(Exception): |
| # illegal_assignment() |
| # Get the object. |
| memory_buffer = self.plasma_client.get_buffers([object_id])[0] |
| |
| # Make sure the object is read only. |
| def illegal_assignment(): |
| memory_buffer[0] = chr(0) |
| with pytest.raises(Exception): |
| illegal_assignment() |
| |
| def test_evict(self): |
| client = self.plasma_client2 |
| object_id1 = random_object_id() |
| b1 = client.create(object_id1, 1000) |
| client.seal(object_id1) |
| del b1 |
| assert client.evict(1) == 1000 |
| |
| object_id2 = random_object_id() |
| object_id3 = random_object_id() |
| b2 = client.create(object_id2, 999) |
| b3 = client.create(object_id3, 998) |
| client.seal(object_id3) |
| del b3 |
| assert client.evict(1000) == 998 |
| |
| object_id4 = random_object_id() |
| b4 = client.create(object_id4, 997) |
| client.seal(object_id4) |
| del b4 |
| client.seal(object_id2) |
| del b2 |
| assert client.evict(1) == 997 |
| assert client.evict(1) == 999 |
| |
| object_id5 = random_object_id() |
| object_id6 = random_object_id() |
| object_id7 = random_object_id() |
| b5 = client.create(object_id5, 996) |
| b6 = client.create(object_id6, 995) |
| b7 = client.create(object_id7, 994) |
| client.seal(object_id5) |
| client.seal(object_id6) |
| client.seal(object_id7) |
| del b5 |
| del b6 |
| del b7 |
| assert client.evict(2000) == 996 + 995 + 994 |
| |
| # Mitigate valgrind-induced slowness |
| SUBSCRIBE_TEST_SIZES = ([1, 10, 100, 1000] if USE_VALGRIND |
| else [1, 10, 100, 1000, 10000]) |
| |
| def test_subscribe(self): |
| # Subscribe to notifications from the Plasma Store. |
| self.plasma_client.subscribe() |
| for i in self.SUBSCRIBE_TEST_SIZES: |
| object_ids = [random_object_id() for _ in range(i)] |
| metadata_sizes = [np.random.randint(1000) for _ in range(i)] |
| data_sizes = [np.random.randint(1000) for _ in range(i)] |
| for j in range(i): |
| self.plasma_client.create( |
| object_ids[j], data_sizes[j], |
| metadata=bytearray(np.random.bytes(metadata_sizes[j]))) |
| self.plasma_client.seal(object_ids[j]) |
| # Check that we received notifications for all of the objects. |
| for j in range(i): |
| notification_info = self.plasma_client.get_next_notification() |
| recv_objid, recv_dsize, recv_msize = notification_info |
| assert object_ids[j] == recv_objid |
| assert data_sizes[j] == recv_dsize |
| assert metadata_sizes[j] == recv_msize |
| |
| def test_subscribe_socket(self): |
| # Subscribe to notifications from the Plasma Store. |
| self.plasma_client.subscribe() |
| rsock = self.plasma_client.get_notification_socket() |
| for i in self.SUBSCRIBE_TEST_SIZES: |
| # Get notification from socket. |
| object_ids = [random_object_id() for _ in range(i)] |
| metadata_sizes = [np.random.randint(1000) for _ in range(i)] |
| data_sizes = [np.random.randint(1000) for _ in range(i)] |
| |
| for j in range(i): |
| self.plasma_client.create( |
| object_ids[j], data_sizes[j], |
| metadata=bytearray(np.random.bytes(metadata_sizes[j]))) |
| self.plasma_client.seal(object_ids[j]) |
| |
| # Check that we received notifications for all of the objects. |
| for j in range(i): |
| # Assume the plasma store will not be full, |
| # so we always get the data size instead of -1. |
| msg_len, = struct.unpack('L', rsock.recv(8)) |
| content = rsock.recv(msg_len) |
| recv_objid, recv_dsize, recv_msize = ( |
| self.plasma_client.decode_notification(content)) |
| assert object_ids[j] == recv_objid |
| assert data_sizes[j] == recv_dsize |
| assert metadata_sizes[j] == recv_msize |
| |
| def test_subscribe_deletions(self): |
| # Subscribe to notifications from the Plasma Store. We use |
| # plasma_client2 to make sure that all used objects will get evicted |
| # properly. |
| self.plasma_client2.subscribe() |
| for i in self.SUBSCRIBE_TEST_SIZES: |
| object_ids = [random_object_id() for _ in range(i)] |
| # Add 1 to the sizes to make sure we have nonzero object sizes. |
| metadata_sizes = [np.random.randint(1000) + 1 for _ in range(i)] |
| data_sizes = [np.random.randint(1000) + 1 for _ in range(i)] |
| for j in range(i): |
| x = self.plasma_client2.create( |
| object_ids[j], data_sizes[j], |
| metadata=bytearray(np.random.bytes(metadata_sizes[j]))) |
| self.plasma_client2.seal(object_ids[j]) |
| del x |
| # Check that we received notifications for creating all of the |
| # objects. |
| for j in range(i): |
| notification_info = self.plasma_client2.get_next_notification() |
| recv_objid, recv_dsize, recv_msize = notification_info |
| assert object_ids[j] == recv_objid |
| assert data_sizes[j] == recv_dsize |
| assert metadata_sizes[j] == recv_msize |
| |
| # Check that we receive notifications for deleting all objects, as |
| # we evict them. |
| for j in range(i): |
| assert (self.plasma_client2.evict(1) == |
| data_sizes[j] + metadata_sizes[j]) |
| notification_info = self.plasma_client2.get_next_notification() |
| recv_objid, recv_dsize, recv_msize = notification_info |
| assert object_ids[j] == recv_objid |
| assert -1 == recv_dsize |
| assert -1 == recv_msize |
| |
| # Test multiple deletion notifications. The first 9 object IDs have |
| # size 0, and the last has a nonzero size. When Plasma evicts 1 byte, |
| # it will evict all objects, so we should receive deletion |
| # notifications for each. |
| num_object_ids = 10 |
| object_ids = [random_object_id() for _ in range(num_object_ids)] |
| metadata_sizes = [0] * (num_object_ids - 1) |
| data_sizes = [0] * (num_object_ids - 1) |
| metadata_sizes.append(np.random.randint(1000)) |
| data_sizes.append(np.random.randint(1000)) |
| for i in range(num_object_ids): |
| x = self.plasma_client2.create( |
| object_ids[i], data_sizes[i], |
| metadata=bytearray(np.random.bytes(metadata_sizes[i]))) |
| self.plasma_client2.seal(object_ids[i]) |
| del x |
| for i in range(num_object_ids): |
| notification_info = self.plasma_client2.get_next_notification() |
| recv_objid, recv_dsize, recv_msize = notification_info |
| assert object_ids[i] == recv_objid |
| assert data_sizes[i] == recv_dsize |
| assert metadata_sizes[i] == recv_msize |
| assert (self.plasma_client2.evict(1) == |
| data_sizes[-1] + metadata_sizes[-1]) |
| for i in range(num_object_ids): |
| notification_info = self.plasma_client2.get_next_notification() |
| recv_objid, recv_dsize, recv_msize = notification_info |
| assert object_ids[i] == recv_objid |
| assert -1 == recv_dsize |
| assert -1 == recv_msize |
| |
| def test_use_full_memory(self): |
| # Fill the object store up with a large number of small objects and let |
| # them go out of scope. |
| for _ in range(100): |
| create_object( |
| self.plasma_client2, |
| np.random.randint(1, DEFAULT_PLASMA_STORE_MEMORY // 20), 0) |
| # Create large objects that require the full object store size, and |
| # verify that they fit. |
| for _ in range(2): |
| create_object(self.plasma_client2, DEFAULT_PLASMA_STORE_MEMORY, 0) |
| # Verify that an object that is too large does not fit. |
| with pytest.raises(pa.lib.PlasmaStoreFull): |
| create_object(self.plasma_client2, |
| DEFAULT_PLASMA_STORE_MEMORY + SMALL_OBJECT_SIZE, 0) |
| |
| def test_client_death_during_get(self): |
| import pyarrow.plasma as plasma |
| |
| object_id = random_object_id() |
| |
| def client_blocked_in_get(plasma_store_name): |
| client = plasma.connect(self.plasma_store_name) |
| # Try to get an object ID that doesn't exist. This should block. |
| client.get([object_id]) |
| |
| p = multiprocessing.Process(target=client_blocked_in_get, |
| args=(self.plasma_store_name, )) |
| p.start() |
| # Make sure the process is running. |
| time.sleep(0.2) |
| assert p.is_alive() |
| |
| # Kill the client process. |
| p.terminate() |
| # Wait a little for the store to process the disconnect event. |
| time.sleep(0.1) |
| |
| # Create the object. |
| self.plasma_client.put(1, object_id=object_id) |
| |
| # Check that the store is still alive. This will raise an exception if |
| # the store is dead. |
| self.plasma_client.contains(random_object_id()) |
| |
| def test_client_getting_multiple_objects(self): |
| import pyarrow.plasma as plasma |
| |
| object_ids = [random_object_id() for _ in range(10)] |
| |
| def client_get_multiple(plasma_store_name): |
| client = plasma.connect(self.plasma_store_name) |
| # Try to get an object ID that doesn't exist. This should block. |
| client.get(object_ids) |
| |
| p = multiprocessing.Process(target=client_get_multiple, |
| args=(self.plasma_store_name, )) |
| p.start() |
| # Make sure the process is running. |
| time.sleep(0.2) |
| assert p.is_alive() |
| |
| # Create the objects one by one. |
| for object_id in object_ids: |
| self.plasma_client.put(1, object_id=object_id) |
| |
| # Check that the store is still alive. This will raise an exception if |
| # the store is dead. |
| self.plasma_client.contains(random_object_id()) |
| |
| # Make sure that the blocked client finishes. |
| start_time = time.time() |
| while True: |
| if time.time() - start_time > 5: |
| raise Exception("Timing out while waiting for blocked client " |
| "to finish.") |
| if not p.is_alive(): |
| break |
| |
| |
| @pytest.mark.plasma |
| class TestEvictionToExternalStore(object): |
| |
| def setup_method(self, test_method): |
| import pyarrow.plasma as plasma |
| # Start Plasma store. |
| self.plasma_store_ctx = plasma.start_plasma_store( |
| plasma_store_memory=1000 * 1024, |
| use_valgrind=USE_VALGRIND, |
| external_store=EXTERNAL_STORE) |
| self.plasma_store_name, self.p = self.plasma_store_ctx.__enter__() |
| # Connect to Plasma. |
| self.plasma_client = plasma.connect(self.plasma_store_name) |
| |
| def teardown_method(self, test_method): |
| try: |
| # Check that the Plasma store is still alive. |
| assert self.p.poll() is None |
| self.p.send_signal(signal.SIGTERM) |
| if sys.version_info >= (3, 3): |
| self.p.wait(timeout=5) |
| else: |
| self.p.wait() |
| finally: |
| self.plasma_store_ctx.__exit__(None, None, None) |
| |
| def test_eviction(self): |
| client = self.plasma_client |
| |
| object_ids = [random_object_id() for _ in range(0, 20)] |
| data = b'x' * 100 * 1024 |
| metadata = b'' |
| |
| for i in range(0, 20): |
| # Test for object non-existence. |
| assert not client.contains(object_ids[i]) |
| |
| # Create and seal the object. |
| client.create_and_seal(object_ids[i], data, metadata) |
| |
| # Test that the client can get the object. |
| assert client.contains(object_ids[i]) |
| |
| for i in range(0, 20): |
| # Since we are accessing objects sequentially, every object we |
| # access would be a cache "miss" owing to LRU eviction. |
| # Try and access the object from the plasma store first, and then |
| # try external store on failure. This should succeed to fetch the |
| # object. However, it may evict the next few objects. |
| [result] = client.get_buffers([object_ids[i]]) |
| assert result.to_pybytes() == data |
| |
| # Make sure we still cannot fetch objects that do not exist |
| [result] = client.get_buffers([random_object_id()], timeout_ms=100) |
| assert result is None |
| |
| |
| @pytest.mark.plasma |
| def test_object_id_size(): |
| import pyarrow.plasma as plasma |
| with pytest.raises(ValueError): |
| plasma.ObjectID("hello") |
| plasma.ObjectID(20 * b"0") |
| |
| |
| @pytest.mark.plasma |
| def test_object_id_equality_operators(): |
| import pyarrow.plasma as plasma |
| |
| oid1 = plasma.ObjectID(20 * b'0') |
| oid2 = plasma.ObjectID(20 * b'0') |
| oid3 = plasma.ObjectID(19 * b'0' + b'1') |
| |
| assert oid1 == oid2 |
| assert oid2 != oid3 |
| assert oid1 != 'foo' |
| |
| |
| @pytest.mark.xfail(reason="often fails on travis") |
| @pytest.mark.skipif(not os.path.exists("/mnt/hugepages"), |
| reason="requires hugepage support") |
| def test_use_huge_pages(): |
| import pyarrow.plasma as plasma |
| with plasma.start_plasma_store( |
| plasma_store_memory=2*10**9, |
| plasma_directory="/mnt/hugepages", |
| use_hugepages=True) as (plasma_store_name, p): |
| plasma_client = plasma.connect(plasma_store_name) |
| create_object(plasma_client, 10**8) |
| |
| |
| # This is checking to make sure plasma_clients cannot be destroyed |
| # before all the PlasmaBuffers that have handles to them are |
| # destroyed, see ARROW-2448. |
| @pytest.mark.plasma |
| def test_plasma_client_sharing(): |
| import pyarrow.plasma as plasma |
| |
| with plasma.start_plasma_store( |
| plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \ |
| as (plasma_store_name, p): |
| plasma_client = plasma.connect(plasma_store_name) |
| object_id = plasma_client.put(np.zeros(3)) |
| buf = plasma_client.get(object_id) |
| del plasma_client |
| assert (buf == np.zeros(3)).all() |
| del buf # This segfaulted pre ARROW-2448. |
| |
| |
| @pytest.mark.plasma |
| def test_plasma_list(): |
| import pyarrow.plasma as plasma |
| |
| with plasma.start_plasma_store( |
| plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \ |
| as (plasma_store_name, p): |
| plasma_client = plasma.connect(plasma_store_name) |
| |
| # Test sizes |
| u, _, _ = create_object(plasma_client, 11, metadata_size=7, seal=False) |
| l1 = plasma_client.list() |
| assert l1[u]["data_size"] == 11 |
| assert l1[u]["metadata_size"] == 7 |
| |
| # Test ref_count |
| v = plasma_client.put(np.zeros(3)) |
| # Ref count has already been released |
| # XXX flaky test, disabled (ARROW-3344) |
| # l2 = plasma_client.list() |
| # assert l2[v]["ref_count"] == 0 |
| a = plasma_client.get(v) |
| l3 = plasma_client.list() |
| assert l3[v]["ref_count"] == 1 |
| del a |
| |
| # Test state |
| w, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False) |
| l4 = plasma_client.list() |
| assert l4[w]["state"] == "created" |
| plasma_client.seal(w) |
| l5 = plasma_client.list() |
| assert l5[w]["state"] == "sealed" |
| |
| # Test timestamps |
| slack = 1.5 # seconds |
| t1 = time.time() |
| x, _, _ = create_object(plasma_client, 3, metadata_size=0, seal=False) |
| t2 = time.time() |
| l6 = plasma_client.list() |
| assert t1 - slack <= l6[x]["create_time"] <= t2 + slack |
| time.sleep(2.0) |
| t3 = time.time() |
| plasma_client.seal(x) |
| t4 = time.time() |
| l7 = plasma_client.list() |
| assert t3 - t2 - slack <= l7[x]["construct_duration"] |
| assert l7[x]["construct_duration"] <= t4 - t1 + slack |
| |
| |
| @pytest.mark.plasma |
| def test_object_id_randomness(): |
| cmd = "from pyarrow import plasma; print(plasma.ObjectID.from_random())" |
| first_object_id = subprocess.check_output(["python", "-c", cmd]) |
| second_object_id = subprocess.check_output(["python", "-c", cmd]) |
| assert first_object_id != second_object_id |
| |
| |
| @pytest.mark.plasma |
| def test_store_capacity(): |
| import pyarrow.plasma as plasma |
| with plasma.start_plasma_store(plasma_store_memory=10000) as (name, p): |
| plasma_client = plasma.connect(name) |
| assert plasma_client.store_capacity() == 10000 |