| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from copy import deepcopy |
| |
| import pytest |
| |
| from superset.utils.core import ( |
| AdhocMetric, |
| ExtraFiltersReasonType, |
| ExtraFiltersTimeColumnType, |
| GenericDataType, |
| get_metric_name, |
| get_metric_names, |
| get_time_filter_status, |
| is_adhoc_metric, |
| NO_TIME_RANGE, |
| ) |
| from tests.unit_tests.fixtures.datasets import get_dataset_mock |
| |
| STR_METRIC = "my_metric" |
| SIMPLE_SUM_ADHOC_METRIC: AdhocMetric = { |
| "aggregate": "SUM", |
| "column": { |
| "column_name": "my_col", |
| "type": "INT", |
| "type_generic": GenericDataType.NUMERIC, |
| }, |
| "expressionType": "SIMPLE", |
| "label": "my SUM", |
| } |
| SQL_ADHOC_METRIC: AdhocMetric = { |
| "expressionType": "SQL", |
| "label": "my_sql", |
| "sqlExpression": "SUM(my_col)", |
| } |
| |
| |
| def test_get_metric_name_saved_metric(): |
| assert get_metric_name(STR_METRIC) == "my_metric" |
| |
| |
| def test_get_metric_name_adhoc(): |
| metric = deepcopy(SIMPLE_SUM_ADHOC_METRIC) |
| assert get_metric_name(metric) == "my SUM" |
| del metric["label"] |
| assert get_metric_name(metric) == "SUM(my_col)" |
| metric["label"] = "" |
| assert get_metric_name(metric) == "SUM(my_col)" |
| del metric["aggregate"] |
| assert get_metric_name(metric) == "my_col" |
| metric["aggregate"] = "" |
| assert get_metric_name(metric) == "my_col" |
| |
| metric = deepcopy(SQL_ADHOC_METRIC) |
| assert get_metric_name(metric) == "my_sql" |
| del metric["label"] |
| assert get_metric_name(metric) == "SUM(my_col)" |
| metric["label"] = "" |
| assert get_metric_name(metric) == "SUM(my_col)" |
| |
| |
| def test_get_metric_name_invalid_metric(): |
| metric = deepcopy(SIMPLE_SUM_ADHOC_METRIC) |
| del metric["label"] |
| del metric["column"] |
| with pytest.raises(ValueError): |
| get_metric_name(metric) |
| |
| metric = deepcopy(SIMPLE_SUM_ADHOC_METRIC) |
| del metric["label"] |
| metric["expressionType"] = "FOO" |
| with pytest.raises(ValueError): |
| get_metric_name(metric) |
| |
| metric = deepcopy(SQL_ADHOC_METRIC) |
| del metric["label"] |
| metric["expressionType"] = "FOO" |
| with pytest.raises(ValueError): |
| get_metric_name(metric) |
| |
| |
| def test_get_metric_names(): |
| assert get_metric_names( |
| [STR_METRIC, SIMPLE_SUM_ADHOC_METRIC, SQL_ADHOC_METRIC] |
| ) == ["my_metric", "my SUM", "my_sql"] |
| |
| |
| def test_is_adhoc_metric(): |
| assert is_adhoc_metric(STR_METRIC) is False |
| assert is_adhoc_metric(SIMPLE_SUM_ADHOC_METRIC) is True |
| assert is_adhoc_metric(SQL_ADHOC_METRIC) is True |
| |
| |
| def test_get_time_filter_status_time_col(): |
| dataset = get_dataset_mock() |
| |
| assert get_time_filter_status( |
| dataset, {ExtraFiltersTimeColumnType.TIME_COL: "ds"} |
| ) == ([{"column": ExtraFiltersTimeColumnType.TIME_COL}], []) |
| |
| |
| def test_get_time_filter_status_time_range(): |
| dataset = get_dataset_mock() |
| |
| assert get_time_filter_status( |
| dataset, {ExtraFiltersTimeColumnType.TIME_RANGE: NO_TIME_RANGE} |
| ) == ([], []) |
| |
| assert get_time_filter_status( |
| dataset, {ExtraFiltersTimeColumnType.TIME_RANGE: "1 year ago"} |
| ) == ([{"column": ExtraFiltersTimeColumnType.TIME_RANGE}], []) |
| |
| |
| def test_get_time_filter_status_time_grain(): |
| dataset = get_dataset_mock() |
| |
| assert get_time_filter_status( |
| dataset, {ExtraFiltersTimeColumnType.TIME_GRAIN: "PT1M"} |
| ) == ([{"column": ExtraFiltersTimeColumnType.TIME_GRAIN}], []) |
| |
| |
| def test_get_time_filter_status_no_temporal_col(): |
| dataset = get_dataset_mock() |
| dataset.columns[0].is_dttm = False |
| |
| assert get_time_filter_status( |
| dataset, {ExtraFiltersTimeColumnType.TIME_COL: "foobar"} |
| ) == ( |
| [], |
| [ |
| { |
| "reason": ExtraFiltersReasonType.COL_NOT_IN_DATASOURCE, |
| "column": ExtraFiltersTimeColumnType.TIME_COL, |
| } |
| ], |
| ) |
| |
| assert get_time_filter_status( |
| dataset, {ExtraFiltersTimeColumnType.TIME_RANGE: "1 year ago"} |
| ) == ( |
| [], |
| [ |
| { |
| "reason": ExtraFiltersReasonType.NO_TEMPORAL_COLUMN, |
| "column": ExtraFiltersTimeColumnType.TIME_RANGE, |
| } |
| ], |
| ) |
| |
| assert get_time_filter_status( |
| dataset, {ExtraFiltersTimeColumnType.TIME_GRAIN: "PT1M"} |
| ) == ( |
| [], |
| [ |
| { |
| "reason": ExtraFiltersReasonType.NO_TEMPORAL_COLUMN, |
| "column": ExtraFiltersTimeColumnType.TIME_GRAIN, |
| } |
| ], |
| ) |