blob: 5125f77109708efbae2357dbdbc4e5153faf9b11 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import io
from nanoarrow._ipc_lib import CIpcInputStream, init_array_stream
from nanoarrow._lib import CArrayStream, _obj_is_buffer
from nanoarrow import _repr_utils
class Stream:
"""Stream of serialized Arrow data
Reads file paths or otherwise readable file objects that contain
serialized Arrow data. Arrow documentation typically refers to this format
as "Arrow IPC" because its origin was as a means to transmit tables between
processes; however, this format can also be written to and read from files
or URLs and is essentially a high-performance equivalent of a CSV file that
does a better job maintaining type fidelity.
Use :staticmethod:`from_readable`, :staticmethod:`from_path`, or
:staticmethod:`from_url` to construct these streams.
Examples
--------
>>> import nanoarrow as na
>>> from nanoarrow.ipc import Stream
>>> with Stream.example() as inp, na.c_array_stream(inp) as stream:
... stream
<nanoarrow.c_lib.CArrayStream>
- get_schema(): struct<some_col: int32>
"""
def __init__(self):
self._stream = None
self._desc = None
def _is_valid(self) -> bool:
return self._stream is not None and self._stream.is_valid()
def __arrow_c_stream__(self, requested_schema=None):
"""Export this stream as an ArrowArrayStream
Implements the Arrow PyCapsule interface by transferring ownership of this
input stream to an ArrowArrayStream wrapped by a PyCapsule.
"""
if not self._is_valid():
raise RuntimeError("nanoarrow.ipc.Stream is no longer valid")
with CArrayStream.allocate() as array_stream:
init_array_stream(self._stream, array_stream._addr())
array_stream._get_cached_schema()
return array_stream.__arrow_c_stream__(requested_schema=requested_schema)
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
if self._stream is not None:
self._stream.release()
@staticmethod
def from_readable(obj):
"""Wrap an open readable file or buffer as an Arrow IPC stream
Wraps a readable object (specificially, an object that implements a
``readinto()`` method) as a non-owning Stream. Closing ``obj`` remains
the caller's responsibility: neither this stream nor the resulting array
stream will call ``obj.close()``.
Parameters
----------
obj : readable file-like or buffer
An object implementing the Python buffer protocol or ``readinto()``.
Examples
--------
>>> import nanoarrow as na
>>> from nanoarrow.ipc import Stream
>>> ipc_stream = Stream.from_readable(Stream.example_bytes())
>>> na.c_array_stream(ipc_stream)
<nanoarrow.c_lib.CArrayStream>
- get_schema(): struct<some_col: int32>
"""
if not hasattr(obj, "readinto") and _obj_is_buffer(obj):
close_obj = True
obj = io.BytesIO(obj)
else:
close_obj = False
out = Stream()
out._stream = CIpcInputStream.from_readable(obj, close_obj=close_obj)
out._desc = repr(obj)
return out
@staticmethod
def from_path(obj, *args, **kwargs):
"""Wrap a local file as an IPC stream
Wraps a pathlike object (specificially, one that can be passed to ``open()``)
as an owning Stream. The file will be opened in binary mode and will be closed
when this stream or the resulting array stream is released.
Parameters
----------
obj : path-like
A string or path-like object that can be passed to ``open()``
Examples
--------
>>> import tempfile
>>> import os
>>> import nanoarrow as na
>>> from nanoarrow.ipc import Stream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
... nbytes = f.write(Stream.example_bytes())
...
... with Stream.from_path(path) as inp, na.c_array_stream(inp) as stream:
... stream
<nanoarrow.c_lib.CArrayStream>
- get_schema(): struct<some_col: int32>
"""
out = Stream()
out._stream = CIpcInputStream.from_readable(
open(obj, "rb", *args, **kwargs), close_obj=True
)
out._desc = repr(obj)
return out
@staticmethod
def from_url(obj, *args, **kwargs):
"""Wrap a URL as an IPC stream
Wraps a URL (specificially, one that can be passed to
``urllib.request.urlopen()``) as an owning Stream. The URL will be
closed when this stream or the resulting array stream is released.
Parameters
----------
obj : str
A URL that can be passed to ``urllib.request.urlopen()``
Examples
--------
>>> import pathlib
>>> import tempfile
>>> import os
>>> import nanoarrow as na
>>> from nanoarrow.ipc import Stream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
... nbytes = f.write(Stream.example_bytes())
...
... uri = pathlib.Path(path).as_uri()
... with Stream.from_url(uri) as inp, na.c_array_stream(inp) as stream:
... stream
<nanoarrow.c_lib.CArrayStream>
- get_schema(): struct<some_col: int32>
"""
import urllib.request
out = Stream()
out._stream = CIpcInputStream.from_readable(
urllib.request.urlopen(obj, *args, **kwargs), close_obj=True
)
out._desc = repr(obj)
return out
@staticmethod
def example():
"""Example IPC Stream
A self-contained example whose value is the serialized version of
``DataFrame({"some_col": [1, 2, 3]})``. This may be used for testing
and documentation and is useful because nanoarrow does not implement
a writer to generate test data.
Examples
--------
>>> from nanoarrow.ipc import Stream
>>> Stream.example()
<nanoarrow.ipc.Stream <_io.BytesIO object at ...>>
"""
return Stream.from_readable(Stream.example_bytes())
@staticmethod
def example_bytes():
"""Example stream bytes
The underlying bytes of the :staticmethod:`example` Stream. This is useful
for writing files or creating other types of test input.
Examples
--------
>>> import os
>>> import tempfile
>>> from nanoarrow.ipc import Stream
>>> with tempfile.TemporaryDirectory() as td:
... path = os.path.join(td, "test.arrows")
... with open(path, "wb") as f:
... f.write(Stream.example_bytes())
440
"""
return _EXAMPLE_IPC_SCHEMA + _EXAMPLE_IPC_BATCH
def __repr__(self) -> str:
class_label = _repr_utils.make_class_label(self)
if self._is_valid():
return f"<{class_label} {self._desc}>"
else:
return f"<{class_label} <invalid>>"
# A self-contained example whose value is the serialized verison of
# DataFrame({"some_col": [1, 2, 3]}). Used to make the tests and documentation
# self-contained since we don't have an IPC writer.
_EXAMPLE_IPC_SCHEMA = (
b"\xff\xff\xff\xff\x10\x01\x00\x00\x10\x00\x00\x00\x00\x00\x0a\x00\x0e\x00\x06"
b"\x00\x05\x00\x08\x00\x0a\x00\x00\x00\x00\x01\x04\x00\x10\x00\x00\x00\x00\x00"
b"\x0a\x00\x0c\x00\x00\x00\x04\x00\x08\x00\x0a\x00\x00\x00\x3c\x00\x00\x00\x04"
b"\x00\x00\x00\x01\x00\x00\x00\x04\x00\x00\x00\x84\xff\xff\xff\x18\x00\x00\x00"
b"\x04\x00\x00\x00\x0a\x00\x00\x00\x73\x6f\x6d\x65\x5f\x76\x61\x6c\x75\x65\x00"
b"\x00\x08\x00\x00\x00\x73\x6f\x6d\x65\x5f\x6b\x65\x79\x00\x00\x00\x00\x01\x00"
b"\x00\x00\x18\x00\x00\x00\x00\x00\x12\x00\x18\x00\x08\x00\x06\x00\x07\x00\x0c"
b"\x00\x00\x00\x10\x00\x14\x00\x12\x00\x00\x00\x00\x00\x01\x02\x14\x00\x00\x00"
b"\x70\x00\x00\x00\x08\x00\x00\x00\x18\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00"
b"\x00\x73\x6f\x6d\x65\x5f\x63\x6f\x6c\x00\x00\x00\x00\x01\x00\x00\x00\x0c\x00"
b"\x00\x00\x08\x00\x0c\x00\x04\x00\x08\x00\x08\x00\x00\x00\x20\x00\x00\x00\x04"
b"\x00\x00\x00\x10\x00\x00\x00\x73\x6f\x6d\x65\x5f\x76\x61\x6c\x75\x65\x5f\x66"
b"\x69\x65\x6c\x64\x00\x00\x00\x00\x0e\x00\x00\x00\x73\x6f\x6d\x65\x5f\x6b\x65"
b"\x79\x5f\x66\x69\x65\x6c\x64\x00\x00\x08\x00\x0c\x00\x08\x00\x07\x00\x08\x00"
b"\x00\x00\x00\x00\x00\x01\x20\x00\x00\x00\x00\x00\x00\x00"
)
_EXAMPLE_IPC_BATCH = (
b"\xff\xff\xff\xff\x88\x00\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x0c\x00\x16"
b"\x00\x06\x00\x05\x00\x08\x00\x0c\x00\x0c\x00\x00\x00\x00\x03\x04\x00\x18\x00"
b"\x00\x00\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0a\x00\x18\x00\x0c\x00\x04"
b"\x00\x08\x00\x0a\x00\x00\x00\x3c\x00\x00\x00\x10\x00\x00\x00\x03\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0c\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
b"\x03\x00\x00\x00\x00\x00\x00\x00"
)