feat(python): Add user-facing ArrayStream class (#439)
This class provides an interface to the ArrowArrayStream whose methods
return `Schema`s and `Array`s. It also provides a more ergonomic
interface to the `ipc.Stream` interface.
```python
import nanoarrow as na
na.ArrayStream([1, 2, 3], na.int32())
#> <nanoarrow.ArrayStream: Schema(INT32)>
na.ArrayStream([1, 2, 3], na.int32()).read_all()
#> nanoarrow.Array<int32>[3]
#> 1
#> 2
#> 3
url = "https://github.com/apache/arrow-experiments/raw/main/data/arrow-commits/arrow-commits.arrows"
na.ArrayStream.from_url(url).read_all()
#> nanoarrow.Array<struct<commit: string, time: timestamp('us', 'UTC'), ...>[15487]
#> {'commit': '49cdb0fe4e98fda19031c864a18e6156c6edbf3c', 'time': datetime.datet...
#> {'commit': '1d966e98e41ce817d1f8c5159c0b9caa4de75816', 'time': datetime.datet...
#> {'commit': '96f26a89bd73997f7532643cdb27d04b70971530', 'time': datetime.datet...
#> {'commit': 'ee1a8c39a55f3543a82fed900dadca791f6e9f88', 'time': datetime.datet...
#> {'commit': '3d467ac7bfae03cf2db09807054c5672e1959aec', 'time': datetime.datet...
#> {'commit': 'ef6ea6beed071ed070daf03508f4c14b4072d6f2', 'time': datetime.datet...
#> {'commit': '53e0c745ad491af98a5bf18b67541b12d7790beb', 'time': datetime.datet...
#> {'commit': '3ba6d286caad328b8572a3b9228045da8c8d2043', 'time': datetime.datet...
#> {'commit': '4ce9a5edd2710fb8bf0c642fd0e3863b01c2ea20', 'time': datetime.datet...
#> {'commit': '2445975162905bd8d9a42ffc9cd0daa0e19d3251', 'time': datetime.datet...
#> ...and 15477 more items
```
---------
Co-authored-by: Dane Pitkin <48041712+danepitkin@users.noreply.github.com>
diff --git a/python/src/nanoarrow/__init__.py b/python/src/nanoarrow/__init__.py
index 1e22093..a86e869 100644
--- a/python/src/nanoarrow/__init__.py
+++ b/python/src/nanoarrow/__init__.py
@@ -74,10 +74,12 @@
struct,
)
from nanoarrow.array import array, Array
+from nanoarrow.array_stream import ArrayStream
from nanoarrow._version import __version__ # noqa: F401
# Helps Sphinx automatically populate an API reference section
__all__ = [
+ "ArrayStream",
"Schema",
"TimeUnit",
"Type",
diff --git a/python/src/nanoarrow/array_stream.py b/python/src/nanoarrow/array_stream.py
new file mode 100644
index 0000000..1f03dcc
--- /dev/null
+++ b/python/src/nanoarrow/array_stream.py
@@ -0,0 +1,279 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from functools import cached_property
+from typing import Iterable, Tuple
+
+from nanoarrow._lib import CMaterializedArrayStream
+from nanoarrow._repr_utils import make_class_label
+from nanoarrow.array import Array
+from nanoarrow.c_lib import c_array_stream
+from nanoarrow.iterator import iter_py, iter_tuples
+from nanoarrow.schema import Schema
+
+
+class ArrayStream:
+ """High-level ArrayStream representation
+
+ The ArrayStream is nanoarrow's high-level representation of zero
+ or more contiguous arrays that have not neccessarily been materialized.
+ This is in constrast to the nanoarrow :class:`Array`, which consists
+ of zero or more contiguous arrays but is always fully-materialized.
+
+ The :class:`ArrayStream` is similar to pyarrow's ``RecordBatchReader``
+ except it can also represent streams of non-struct arrays. Its scope
+ maps to that of an``ArrowArrayStream`` as represented by the Arrow C
+ Stream interface.
+
+ Parameters
+ ----------
+ obj : array or array stream-like
+ An array-like or array stream-like object as sanitized by
+ :func:`c_array_stream`.
+ schema : schema-like, optional
+ An optional schema, passed to :func:`c_array_stream`.
+
+ Examples
+ --------
+
+ >>> import nanoarrow as na
+ >>> na.ArrayStream([1, 2, 3], na.int32())
+ <nanoarrow.ArrayStream: Schema(INT32)>
+ """
+
+ def __init__(self, obj, schema=None) -> None:
+ self._c_array_stream = c_array_stream(obj, schema)
+
+ @cached_property
+ def schema(self):
+ """The :class:`Schema` associated with this stream
+
+ >>> import nanoarrow as na
+ >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+ >>> stream.schema
+ Schema(INT32)
+ """
+ return Schema(self._c_array_stream._get_cached_schema())
+
+ def __arrow_c_stream__(self, requested_schema=None):
+ return self._c_array_stream.__arrow_c_stream__(
+ requested_schema=requested_schema
+ )
+
+ def read_all(self) -> Array:
+ """Materialize the entire stream into an :class:`Array`
+
+ >>> import nanoarrow as na
+ >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+ >>> stream.read_all()
+ nanoarrow.Array<int32>[3]
+ 1
+ 2
+ 3
+ """
+ return Array(self._c_array_stream)
+
+ def read_next(self) -> Array:
+ """Materialize the next contiguous :class:`Array` in this stream
+
+ This method raises ``StopIteration`` if there are no more arrays
+ in this stream.
+
+ >>> import nanoarrow as na
+ >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+ >>> stream.read_next()
+ nanoarrow.Array<int32>[3]
+ 1
+ 2
+ 3
+ """
+ c_array = self._c_array_stream.get_next()
+ return Array(CMaterializedArrayStream.from_c_array(c_array))
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args, **kwargs):
+ self.close()
+
+ def close(self) -> None:
+ """Release resources associated with this stream
+
+ Note that it is usually preferred to use the context manager to ensure
+ prompt release of resources (e.g., open files) associated with
+ this stream.
+
+ Examples
+ --------
+ >>> import nanoarrow as na
+ >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+ >>> with stream:
+ ... pass
+ >>> stream.read_all()
+ Traceback (most recent call last):
+ ...
+ RuntimeError: array stream is released
+
+ >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+ >>> stream.close()
+ >>> stream.read_all()
+ Traceback (most recent call last):
+ ...
+ RuntimeError: array stream is released
+ """
+ self._c_array_stream.release()
+
+ def __iter__(self) -> Iterable[Array]:
+ for c_array in self._c_array_stream:
+ yield Array(CMaterializedArrayStream.from_c_array(c_array))
+
+ def iter_chunks(self) -> Iterable[Array]:
+ """Iterate over contiguous Arrays in this stream
+
+ For the :class:`ArrayStream`, this is the same as iterating over
+ the stream itself.
+
+ Examples
+ --------
+
+ >>> import nanoarrow as na
+ >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+ >>> for chunk in stream:
+ ... print(chunk)
+ nanoarrow.Array<int32>[3]
+ 1
+ 2
+ 3
+ """
+ return iter(self)
+
+ def iter_py(self) -> Iterable:
+ """Iterate over the default Python representation of each element.
+
+ Examples
+ --------
+
+ >>> import nanoarrow as na
+ >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+ >>> for item in stream.iter_py():
+ ... print(item)
+ 1
+ 2
+ 3
+ """
+ return iter_py(self)
+
+ def iter_tuples(self) -> Iterable[Tuple]:
+ """Iterate over rows of a struct stream as tuples
+
+ Examples
+ --------
+
+ >>> import nanoarrow as na
+ >>> import pyarrow as pa
+ >>> batch = pa.record_batch(
+ ... [pa.array([1, 2, 3]), pa.array(["a", "b", "c"])],
+ ... names=["col1", "col2"]
+ ... )
+ >>> stream = na.ArrayStream(batch)
+ >>> for item in stream.iter_tuples():
+ ... print(item)
+ (1, 'a')
+ (2, 'b')
+ (3, 'c')
+ """
+ return iter_tuples(self)
+
+ def __repr__(self) -> str:
+ cls = make_class_label(self, "nanoarrow")
+ return f"<{cls}: {self.schema}>"
+
+ @staticmethod
+ def from_readable(obj):
+ """Create an ArrayStream from an IPC stream in a readable file or buffer
+
+ Examples
+ --------
+ >>> import nanoarrow as na
+ >>> from nanoarrow.ipc import Stream
+ >>> with na.ArrayStream.from_readable(Stream.example_bytes()) as stream:
+ ... stream.read_all()
+ nanoarrow.Array<struct<some_col: int32>>[3]
+ {'some_col': 1}
+ {'some_col': 2}
+ {'some_col': 3}
+ """
+ from nanoarrow.ipc import Stream
+
+ with Stream.from_readable(obj) as ipc_stream:
+ return ArrayStream(ipc_stream)
+
+ @staticmethod
+ def from_path(obj, *args, **kwargs):
+ """Create an ArrayStream from an IPC stream at a local file path
+
+ Examples
+ --------
+ >>> import tempfile
+ >>> import os
+ >>> import nanoarrow as na
+ >>> from nanoarrow.ipc import Stream
+ >>> with tempfile.TemporaryDirectory() as td:
+ ... path = os.path.join(td, "test.arrows")
+ ... with open(path, "wb") as f:
+ ... nbytes = f.write(Stream.example_bytes())
+ ...
+ ... with na.ArrayStream.from_path(path) as stream:
+ ... stream.read_all()
+ nanoarrow.Array<struct<some_col: int32>>[3]
+ {'some_col': 1}
+ {'some_col': 2}
+ {'some_col': 3}
+ """
+ from nanoarrow.ipc import Stream
+
+ with Stream.from_path(obj, *args, **kwargs) as ipc_stream:
+ return ArrayStream(ipc_stream)
+
+ @staticmethod
+ def from_url(obj, *args, **kwargs):
+ """Create an ArrayStream from an IPC stream at a URL
+
+ Examples
+ --------
+ >>> import pathlib
+ >>> import tempfile
+ >>> import os
+ >>> import nanoarrow as na
+ >>> from nanoarrow.ipc import Stream
+ >>> with tempfile.TemporaryDirectory() as td:
+ ... path = os.path.join(td, "test.arrows")
+ ... with open(path, "wb") as f:
+ ... nbytes = f.write(Stream.example_bytes())
+ ...
+ ... uri = pathlib.Path(path).as_uri()
+ ... with na.ArrayStream.from_url(uri) as stream:
+ ... stream.read_all()
+ nanoarrow.Array<struct<some_col: int32>>[3]
+ {'some_col': 1}
+ {'some_col': 2}
+ {'some_col': 3}
+ """
+ from nanoarrow.ipc import Stream
+
+ with Stream.from_url(obj, *args, **kwargs) as ipc_stream:
+ return ArrayStream(ipc_stream)
diff --git a/python/src/nanoarrow/ipc.py b/python/src/nanoarrow/ipc.py
index 645c395..5102a60 100644
--- a/python/src/nanoarrow/ipc.py
+++ b/python/src/nanoarrow/ipc.py
@@ -18,7 +18,7 @@
import io
from nanoarrow._ipc_lib import CIpcInputStream, init_array_stream
-from nanoarrow._lib import CArrayStream
+from nanoarrow._lib import CArrayStream, _obj_is_buffer
from nanoarrow import _repr_utils
@@ -77,7 +77,7 @@
@staticmethod
def from_readable(obj):
- """Wrap an open readable object as an Arrow stream
+ """Wrap an open readable file or buffer as an Arrow IPC stream
Wraps a readable object (specificially, an object that implements a
``readinto()`` method) as a non-owning Stream. Closing ``obj`` remains
@@ -86,29 +86,33 @@
Parameters
----------
- obj : readable file-like
- An object implementing ``readinto()``.
+ obj : readable file-like or buffer
+ An object implementing the Python buffer protocol or ``readinto()``.
Examples
--------
- >>> import io
>>> import nanoarrow as na
>>> from nanoarrow.ipc import Stream
- >>> with io.BytesIO(Stream.example_bytes()) as f:
- ... inp = Stream.from_readable(f)
- ... na.c_array_stream(inp)
+ >>> ipc_stream = Stream.from_readable(Stream.example_bytes())
+ >>> na.c_array_stream(ipc_stream)
<nanoarrow.c_lib.CArrayStream>
- get_schema(): struct<some_col: int32>
"""
+ if _obj_is_buffer(obj):
+ close_obj = True
+ obj = io.BytesIO(obj)
+ else:
+ close_obj = False
+
out = Stream()
- out._stream = CIpcInputStream.from_readable(obj)
+ out._stream = CIpcInputStream.from_readable(obj, close_obj=close_obj)
out._desc = repr(obj)
return out
@staticmethod
def from_path(obj, *args, **kwargs):
- """Wrap a local file as an Arrow stream
+ """Wrap a local file as an IPC stream
Wraps a pathlike object (specificially, one that can be passed to ``open()``)
as an owning Stream. The file will be opened in binary mode and will be closed
@@ -145,7 +149,7 @@
@staticmethod
def from_url(obj, *args, **kwargs):
- """Wrap a URL as an Arrow stream
+ """Wrap a URL as an IPC stream
Wraps a URL (specificially, one that can be passed to
``urllib.request.urlopen()``) as an owning Stream. The URL will be
@@ -186,7 +190,7 @@
@staticmethod
def example():
- """Example Stream
+ """Example IPC Stream
A self-contained example whose value is the serialized version of
``DataFrame({"some_col": [1, 2, 3]})``. This may be used for testing
@@ -200,7 +204,7 @@
>>> Stream.example()
<nanoarrow.ipc.Stream <_io.BytesIO object at ...>>
"""
- return Stream.from_readable(io.BytesIO(Stream.example_bytes()))
+ return Stream.from_readable(Stream.example_bytes())
@staticmethod
def example_bytes():
diff --git a/python/tests/test_array_stream.py b/python/tests/test_array_stream.py
new file mode 100644
index 0000000..035949f
--- /dev/null
+++ b/python/tests/test_array_stream.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+import pathlib
+import tempfile
+
+import pytest
+from nanoarrow.ipc import Stream
+
+import nanoarrow as na
+
+
+def test_array_stream_iter():
+ stream = na.ArrayStream([1, 2, 3], na.int32())
+ assert stream.schema.type == na.Type.INT32
+ stream_iter = iter(stream)
+
+ assert list(next(stream_iter).iter_py()) == [1, 2, 3]
+ with pytest.raises(StopIteration):
+ next(stream_iter)
+
+
+def test_array_stream_read_all():
+ stream = na.ArrayStream([1, 2, 3], na.int32())
+ array = stream.read_all()
+ assert array.schema.type == na.Type.INT32
+ assert list(array.iter_py()) == [1, 2, 3]
+
+
+def test_array_stream_read_next():
+ stream = na.ArrayStream([1, 2, 3], na.int32())
+ array = stream.read_next()
+ assert array.schema.type == na.Type.INT32
+ assert list(array.iter_py()) == [1, 2, 3]
+
+ with pytest.raises(StopIteration):
+ stream.read_next()
+
+
+def test_array_stream_close():
+ stream = na.ArrayStream([], na.int32())
+ stream.close()
+ with pytest.raises(RuntimeError, match="array stream is released"):
+ stream.read_all()
+
+
+def test_array_stream_context_manager():
+ stream = na.ArrayStream([], na.int32())
+ with stream:
+ pass
+
+ with pytest.raises(RuntimeError, match="array stream is released"):
+ stream.read_all()
+
+
+def test_array_stream_from_readable():
+ stream = na.ArrayStream.from_readable(Stream.example_bytes())
+ assert stream.schema.type == na.Type.STRUCT
+ assert list(stream.read_all().iter_tuples()) == [(1,), (2,), (3,)]
+
+
+def test_array_stream_from_path():
+ with tempfile.TemporaryDirectory() as td:
+ path = os.path.join(td, "test.arrows")
+ with open(path, "wb") as f:
+ f.write(Stream.example_bytes())
+
+ stream = na.ArrayStream.from_path(path)
+ assert stream.schema.type == na.Type.STRUCT
+ assert list(stream.read_all().iter_tuples()) == [(1,), (2,), (3,)]
+
+
+def test_array_stream_from_url():
+ with tempfile.TemporaryDirectory() as td:
+ path = os.path.join(td, "test.arrows")
+ with open(path, "wb") as f:
+ f.write(Stream.example_bytes())
+
+ uri = pathlib.Path(path).as_uri()
+ with na.ArrayStream.from_url(uri) as stream:
+ assert stream.schema.type == na.Type.STRUCT
+ assert list(stream.read_all().iter_tuples()) == [(1,), (2,), (3,)]
diff --git a/python/tests/test_ipc.py b/python/tests/test_ipc.py
index d23abdf..dc8ee85 100644
--- a/python/tests/test_ipc.py
+++ b/python/tests/test_ipc.py
@@ -53,6 +53,18 @@
assert list(batch.child(0).buffer(1)) == [1, 2, 3]
+def test_ipc_stream_from_readable():
+ with io.BytesIO(Stream.example_bytes()) as f:
+ with Stream.from_readable(f) as input:
+ assert input._is_valid() is True
+ assert "BytesIO object" in repr(input)
+
+ with na.c_array_stream(input) as stream:
+ batches = list(stream)
+ assert len(batches) == 1
+ assert batches[0].length == 3
+
+
def test_ipc_stream_from_path():
with tempfile.TemporaryDirectory() as td:
path = os.path.join(td, "test.arrows")