# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import io

from nanoarrow._ipc_lib import CIpcInputStream, init_array_stream
from nanoarrow._lib import CArrayStream, _obj_is_buffer

from nanoarrow import _repr_utils


class Stream:
    """Stream of serialized Arrow data

    Reads file paths or otherwise readable file objects that contain
    serialized Arrow data. Arrow documentation typically refers to this format
    as "Arrow IPC" because its origin was as a means to transmit tables between
    processes; however, this format can also be written to and read from files
    or URLs and is essentially a high-performance equivalent of a CSV file that
    does a better job maintaining type fidelity.

    Use :staticmethod:`from_readable`, :staticmethod:`from_path`, or
    :staticmethod:`from_url` to construct these streams.

    Examples
    --------

    >>> import nanoarrow as na
    >>> from nanoarrow.ipc import Stream
    >>> with Stream.example() as inp, na.c_array_stream(inp) as stream:
    ...     stream
    <nanoarrow.c_lib.CArrayStream>
    - get_schema(): struct<some_col: int32>
    """

    def __init__(self):
        self._stream = None
        self._desc = None

    def _is_valid(self) -> bool:
        return self._stream is not None and self._stream.is_valid()

    def __arrow_c_stream__(self, requested_schema=None):
        """Export this stream as an ArrowArrayStream

        Implements the Arrow PyCapsule interface by transferring ownership of this
        input stream to an ArrowArrayStream wrapped by a PyCapsule.
        """
        if not self._is_valid():
            raise RuntimeError("nanoarrow.ipc.Stream is no longer valid")

        with CArrayStream.allocate() as array_stream:
            init_array_stream(self._stream, array_stream._addr())
            array_stream._get_cached_schema()
            return array_stream.__arrow_c_stream__(requested_schema=requested_schema)

    def __enter__(self):
        return self

    def __exit__(self, *args, **kwargs):
        if self._stream is not None:
            self._stream.release()

    @staticmethod
    def from_readable(obj):
        """Wrap an open readable file or buffer as an Arrow IPC stream

        Wraps a readable object (specificially, an object that implements a
        ``readinto()`` method) as a non-owning Stream. Closing ``obj`` remains
        the caller's responsibility: neither this stream nor the resulting array
        stream will call ``obj.close()``.

        Parameters
        ----------
        obj : readable file-like or buffer
            An object implementing the Python buffer protocol or ``readinto()``.

        Examples
        --------

        >>> import nanoarrow as na
        >>> from nanoarrow.ipc import Stream
        >>> ipc_stream = Stream.from_readable(Stream.example_bytes())
        >>> na.c_array_stream(ipc_stream)
        <nanoarrow.c_lib.CArrayStream>
        - get_schema(): struct<some_col: int32>
        """
        if not hasattr(obj, "readinto") and _obj_is_buffer(obj):
            close_obj = True
            obj = io.BytesIO(obj)
        else:
            close_obj = False

        out = Stream()
        out._stream = CIpcInputStream.from_readable(obj, close_obj=close_obj)
        out._desc = repr(obj)
        return out

    @staticmethod
    def from_path(obj, *args, **kwargs):
        """Wrap a local file as an IPC stream

        Wraps a pathlike object (specificially, one that can be passed to ``open()``)
        as an owning Stream. The file will be opened in binary mode and will be closed
        when this stream or the resulting array stream is released.

        Parameters
        ----------
        obj : path-like
            A string or path-like object that can be passed to ``open()``

        Examples
        --------

        >>> import tempfile
        >>> import os
        >>> import nanoarrow as na
        >>> from nanoarrow.ipc import Stream
        >>> with tempfile.TemporaryDirectory() as td:
        ...     path = os.path.join(td, "test.arrows")
        ...     with open(path, "wb") as f:
        ...         nbytes = f.write(Stream.example_bytes())
        ...
        ...     with Stream.from_path(path) as inp, na.c_array_stream(inp) as stream:
        ...         stream
        <nanoarrow.c_lib.CArrayStream>
        - get_schema(): struct<some_col: int32>
        """
        out = Stream()
        out._stream = CIpcInputStream.from_readable(
            open(obj, "rb", *args, **kwargs), close_obj=True
        )
        out._desc = repr(obj)
        return out

    @staticmethod
    def from_url(obj, *args, **kwargs):
        """Wrap a URL as an IPC stream

        Wraps a URL (specificially, one that can be passed to
        ``urllib.request.urlopen()``) as an owning Stream. The URL will be
        closed when this stream or the resulting array stream is released.

        Parameters
        ----------
        obj : str
            A URL that can be passed to ``urllib.request.urlopen()``

        Examples
        --------

        >>> import pathlib
        >>> import tempfile
        >>> import os
        >>> import nanoarrow as na
        >>> from nanoarrow.ipc import Stream
        >>> with tempfile.TemporaryDirectory() as td:
        ...     path = os.path.join(td, "test.arrows")
        ...     with open(path, "wb") as f:
        ...         nbytes = f.write(Stream.example_bytes())
        ...
        ...     uri = pathlib.Path(path).as_uri()
        ...     with Stream.from_url(uri) as inp, na.c_array_stream(inp) as stream:
        ...         stream
        <nanoarrow.c_lib.CArrayStream>
        - get_schema(): struct<some_col: int32>
        """
        import urllib.request

        out = Stream()
        out._stream = CIpcInputStream.from_readable(
            urllib.request.urlopen(obj, *args, **kwargs), close_obj=True
        )
        out._desc = repr(obj)
        return out

    @staticmethod
    def example():
        """Example IPC Stream

        A self-contained example whose value is the serialized version of
        ``DataFrame({"some_col": [1, 2, 3]})``. This may be used for testing
        and documentation and is useful because nanoarrow does not implement
        a writer to generate test data.

        Examples
        --------

        >>> from nanoarrow.ipc import Stream
        >>> Stream.example()
        <nanoarrow.ipc.Stream <_io.BytesIO object at ...>>
        """
        return Stream.from_readable(Stream.example_bytes())

    @staticmethod
    def example_bytes():
        """Example stream bytes

        The underlying bytes of the :staticmethod:`example` Stream. This is useful
        for writing files or creating other types of test input.

        Examples
        --------

        >>> import os
        >>> import tempfile
        >>> from nanoarrow.ipc import Stream
        >>> with tempfile.TemporaryDirectory() as td:
        ...     path = os.path.join(td, "test.arrows")
        ...     with open(path, "wb") as f:
        ...         f.write(Stream.example_bytes())
        440
        """
        return _EXAMPLE_IPC_SCHEMA + _EXAMPLE_IPC_BATCH

    def __repr__(self) -> str:
        class_label = _repr_utils.make_class_label(self)
        if self._is_valid():
            return f"<{class_label} {self._desc}>"
        else:
            return f"<{class_label} <invalid>>"


# A self-contained example whose value is the serialized verison of
# DataFrame({"some_col": [1, 2, 3]}). Used to make the tests and documentation
# self-contained since we don't have an IPC writer.
_EXAMPLE_IPC_SCHEMA = (
    b"\xff\xff\xff\xff\x10\x01\x00\x00\x10\x00\x00\x00\x00\x00\x0a\x00\x0e\x00\x06"
    b"\x00\x05\x00\x08\x00\x0a\x00\x00\x00\x00\x01\x04\x00\x10\x00\x00\x00\x00\x00"
    b"\x0a\x00\x0c\x00\x00\x00\x04\x00\x08\x00\x0a\x00\x00\x00\x3c\x00\x00\x00\x04"
    b"\x00\x00\x00\x01\x00\x00\x00\x04\x00\x00\x00\x84\xff\xff\xff\x18\x00\x00\x00"
    b"\x04\x00\x00\x00\x0a\x00\x00\x00\x73\x6f\x6d\x65\x5f\x76\x61\x6c\x75\x65\x00"
    b"\x00\x08\x00\x00\x00\x73\x6f\x6d\x65\x5f\x6b\x65\x79\x00\x00\x00\x00\x01\x00"
    b"\x00\x00\x18\x00\x00\x00\x00\x00\x12\x00\x18\x00\x08\x00\x06\x00\x07\x00\x0c"
    b"\x00\x00\x00\x10\x00\x14\x00\x12\x00\x00\x00\x00\x00\x01\x02\x14\x00\x00\x00"
    b"\x70\x00\x00\x00\x08\x00\x00\x00\x18\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00"
    b"\x00\x73\x6f\x6d\x65\x5f\x63\x6f\x6c\x00\x00\x00\x00\x01\x00\x00\x00\x0c\x00"
    b"\x00\x00\x08\x00\x0c\x00\x04\x00\x08\x00\x08\x00\x00\x00\x20\x00\x00\x00\x04"
    b"\x00\x00\x00\x10\x00\x00\x00\x73\x6f\x6d\x65\x5f\x76\x61\x6c\x75\x65\x5f\x66"
    b"\x69\x65\x6c\x64\x00\x00\x00\x00\x0e\x00\x00\x00\x73\x6f\x6d\x65\x5f\x6b\x65"
    b"\x79\x5f\x66\x69\x65\x6c\x64\x00\x00\x08\x00\x0c\x00\x08\x00\x07\x00\x08\x00"
    b"\x00\x00\x00\x00\x00\x01\x20\x00\x00\x00\x00\x00\x00\x00"
)

_EXAMPLE_IPC_BATCH = (
    b"\xff\xff\xff\xff\x88\x00\x00\x00\x14\x00\x00\x00\x00\x00\x00\x00\x0c\x00\x16"
    b"\x00\x06\x00\x05\x00\x08\x00\x0c\x00\x0c\x00\x00\x00\x00\x03\x04\x00\x18\x00"
    b"\x00\x00\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0a\x00\x18\x00\x0c\x00\x04"
    b"\x00\x08\x00\x0a\x00\x00\x00\x3c\x00\x00\x00\x10\x00\x00\x00\x03\x00\x00\x00"
    b"\x00\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
    b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0c\x00"
    b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x00"
    b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00"
    b"\x03\x00\x00\x00\x00\x00\x00\x00"
)
