# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint:disable=protected-access
import json
from typing import Callable

import pytest

from pyiceberg.avro.decoder import BinaryDecoder, StreamingBinaryDecoder
from pyiceberg.avro.decoder_fast import CythonBinaryDecoder
from pyiceberg.avro.file import AvroFile
from pyiceberg.avro.reader import (
    BinaryReader,
    BooleanReader,
    DateReader,
    DecimalReader,
    DoubleReader,
    FixedReader,
    FloatReader,
    IntegerReader,
    StringReader,
    StructReader,
    TimeReader,
    TimestampNanoReader,
    TimestampReader,
    TimestamptzNanoReader,
    TimestamptzReader,
    UnknownReader,
    UUIDReader,
)
from pyiceberg.avro.resolver import construct_reader
from pyiceberg.io.pyarrow import PyArrowFileIO
from pyiceberg.manifest import MANIFEST_ENTRY_SCHEMAS, DataFile, ManifestEntry
from pyiceberg.schema import Schema
from pyiceberg.typedef import Record
from pyiceberg.types import (
    BinaryType,
    BooleanType,
    DateType,
    DecimalType,
    DoubleType,
    FixedType,
    FloatType,
    IntegerType,
    LongType,
    NestedField,
    StringType,
    StructType,
    TimestampNanoType,
    TimestampType,
    TimestamptzNanoType,
    TimestamptzType,
    TimeType,
    UnknownType,
    UUIDType,
)

AVAILABLE_DECODERS = [StreamingBinaryDecoder, CythonBinaryDecoder]


def test_read_header(generated_manifest_entry_file: str, iceberg_manifest_entry_schema: Schema) -> None:
    with AvroFile[ManifestEntry](
        PyArrowFileIO().new_input(generated_manifest_entry_file),
        MANIFEST_ENTRY_SCHEMAS[2],
        {-1: ManifestEntry, 2: DataFile},
    ) as reader:
        header = reader.header

    assert header.magic == b"Obj\x01"
    assert json.loads(header.meta["avro.schema"]) == {
        "type": "record",
        "name": "manifest_entry",
        "fields": [
            {"field-id": 0, "name": "status", "type": "int"},
            {"field-id": 1, "default": None, "name": "snapshot_id", "type": ["null", "long"]},
            {
                "field-id": 2,
                "name": "data_file",
                "type": {
                    "type": "record",
                    "name": "r2",
                    "fields": [
                        {"field-id": 100, "doc": "Location URI with FS scheme", "name": "file_path", "type": "string"},
                        {
                            "field-id": 101,
                            "doc": "File format name: avro, orc, or parquet",
                            "name": "file_format",
                            "type": "string",
                        },
                        {
                            "field-id": 102,
                            "name": "partition",
                            "type": {
                                "type": "record",
                                "name": "r102",
                                "fields": [
                                    {"field-id": 1000, "default": None, "name": "VendorID", "type": ["null", "int"]},
                                    {
                                        "field-id": 1001,
                                        "default": None,
                                        "name": "tpep_pickup_datetime",
                                        "type": ["null", {"type": "int", "logicalType": "date"}],
                                    },
                                ],
                            },
                        },
                        {"field-id": 103, "doc": "Number of records in the file", "name": "record_count", "type": "long"},
                        {"field-id": 104, "doc": "Total file size in bytes", "name": "file_size_in_bytes", "type": "long"},
                        {"field-id": 105, "name": "block_size_in_bytes", "type": "long"},
                        {
                            "field-id": 108,
                            "doc": "Map of column id to total size on disk",
                            "default": None,
                            "name": "column_sizes",
                            "type": [
                                "null",
                                {
                                    "logicalType": "map",
                                    "type": "array",
                                    "items": {
                                        "type": "record",
                                        "name": "k117_v118",
                                        "fields": [
                                            {"field-id": 117, "name": "key", "type": "int"},
                                            {"field-id": 118, "name": "value", "type": "long"},
                                        ],
                                    },
                                },
                            ],
                        },
                        {
                            "field-id": 109,
                            "doc": "Map of column id to total count, including null and NaN",
                            "default": None,
                            "name": "value_counts",
                            "type": [
                                "null",
                                {
                                    "logicalType": "map",
                                    "type": "array",
                                    "items": {
                                        "type": "record",
                                        "name": "k119_v120",
                                        "fields": [
                                            {"field-id": 119, "name": "key", "type": "int"},
                                            {"field-id": 120, "name": "value", "type": "long"},
                                        ],
                                    },
                                },
                            ],
                        },
                        {
                            "field-id": 110,
                            "doc": "Map of column id to null value count",
                            "default": None,
                            "name": "null_value_counts",
                            "type": [
                                "null",
                                {
                                    "logicalType": "map",
                                    "type": "array",
                                    "items": {
                                        "type": "record",
                                        "name": "k121_v122",
                                        "fields": [
                                            {"field-id": 121, "name": "key", "type": "int"},
                                            {"field-id": 122, "name": "value", "type": "long"},
                                        ],
                                    },
                                },
                            ],
                        },
                        {
                            "field-id": 137,
                            "doc": "Map of column id to number of NaN values in the column",
                            "default": None,
                            "name": "nan_value_counts",
                            "type": [
                                "null",
                                {
                                    "logicalType": "map",
                                    "type": "array",
                                    "items": {
                                        "type": "record",
                                        "name": "k138_v139",
                                        "fields": [
                                            {"field-id": 138, "name": "key", "type": "int"},
                                            {"field-id": 139, "name": "value", "type": "long"},
                                        ],
                                    },
                                },
                            ],
                        },
                        {
                            "field-id": 125,
                            "doc": "Map of column id to lower bound",
                            "default": None,
                            "name": "lower_bounds",
                            "type": [
                                "null",
                                {
                                    "logicalType": "map",
                                    "type": "array",
                                    "items": {
                                        "type": "record",
                                        "name": "k126_v127",
                                        "fields": [
                                            {"field-id": 126, "name": "key", "type": "int"},
                                            {"field-id": 127, "name": "value", "type": "bytes"},
                                        ],
                                    },
                                },
                            ],
                        },
                        {
                            "field-id": 128,
                            "doc": "Map of column id to upper bound",
                            "default": None,
                            "name": "upper_bounds",
                            "type": [
                                "null",
                                {
                                    "logicalType": "map",
                                    "type": "array",
                                    "items": {
                                        "type": "record",
                                        "name": "k129_v130",
                                        "fields": [
                                            {"field-id": 129, "name": "key", "type": "int"},
                                            {"field-id": 130, "name": "value", "type": "bytes"},
                                        ],
                                    },
                                },
                            ],
                        },
                        {
                            "field-id": 131,
                            "doc": "Encryption key metadata blob",
                            "default": None,
                            "name": "key_metadata",
                            "type": ["null", "bytes"],
                        },
                        {
                            "field-id": 132,
                            "doc": "Splittable offsets",
                            "default": None,
                            "name": "split_offsets",
                            "type": ["null", {"element-id": 133, "type": "array", "items": "long"}],
                        },
                        {
                            "field-id": 140,
                            "doc": "Sort order ID",
                            "default": None,
                            "name": "sort_order_id",
                            "type": ["null", "int"],
                        },
                    ],
                },
            },
        ],
    }

    assert header.get_schema() == iceberg_manifest_entry_schema


def test_fixed_reader() -> None:
    assert construct_reader(FixedType(22)) == FixedReader(22)


def test_decimal_reader() -> None:
    assert construct_reader(DecimalType(19, 25)) == DecimalReader(19, 25)


def test_boolean_reader() -> None:
    assert construct_reader(BooleanType()) == BooleanReader()


def test_integer_reader() -> None:
    assert construct_reader(IntegerType()) == IntegerReader()


def test_long_reader() -> None:
    assert construct_reader(LongType()) == IntegerReader()


def test_float_reader() -> None:
    assert construct_reader(FloatType()) == FloatReader()


def test_double_reader() -> None:
    assert construct_reader(DoubleType()) == DoubleReader()


def test_date_reader() -> None:
    assert construct_reader(DateType()) == DateReader()


def test_time_reader() -> None:
    assert construct_reader(TimeType()) == TimeReader()


def test_timestamp_reader() -> None:
    assert construct_reader(TimestampType()) == TimestampReader()


def test_timestamp_ns_reader() -> None:
    assert construct_reader(TimestampNanoType()) == TimestampNanoReader()


def test_timestamptz_reader() -> None:
    assert construct_reader(TimestamptzType()) == TimestamptzReader()


def test_timestamptz_ns_reader() -> None:
    assert construct_reader(TimestamptzNanoType()) == TimestamptzNanoReader()


def test_string_reader() -> None:
    assert construct_reader(StringType()) == StringReader()


def test_binary_reader() -> None:
    assert construct_reader(BinaryType()) == BinaryReader()


def test_unknown_type() -> None:
    assert construct_reader(UnknownType()) == UnknownReader()


def test_uuid_reader() -> None:
    assert construct_reader(UUIDType()) == UUIDReader()


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_struct(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
    decoder = decoder_class(b"\x18")
    struct = StructType(NestedField(1, "id", IntegerType(), required=True))
    result = StructReader(((0, IntegerReader()),), Record, struct).read(decoder)
    assert repr(result) == "Record[12]"


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_struct_lambda(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
    decoder = decoder_class(b"\x18")

    struct = StructType(NestedField(1, "id", IntegerType(), required=True))
    # You can also pass in an arbitrary function that returns a struct
    result = StructReader(
        ((0, IntegerReader()),),
        Record,
        struct,  # pylint: disable=unnecessary-lambda
    ).read(decoder)
    assert repr(result) == "Record[12]"


@pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS)
def test_read_not_struct_type(decoder_class: Callable[[bytes], BinaryDecoder]) -> None:
    decoder = decoder_class(b"\x18")

    struct = StructType(NestedField(1, "id", IntegerType(), required=True))
    with pytest.raises(ValueError) as exc_info:
        _ = StructReader(((0, IntegerReader()),), str, struct).read(decoder)  # type: ignore

    assert "Incompatible with StructProtocol: <class 'str'>" in str(exc_info.value)
