# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=import-outside-toplevel, unused-argument, redefined-outer-name, invalid-name

from functools import partial
from typing import Any, TYPE_CHECKING

import pytest
from pytest_mock import MockerFixture
from sqlalchemy.orm.session import Session

from superset import db
from tests.conftest import with_config

if TYPE_CHECKING:
    from superset.connectors.sqla.models import SqlaTable

FULL_DTTM_DEFAULTS_EXAMPLE = {
    "main_dttm_col": "id",
    "dttm_columns": {
        "dttm": {
            "python_date_format": "epoch_s",
            "expression": "CAST(dttm as INTEGER)",
        },
        "id": {"python_date_format": "epoch_ms"},
        "month": {
            "python_date_format": "%Y-%m-%d",
            "expression": (
                "CASE WHEN length(month) = 7 THEN month || '-01' ELSE month END"
            ),
        },
    },
}


def apply_dttm_defaults(table: "SqlaTable", dttm_defaults: dict[str, Any]) -> None:
    """Applies dttm defaults to the table, mutates in place."""
    for dbcol in table.columns:
        # Set is_dttm is column is listed in dttm_columns.
        if dbcol.column_name in dttm_defaults.get("dttm_columns", {}):
            dbcol.is_dttm = True

        # Skip non dttm columns.
        if dbcol.column_name not in dttm_defaults.get("dttm_columns", {}):
            continue

        # Set table main_dttm_col.
        if dbcol.column_name == dttm_defaults.get("main_dttm_col"):
            table.main_dttm_col = dbcol.column_name

        # Apply defaults if empty.
        dttm_column_defaults = dttm_defaults.get("dttm_columns", {}).get(
            dbcol.column_name, {}
        )
        dbcol.is_dttm = True
        if (
            not dbcol.python_date_format
            and "python_date_format" in dttm_column_defaults
        ):
            dbcol.python_date_format = dttm_column_defaults["python_date_format"]
        if not dbcol.expression and "expression" in dttm_column_defaults:
            dbcol.expression = dttm_column_defaults["expression"]


@pytest.fixture
def test_table(session: Session) -> "SqlaTable":
    """
    Fixture that generates an in-memory table.
    """
    from superset.connectors.sqla.models import SqlaTable, TableColumn
    from superset.models.core import Database

    engine = db.session.get_bind()
    SqlaTable.metadata.create_all(engine)  # pylint: disable=no-member

    columns = [
        TableColumn(column_name="ds", is_dttm=1, type="TIMESTAMP"),
        TableColumn(column_name="event_time", is_dttm=1, type="TIMESTAMP"),
        TableColumn(column_name="id", type="INTEGER"),
        TableColumn(column_name="dttm", type="INTEGER"),
        TableColumn(column_name="duration_ms", type="INTEGER"),
    ]

    return SqlaTable(
        table_name="test_table",
        columns=columns,
        metrics=[],
        main_dttm_col=None,
        database=Database(database_name="my_database", sqlalchemy_uri="sqlite://"),
    )


@with_config(
    {
        "SQLA_TABLE_MUTATOR": partial(
            apply_dttm_defaults,
            dttm_defaults={
                "main_dttm_col": "event_time",
                "dttm_columns": {"ds": {}, "event_time": {}},
            },
        )
    }
)
def test_main_dttm_col(mocker: MockerFixture, test_table: "SqlaTable") -> None:
    """
    Test the ``SQLA_TABLE_MUTATOR`` config.
    """
    mocker.patch(
        "superset.connectors.sqla.models.get_physical_table_metadata",
        return_value=[
            {"column_name": "ds", "type": "TIMESTAMP", "is_dttm": True},
            {"column_name": "event_time", "type": "TIMESTAMP", "is_dttm": True},
            {"column_name": "id", "type": "INTEGER", "is_dttm": False},
        ],
    )

    assert test_table.main_dttm_col is None
    test_table.fetch_metadata()
    assert test_table.main_dttm_col == "event_time"


@with_config(
    {
        "SQLA_TABLE_MUTATOR": partial(
            apply_dttm_defaults,
            dttm_defaults={
                "main_dttm_col": "nonexistent",
            },
        )
    }
)
def test_main_dttm_col_nonexistent(
    mocker: MockerFixture,
    test_table: "SqlaTable",
) -> None:
    """
    Test the ``SQLA_TABLE_MUTATOR`` config when main datetime column doesn't exist.
    """
    mocker.patch(
        "superset.connectors.sqla.models.get_physical_table_metadata",
        return_value=[
            {"column_name": "ds", "type": "TIMESTAMP", "is_dttm": True},
            {"column_name": "event_time", "type": "TIMESTAMP", "is_dttm": True},
            {"column_name": "id", "type": "INTEGER", "is_dttm": False},
        ],
    )

    assert test_table.main_dttm_col is None
    test_table.fetch_metadata()
    # fall back to ds
    assert test_table.main_dttm_col == "ds"


@with_config(
    {
        "SQLA_TABLE_MUTATOR": partial(
            apply_dttm_defaults,
            dttm_defaults={
                "main_dttm_col": "id",
            },
        )
    }
)
def test_main_dttm_col_nondttm(
    mocker: MockerFixture,
    test_table: "SqlaTable",
) -> None:
    """
    Test the ``SQLA_TABLE_MUTATOR`` config when main datetime column has wrong type.
    """
    mocker.patch(
        "superset.connectors.sqla.models.get_physical_table_metadata",
        return_value=[
            {"column_name": "ds", "type": "TIMESTAMP", "is_dttm": True},
            {"column_name": "event_time", "type": "TIMESTAMP", "is_dttm": True},
            {"column_name": "id", "type": "INTEGER", "is_dttm": False},
        ],
    )

    assert test_table.main_dttm_col is None
    test_table.fetch_metadata()
    # fall back to ds
    assert test_table.main_dttm_col == "ds"


@with_config(
    {
        "SQLA_TABLE_MUTATOR": partial(
            apply_dttm_defaults,
            dttm_defaults={
                "dttm_columns": {
                    "id": {"python_date_format": "epoch_ms"},
                    "dttm": {"python_date_format": "epoch_s"},
                },
            },
        )
    }
)
def test_python_date_format_by_column_name(
    mocker: MockerFixture,
    test_table: "SqlaTable",
) -> None:
    """
    Test the ``SQLA_TABLE_MUTATOR`` setting for "python_date_format".
    """
    mocker.patch(
        "superset.connectors.sqla.models.get_physical_table_metadata",
        return_value=[
            {"column_name": "id", "type": "INTEGER", "is_dttm": False},
            {"column_name": "dttm", "type": "INTEGER", "is_dttm": False},
        ],
    )

    test_table.fetch_metadata()

    id_col = [c for c in test_table.columns if c.column_name == "id"][0]
    assert id_col.is_dttm
    assert id_col.python_date_format == "epoch_ms"

    dttm_col = [c for c in test_table.columns if c.column_name == "dttm"][0]
    assert dttm_col.is_dttm
    assert dttm_col.python_date_format == "epoch_s"


@with_config(
    {
        "SQLA_TABLE_MUTATOR": partial(
            apply_dttm_defaults,
            dttm_defaults={
                "dttm_columns": {
                    "dttm": {"expression": "CAST(dttm as INTEGER)"},
                    "duration_ms": {"expression": "CAST(duration_ms as DOUBLE)"},
                },
            },
        )
    }
)
def test_expression_by_column_name(
    mocker: MockerFixture,
    test_table: "SqlaTable",
) -> None:
    """
    Test the ``SQLA_TABLE_MUTATOR`` setting for expression.
    """
    mocker.patch(
        "superset.connectors.sqla.models.get_physical_table_metadata",
        return_value=[
            {"column_name": "dttm", "type": "INTEGER", "is_dttm": False},
            {"column_name": "duration_ms", "type": "INTEGER", "is_dttm": False},
        ],
    )

    test_table.fetch_metadata()

    dttm_col = [c for c in test_table.columns if c.column_name == "dttm"][0]
    assert dttm_col.is_dttm
    assert dttm_col.expression == "CAST(dttm as INTEGER)"

    duration_ms_col = [c for c in test_table.columns if c.column_name == "duration_ms"][
        0
    ]
    assert duration_ms_col.is_dttm
    assert duration_ms_col.expression == "CAST(duration_ms as DOUBLE)"


@with_config(
    {
        "SQLA_TABLE_MUTATOR": partial(
            apply_dttm_defaults,
            dttm_defaults=FULL_DTTM_DEFAULTS_EXAMPLE,
        )
    }
)
def test_full_setting(
    mocker: MockerFixture,
    test_table: "SqlaTable",
) -> None:
    """
    Test the ``SQLA_TABLE_MUTATOR`` with full settings.
    """
    mocker.patch(
        "superset.connectors.sqla.models.get_physical_table_metadata",
        return_value=[
            {"column_name": "id", "type": "INTEGER", "is_dttm": False},
            {"column_name": "dttm", "type": "INTEGER", "is_dttm": False},
            {"column_name": "duration_ms", "type": "INTEGER", "is_dttm": False},
        ],
    )

    test_table.fetch_metadata()

    id_col = [c for c in test_table.columns if c.column_name == "id"][0]
    assert id_col.is_dttm
    assert id_col.python_date_format == "epoch_ms"
    assert id_col.expression == ""

    dttm_col = [c for c in test_table.columns if c.column_name == "dttm"][0]
    assert dttm_col.is_dttm
    assert dttm_col.python_date_format == "epoch_s"
    assert dttm_col.expression == "CAST(dttm as INTEGER)"
