blob: 857bc19b9bee60f544ffaedbff0fb54714b9c388 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from typing import Any, Optional
from unittest import mock
import pytest
import pytz
from pyhive.sqlalchemy_presto import PrestoDialect
from pytest_mock import MockerFixture
from sqlalchemy import column, sql, text, types
from sqlalchemy.engine.interfaces import Dialect
from sqlalchemy.engine.url import make_url
from superset.sql.parse import Table
from superset.utils.core import GenericDataType
from tests.unit_tests.db_engine_specs.utils import (
assert_column_spec,
assert_convert_dttm,
)
@pytest.mark.parametrize(
"target_type,dttm,expected_result",
[
("VARCHAR", datetime(2022, 1, 1), None),
("DATE", datetime(2022, 1, 1), "DATE '2022-01-01'"),
(
"TIMESTAMP",
datetime(2022, 1, 1, 1, 23, 45, 600000),
"TIMESTAMP '2022-01-01 01:23:45.600000'",
),
(
"TIMESTAMP WITH TIME ZONE",
datetime(2022, 1, 1, 1, 23, 45, 600000),
"TIMESTAMP '2022-01-01 01:23:45.600000'",
),
(
"TIMESTAMP WITH TIME ZONE",
datetime(2022, 1, 1, 1, 23, 45, 600000, tzinfo=pytz.UTC),
"TIMESTAMP '2022-01-01 01:23:45.600000+00:00'",
),
],
)
def test_convert_dttm(
target_type: str,
dttm: datetime,
expected_result: Optional[str],
) -> None:
from superset.db_engine_specs.presto import PrestoEngineSpec as spec # noqa: N813
assert_convert_dttm(spec, target_type, expected_result, dttm)
@pytest.mark.parametrize(
"native_type,sqla_type,attrs,generic_type,is_dttm",
[
("varchar(255)", types.VARCHAR, {"length": 255}, GenericDataType.STRING, False),
("varchar", types.String, None, GenericDataType.STRING, False),
("char(255)", types.CHAR, {"length": 255}, GenericDataType.STRING, False),
("char", types.String, None, GenericDataType.STRING, False),
("integer", types.Integer, None, GenericDataType.NUMERIC, False),
("time", types.Time, None, GenericDataType.TEMPORAL, True),
("timestamp", types.TIMESTAMP, None, GenericDataType.TEMPORAL, True),
],
)
def test_get_column_spec(
native_type: str,
sqla_type: type[types.TypeEngine],
attrs: Optional[dict[str, Any]],
generic_type: GenericDataType,
is_dttm: bool,
) -> None:
from superset.db_engine_specs.presto import PrestoEngineSpec as spec # noqa: N813
assert_column_spec(spec, native_type, sqla_type, attrs, generic_type, is_dttm)
def test_get_schema_from_engine_params() -> None:
"""
Test the ``get_schema_from_engine_params`` method.
"""
from superset.db_engine_specs.presto import PrestoEngineSpec
assert (
PrestoEngineSpec.get_schema_from_engine_params(
make_url("presto://localhost:8080/hive/default"),
{},
)
== "default"
)
assert (
PrestoEngineSpec.get_schema_from_engine_params(
make_url("presto://localhost:8080/hive"),
{},
)
is None
)
@mock.patch("superset.db_engine_specs.presto.PrestoEngineSpec.latest_partition")
@pytest.mark.parametrize(
["column_type", "column_value", "expected_value"],
[
("DATE", "2023-05-01", "DATE '2023-05-01'"),
("TIMESTAMP", "2023-05-01", "TIMESTAMP '2023-05-01'"),
("VARCHAR", "2023-05-01", "'2023-05-01'"),
("INT", 1234, "1234"),
],
)
def test_where_latest_partition(
mock_latest_partition,
column_type: str,
column_value: Any,
expected_value: str,
) -> None:
from superset.db_engine_specs.presto import PrestoEngineSpec
mock_latest_partition.return_value = (["partition_key"], [column_value])
assert (
str(
PrestoEngineSpec.where_latest_partition( # type: ignore
database=mock.MagicMock(),
table=Table("table"),
query=sql.select(text("* FROM table")),
columns=[
{
"column_name": "partition_key",
"name": "partition_key",
"type": column_type,
"is_dttm": False,
}
],
).compile(
dialect=PrestoDialect(),
compile_kwargs={"literal_binds": True},
)
)
== f"""SELECT * FROM table \nWHERE "partition_key" = {expected_value}""" # noqa: S608
)
def test_adjust_engine_params_fully_qualified() -> None:
"""
Test the ``adjust_engine_params`` method when the URL has catalog and schema.
"""
from superset.db_engine_specs.presto import PrestoEngineSpec
url = make_url("presto://localhost:8080/hive/default")
uri = PrestoEngineSpec.adjust_engine_params(url, {})[0]
assert str(uri) == "presto://localhost:8080/hive/default"
uri = PrestoEngineSpec.adjust_engine_params(
url,
{},
schema="new_schema",
)[0]
assert str(uri) == "presto://localhost:8080/hive/new_schema"
uri = PrestoEngineSpec.adjust_engine_params(
url,
{},
catalog="new_catalog",
)[0]
assert str(uri) == "presto://localhost:8080/new_catalog/default"
uri = PrestoEngineSpec.adjust_engine_params(
url,
{},
catalog="new_catalog",
schema="new_schema",
)[0]
assert str(uri) == "presto://localhost:8080/new_catalog/new_schema"
def test_adjust_engine_params_catalog_only() -> None:
"""
Test the ``adjust_engine_params`` method when the URL has only the catalog.
"""
from superset.db_engine_specs.presto import PrestoEngineSpec
url = make_url("presto://localhost:8080/hive")
uri = PrestoEngineSpec.adjust_engine_params(url, {})[0]
assert str(uri) == "presto://localhost:8080/hive"
uri = PrestoEngineSpec.adjust_engine_params(
url,
{},
schema="new_schema",
)[0]
assert str(uri) == "presto://localhost:8080/hive/new_schema"
uri = PrestoEngineSpec.adjust_engine_params(
url,
{},
catalog="new_catalog",
)[0]
assert str(uri) == "presto://localhost:8080/new_catalog"
uri = PrestoEngineSpec.adjust_engine_params(
url,
{},
catalog="new_catalog",
schema="new_schema",
)[0]
assert str(uri) == "presto://localhost:8080/new_catalog/new_schema"
def test_get_default_catalog() -> None:
"""
Test the ``get_default_catalog`` method.
"""
from superset.db_engine_specs.presto import PrestoEngineSpec
from superset.models.core import Database
database = Database(
database_name="my_db",
sqlalchemy_uri="presto://localhost:8080/hive",
)
assert PrestoEngineSpec.get_default_catalog(database) == "hive"
database = Database(
database_name="my_db",
sqlalchemy_uri="presto://localhost:8080/hive/default",
)
assert PrestoEngineSpec.get_default_catalog(database) == "hive"
@pytest.mark.parametrize(
"time_grain,expected_result",
[
("PT1S", "date_trunc('second', CAST(col AS TIMESTAMP))"),
(
"PT5S",
"date_trunc('second', CAST(col AS TIMESTAMP)) - interval '1' second * (second(CAST(col AS TIMESTAMP)) % 5)", # noqa: E501
),
(
"PT30S",
"date_trunc('second', CAST(col AS TIMESTAMP)) - interval '1' second * (second(CAST(col AS TIMESTAMP)) % 30)", # noqa: E501
),
("PT1M", "date_trunc('minute', CAST(col AS TIMESTAMP))"),
(
"PT5M",
"date_trunc('minute', CAST(col AS TIMESTAMP)) - interval '1' minute * (minute(CAST(col AS TIMESTAMP)) % 5)", # noqa: E501
),
(
"PT10M",
"date_trunc('minute', CAST(col AS TIMESTAMP)) - interval '1' minute * (minute(CAST(col AS TIMESTAMP)) % 10)", # noqa: E501
),
(
"PT15M",
"date_trunc('minute', CAST(col AS TIMESTAMP)) - interval '1' minute * (minute(CAST(col AS TIMESTAMP)) % 15)", # noqa: E501
),
(
"PT0.5H",
"date_trunc('minute', CAST(col AS TIMESTAMP)) - interval '1' minute * (minute(CAST(col AS TIMESTAMP)) % 30)", # noqa: E501
),
("PT1H", "date_trunc('hour', CAST(col AS TIMESTAMP))"),
(
"PT6H",
"date_trunc('hour', CAST(col AS TIMESTAMP)) - interval '1' hour * (hour(CAST(col AS TIMESTAMP)) % 6)", # noqa: E501
),
("P1D", "date_trunc('day', CAST(col AS TIMESTAMP))"),
("P1W", "date_trunc('week', CAST(col AS TIMESTAMP))"),
("P1M", "date_trunc('month', CAST(col AS TIMESTAMP))"),
("P3M", "date_trunc('quarter', CAST(col AS TIMESTAMP))"),
("P1Y", "date_trunc('year', CAST(col AS TIMESTAMP))"),
(
"1969-12-28T00:00:00Z/P1W",
"date_trunc('week', CAST(col AS TIMESTAMP) + interval '1' day) - interval '1' day", # noqa: E501
),
("1969-12-29T00:00:00Z/P1W", "date_trunc('week', CAST(col AS TIMESTAMP))"),
(
"P1W/1970-01-03T00:00:00Z",
"date_trunc('week', CAST(col AS TIMESTAMP) + interval '1' day) + interval '5' day", # noqa: E501
),
(
"P1W/1970-01-04T00:00:00Z",
"date_trunc('week', CAST(col AS TIMESTAMP)) + interval '6' day",
),
],
)
def test_timegrain_expressions(time_grain: str, expected_result: str) -> None:
from superset.db_engine_specs.presto import PrestoEngineSpec as spec # noqa: N813
actual = str(
spec.get_timestamp_expr(col=column("col"), pdf=None, time_grain=time_grain)
)
assert actual == expected_result
def test_select_star(mocker: MockerFixture) -> None:
"""
Test the ``select_star`` method.
"""
from superset.db_engine_specs.presto import PrestoEngineSpec as spec # noqa: N813
database = mocker.MagicMock()
engine = mocker.MagicMock()
def quote_table(table: Table, dialect: Dialect) -> str:
return ".".join(
part for part in (table.catalog, table.schema, table.table) if part
)
mocker.patch.object(spec, "quote_table", quote_table)
spec.select_star(
database=database,
table=Table("my_table", "my_schema", "my_catalog"),
engine=engine,
limit=100,
show_cols=False,
indent=True,
latest_partition=False,
cols=None,
)
query = database.compile_sqla_query.mock_calls[0][1][0]
assert (
str(query)
== """
SELECT * \nFROM my_catalog.my_schema.my_table
LIMIT :param_1
""".strip()
)