blob: fb2837e70a11da77fb2db6ab53d7b65cd03d44f5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# isort:skip_file
import json
import unittest
from unittest.mock import Mock, patch
import tests.test_app
import superset.connectors.druid.models as models
from superset.connectors.druid.models import DruidColumn, DruidDatasource, DruidMetric
from superset.exceptions import SupersetException
from .base_tests import SupersetTestCase
try:
from pydruid.utils.dimensions import (
MapLookupExtraction,
RegexExtraction,
RegisteredLookupExtraction,
TimeFormatExtraction,
)
import pydruid.utils.postaggregator as postaggs
except ImportError:
pass
def mock_metric(metric_name, is_postagg=False):
metric = Mock()
metric.metric_name = metric_name
metric.metric_type = "postagg" if is_postagg else "metric"
return metric
def emplace(metrics_dict, metric_name, is_postagg=False):
metrics_dict[metric_name] = mock_metric(metric_name, is_postagg)
# Unit tests that can be run without initializing base tests
@patch.dict(
"superset.extensions.feature_flag_manager._feature_flags",
{"SIP_38_VIZ_REARCHITECTURE": True},
clear=True,
)
class TestDruidFunc(SupersetTestCase):
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_extraction_fn_map(self):
filters = [{"col": "deviceName", "val": ["iPhone X"], "op": "in"}]
dimension_spec = {
"type": "extraction",
"dimension": "device",
"outputName": "deviceName",
"outputType": "STRING",
"extractionFn": {
"type": "lookup",
"dimension": "dimensionName",
"outputName": "dimensionOutputName",
"replaceMissingValueWith": "missing_value",
"retainMissingValue": False,
"lookup": {
"type": "map",
"map": {
"iPhone10,1": "iPhone 8",
"iPhone10,4": "iPhone 8",
"iPhone10,2": "iPhone 8 Plus",
"iPhone10,5": "iPhone 8 Plus",
"iPhone10,3": "iPhone X",
"iPhone10,6": "iPhone X",
},
"isOneToOne": False,
},
},
}
spec_json = json.dumps(dimension_spec)
col = DruidColumn(column_name="deviceName", dimension_spec_json=spec_json)
column_dict = {"deviceName": col}
f = DruidDatasource.get_filters(filters, [], column_dict)
assert isinstance(f.extraction_function, MapLookupExtraction)
dim_ext_fn = dimension_spec["extractionFn"]
f_ext_fn = f.extraction_function
self.assertEqual(dim_ext_fn["lookup"]["map"], f_ext_fn._mapping)
self.assertEqual(dim_ext_fn["lookup"]["isOneToOne"], f_ext_fn._injective)
self.assertEqual(
dim_ext_fn["replaceMissingValueWith"], f_ext_fn._replace_missing_values
)
self.assertEqual(
dim_ext_fn["retainMissingValue"], f_ext_fn._retain_missing_values
)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_extraction_fn_regex(self):
filters = [{"col": "buildPrefix", "val": ["22B"], "op": "in"}]
dimension_spec = {
"type": "extraction",
"dimension": "build",
"outputName": "buildPrefix",
"outputType": "STRING",
"extractionFn": {"type": "regex", "expr": "(^[0-9A-Za-z]{3})"},
}
spec_json = json.dumps(dimension_spec)
col = DruidColumn(column_name="buildPrefix", dimension_spec_json=spec_json)
column_dict = {"buildPrefix": col}
f = DruidDatasource.get_filters(filters, [], column_dict)
assert isinstance(f.extraction_function, RegexExtraction)
dim_ext_fn = dimension_spec["extractionFn"]
f_ext_fn = f.extraction_function
self.assertEqual(dim_ext_fn["expr"], f_ext_fn._expr)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_extraction_fn_registered_lookup_extraction(self):
filters = [{"col": "country", "val": ["Spain"], "op": "in"}]
dimension_spec = {
"type": "extraction",
"dimension": "country_name",
"outputName": "country",
"outputType": "STRING",
"extractionFn": {"type": "registeredLookup", "lookup": "country_name"},
}
spec_json = json.dumps(dimension_spec)
col = DruidColumn(column_name="country", dimension_spec_json=spec_json)
column_dict = {"country": col}
f = DruidDatasource.get_filters(filters, [], column_dict)
assert isinstance(f.extraction_function, RegisteredLookupExtraction)
dim_ext_fn = dimension_spec["extractionFn"]
self.assertEqual(dim_ext_fn["type"], f.extraction_function.extraction_type)
self.assertEqual(dim_ext_fn["lookup"], f.extraction_function._lookup)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_extraction_fn_time_format(self):
filters = [{"col": "dayOfMonth", "val": ["1", "20"], "op": "in"}]
dimension_spec = {
"type": "extraction",
"dimension": "__time",
"outputName": "dayOfMonth",
"extractionFn": {
"type": "timeFormat",
"format": "d",
"timeZone": "Asia/Kolkata",
"locale": "en",
},
}
spec_json = json.dumps(dimension_spec)
col = DruidColumn(column_name="dayOfMonth", dimension_spec_json=spec_json)
column_dict = {"dayOfMonth": col}
f = DruidDatasource.get_filters(filters, [], column_dict)
assert isinstance(f.extraction_function, TimeFormatExtraction)
dim_ext_fn = dimension_spec["extractionFn"]
self.assertEqual(dim_ext_fn["type"], f.extraction_function.extraction_type)
self.assertEqual(dim_ext_fn["format"], f.extraction_function._format)
self.assertEqual(dim_ext_fn["timeZone"], f.extraction_function._time_zone)
self.assertEqual(dim_ext_fn["locale"], f.extraction_function._locale)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_ignores_invalid_filter_objects(self):
filtr = {"col": "col1", "op": "=="}
filters = [filtr]
col = DruidColumn(column_name="col1")
column_dict = {"col1": col}
self.assertIsNone(DruidDatasource.get_filters(filters, [], column_dict))
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_constructs_filter_in(self):
filtr = {"col": "A", "op": "in", "val": ["a", "b", "c"]}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIn("filter", res.filter)
self.assertIn("fields", res.filter["filter"])
self.assertEqual("or", res.filter["filter"]["type"])
self.assertEqual(3, len(res.filter["filter"]["fields"]))
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_constructs_filter_not_in(self):
filtr = {"col": "A", "op": "not in", "val": ["a", "b", "c"]}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIn("filter", res.filter)
self.assertIn("type", res.filter["filter"])
self.assertEqual("not", res.filter["filter"]["type"])
self.assertIn("field", res.filter["filter"])
self.assertEqual(
3, len(res.filter["filter"]["field"].filter["filter"]["fields"])
)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_constructs_filter_equals(self):
filtr = {"col": "A", "op": "==", "val": "h"}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual("selector", res.filter["filter"]["type"])
self.assertEqual("A", res.filter["filter"]["dimension"])
self.assertEqual("h", res.filter["filter"]["value"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_constructs_filter_not_equals(self):
filtr = {"col": "A", "op": "!=", "val": "h"}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual("not", res.filter["filter"]["type"])
self.assertEqual("h", res.filter["filter"]["field"].filter["filter"]["value"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_constructs_bounds_filter(self):
filtr = {"col": "A", "op": ">=", "val": "h"}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertFalse(res.filter["filter"]["lowerStrict"])
self.assertEqual("A", res.filter["filter"]["dimension"])
self.assertEqual("h", res.filter["filter"]["lower"])
self.assertEqual("lexicographic", res.filter["filter"]["ordering"])
filtr["op"] = ">"
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertTrue(res.filter["filter"]["lowerStrict"])
filtr["op"] = "<="
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertFalse(res.filter["filter"]["upperStrict"])
self.assertEqual("h", res.filter["filter"]["upper"])
filtr["op"] = "<"
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertTrue(res.filter["filter"]["upperStrict"])
filtr["val"] = 1
res = DruidDatasource.get_filters([filtr], ["A"], column_dict)
self.assertEqual("numeric", res.filter["filter"]["ordering"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_is_null_filter(self):
filtr = {"col": "A", "op": "IS NULL"}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual("selector", res.filter["filter"]["type"])
self.assertEqual("", res.filter["filter"]["value"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_is_not_null_filter(self):
filtr = {"col": "A", "op": "IS NOT NULL"}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual("not", res.filter["filter"]["type"])
self.assertIn("field", res.filter["filter"])
self.assertEqual(
"selector", res.filter["filter"]["field"].filter["filter"]["type"]
)
self.assertEqual("", res.filter["filter"]["field"].filter["filter"]["value"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_constructs_regex_filter(self):
filtr = {"col": "A", "op": "regex", "val": "[abc]"}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual("regex", res.filter["filter"]["type"])
self.assertEqual("[abc]", res.filter["filter"]["pattern"])
self.assertEqual("A", res.filter["filter"]["dimension"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_composes_multiple_filters(self):
filtr1 = {"col": "A", "op": "!=", "val": "y"}
filtr2 = {"col": "B", "op": "in", "val": ["a", "b", "c"]}
cola = DruidColumn(column_name="A")
colb = DruidColumn(column_name="B")
column_dict = {"A": cola, "B": colb}
res = DruidDatasource.get_filters([filtr1, filtr2], [], column_dict)
self.assertEqual("and", res.filter["filter"]["type"])
self.assertEqual(2, len(res.filter["filter"]["fields"]))
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_ignores_in_not_in_with_empty_value(self):
filtr1 = {"col": "A", "op": "in", "val": []}
filtr2 = {"col": "A", "op": "not in", "val": []}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr1, filtr2], [], column_dict)
self.assertIsNone(res)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_constructs_equals_for_in_not_in_single_value(self):
filtr = {"col": "A", "op": "in", "val": ["a"]}
cola = DruidColumn(column_name="A")
colb = DruidColumn(column_name="B")
column_dict = {"A": cola, "B": colb}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual("selector", res.filter["filter"]["type"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_handles_arrays_for_string_types(self):
filtr = {"col": "A", "op": "==", "val": ["a", "b"]}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual("a", res.filter["filter"]["value"])
filtr = {"col": "A", "op": "==", "val": []}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIsNone(res.filter["filter"]["value"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_handles_none_for_string_types(self):
filtr = {"col": "A", "op": "==", "val": None}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertIsNone(res)
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_extracts_values_in_quotes(self):
filtr = {"col": "A", "op": "in", "val": ['"a"']}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual("a", res.filter["filter"]["value"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_keeps_trailing_spaces(self):
filtr = {"col": "A", "op": "in", "val": ["a "]}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], [], column_dict)
self.assertEqual("a ", res.filter["filter"]["value"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_filters_converts_strings_to_num(self):
filtr = {"col": "A", "op": "in", "val": ["6"]}
col = DruidColumn(column_name="A")
column_dict = {"A": col}
res = DruidDatasource.get_filters([filtr], ["A"], column_dict)
self.assertEqual(6, res.filter["filter"]["value"])
filtr = {"col": "A", "op": "==", "val": "6"}
res = DruidDatasource.get_filters([filtr], ["A"], column_dict)
self.assertEqual(6, res.filter["filter"]["value"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_run_query_no_groupby(self):
client = Mock()
from_dttm = Mock()
to_dttm = Mock()
from_dttm.replace = Mock(return_value=from_dttm)
to_dttm.replace = Mock(return_value=to_dttm)
from_dttm.isoformat = Mock(return_value="from")
to_dttm.isoformat = Mock(return_value="to")
timezone = "timezone"
from_dttm.tzname = Mock(return_value=timezone)
ds = DruidDatasource(datasource_name="datasource")
metric1 = DruidMetric(metric_name="metric1")
metric2 = DruidMetric(metric_name="metric2")
ds.metrics = [metric1, metric2]
col1 = DruidColumn(column_name="col1")
col2 = DruidColumn(column_name="col2")
ds.columns = [col1, col2]
aggs = []
post_aggs = ["some_agg"]
ds._metrics_and_post_aggs = Mock(return_value=(aggs, post_aggs))
columns = []
metrics = ["metric1"]
ds.get_having_filters = Mock(return_value=[])
client.query_builder = Mock()
client.query_builder.last_query = Mock()
client.query_builder.last_query.query_dict = {"mock": 0}
# no groupby calls client.timeseries
ds.run_query(
metrics,
None,
from_dttm,
to_dttm,
groupby=columns,
client=client,
filter=[],
row_limit=100,
)
self.assertEqual(0, len(client.topn.call_args_list))
self.assertEqual(0, len(client.groupby.call_args_list))
self.assertEqual(1, len(client.timeseries.call_args_list))
# check that there is no dimensions entry
called_args = client.timeseries.call_args_list[0][1]
self.assertNotIn("dimensions", called_args)
self.assertIn("post_aggregations", called_args)
# restore functions
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_run_query_with_adhoc_metric(self):
client = Mock()
from_dttm = Mock()
to_dttm = Mock()
from_dttm.replace = Mock(return_value=from_dttm)
to_dttm.replace = Mock(return_value=to_dttm)
from_dttm.isoformat = Mock(return_value="from")
to_dttm.isoformat = Mock(return_value="to")
timezone = "timezone"
from_dttm.tzname = Mock(return_value=timezone)
ds = DruidDatasource(datasource_name="datasource")
metric1 = DruidMetric(metric_name="metric1")
metric2 = DruidMetric(metric_name="metric2")
ds.metrics = [metric1, metric2]
col1 = DruidColumn(column_name="col1")
col2 = DruidColumn(column_name="col2")
ds.columns = [col1, col2]
all_metrics = []
post_aggs = ["some_agg"]
ds._metrics_and_post_aggs = Mock(return_value=(all_metrics, post_aggs))
columns = []
metrics = [
{
"expressionType": "SIMPLE",
"column": {"type": "DOUBLE", "column_name": "col1"},
"aggregate": "SUM",
"label": "My Adhoc Metric",
}
]
ds.get_having_filters = Mock(return_value=[])
client.query_builder = Mock()
client.query_builder.last_query = Mock()
client.query_builder.last_query.query_dict = {"mock": 0}
# no groupby calls client.timeseries
ds.run_query(
metrics,
None,
from_dttm,
to_dttm,
groupby=columns,
client=client,
filter=[],
row_limit=100,
)
self.assertEqual(0, len(client.topn.call_args_list))
self.assertEqual(0, len(client.groupby.call_args_list))
self.assertEqual(1, len(client.timeseries.call_args_list))
# check that there is no dimensions entry
called_args = client.timeseries.call_args_list[0][1]
self.assertNotIn("dimensions", called_args)
self.assertIn("post_aggregations", called_args)
# restore functions
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_run_query_single_groupby(self):
client = Mock()
from_dttm = Mock()
to_dttm = Mock()
from_dttm.replace = Mock(return_value=from_dttm)
to_dttm.replace = Mock(return_value=to_dttm)
from_dttm.isoformat = Mock(return_value="from")
to_dttm.isoformat = Mock(return_value="to")
timezone = "timezone"
from_dttm.tzname = Mock(return_value=timezone)
ds = DruidDatasource(datasource_name="datasource")
metric1 = DruidMetric(metric_name="metric1")
metric2 = DruidMetric(metric_name="metric2")
ds.metrics = [metric1, metric2]
col1 = DruidColumn(column_name="col1")
col2 = DruidColumn(column_name="col2")
ds.columns = [col1, col2]
aggs = ["metric1"]
post_aggs = ["some_agg"]
ds._metrics_and_post_aggs = Mock(return_value=(aggs, post_aggs))
columns = ["col1"]
metrics = ["metric1"]
ds.get_having_filters = Mock(return_value=[])
client.query_builder.last_query.query_dict = {"mock": 0}
# client.topn is called twice
ds.run_query(
metrics,
None,
from_dttm,
to_dttm,
groupby=columns,
timeseries_limit=100,
client=client,
order_desc=True,
filter=[],
)
self.assertEqual(2, len(client.topn.call_args_list))
self.assertEqual(0, len(client.groupby.call_args_list))
self.assertEqual(0, len(client.timeseries.call_args_list))
# check that there is no dimensions entry
called_args_pre = client.topn.call_args_list[0][1]
self.assertNotIn("dimensions", called_args_pre)
self.assertIn("dimension", called_args_pre)
called_args = client.topn.call_args_list[1][1]
self.assertIn("dimension", called_args)
self.assertEqual("col1", called_args["dimension"])
# not order_desc
client = Mock()
client.query_builder.last_query.query_dict = {"mock": 0}
ds.run_query(
metrics,
None,
from_dttm,
to_dttm,
groupby=columns,
client=client,
order_desc=False,
filter=[],
row_limit=100,
)
self.assertEqual(0, len(client.topn.call_args_list))
self.assertEqual(1, len(client.groupby.call_args_list))
self.assertEqual(0, len(client.timeseries.call_args_list))
self.assertIn("dimensions", client.groupby.call_args_list[0][1])
self.assertEqual(["col1"], client.groupby.call_args_list[0][1]["dimensions"])
# order_desc but timeseries and dimension spec
# calls topn with single dimension spec 'dimension'
spec = {"outputName": "hello", "dimension": "matcho"}
spec_json = json.dumps(spec)
col3 = DruidColumn(column_name="col3", dimension_spec_json=spec_json)
ds.columns.append(col3)
groupby = ["col3"]
client = Mock()
client.query_builder.last_query.query_dict = {"mock": 0}
ds.run_query(
metrics,
None,
from_dttm,
to_dttm,
groupby=groupby,
client=client,
order_desc=True,
timeseries_limit=5,
filter=[],
row_limit=100,
)
self.assertEqual(2, len(client.topn.call_args_list))
self.assertEqual(0, len(client.groupby.call_args_list))
self.assertEqual(0, len(client.timeseries.call_args_list))
self.assertIn("dimension", client.topn.call_args_list[0][1])
self.assertIn("dimension", client.topn.call_args_list[1][1])
# uses dimension for pre query and full spec for final query
self.assertEqual("matcho", client.topn.call_args_list[0][1]["dimension"])
self.assertEqual(spec, client.topn.call_args_list[1][1]["dimension"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_run_query_multiple_groupby(self):
client = Mock()
from_dttm = Mock()
to_dttm = Mock()
from_dttm.replace = Mock(return_value=from_dttm)
to_dttm.replace = Mock(return_value=to_dttm)
from_dttm.isoformat = Mock(return_value="from")
to_dttm.isoformat = Mock(return_value="to")
timezone = "timezone"
from_dttm.tzname = Mock(return_value=timezone)
ds = DruidDatasource(datasource_name="datasource")
metric1 = DruidMetric(metric_name="metric1")
metric2 = DruidMetric(metric_name="metric2")
ds.metrics = [metric1, metric2]
col1 = DruidColumn(column_name="col1")
col2 = DruidColumn(column_name="col2")
ds.columns = [col1, col2]
aggs = []
post_aggs = ["some_agg"]
ds._metrics_and_post_aggs = Mock(return_value=(aggs, post_aggs))
columns = ["col1", "col2"]
metrics = ["metric1"]
ds.get_having_filters = Mock(return_value=[])
client.query_builder = Mock()
client.query_builder.last_query = Mock()
client.query_builder.last_query.query_dict = {"mock": 0}
# no groupby calls client.timeseries
ds.run_query(
metrics,
None,
from_dttm,
to_dttm,
groupby=columns,
client=client,
row_limit=100,
filter=[],
)
self.assertEqual(0, len(client.topn.call_args_list))
self.assertEqual(1, len(client.groupby.call_args_list))
self.assertEqual(0, len(client.timeseries.call_args_list))
# check that there is no dimensions entry
called_args = client.groupby.call_args_list[0][1]
self.assertIn("dimensions", called_args)
self.assertEqual(["col1", "col2"], called_args["dimensions"])
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_post_agg_returns_correct_agg_type(self):
get_post_agg = DruidDatasource.get_post_agg
# javascript PostAggregators
function = "function(field1, field2) { return field1 + field2; }"
conf = {
"type": "javascript",
"name": "postagg_name",
"fieldNames": ["field1", "field2"],
"function": function,
}
postagg = get_post_agg(conf)
self.assertTrue(isinstance(postagg, models.JavascriptPostAggregator))
self.assertEqual(postagg.name, "postagg_name")
self.assertEqual(postagg.post_aggregator["type"], "javascript")
self.assertEqual(postagg.post_aggregator["fieldNames"], ["field1", "field2"])
self.assertEqual(postagg.post_aggregator["name"], "postagg_name")
self.assertEqual(postagg.post_aggregator["function"], function)
# Quantile
conf = {"type": "quantile", "name": "postagg_name", "probability": "0.5"}
postagg = get_post_agg(conf)
self.assertTrue(isinstance(postagg, postaggs.Quantile))
self.assertEqual(postagg.name, "postagg_name")
self.assertEqual(postagg.post_aggregator["probability"], "0.5")
# Quantiles
conf = {
"type": "quantiles",
"name": "postagg_name",
"probabilities": "0.4,0.5,0.6",
}
postagg = get_post_agg(conf)
self.assertTrue(isinstance(postagg, postaggs.Quantiles))
self.assertEqual(postagg.name, "postagg_name")
self.assertEqual(postagg.post_aggregator["probabilities"], "0.4,0.5,0.6")
# FieldAccess
conf = {"type": "fieldAccess", "name": "field_name"}
postagg = get_post_agg(conf)
self.assertTrue(isinstance(postagg, postaggs.Field))
self.assertEqual(postagg.name, "field_name")
# constant
conf = {"type": "constant", "value": 1234, "name": "postagg_name"}
postagg = get_post_agg(conf)
self.assertTrue(isinstance(postagg, postaggs.Const))
self.assertEqual(postagg.name, "postagg_name")
self.assertEqual(postagg.post_aggregator["value"], 1234)
# hyperUniqueCardinality
conf = {"type": "hyperUniqueCardinality", "name": "unique_name"}
postagg = get_post_agg(conf)
self.assertTrue(isinstance(postagg, postaggs.HyperUniqueCardinality))
self.assertEqual(postagg.name, "unique_name")
# arithmetic
conf = {
"type": "arithmetic",
"fn": "+",
"fields": ["field1", "field2"],
"name": "postagg_name",
}
postagg = get_post_agg(conf)
self.assertTrue(isinstance(postagg, postaggs.Postaggregator))
self.assertEqual(postagg.name, "postagg_name")
self.assertEqual(postagg.post_aggregator["fn"], "+")
self.assertEqual(postagg.post_aggregator["fields"], ["field1", "field2"])
# custom post aggregator
conf = {"type": "custom", "name": "custom_name", "stuff": "more_stuff"}
postagg = get_post_agg(conf)
self.assertTrue(isinstance(postagg, models.CustomPostAggregator))
self.assertEqual(postagg.name, "custom_name")
self.assertEqual(postagg.post_aggregator["stuff"], "more_stuff")
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_find_postaggs_for_returns_postaggs_and_removes(self):
find_postaggs_for = DruidDatasource.find_postaggs_for
postagg_names = set(["pa2", "pa3", "pa4", "m1", "m2", "m3", "m4"])
metrics = {}
for i in range(1, 6):
emplace(metrics, "pa" + str(i), True)
emplace(metrics, "m" + str(i), False)
postagg_list = find_postaggs_for(postagg_names, metrics)
self.assertEqual(3, len(postagg_list))
self.assertEqual(4, len(postagg_names))
expected_metrics = ["m1", "m2", "m3", "m4"]
expected_postaggs = set(["pa2", "pa3", "pa4"])
for postagg in postagg_list:
expected_postaggs.remove(postagg.metric_name)
for metric in expected_metrics:
postagg_names.remove(metric)
self.assertEqual(0, len(expected_postaggs))
self.assertEqual(0, len(postagg_names))
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_recursive_get_fields(self):
conf = {
"type": "quantile",
"fieldName": "f1",
"field": {
"type": "custom",
"fields": [
{"type": "fieldAccess", "fieldName": "f2"},
{"type": "fieldAccess", "fieldName": "f3"},
{
"type": "quantiles",
"fieldName": "f4",
"field": {"type": "custom"},
},
{
"type": "custom",
"fields": [
{"type": "fieldAccess", "fieldName": "f5"},
{
"type": "fieldAccess",
"fieldName": "f2",
"fields": [
{"type": "fieldAccess", "fieldName": "f3"},
{"type": "fieldIgnoreMe", "fieldName": "f6"},
],
},
],
},
],
},
}
fields = DruidDatasource.recursive_get_fields(conf)
expected = set(["f1", "f2", "f3", "f4", "f5"])
self.assertEqual(5, len(fields))
for field in fields:
expected.remove(field)
self.assertEqual(0, len(expected))
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_metrics_and_post_aggs_tree(self):
metrics = ["A", "B", "m1", "m2"]
metrics_dict = {}
for i in range(ord("A"), ord("K") + 1):
emplace(metrics_dict, chr(i), True)
for i in range(1, 10):
emplace(metrics_dict, "m" + str(i), False)
def depends_on(index, fields):
dependents = fields if isinstance(fields, list) else [fields]
metrics_dict[index].json_obj = {"fieldNames": dependents}
depends_on("A", ["m1", "D", "C"])
depends_on("B", ["B", "C", "E", "F", "m3"])
depends_on("C", ["H", "I"])
depends_on("D", ["m2", "m5", "G", "C"])
depends_on("E", ["H", "I", "J"])
depends_on("F", ["J", "m5"])
depends_on("G", ["m4", "m7", "m6", "A"])
depends_on("H", ["A", "m4", "I"])
depends_on("I", ["H", "K"])
depends_on("J", "K")
depends_on("K", ["m8", "m9"])
aggs, postaggs = DruidDatasource.metrics_and_post_aggs(metrics, metrics_dict)
expected_metrics = set(aggs.keys())
self.assertEqual(9, len(aggs))
for i in range(1, 10):
expected_metrics.remove("m" + str(i))
self.assertEqual(0, len(expected_metrics))
self.assertEqual(11, len(postaggs))
for i in range(ord("A"), ord("K") + 1):
del postaggs[chr(i)]
self.assertEqual(0, len(postaggs))
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_metrics_and_post_aggs(self):
"""
Test generation of metrics and post-aggregations from an initial list
of superset metrics (which may include the results of either). This
primarily tests that specifying a post-aggregator metric will also
require the raw aggregation of the associated druid metric column.
"""
metrics_dict = {
"unused_count": DruidMetric(
metric_name="unused_count",
verbose_name="COUNT(*)",
metric_type="count",
json=json.dumps({"type": "count", "name": "unused_count"}),
),
"some_sum": DruidMetric(
metric_name="some_sum",
verbose_name="SUM(*)",
metric_type="sum",
json=json.dumps({"type": "sum", "name": "sum"}),
),
"a_histogram": DruidMetric(
metric_name="a_histogram",
verbose_name="APPROXIMATE_HISTOGRAM(*)",
metric_type="approxHistogramFold",
json=json.dumps({"type": "approxHistogramFold", "name": "a_histogram"}),
),
"aCustomMetric": DruidMetric(
metric_name="aCustomMetric",
verbose_name="MY_AWESOME_METRIC(*)",
metric_type="aCustomType",
json=json.dumps({"type": "customMetric", "name": "aCustomMetric"}),
),
"quantile_p95": DruidMetric(
metric_name="quantile_p95",
verbose_name="P95(*)",
metric_type="postagg",
json=json.dumps(
{
"type": "quantile",
"probability": 0.95,
"name": "p95",
"fieldName": "a_histogram",
}
),
),
"aCustomPostAgg": DruidMetric(
metric_name="aCustomPostAgg",
verbose_name="CUSTOM_POST_AGG(*)",
metric_type="postagg",
json=json.dumps(
{
"type": "customPostAgg",
"name": "aCustomPostAgg",
"field": {"type": "fieldAccess", "fieldName": "aCustomMetric"},
}
),
),
}
adhoc_metric = {
"expressionType": "SIMPLE",
"column": {"type": "DOUBLE", "column_name": "value"},
"aggregate": "SUM",
"label": "My Adhoc Metric",
}
metrics = ["some_sum"]
saved_metrics, post_aggs = DruidDatasource.metrics_and_post_aggs(
metrics, metrics_dict
)
assert set(saved_metrics.keys()) == {"some_sum"}
assert post_aggs == {}
metrics = [adhoc_metric]
saved_metrics, post_aggs = DruidDatasource.metrics_and_post_aggs(
metrics, metrics_dict
)
assert set(saved_metrics.keys()) == set([adhoc_metric["label"]])
assert post_aggs == {}
metrics = ["some_sum", adhoc_metric]
saved_metrics, post_aggs = DruidDatasource.metrics_and_post_aggs(
metrics, metrics_dict
)
assert set(saved_metrics.keys()) == {"some_sum", adhoc_metric["label"]}
assert post_aggs == {}
metrics = ["quantile_p95"]
saved_metrics, post_aggs = DruidDatasource.metrics_and_post_aggs(
metrics, metrics_dict
)
result_postaggs = set(["quantile_p95"])
assert set(saved_metrics.keys()) == {"a_histogram"}
assert set(post_aggs.keys()) == result_postaggs
metrics = ["aCustomPostAgg"]
saved_metrics, post_aggs = DruidDatasource.metrics_and_post_aggs(
metrics, metrics_dict
)
result_postaggs = set(["aCustomPostAgg"])
assert set(saved_metrics.keys()) == {"aCustomMetric"}
assert set(post_aggs.keys()) == result_postaggs
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_druid_type_from_adhoc_metric(self):
druid_type = DruidDatasource.druid_type_from_adhoc_metric(
{
"column": {"type": "DOUBLE", "column_name": "value"},
"aggregate": "SUM",
"label": "My Adhoc Metric",
}
)
assert druid_type == "doubleSum"
druid_type = DruidDatasource.druid_type_from_adhoc_metric(
{
"column": {"type": "LONG", "column_name": "value"},
"aggregate": "MAX",
"label": "My Adhoc Metric",
}
)
assert druid_type == "longMax"
druid_type = DruidDatasource.druid_type_from_adhoc_metric(
{
"column": {"type": "VARCHAR(255)", "column_name": "value"},
"aggregate": "COUNT",
"label": "My Adhoc Metric",
}
)
assert druid_type == "count"
druid_type = DruidDatasource.druid_type_from_adhoc_metric(
{
"column": {"type": "VARCHAR(255)", "column_name": "value"},
"aggregate": "COUNT_DISTINCT",
"label": "My Adhoc Metric",
}
)
assert druid_type == "cardinality"
druid_type = DruidDatasource.druid_type_from_adhoc_metric(
{
"column": {"type": "hyperUnique", "column_name": "value"},
"aggregate": "COUNT_DISTINCT",
"label": "My Adhoc Metric",
}
)
assert druid_type == "hyperUnique"
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_run_query_order_by_metrics(self):
client = Mock()
client.query_builder.last_query.query_dict = {"mock": 0}
from_dttm = Mock()
to_dttm = Mock()
ds = DruidDatasource(datasource_name="datasource")
ds.get_having_filters = Mock(return_value=[])
dim1 = DruidColumn(column_name="dim1")
dim2 = DruidColumn(column_name="dim2")
metrics_dict = {
"count1": DruidMetric(
metric_name="count1",
metric_type="count",
json=json.dumps({"type": "count", "name": "count1"}),
),
"sum1": DruidMetric(
metric_name="sum1",
metric_type="doubleSum",
json=json.dumps({"type": "doubleSum", "name": "sum1"}),
),
"sum2": DruidMetric(
metric_name="sum2",
metric_type="doubleSum",
json=json.dumps({"type": "doubleSum", "name": "sum2"}),
),
"div1": DruidMetric(
metric_name="div1",
metric_type="postagg",
json=json.dumps(
{
"fn": "/",
"type": "arithmetic",
"name": "div1",
"fields": [
{"fieldName": "sum1", "type": "fieldAccess"},
{"fieldName": "sum2", "type": "fieldAccess"},
],
}
),
),
}
ds.columns = [dim1, dim2]
ds.metrics = list(metrics_dict.values())
columns = ["dim1"]
metrics = ["count1"]
granularity = "all"
# get the counts of the top 5 'dim1's, order by 'sum1'
ds.run_query(
metrics,
granularity,
from_dttm,
to_dttm,
groupby=columns,
timeseries_limit=5,
timeseries_limit_metric="sum1",
client=client,
order_desc=True,
filter=[],
)
qry_obj = client.topn.call_args_list[0][1]
self.assertEqual("dim1", qry_obj["dimension"])
self.assertEqual("sum1", qry_obj["metric"])
aggregations = qry_obj["aggregations"]
post_aggregations = qry_obj["post_aggregations"]
self.assertEqual({"count1", "sum1"}, set(aggregations.keys()))
self.assertEqual(set(), set(post_aggregations.keys()))
# get the counts of the top 5 'dim1's, order by 'div1'
ds.run_query(
metrics,
granularity,
from_dttm,
to_dttm,
groupby=columns,
timeseries_limit=5,
timeseries_limit_metric="div1",
client=client,
order_desc=True,
filter=[],
)
qry_obj = client.topn.call_args_list[1][1]
self.assertEqual("dim1", qry_obj["dimension"])
self.assertEqual("div1", qry_obj["metric"])
aggregations = qry_obj["aggregations"]
post_aggregations = qry_obj["post_aggregations"]
self.assertEqual({"count1", "sum1", "sum2"}, set(aggregations.keys()))
self.assertEqual({"div1"}, set(post_aggregations.keys()))
columns = ["dim1", "dim2"]
# get the counts of the top 5 ['dim1', 'dim2']s, order by 'sum1'
ds.run_query(
metrics,
granularity,
from_dttm,
to_dttm,
groupby=columns,
timeseries_limit=5,
timeseries_limit_metric="sum1",
client=client,
order_desc=True,
filter=[],
)
qry_obj = client.groupby.call_args_list[0][1]
self.assertEqual({"dim1", "dim2"}, set(qry_obj["dimensions"]))
self.assertEqual("sum1", qry_obj["limit_spec"]["columns"][0]["dimension"])
aggregations = qry_obj["aggregations"]
post_aggregations = qry_obj["post_aggregations"]
self.assertEqual({"count1", "sum1"}, set(aggregations.keys()))
self.assertEqual(set(), set(post_aggregations.keys()))
# get the counts of the top 5 ['dim1', 'dim2']s, order by 'div1'
ds.run_query(
metrics,
granularity,
from_dttm,
to_dttm,
groupby=columns,
timeseries_limit=5,
timeseries_limit_metric="div1",
client=client,
order_desc=True,
filter=[],
)
qry_obj = client.groupby.call_args_list[1][1]
self.assertEqual({"dim1", "dim2"}, set(qry_obj["dimensions"]))
self.assertEqual("div1", qry_obj["limit_spec"]["columns"][0]["dimension"])
aggregations = qry_obj["aggregations"]
post_aggregations = qry_obj["post_aggregations"]
self.assertEqual({"count1", "sum1", "sum2"}, set(aggregations.keys()))
self.assertEqual({"div1"}, set(post_aggregations.keys()))
@unittest.skipUnless(
SupersetTestCase.is_module_installed("pydruid"), "pydruid not installed"
)
def test_get_aggregations(self):
ds = DruidDatasource(datasource_name="datasource")
metrics_dict = {
"sum1": DruidMetric(
metric_name="sum1",
metric_type="doubleSum",
json=json.dumps({"type": "doubleSum", "name": "sum1"}),
),
"sum2": DruidMetric(
metric_name="sum2",
metric_type="doubleSum",
json=json.dumps({"type": "doubleSum", "name": "sum2"}),
),
"div1": DruidMetric(
metric_name="div1",
metric_type="postagg",
json=json.dumps(
{
"fn": "/",
"type": "arithmetic",
"name": "div1",
"fields": [
{"fieldName": "sum1", "type": "fieldAccess"},
{"fieldName": "sum2", "type": "fieldAccess"},
],
}
),
),
}
metric_names = ["sum1", "sum2"]
aggs = ds.get_aggregations(metrics_dict, metric_names)
expected_agg = {name: metrics_dict[name].json_obj for name in metric_names}
self.assertEqual(expected_agg, aggs)
metric_names = ["sum1", "col1"]
self.assertRaises(
SupersetException, ds.get_aggregations, metrics_dict, metric_names
)
metric_names = ["sum1", "div1"]
self.assertRaises(
SupersetException, ds.get_aggregations, metrics_dict, metric_names
)