feat(python): Add user-facing ArrayStream class (#439)

This class provides an interface to the ArrowArrayStream whose methods
return `Schema`s and `Array`s. It also provides a more ergonomic
interface to the `ipc.Stream` interface.

```python
import nanoarrow as na

na.ArrayStream([1, 2, 3], na.int32())
#> <nanoarrow.ArrayStream: Schema(INT32)>

na.ArrayStream([1, 2, 3], na.int32()).read_all()
#> nanoarrow.Array<int32>[3]
#> 1
#> 2
#> 3

url = "https://github.com/apache/arrow-experiments/raw/main/data/arrow-commits/arrow-commits.arrows"
na.ArrayStream.from_url(url).read_all()
#> nanoarrow.Array<struct<commit: string, time: timestamp('us', 'UTC'), ...>[15487]
#> {'commit': '49cdb0fe4e98fda19031c864a18e6156c6edbf3c', 'time': datetime.datet...
#> {'commit': '1d966e98e41ce817d1f8c5159c0b9caa4de75816', 'time': datetime.datet...
#> {'commit': '96f26a89bd73997f7532643cdb27d04b70971530', 'time': datetime.datet...
#> {'commit': 'ee1a8c39a55f3543a82fed900dadca791f6e9f88', 'time': datetime.datet...
#> {'commit': '3d467ac7bfae03cf2db09807054c5672e1959aec', 'time': datetime.datet...
#> {'commit': 'ef6ea6beed071ed070daf03508f4c14b4072d6f2', 'time': datetime.datet...
#> {'commit': '53e0c745ad491af98a5bf18b67541b12d7790beb', 'time': datetime.datet...
#> {'commit': '3ba6d286caad328b8572a3b9228045da8c8d2043', 'time': datetime.datet...
#> {'commit': '4ce9a5edd2710fb8bf0c642fd0e3863b01c2ea20', 'time': datetime.datet...
#> {'commit': '2445975162905bd8d9a42ffc9cd0daa0e19d3251', 'time': datetime.datet...
#> ...and 15477 more items
```

---------

Co-authored-by: Dane Pitkin <48041712+danepitkin@users.noreply.github.com>
diff --git a/python/src/nanoarrow/__init__.py b/python/src/nanoarrow/__init__.py
index 1e22093..a86e869 100644
--- a/python/src/nanoarrow/__init__.py
+++ b/python/src/nanoarrow/__init__.py
@@ -74,10 +74,12 @@
     struct,
 )
 from nanoarrow.array import array, Array
+from nanoarrow.array_stream import ArrayStream
 from nanoarrow._version import __version__  # noqa: F401
 
 # Helps Sphinx automatically populate an API reference section
 __all__ = [
+    "ArrayStream",
     "Schema",
     "TimeUnit",
     "Type",
diff --git a/python/src/nanoarrow/array_stream.py b/python/src/nanoarrow/array_stream.py
new file mode 100644
index 0000000..1f03dcc
--- /dev/null
+++ b/python/src/nanoarrow/array_stream.py
@@ -0,0 +1,279 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from functools import cached_property
+from typing import Iterable, Tuple
+
+from nanoarrow._lib import CMaterializedArrayStream
+from nanoarrow._repr_utils import make_class_label
+from nanoarrow.array import Array
+from nanoarrow.c_lib import c_array_stream
+from nanoarrow.iterator import iter_py, iter_tuples
+from nanoarrow.schema import Schema
+
+
+class ArrayStream:
+    """High-level ArrayStream representation
+
+    The ArrayStream is nanoarrow's high-level representation of zero
+    or more contiguous arrays that have not neccessarily been materialized.
+    This is in constrast to the nanoarrow :class:`Array`, which consists
+    of zero or more contiguous arrays but is always fully-materialized.
+
+    The :class:`ArrayStream` is similar to pyarrow's ``RecordBatchReader``
+    except it can also represent streams of non-struct arrays. Its scope
+    maps to that of an``ArrowArrayStream`` as represented by the Arrow C
+    Stream interface.
+
+    Parameters
+    ----------
+    obj : array or array stream-like
+        An array-like or array stream-like object as sanitized by
+        :func:`c_array_stream`.
+    schema : schema-like, optional
+        An optional schema, passed to :func:`c_array_stream`.
+
+    Examples
+    --------
+
+    >>> import nanoarrow as na
+    >>> na.ArrayStream([1, 2, 3], na.int32())
+    <nanoarrow.ArrayStream: Schema(INT32)>
+    """
+
+    def __init__(self, obj, schema=None) -> None:
+        self._c_array_stream = c_array_stream(obj, schema)
+
+    @cached_property
+    def schema(self):
+        """The :class:`Schema` associated with this stream
+
+        >>> import nanoarrow as na
+        >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+        >>> stream.schema
+        Schema(INT32)
+        """
+        return Schema(self._c_array_stream._get_cached_schema())
+
+    def __arrow_c_stream__(self, requested_schema=None):
+        return self._c_array_stream.__arrow_c_stream__(
+            requested_schema=requested_schema
+        )
+
+    def read_all(self) -> Array:
+        """Materialize the entire stream into an :class:`Array`
+
+        >>> import nanoarrow as na
+        >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+        >>> stream.read_all()
+        nanoarrow.Array<int32>[3]
+        1
+        2
+        3
+        """
+        return Array(self._c_array_stream)
+
+    def read_next(self) -> Array:
+        """Materialize the next contiguous :class:`Array` in this stream
+
+        This method raises ``StopIteration`` if there are no more arrays
+        in this stream.
+
+        >>> import nanoarrow as na
+        >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+        >>> stream.read_next()
+        nanoarrow.Array<int32>[3]
+        1
+        2
+        3
+        """
+        c_array = self._c_array_stream.get_next()
+        return Array(CMaterializedArrayStream.from_c_array(c_array))
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.close()
+
+    def close(self) -> None:
+        """Release resources associated with this stream
+
+        Note that it is usually preferred to use the context manager to ensure
+        prompt release of resources (e.g., open files) associated with
+        this stream.
+
+        Examples
+        --------
+        >>> import nanoarrow as na
+        >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+        >>> with stream:
+        ...     pass
+        >>> stream.read_all()
+        Traceback (most recent call last):
+        ...
+        RuntimeError: array stream is released
+
+        >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+        >>> stream.close()
+        >>> stream.read_all()
+        Traceback (most recent call last):
+        ...
+        RuntimeError: array stream is released
+        """
+        self._c_array_stream.release()
+
+    def __iter__(self) -> Iterable[Array]:
+        for c_array in self._c_array_stream:
+            yield Array(CMaterializedArrayStream.from_c_array(c_array))
+
+    def iter_chunks(self) -> Iterable[Array]:
+        """Iterate over contiguous Arrays in this stream
+
+        For the :class:`ArrayStream`, this is the same as iterating over
+        the stream itself.
+
+        Examples
+        --------
+
+        >>> import nanoarrow as na
+        >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+        >>> for chunk in stream:
+        ...     print(chunk)
+        nanoarrow.Array<int32>[3]
+        1
+        2
+        3
+        """
+        return iter(self)
+
+    def iter_py(self) -> Iterable:
+        """Iterate over the default Python representation of each element.
+
+        Examples
+        --------
+
+        >>> import nanoarrow as na
+        >>> stream = na.ArrayStream([1, 2, 3], na.int32())
+        >>> for item in stream.iter_py():
+        ...     print(item)
+        1
+        2
+        3
+        """
+        return iter_py(self)
+
+    def iter_tuples(self) -> Iterable[Tuple]:
+        """Iterate over rows of a struct stream as tuples
+
+        Examples
+        --------
+
+        >>> import nanoarrow as na
+        >>> import pyarrow as pa
+        >>> batch = pa.record_batch(
+        ...     [pa.array([1, 2, 3]), pa.array(["a", "b", "c"])],
+        ...     names=["col1", "col2"]
+        ... )
+        >>> stream = na.ArrayStream(batch)
+        >>> for item in stream.iter_tuples():
+        ...     print(item)
+        (1, 'a')
+        (2, 'b')
+        (3, 'c')
+        """
+        return iter_tuples(self)
+
+    def __repr__(self) -> str:
+        cls = make_class_label(self, "nanoarrow")
+        return f"<{cls}: {self.schema}>"
+
+    @staticmethod
+    def from_readable(obj):
+        """Create an ArrayStream from an IPC stream in a readable file or buffer
+
+        Examples
+        --------
+        >>> import nanoarrow as na
+        >>> from nanoarrow.ipc import Stream
+        >>> with na.ArrayStream.from_readable(Stream.example_bytes()) as stream:
+        ...     stream.read_all()
+        nanoarrow.Array<struct<some_col: int32>>[3]
+        {'some_col': 1}
+        {'some_col': 2}
+        {'some_col': 3}
+        """
+        from nanoarrow.ipc import Stream
+
+        with Stream.from_readable(obj) as ipc_stream:
+            return ArrayStream(ipc_stream)
+
+    @staticmethod
+    def from_path(obj, *args, **kwargs):
+        """Create an ArrayStream from an IPC stream at a local file path
+
+        Examples
+        --------
+        >>> import tempfile
+        >>> import os
+        >>> import nanoarrow as na
+        >>> from nanoarrow.ipc import Stream
+        >>> with tempfile.TemporaryDirectory() as td:
+        ...     path = os.path.join(td, "test.arrows")
+        ...     with open(path, "wb") as f:
+        ...         nbytes = f.write(Stream.example_bytes())
+        ...
+        ...     with na.ArrayStream.from_path(path) as stream:
+        ...         stream.read_all()
+        nanoarrow.Array<struct<some_col: int32>>[3]
+        {'some_col': 1}
+        {'some_col': 2}
+        {'some_col': 3}
+        """
+        from nanoarrow.ipc import Stream
+
+        with Stream.from_path(obj, *args, **kwargs) as ipc_stream:
+            return ArrayStream(ipc_stream)
+
+    @staticmethod
+    def from_url(obj, *args, **kwargs):
+        """Create an ArrayStream from an IPC stream at a URL
+
+        Examples
+        --------
+        >>> import pathlib
+        >>> import tempfile
+        >>> import os
+        >>> import nanoarrow as na
+        >>> from nanoarrow.ipc import Stream
+        >>> with tempfile.TemporaryDirectory() as td:
+        ...     path = os.path.join(td, "test.arrows")
+        ...     with open(path, "wb") as f:
+        ...         nbytes = f.write(Stream.example_bytes())
+        ...
+        ...     uri = pathlib.Path(path).as_uri()
+        ...     with na.ArrayStream.from_url(uri) as stream:
+        ...         stream.read_all()
+        nanoarrow.Array<struct<some_col: int32>>[3]
+        {'some_col': 1}
+        {'some_col': 2}
+        {'some_col': 3}
+        """
+        from nanoarrow.ipc import Stream
+
+        with Stream.from_url(obj, *args, **kwargs) as ipc_stream:
+            return ArrayStream(ipc_stream)
diff --git a/python/src/nanoarrow/ipc.py b/python/src/nanoarrow/ipc.py
index 645c395..5102a60 100644
--- a/python/src/nanoarrow/ipc.py
+++ b/python/src/nanoarrow/ipc.py
@@ -18,7 +18,7 @@
 import io
 
 from nanoarrow._ipc_lib import CIpcInputStream, init_array_stream
-from nanoarrow._lib import CArrayStream
+from nanoarrow._lib import CArrayStream, _obj_is_buffer
 
 from nanoarrow import _repr_utils
 
@@ -77,7 +77,7 @@
 
     @staticmethod
     def from_readable(obj):
-        """Wrap an open readable object as an Arrow stream
+        """Wrap an open readable file or buffer as an Arrow IPC stream
 
         Wraps a readable object (specificially, an object that implements a
         ``readinto()`` method) as a non-owning Stream. Closing ``obj`` remains
@@ -86,29 +86,33 @@
 
         Parameters
         ----------
-        obj : readable file-like
-            An object implementing ``readinto()``.
+        obj : readable file-like or buffer
+            An object implementing the Python buffer protocol or ``readinto()``.
 
         Examples
         --------
 
-        >>> import io
         >>> import nanoarrow as na
         >>> from nanoarrow.ipc import Stream
-        >>> with io.BytesIO(Stream.example_bytes()) as f:
-        ...     inp = Stream.from_readable(f)
-        ...     na.c_array_stream(inp)
+        >>> ipc_stream = Stream.from_readable(Stream.example_bytes())
+        >>> na.c_array_stream(ipc_stream)
         <nanoarrow.c_lib.CArrayStream>
         - get_schema(): struct<some_col: int32>
         """
+        if _obj_is_buffer(obj):
+            close_obj = True
+            obj = io.BytesIO(obj)
+        else:
+            close_obj = False
+
         out = Stream()
-        out._stream = CIpcInputStream.from_readable(obj)
+        out._stream = CIpcInputStream.from_readable(obj, close_obj=close_obj)
         out._desc = repr(obj)
         return out
 
     @staticmethod
     def from_path(obj, *args, **kwargs):
-        """Wrap a local file as an Arrow stream
+        """Wrap a local file as an IPC stream
 
         Wraps a pathlike object (specificially, one that can be passed to ``open()``)
         as an owning Stream. The file will be opened in binary mode and will be closed
@@ -145,7 +149,7 @@
 
     @staticmethod
     def from_url(obj, *args, **kwargs):
-        """Wrap a URL as an Arrow stream
+        """Wrap a URL as an IPC stream
 
         Wraps a URL (specificially, one that can be passed to
         ``urllib.request.urlopen()``) as an owning Stream. The URL will be
@@ -186,7 +190,7 @@
 
     @staticmethod
     def example():
-        """Example Stream
+        """Example IPC Stream
 
         A self-contained example whose value is the serialized version of
         ``DataFrame({"some_col": [1, 2, 3]})``. This may be used for testing
@@ -200,7 +204,7 @@
         >>> Stream.example()
         <nanoarrow.ipc.Stream <_io.BytesIO object at ...>>
         """
-        return Stream.from_readable(io.BytesIO(Stream.example_bytes()))
+        return Stream.from_readable(Stream.example_bytes())
 
     @staticmethod
     def example_bytes():
diff --git a/python/tests/test_array_stream.py b/python/tests/test_array_stream.py
new file mode 100644
index 0000000..035949f
--- /dev/null
+++ b/python/tests/test_array_stream.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+
+import os
+import pathlib
+import tempfile
+
+import pytest
+from nanoarrow.ipc import Stream
+
+import nanoarrow as na
+
+
+def test_array_stream_iter():
+    stream = na.ArrayStream([1, 2, 3], na.int32())
+    assert stream.schema.type == na.Type.INT32
+    stream_iter = iter(stream)
+
+    assert list(next(stream_iter).iter_py()) == [1, 2, 3]
+    with pytest.raises(StopIteration):
+        next(stream_iter)
+
+
+def test_array_stream_read_all():
+    stream = na.ArrayStream([1, 2, 3], na.int32())
+    array = stream.read_all()
+    assert array.schema.type == na.Type.INT32
+    assert list(array.iter_py()) == [1, 2, 3]
+
+
+def test_array_stream_read_next():
+    stream = na.ArrayStream([1, 2, 3], na.int32())
+    array = stream.read_next()
+    assert array.schema.type == na.Type.INT32
+    assert list(array.iter_py()) == [1, 2, 3]
+
+    with pytest.raises(StopIteration):
+        stream.read_next()
+
+
+def test_array_stream_close():
+    stream = na.ArrayStream([], na.int32())
+    stream.close()
+    with pytest.raises(RuntimeError, match="array stream is released"):
+        stream.read_all()
+
+
+def test_array_stream_context_manager():
+    stream = na.ArrayStream([], na.int32())
+    with stream:
+        pass
+
+    with pytest.raises(RuntimeError, match="array stream is released"):
+        stream.read_all()
+
+
+def test_array_stream_from_readable():
+    stream = na.ArrayStream.from_readable(Stream.example_bytes())
+    assert stream.schema.type == na.Type.STRUCT
+    assert list(stream.read_all().iter_tuples()) == [(1,), (2,), (3,)]
+
+
+def test_array_stream_from_path():
+    with tempfile.TemporaryDirectory() as td:
+        path = os.path.join(td, "test.arrows")
+        with open(path, "wb") as f:
+            f.write(Stream.example_bytes())
+
+        stream = na.ArrayStream.from_path(path)
+        assert stream.schema.type == na.Type.STRUCT
+        assert list(stream.read_all().iter_tuples()) == [(1,), (2,), (3,)]
+
+
+def test_array_stream_from_url():
+    with tempfile.TemporaryDirectory() as td:
+        path = os.path.join(td, "test.arrows")
+        with open(path, "wb") as f:
+            f.write(Stream.example_bytes())
+
+        uri = pathlib.Path(path).as_uri()
+        with na.ArrayStream.from_url(uri) as stream:
+            assert stream.schema.type == na.Type.STRUCT
+            assert list(stream.read_all().iter_tuples()) == [(1,), (2,), (3,)]
diff --git a/python/tests/test_ipc.py b/python/tests/test_ipc.py
index d23abdf..dc8ee85 100644
--- a/python/tests/test_ipc.py
+++ b/python/tests/test_ipc.py
@@ -53,6 +53,18 @@
         assert list(batch.child(0).buffer(1)) == [1, 2, 3]
 
 
+def test_ipc_stream_from_readable():
+    with io.BytesIO(Stream.example_bytes()) as f:
+        with Stream.from_readable(f) as input:
+            assert input._is_valid() is True
+            assert "BytesIO object" in repr(input)
+
+            with na.c_array_stream(input) as stream:
+                batches = list(stream)
+                assert len(batches) == 1
+                assert batches[0].length == 3
+
+
 def test_ipc_stream_from_path():
     with tempfile.TemporaryDirectory() as td:
         path = os.path.join(td, "test.arrows")