| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # pylint: disable=invalid-name, unused-argument |
| from __future__ import annotations |
| |
| from datetime import datetime |
| from typing import Any |
| |
| import pytest |
| from flask import current_app |
| from flask_appbuilder.security.sqla.models import Role |
| from freezegun import freeze_time |
| from jinja2 import DebugUndefined |
| from jinja2.sandbox import SandboxedEnvironment |
| from pytest_mock import MockerFixture |
| from sqlalchemy.dialects import mysql |
| from sqlalchemy.dialects.postgresql import dialect |
| |
| from superset.commands.dataset.exceptions import DatasetNotFoundError |
| from superset.connectors.sqla.models import ( |
| RowLevelSecurityFilter, |
| SqlaTable, |
| SqlMetric, |
| TableColumn, |
| ) |
| from superset.exceptions import SupersetTemplateException |
| from superset.jinja_context import ( |
| dataset_macro, |
| ExtraCache, |
| get_template_processor, |
| metric_macro, |
| safe_proxy, |
| TimeFilter, |
| to_datetime, |
| WhereInMacro, |
| ) |
| from superset.models.core import Database |
| from superset.models.slice import Slice |
| from superset.utils import json |
| from tests.unit_tests.conftest import with_feature_flags |
| |
| |
| def test_filter_values_adhoc_filters() -> None: |
| """ |
| Test the ``filter_values`` macro with ``adhoc_filters``. |
| """ |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "adhoc_filters": [ |
| { |
| "clause": "WHERE", |
| "comparator": "foo", |
| "expressionType": "SIMPLE", |
| "operator": "in", |
| "subject": "name", |
| } |
| ], |
| } |
| ) |
| } |
| ): |
| cache = ExtraCache() |
| assert cache.filter_values("name") == ["foo"] |
| assert cache.applied_filters == ["name"] |
| |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "adhoc_filters": [ |
| { |
| "clause": "WHERE", |
| "comparator": ["foo", "bar"], |
| "expressionType": "SIMPLE", |
| "operator": "in", |
| "subject": "name", |
| } |
| ], |
| } |
| ) |
| } |
| ): |
| cache = ExtraCache() |
| assert cache.filter_values("name") == ["foo", "bar"] |
| assert cache.applied_filters == ["name"] |
| |
| |
| def test_filter_values_extra_filters() -> None: |
| """ |
| Test the ``filter_values`` macro with ``extra_filters``. |
| """ |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| {"extra_filters": [{"col": "name", "op": "in", "val": "foo"}]} |
| ) |
| } |
| ): |
| cache = ExtraCache() |
| assert cache.filter_values("name") == ["foo"] |
| assert cache.applied_filters == ["name"] |
| |
| |
| def test_filter_values_default() -> None: |
| """ |
| Test the ``filter_values`` macro with a default value. |
| """ |
| cache = ExtraCache() |
| assert cache.filter_values("name", "foo") == ["foo"] |
| assert cache.removed_filters == [] |
| |
| |
| def test_filter_values_remove_not_present() -> None: |
| """ |
| Test the ``filter_values`` macro without a match and ``remove_filter`` set to True. |
| """ |
| cache = ExtraCache() |
| assert cache.filter_values("name", remove_filter=True) == [] |
| assert cache.removed_filters == [] |
| |
| |
| def test_filter_values_no_default() -> None: |
| """ |
| Test calling the ``filter_values`` macro without a match. |
| """ |
| cache = ExtraCache() |
| assert cache.filter_values("name") == [] |
| |
| |
| def test_get_filters_adhoc_filters() -> None: |
| """ |
| Test the ``get_filters`` macro. |
| """ |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "adhoc_filters": [ |
| { |
| "clause": "WHERE", |
| "comparator": "foo", |
| "expressionType": "SIMPLE", |
| "operator": "in", |
| "subject": "name", |
| } |
| ], |
| } |
| ) |
| } |
| ): |
| cache = ExtraCache() |
| assert cache.get_filters("name") == [ |
| {"op": "IN", "col": "name", "val": ["foo"]} |
| ] |
| |
| assert cache.removed_filters == [] |
| assert cache.applied_filters == ["name"] |
| |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "adhoc_filters": [ |
| { |
| "clause": "WHERE", |
| "comparator": ["foo", "bar"], |
| "expressionType": "SIMPLE", |
| "operator": "in", |
| "subject": "name", |
| } |
| ], |
| } |
| ) |
| } |
| ): |
| cache = ExtraCache() |
| assert cache.get_filters("name") == [ |
| {"op": "IN", "col": "name", "val": ["foo", "bar"]} |
| ] |
| assert cache.removed_filters == [] |
| |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "adhoc_filters": [ |
| { |
| "clause": "WHERE", |
| "comparator": ["foo", "bar"], |
| "expressionType": "SIMPLE", |
| "operator": "in", |
| "subject": "name", |
| } |
| ], |
| } |
| ) |
| } |
| ): |
| cache = ExtraCache() |
| assert cache.get_filters("name", remove_filter=True) == [ |
| {"op": "IN", "col": "name", "val": ["foo", "bar"]} |
| ] |
| assert cache.removed_filters == ["name"] |
| assert cache.applied_filters == ["name"] |
| |
| |
| def test_get_filters_is_null_operator() -> None: |
| """ |
| Test the ``get_filters`` macro with a IS_NULL operator, |
| which doesn't have a comparator |
| """ |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "adhoc_filters": [ |
| { |
| "clause": "WHERE", |
| "expressionType": "SIMPLE", |
| "operator": "IS NULL", |
| "subject": "name", |
| "comparator": None, |
| } |
| ], |
| } |
| ) |
| } |
| ): |
| cache = ExtraCache() |
| assert cache.get_filters("name", remove_filter=True) == [ |
| {"op": "IS NULL", "col": "name", "val": None} |
| ] |
| assert cache.removed_filters == ["name"] |
| assert cache.applied_filters == ["name"] |
| |
| |
| def test_get_filters_remove_not_present() -> None: |
| """ |
| Test the ``get_filters`` macro without a match and ``remove_filter`` set to True. |
| """ |
| cache = ExtraCache() |
| assert cache.get_filters("name", remove_filter=True) == [] |
| assert cache.removed_filters == [] |
| |
| |
| def test_url_param_query() -> None: |
| """ |
| Test the ``url_param`` macro. |
| """ |
| with current_app.test_request_context(query_string={"foo": "bar"}): |
| cache = ExtraCache() |
| assert cache.url_param("foo") == "bar" |
| |
| |
| def test_url_param_default() -> None: |
| """ |
| Test the ``url_param`` macro with a default value. |
| """ |
| with current_app.test_request_context(): |
| cache = ExtraCache() |
| assert cache.url_param("foo", "bar") == "bar" |
| |
| |
| def test_url_param_no_default() -> None: |
| """ |
| Test the ``url_param`` macro without a match. |
| """ |
| with current_app.test_request_context(): |
| cache = ExtraCache() |
| assert cache.url_param("foo") is None |
| |
| |
| def test_url_param_form_data() -> None: |
| """ |
| Test the ``url_param`` with ``url_params`` in ``form_data``. |
| """ |
| with current_app.test_request_context( |
| query_string={"form_data": json.dumps({"url_params": {"foo": "bar"}})} |
| ): |
| cache = ExtraCache() |
| assert cache.url_param("foo") == "bar" |
| |
| |
| def test_url_param_escaped_form_data() -> None: |
| """ |
| Test the ``url_param`` with ``url_params`` in ``form_data`` returning |
| an escaped value with a quote. |
| """ |
| with current_app.test_request_context( |
| query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})} |
| ): |
| cache = ExtraCache(dialect=dialect()) |
| assert cache.url_param("foo") == "O''Brien" |
| |
| |
| def test_url_param_escaped_default_form_data() -> None: |
| """ |
| Test the ``url_param`` with default value containing an escaped quote. |
| """ |
| with current_app.test_request_context( |
| query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})} |
| ): |
| cache = ExtraCache(dialect=dialect()) |
| assert cache.url_param("bar", "O'Malley") == "O''Malley" |
| |
| |
| def test_url_param_unescaped_form_data() -> None: |
| """ |
| Test the ``url_param`` with ``url_params`` in ``form_data`` returning |
| an un-escaped value with a quote. |
| """ |
| with current_app.test_request_context( |
| query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})} |
| ): |
| cache = ExtraCache(dialect=dialect()) |
| assert cache.url_param("foo", escape_result=False) == "O'Brien" |
| |
| |
| def test_url_param_unescaped_default_form_data() -> None: |
| """ |
| Test the ``url_param`` with default value containing an un-escaped quote. |
| """ |
| with current_app.test_request_context( |
| query_string={"form_data": json.dumps({"url_params": {"foo": "O'Brien"}})} |
| ): |
| cache = ExtraCache(dialect=dialect()) |
| assert cache.url_param("bar", "O'Malley", escape_result=False) == "O'Malley" |
| |
| |
| def test_safe_proxy_primitive() -> None: |
| """ |
| Test the ``safe_proxy`` helper with a function returning a ``str``. |
| """ |
| |
| def func(input_: Any) -> Any: |
| return input_ |
| |
| assert safe_proxy(func, "foo") == "foo" |
| |
| |
| def test_safe_proxy_dict() -> None: |
| """ |
| Test the ``safe_proxy`` helper with a function returning a ``dict``. |
| """ |
| |
| def func(input_: Any) -> Any: |
| return input_ |
| |
| assert safe_proxy(func, {"foo": "bar"}) == {"foo": "bar"} |
| |
| |
| def test_safe_proxy_lambda() -> None: |
| """ |
| Test the ``safe_proxy`` helper with a function returning a ``lambda``. |
| Should raise ``SupersetTemplateException``. |
| """ |
| |
| def func(input_: Any) -> Any: |
| return input_ |
| |
| with pytest.raises(SupersetTemplateException): |
| safe_proxy(func, lambda: "bar") |
| |
| |
| def test_safe_proxy_nested_lambda() -> None: |
| """ |
| Test the ``safe_proxy`` helper with a function returning a ``dict`` |
| containing ``lambda`` value. Should raise ``SupersetTemplateException``. |
| """ |
| |
| def func(input_: Any) -> Any: |
| return input_ |
| |
| with pytest.raises(SupersetTemplateException): |
| safe_proxy(func, {"foo": lambda: "bar"}) |
| |
| |
| @pytest.mark.parametrize( |
| "add_to_cache_keys,mock_cache_key_wrapper_call_count", |
| [ |
| (True, 4), |
| (False, 0), |
| ], |
| ) |
| def test_user_macros( |
| mocker: MockerFixture, |
| add_to_cache_keys: bool, |
| mock_cache_key_wrapper_call_count: int, |
| ): |
| """ |
| Test all user macros: |
| - ``current_user_id`` |
| - ``current_username`` |
| - ``current_user_email`` |
| - ``current_user_roles`` |
| - ``current_user_rls_rules`` |
| """ |
| mock_g = mocker.patch("superset.utils.core.g") |
| mock_get_user_roles = mocker.patch("superset.security_manager.get_user_roles") |
| mock_get_user_rls = mocker.patch("superset.security_manager.get_rls_filters") |
| mock_cache_key_wrapper = mocker.patch( |
| "superset.jinja_context.ExtraCache.cache_key_wrapper" |
| ) |
| mock_g.user.id = 1 |
| mock_g.user.username = "my_username" |
| mock_g.user.email = "my_email@test.com" |
| mock_get_user_roles.return_value = [Role(name="my_role1"), Role(name="my_role2")] |
| mock_get_user_rls.return_value = [ |
| RowLevelSecurityFilter(group_key="test", clause="1=1"), |
| RowLevelSecurityFilter(group_key="other_test", clause="product_id=1"), |
| ] |
| cache = ExtraCache(table=mocker.MagicMock()) |
| assert cache.current_user_id(add_to_cache_keys) == 1 |
| assert cache.current_username(add_to_cache_keys) == "my_username" |
| assert cache.current_user_email(add_to_cache_keys) == "my_email@test.com" |
| assert cache.current_user_roles(add_to_cache_keys) == ["my_role1", "my_role2"] |
| assert mock_cache_key_wrapper.call_count == mock_cache_key_wrapper_call_count |
| |
| # Testing {{ current_user_rls_rules() }} macro isolated and always without |
| # the param because it does not support it to avoid shared cache. |
| assert cache.current_user_rls_rules() == ["1=1", "product_id=1"] |
| |
| |
| def test_user_macros_without_user_info(mocker: MockerFixture): |
| """ |
| Test all user macros when no user info is available. |
| """ |
| mock_g = mocker.patch("superset.utils.core.g") |
| mock_g.user = None |
| cache = ExtraCache(table=mocker.MagicMock()) |
| assert cache.current_user_id() is None |
| assert cache.current_username() is None |
| assert cache.current_user_email() is None |
| assert cache.current_user_roles() is None |
| assert cache.current_user_rls_rules() is None |
| |
| |
| def test_current_user_rls_rules_with_no_table(mocker: MockerFixture): |
| """ |
| Test the ``current_user_rls_rules`` macro when no table is provided. |
| """ |
| mock_g = mocker.patch("superset.utils.core.g") |
| mock_get_user_rls = mocker.patch("superset.security_manager.get_rls_filters") |
| mock_is_guest_user = mocker.patch("superset.security_manager.is_guest_user") |
| mock_cache_key_wrapper = mocker.patch( |
| "superset.jinja_context.ExtraCache.cache_key_wrapper" |
| ) |
| mock_g.user.id = 1 |
| mock_g.user.username = "my_username" |
| mock_g.user.email = "my_email@test.com" |
| cache = ExtraCache() |
| assert cache.current_user_rls_rules() is None |
| assert mock_cache_key_wrapper.call_count == 0 |
| assert mock_get_user_rls.call_count == 0 |
| assert mock_is_guest_user.call_count == 0 |
| |
| |
| @with_feature_flags(EMBEDDED_SUPERSET=True) |
| def test_current_user_rls_rules_guest_user(mocker: MockerFixture): |
| """ |
| Test the ``current_user_rls_rules`` with an embedded user. |
| """ |
| mock_g = mocker.patch("superset.utils.core.g") |
| mock_gg = mocker.patch("superset.tasks.utils.g") |
| mock_ggg = mocker.patch("superset.security.manager.g") |
| mock_get_user_rls = mocker.patch("superset.security_manager.get_guest_rls_filters") |
| mock_user = mocker.MagicMock() |
| mock_user.username = "my_username" |
| mock_user.is_guest_user = True |
| mock_user.is_anonymous = False |
| mock_g.user = mock_gg.user = mock_ggg.user = mock_user |
| |
| mock_get_user_rls.return_value = [ |
| {"group_key": "test", "clause": "1=1"}, |
| {"group_key": "other_test", "clause": "product_id=1"}, |
| ] |
| cache = ExtraCache(table=mocker.MagicMock()) |
| assert cache.current_user_rls_rules() == ["1=1", "product_id=1"] |
| |
| |
| def test_where_in() -> None: |
| """ |
| Test the ``where_in`` Jinja2 filter. |
| """ |
| where_in = WhereInMacro(mysql.dialect()) |
| assert where_in([1, "b", 3]) == "(1, 'b', 3)" |
| assert where_in([1, "b", 3], '"') == ( |
| "(1, 'b', 3)\n-- WARNING: the `mark` parameter was removed from the " |
| "`where_in` macro for security reasons\n" |
| ) |
| assert where_in(["O'Malley's"]) == "('O''Malley''s')" |
| |
| |
| def test_where_in_empty_list() -> None: |
| """ |
| Test the ``where_in`` Jinja2 filter when it receives an |
| empty list. |
| """ |
| where_in = WhereInMacro(mysql.dialect()) |
| |
| # By default, the filter should return empty parenthesis (as a string) |
| assert where_in([]) == "()" |
| # With the default_to_none parameter set to True, it should return None |
| assert where_in([], default_to_none=True) is None |
| |
| |
| @pytest.mark.parametrize( |
| "value,format,output", |
| [ |
| ("2025-03-20 15:55:00", None, datetime(2025, 3, 20, 15, 55)), |
| (None, None, None), |
| ("2025-03-20", "%Y-%m-%d", datetime(2025, 3, 20)), |
| ("'2025-03-20'", "%Y-%m-%d", datetime(2025, 3, 20)), |
| ], |
| ) |
| def test_to_datetime( |
| value: str | None, format: str | None, output: datetime | None |
| ) -> None: |
| """ |
| Test the ``to_datetime`` custom filter. |
| """ |
| |
| result = ( |
| to_datetime(value, format=format) if format is not None else to_datetime(value) |
| ) |
| assert result == output |
| |
| |
| @pytest.mark.parametrize( |
| "value,format,match", |
| [ |
| ( |
| "2025-03-20", |
| None, |
| "time data '2025-03-20' does not match format '%Y-%m-%d %H:%M:%S'", |
| ), |
| ( |
| "2025-03-20 15:55:00", |
| "%Y-%m-%d", |
| "unconverted data remains: 15:55:00", |
| ), |
| ], |
| ) |
| def test_to_datetime_raises(value: str, format: str | None, match: str) -> None: |
| """ |
| Test the ``to_datetime`` custom filter raises with an incorrect |
| format. |
| """ |
| with pytest.raises( |
| ValueError, |
| match=match, |
| ): |
| ( |
| to_datetime(value, format=format) |
| if format is not None |
| else to_datetime(value) |
| ) |
| |
| |
| def test_dataset_macro(mocker: MockerFixture) -> None: |
| """ |
| Test the ``dataset_macro`` macro. |
| """ |
| mocker.patch( |
| "superset.connectors.sqla.models.security_manager.get_guest_rls_filters", |
| return_value=[], |
| ) |
| |
| columns = [ |
| TableColumn(column_name="ds", is_dttm=1, type="TIMESTAMP"), |
| TableColumn(column_name="num_boys", type="INTEGER"), |
| TableColumn(column_name="revenue", type="INTEGER"), |
| TableColumn(column_name="expenses", type="INTEGER"), |
| TableColumn( |
| column_name="profit", type="INTEGER", expression="revenue-expenses" |
| ), |
| ] |
| metrics = [ |
| SqlMetric(metric_name="cnt", expression="COUNT(*)"), |
| ] |
| |
| dataset = SqlaTable( |
| table_name="old_dataset", |
| columns=columns, |
| metrics=metrics, |
| main_dttm_col="ds", |
| default_endpoint="https://www.youtube.com/watch?v=dQw4w9WgXcQ", # not used |
| database=Database(database_name="my_database", sqlalchemy_uri="sqlite://"), |
| offset=-8, |
| description="This is the description", |
| is_featured=1, |
| cache_timeout=3600, |
| schema="my_schema", |
| sql=None, |
| params=json.dumps( |
| { |
| "remote_id": 64, |
| "database_name": "examples", |
| "import_time": 1606677834, |
| } |
| ), |
| perm=None, |
| filter_select_enabled=1, |
| fetch_values_predicate="foo IN (1, 2)", |
| is_sqllab_view=0, # no longer used? |
| template_params=json.dumps({"answer": "42"}), |
| schema_perm=None, |
| extra=json.dumps({"warning_markdown": "*WARNING*"}), |
| ) |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = dataset |
| mocker.patch( |
| "superset.connectors.sqla.models.security_manager.get_guest_rls_filters", |
| return_value=[], |
| ) |
| |
| space = " " |
| |
| assert ( |
| dataset_macro(1) |
| == f"""( |
| SELECT ds AS ds, num_boys AS num_boys, revenue AS revenue, expenses AS expenses, revenue-expenses AS profit{space} |
| FROM my_schema.old_dataset |
| ) AS dataset_1""" # noqa: S608, E501 |
| ) |
| |
| assert ( |
| dataset_macro(1, include_metrics=True) |
| == f"""( |
| SELECT ds AS ds, num_boys AS num_boys, revenue AS revenue, expenses AS expenses, revenue-expenses AS profit, COUNT(*) AS cnt{space} |
| FROM my_schema.old_dataset GROUP BY ds, num_boys, revenue, expenses, revenue-expenses |
| ) AS dataset_1""" # noqa: S608, E501 |
| ) |
| |
| assert ( |
| dataset_macro(1, include_metrics=True, columns=["ds"]) |
| == f"""( |
| SELECT ds AS ds, COUNT(*) AS cnt{space} |
| FROM my_schema.old_dataset GROUP BY ds |
| ) AS dataset_1""" # noqa: S608 |
| ) |
| |
| DatasetDAO.find_by_id.return_value = None |
| with pytest.raises(DatasetNotFoundError) as excinfo: |
| dataset_macro(1) |
| assert str(excinfo.value) == "Dataset 1 not found!" |
| |
| |
| def test_dataset_macro_mutator_with_comments(mocker: MockerFixture) -> None: |
| """ |
| Test ``dataset_macro`` when the mutator adds comment. |
| """ |
| |
| def mutator(sql: str) -> str: |
| """ |
| A simple mutator that wraps the query in comments. |
| """ |
| return f"-- begin\n{sql}\n-- end" |
| |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id().get_query_str_extended().sql = mutator("SELECT 1") |
| assert ( |
| dataset_macro(1) |
| == """( |
| -- begin |
| SELECT 1 |
| -- end |
| ) AS dataset_1""" |
| ) |
| |
| |
| def test_metric_macro_with_dataset_id(mocker: MockerFixture) -> None: |
| """ |
| Test the ``metric_macro`` when passing a dataset ID. |
| """ |
| mock_get_form_data = mocker.patch("superset.views.utils.get_form_data") |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = SqlaTable( |
| table_name="test_dataset", |
| metrics=[ |
| SqlMetric(metric_name="count", expression="COUNT(*)"), |
| ], |
| database=Database(database_name="my_database", sqlalchemy_uri="sqlite://"), |
| schema="my_schema", |
| sql=None, |
| ) |
| env = SandboxedEnvironment(undefined=DebugUndefined) |
| assert metric_macro(env, {}, "count", 1) == "COUNT(*)" |
| mock_get_form_data.assert_not_called() |
| |
| |
| def test_metric_macro_recursive(mocker: MockerFixture) -> None: |
| """ |
| Test the ``metric_macro`` when the definition is recursive. |
| """ |
| database = Database(id=1, database_name="my_database", sqlalchemy_uri="sqlite://") |
| dataset = SqlaTable( |
| id=1, |
| metrics=[ |
| SqlMetric(metric_name="a", expression="COUNT(*)"), |
| SqlMetric(metric_name="b", expression="{{ metric('a') }}"), |
| SqlMetric(metric_name="c", expression="{{ metric('b') }}"), |
| ], |
| table_name="test_dataset", |
| database=database, |
| schema="my_schema", |
| sql=None, |
| ) |
| |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {"datasource": {"id": 1}} |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = dataset |
| |
| processor = get_template_processor(database=database) |
| assert processor.process_template("{{ metric('c', 1) }}") == "COUNT(*)" |
| |
| |
| def test_metric_macro_expansion(mocker: MockerFixture) -> None: |
| """ |
| Test that the ``metric_macro`` expands other macros. |
| """ |
| database = Database(id=1, database_name="my_database", sqlalchemy_uri="sqlite://") |
| dataset = SqlaTable( |
| id=1, |
| metrics=[ |
| SqlMetric(metric_name="a", expression="{{ current_user_id() }}"), |
| SqlMetric(metric_name="b", expression="{{ metric('a') }}"), |
| SqlMetric(metric_name="c", expression="{{ metric('b') }}"), |
| ], |
| table_name="test_dataset", |
| database=database, |
| schema="my_schema", |
| sql=None, |
| ) |
| |
| mocker.patch("superset.jinja_context.get_user_id", return_value=42) |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {"datasource": {"id": 1}} |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = dataset |
| |
| processor = get_template_processor(database=database) |
| assert processor.process_template("{{ metric('c') }}") == "42" |
| |
| |
| def test_metric_macro_recursive_compound(mocker: MockerFixture) -> None: |
| """ |
| Test the ``metric_macro`` when the definition is compound. |
| """ |
| database = Database(id=1, database_name="my_database", sqlalchemy_uri="sqlite://") |
| dataset = SqlaTable( |
| id=1, |
| metrics=[ |
| SqlMetric(metric_name="a", expression="SUM(*)"), |
| SqlMetric(metric_name="b", expression="COUNT(*)"), |
| SqlMetric( |
| metric_name="c", |
| expression="{{ metric('a') }} / {{ metric('b') }}", |
| ), |
| ], |
| table_name="test_dataset", |
| database=database, |
| schema="my_schema", |
| sql=None, |
| ) |
| |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {"datasource": {"id": 1}} |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = dataset |
| |
| processor = get_template_processor(database=database) |
| assert processor.process_template("{{ metric('c') }}") == "SUM(*) / COUNT(*)" |
| |
| |
| def test_metric_macro_recursive_cyclic(mocker: MockerFixture) -> None: |
| """ |
| Test the ``metric_macro`` when the definition is cyclic. |
| |
| In this case it should stop, and not go into an infinite loop. |
| """ |
| database = Database(id=1, database_name="my_database", sqlalchemy_uri="sqlite://") |
| dataset = SqlaTable( |
| id=1, |
| metrics=[ |
| SqlMetric(metric_name="a", expression="{{ metric('c') }}"), |
| SqlMetric(metric_name="b", expression="{{ metric('a') }}"), |
| SqlMetric(metric_name="c", expression="{{ metric('b') }}"), |
| ], |
| table_name="test_dataset", |
| database=database, |
| schema="my_schema", |
| sql=None, |
| ) |
| |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {"datasource": {"id": 1}} |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = dataset |
| |
| processor = get_template_processor(database=database) |
| with pytest.raises(SupersetTemplateException) as excinfo: |
| processor.process_template("{{ metric('c') }}") |
| assert str(excinfo.value) == "Infinite recursion detected in template" |
| |
| |
| def test_metric_macro_recursive_infinite(mocker: MockerFixture) -> None: |
| """ |
| Test the ``metric_macro`` when the definition is cyclic. |
| |
| In this case it should stop, and not go into an infinite loop. |
| """ |
| database = Database(id=1, database_name="my_database", sqlalchemy_uri="sqlite://") |
| dataset = SqlaTable( |
| id=1, |
| metrics=[ |
| SqlMetric(metric_name="a", expression="{{ metric('a') }}"), |
| ], |
| table_name="test_dataset", |
| database=database, |
| schema="my_schema", |
| sql=None, |
| ) |
| |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {"datasource": {"id": 1}} |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = dataset |
| |
| processor = get_template_processor(database=database) |
| with pytest.raises(SupersetTemplateException) as excinfo: |
| processor.process_template("{{ metric('a') }}") |
| assert str(excinfo.value) == "Infinite recursion detected in template" |
| |
| |
| def test_metric_macro_with_dataset_id_invalid_key(mocker: MockerFixture) -> None: |
| """ |
| Test the ``metric_macro`` when passing a dataset ID and an invalid key. |
| """ |
| mock_get_form_data = mocker.patch("superset.views.utils.get_form_data") |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = SqlaTable( |
| table_name="test_dataset", |
| metrics=[ |
| SqlMetric(metric_name="count", expression="COUNT(*)"), |
| ], |
| database=Database(database_name="my_database", sqlalchemy_uri="sqlite://"), |
| schema="my_schema", |
| sql=None, |
| ) |
| env = SandboxedEnvironment(undefined=DebugUndefined) |
| with pytest.raises(SupersetTemplateException) as excinfo: |
| metric_macro(env, {}, "blah", 1) |
| assert str(excinfo.value) == "Metric ``blah`` not found in test_dataset." |
| mock_get_form_data.assert_not_called() |
| |
| |
| def test_metric_macro_invalid_dataset_id(mocker: MockerFixture) -> None: |
| """ |
| Test the ``metric_macro`` when specifying a dataset that doesn't exist. |
| """ |
| mock_get_form_data = mocker.patch("superset.views.utils.get_form_data") |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = None |
| env = SandboxedEnvironment(undefined=DebugUndefined) |
| with pytest.raises(DatasetNotFoundError) as excinfo: |
| metric_macro(env, {}, "macro_key", 100) |
| assert str(excinfo.value) == "Dataset ID 100 not found." |
| mock_get_form_data.assert_not_called() |
| |
| |
| def test_metric_macro_no_dataset_id_no_context(mocker: MockerFixture) -> None: |
| """ |
| Test the ``metric_macro`` when not specifying a dataset ID and it's |
| not available in the context. |
| """ |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {} |
| env = SandboxedEnvironment(undefined=DebugUndefined) |
| with current_app.test_request_context(): |
| with pytest.raises(SupersetTemplateException) as excinfo: |
| metric_macro(env, {}, "macro_key") |
| assert str(excinfo.value) == ( |
| "Please specify the Dataset ID for the ``macro_key`` metric in the Jinja macro." # noqa: E501 |
| ) |
| DatasetDAO.find_by_id.assert_not_called() |
| |
| |
| def test_metric_macro_no_dataset_id_with_context_missing_info( |
| mocker: MockerFixture, |
| ) -> None: |
| """ |
| Test the ``metric_macro`` when not specifying a dataset ID and request |
| has context but no dataset/chart ID. |
| """ |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {"queries": []} |
| |
| env = SandboxedEnvironment(undefined=DebugUndefined) |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "adhoc_filters": [ |
| { |
| "clause": "WHERE", |
| "comparator": "foo", |
| "expressionType": "SIMPLE", |
| "operator": "in", |
| "subject": "name", |
| } |
| ], |
| } |
| ), |
| } |
| ): |
| with pytest.raises(SupersetTemplateException) as excinfo: |
| metric_macro(env, {}, "macro_key") |
| assert str(excinfo.value) == ( |
| "Please specify the Dataset ID for the ``macro_key`` metric in the Jinja macro." # noqa: E501 |
| ) |
| DatasetDAO.find_by_id.assert_not_called() |
| |
| |
| def test_metric_macro_no_dataset_id_with_context_datasource_id( |
| mocker: MockerFixture, |
| ) -> None: |
| """ |
| Test the ``metric_macro`` when not specifying a dataset ID and it's |
| available in the context (url_params.datasource_id). |
| """ |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = SqlaTable( |
| table_name="test_dataset", |
| metrics=[ |
| SqlMetric(metric_name="macro_key", expression="COUNT(*)"), |
| ], |
| database=Database(database_name="my_database", sqlalchemy_uri="sqlite://"), |
| schema="my_schema", |
| sql=None, |
| ) |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {} |
| |
| # Getting the data from the request context |
| env = SandboxedEnvironment(undefined=DebugUndefined) |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "queries": [ |
| { |
| "url_params": { |
| "datasource_id": 1, |
| } |
| } |
| ], |
| } |
| ) |
| } |
| ): |
| assert metric_macro(env, {}, "macro_key") == "COUNT(*)" |
| |
| # Getting data from g's form_data |
| mock_g.form_data = { |
| "queries": [ |
| { |
| "url_params": { |
| "datasource_id": 1, |
| } |
| } |
| ], |
| } |
| with current_app.test_request_context(): |
| assert metric_macro(env, {}, "macro_key") == "COUNT(*)" |
| |
| |
| def test_metric_macro_no_dataset_id_with_context_datasource_id_none( |
| mocker: MockerFixture, |
| ) -> None: |
| """ |
| Test the ``metric_macro`` when not specifying a dataset ID and it's |
| set to None in the context (url_params.datasource_id). |
| """ |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {} |
| |
| # Getting the data from the request context |
| env = SandboxedEnvironment(undefined=DebugUndefined) |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "queries": [ |
| { |
| "url_params": { |
| "datasource_id": None, |
| } |
| } |
| ], |
| } |
| ) |
| } |
| ): |
| with pytest.raises(SupersetTemplateException) as excinfo: |
| metric_macro(env, {}, "macro_key") |
| assert str(excinfo.value) == ( |
| "Please specify the Dataset ID for the ``macro_key`` metric in the Jinja macro." # noqa: E501 |
| ) |
| |
| # Getting data from g's form_data |
| mock_g.form_data = { |
| "queries": [ |
| { |
| "url_params": { |
| "datasource_id": None, |
| } |
| } |
| ], |
| } |
| with current_app.test_request_context(): |
| with pytest.raises(SupersetTemplateException) as excinfo: |
| metric_macro(env, {}, "macro_key") |
| assert str(excinfo.value) == ( |
| "Please specify the Dataset ID for the ``macro_key`` metric in the Jinja macro." # noqa: E501 |
| ) |
| |
| |
| def test_metric_macro_no_dataset_id_with_context_chart_id( |
| mocker: MockerFixture, |
| ) -> None: |
| """ |
| Test the ``metric_macro`` when not specifying a dataset ID and context |
| includes an existing chart ID (url_params.slice_id). |
| """ |
| ChartDAO = mocker.patch("superset.daos.chart.ChartDAO") # noqa: N806 |
| ChartDAO.find_by_id.return_value = Slice( |
| datasource_id=1, |
| ) |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = SqlaTable( |
| table_name="test_dataset", |
| metrics=[ |
| SqlMetric(metric_name="macro_key", expression="COUNT(*)"), |
| ], |
| database=Database(database_name="my_database", sqlalchemy_uri="sqlite://"), |
| schema="my_schema", |
| sql=None, |
| ) |
| |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {} |
| |
| # Getting the data from the request context |
| env = SandboxedEnvironment(undefined=DebugUndefined) |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "queries": [ |
| { |
| "url_params": { |
| "slice_id": 1, |
| } |
| } |
| ], |
| } |
| ) |
| } |
| ): |
| assert metric_macro(env, {}, "macro_key") == "COUNT(*)" |
| |
| # Getting data from g's form_data |
| mock_g.form_data = { |
| "queries": [ |
| { |
| "url_params": { |
| "slice_id": 1, |
| } |
| } |
| ], |
| } |
| with current_app.test_request_context(): |
| assert metric_macro(env, {}, "macro_key") == "COUNT(*)" |
| |
| |
| def test_metric_macro_no_dataset_id_with_context_slice_id_none( |
| mocker: MockerFixture, |
| ) -> None: |
| """ |
| Test the ``metric_macro`` when not specifying a dataset ID and context |
| includes slice_id set to None (url_params.slice_id). |
| """ |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {} |
| |
| # Getting the data from the request context |
| env = SandboxedEnvironment(undefined=DebugUndefined) |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "queries": [ |
| { |
| "url_params": { |
| "slice_id": None, |
| } |
| } |
| ], |
| } |
| ) |
| } |
| ): |
| with pytest.raises(SupersetTemplateException) as excinfo: |
| metric_macro(env, {}, "macro_key") |
| assert str(excinfo.value) == ( |
| "Please specify the Dataset ID for the ``macro_key`` metric in the Jinja macro." # noqa: E501 |
| ) |
| |
| # Getting data from g's form_data |
| mock_g.form_data = { |
| "queries": [ |
| { |
| "url_params": { |
| "slice_id": None, |
| } |
| } |
| ], |
| } |
| with current_app.test_request_context(): |
| with pytest.raises(SupersetTemplateException) as excinfo: |
| metric_macro(env, {}, "macro_key") |
| assert str(excinfo.value) == ( |
| "Please specify the Dataset ID for the ``macro_key`` metric in the Jinja macro." # noqa: E501 |
| ) |
| |
| |
| def test_metric_macro_no_dataset_id_with_context_deleted_chart( |
| mocker: MockerFixture, |
| ) -> None: |
| """ |
| Test the ``metric_macro`` when not specifying a dataset ID and context |
| includes a deleted chart ID. |
| """ |
| ChartDAO = mocker.patch("superset.daos.chart.ChartDAO") # noqa: N806 |
| ChartDAO.find_by_id.return_value = None |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {} |
| |
| # Getting the data from the request context |
| env = SandboxedEnvironment(undefined=DebugUndefined) |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "queries": [ |
| { |
| "url_params": { |
| "slice_id": 1, |
| } |
| } |
| ], |
| } |
| ) |
| } |
| ): |
| with pytest.raises(SupersetTemplateException) as excinfo: |
| metric_macro(env, {}, "macro_key") |
| assert str(excinfo.value) == ( |
| "Please specify the Dataset ID for the ``macro_key`` metric in the Jinja macro." # noqa: E501 |
| ) |
| |
| # Getting data from g's form_data |
| mock_g.form_data = { |
| "queries": [ |
| { |
| "url_params": { |
| "slice_id": 1, |
| } |
| } |
| ], |
| } |
| with current_app.test_request_context(): |
| with pytest.raises(SupersetTemplateException) as excinfo: |
| metric_macro(env, {}, "macro_key") |
| assert str(excinfo.value) == ( |
| "Please specify the Dataset ID for the ``macro_key`` metric in the Jinja macro." # noqa: E501 |
| ) |
| |
| |
| def test_metric_macro_no_dataset_id_available_in_request_form_data( |
| mocker: MockerFixture, |
| ) -> None: |
| """ |
| Test the ``metric_macro`` when not specifying a dataset ID and context |
| includes an existing dataset ID (datasource.id). |
| """ |
| DatasetDAO = mocker.patch("superset.daos.dataset.DatasetDAO") # noqa: N806 |
| DatasetDAO.find_by_id.return_value = SqlaTable( |
| table_name="test_dataset", |
| metrics=[ |
| SqlMetric(metric_name="macro_key", expression="COUNT(*)"), |
| ], |
| database=Database(database_name="my_database", sqlalchemy_uri="sqlite://"), |
| schema="my_schema", |
| sql=None, |
| ) |
| |
| mock_g = mocker.patch("superset.jinja_context.g") |
| mock_g.form_data = {} |
| |
| # Getting the data from the request context |
| env = SandboxedEnvironment(undefined=DebugUndefined) |
| with current_app.test_request_context( |
| data={ |
| "form_data": json.dumps( |
| { |
| "datasource": { |
| "id": 1, |
| }, |
| } |
| ) |
| } |
| ): |
| assert metric_macro(env, {}, "macro_key") == "COUNT(*)" |
| |
| # Getting data from g's form_data |
| mock_g.form_data = { |
| "datasource": "1__table", |
| } |
| |
| with current_app.test_request_context(): |
| assert metric_macro(env, {}, "macro_key") == "COUNT(*)" |
| |
| |
| @pytest.mark.parametrize( |
| "description,args,kwargs,sqlalchemy_uri,queries,time_filter,removed_filters,applied_filters", |
| [ |
| ( |
| "Missing time_range and filter will return a No filter result", |
| [], |
| {"target_type": "TIMESTAMP"}, |
| "postgresql://mydb", |
| [{}], |
| TimeFilter( |
| from_expr=None, |
| to_expr=None, |
| time_range="No filter", |
| ), |
| [], |
| [], |
| ), |
| ( |
| "Missing time range and filter with default value will return a result with the defaults", # noqa: E501 |
| [], |
| {"default": "Last week", "target_type": "TIMESTAMP"}, |
| "postgresql://mydb", |
| [{}], |
| TimeFilter( |
| from_expr="TO_TIMESTAMP('2024-08-27 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')", # noqa: E501 |
| to_expr="TO_TIMESTAMP('2024-09-03 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')", # noqa: E501 |
| time_range="Last week", |
| ), |
| [], |
| [], |
| ), |
| ( |
| "Time range is extracted with the expected format, and default is ignored", |
| [], |
| {"default": "Last month", "target_type": "TIMESTAMP"}, |
| "postgresql://mydb", |
| [{"time_range": "Last week"}], |
| TimeFilter( |
| from_expr="TO_TIMESTAMP('2024-08-27 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')", # noqa: E501 |
| to_expr="TO_TIMESTAMP('2024-09-03 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')", # noqa: E501 |
| time_range="Last week", |
| ), |
| [], |
| [], |
| ), |
| ( |
| "Filter is extracted with the native format of the column (TIMESTAMP)", |
| ["dttm"], |
| {}, |
| "postgresql://mydb", |
| [ |
| { |
| "filters": [ |
| { |
| "col": "dttm", |
| "op": "TEMPORAL_RANGE", |
| "val": "Last week", |
| }, |
| ], |
| } |
| ], |
| TimeFilter( |
| from_expr="TO_TIMESTAMP('2024-08-27 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')", # noqa: E501 |
| to_expr="TO_TIMESTAMP('2024-09-03 00:00:00.000000', 'YYYY-MM-DD HH24:MI:SS.US')", # noqa: E501 |
| time_range="Last week", |
| ), |
| [], |
| ["dttm"], |
| ), |
| ( |
| "Filter is extracted with the native format of the column (DATE)", |
| ["dt"], |
| {"remove_filter": True}, |
| "postgresql://mydb", |
| [ |
| { |
| "filters": [ |
| { |
| "col": "dt", |
| "op": "TEMPORAL_RANGE", |
| "val": "Last week", |
| }, |
| ], |
| } |
| ], |
| TimeFilter( |
| from_expr="TO_DATE('2024-08-27', 'YYYY-MM-DD')", |
| to_expr="TO_DATE('2024-09-03', 'YYYY-MM-DD')", |
| time_range="Last week", |
| ), |
| ["dt"], |
| ["dt"], |
| ), |
| ( |
| "Filter is extracted with the overridden format (TIMESTAMP to DATE)", |
| ["dttm"], |
| {"target_type": "DATE", "remove_filter": True}, |
| "trino://mydb", |
| [ |
| { |
| "filters": [ |
| { |
| "col": "dttm", |
| "op": "TEMPORAL_RANGE", |
| "val": "Last month", |
| }, |
| ], |
| } |
| ], |
| TimeFilter( |
| from_expr="DATE '2024-08-03'", |
| to_expr="DATE '2024-09-03'", |
| time_range="Last month", |
| ), |
| ["dttm"], |
| ["dttm"], |
| ), |
| ( |
| "Filter is formatted with the custom format, ignoring target_type", |
| ["dttm"], |
| {"target_type": "DATE", "strftime": "%Y%m%d", "remove_filter": True}, |
| "trino://mydb", |
| [ |
| { |
| "filters": [ |
| { |
| "col": "dttm", |
| "op": "TEMPORAL_RANGE", |
| "val": "Last month", |
| }, |
| ], |
| } |
| ], |
| TimeFilter( |
| from_expr="20240803", |
| to_expr="20240903", |
| time_range="Last month", |
| ), |
| ["dttm"], |
| ["dttm"], |
| ), |
| ], |
| ) |
| def test_get_time_filter( |
| description: str, |
| args: list[Any], |
| kwargs: dict[str, Any], |
| sqlalchemy_uri: str, |
| queries: list[Any] | None, |
| time_filter: TimeFilter, |
| removed_filters: list[str], |
| applied_filters: list[str], |
| ) -> None: |
| """ |
| Test the ``get_time_filter`` macro. |
| """ |
| columns = [ |
| TableColumn(column_name="dt", is_dttm=1, type="DATE"), |
| TableColumn(column_name="dttm", is_dttm=1, type="TIMESTAMP"), |
| ] |
| |
| database = Database(database_name="my_database", sqlalchemy_uri=sqlalchemy_uri) |
| table = SqlaTable( |
| table_name="my_dataset", |
| columns=columns, |
| main_dttm_col="dt", |
| database=database, |
| ) |
| |
| with ( |
| freeze_time("2024-09-03"), |
| current_app.test_request_context( |
| json={"queries": queries}, |
| ), |
| ): |
| cache = ExtraCache( |
| database=database, |
| table=table, |
| ) |
| |
| assert cache.get_time_filter(*args, **kwargs) == time_filter, description |
| assert cache.removed_filters == removed_filters |
| assert cache.applied_filters == applied_filters |