blob: 4b2b45bb66c07070b533aeaf8290206e7669258f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=too-many-ancestors
import json
import logging
from datetime import datetime
from flask import flash, Markup, redirect
from flask_appbuilder import CompactCRUDMixin, expose
from flask_appbuilder.fieldwidgets import Select2Widget
from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_appbuilder.security.decorators import has_access
from flask_babel import lazy_gettext as _
from wtforms import StringField
from wtforms.ext.sqlalchemy.fields import QuerySelectField
from superset import db, security_manager
from superset.connectors.base.views import BS3TextFieldROWidget, DatasourceModelView
from superset.connectors.connector_registry import ConnectorRegistry
from superset.connectors.druid import models
from superset.constants import RouteMethod
from superset.typing import FlaskResponse
from superset.utils import core as utils
from superset.views.base import (
BaseSupersetView,
DatasourceFilter,
DeleteMixin,
get_dataset_exist_error_msg,
ListWidgetWithCheckboxes,
SupersetModelView,
validate_json,
YamlExportMixin,
)
logger = logging.getLogger(__name__)
class DruidColumnInlineView(CompactCRUDMixin, SupersetModelView):
datamodel = SQLAInterface(models.DruidColumn)
include_route_methods = RouteMethod.RELATED_VIEW_SET
list_title = _("Columns")
show_title = _("Show Druid Column")
add_title = _("Add Druid Column")
edit_title = _("Edit Druid Column")
list_widget = ListWidgetWithCheckboxes
edit_columns = [
"column_name",
"verbose_name",
"description",
"dimension_spec_json",
"datasource",
"groupby",
"filterable",
]
add_columns = edit_columns
list_columns = ["column_name", "verbose_name", "type", "groupby", "filterable"]
can_delete = False
page_size = 500
label_columns = {
"column_name": _("Column"),
"type": _("Type"),
"datasource": _("Datasource"),
"groupby": _("Groupable"),
"filterable": _("Filterable"),
}
description_columns = {
"filterable": _(
"Whether this column is exposed in the `Filters` section "
"of the explore view."
),
"dimension_spec_json": utils.markdown(
"this field can be used to specify "
"a `dimensionSpec` as documented [here]"
"(http://druid.io/docs/latest/querying/dimensionspecs.html). "
"Make sure to input valid JSON and that the "
"`outputName` matches the `column_name` defined "
"above.",
True,
),
}
add_form_extra_fields = {
"datasource": QuerySelectField(
"Datasource",
query_factory=lambda: db.session.query(models.DruidDatasource),
allow_blank=True,
widget=Select2Widget(extra_classes="readonly"),
)
}
edit_form_extra_fields = add_form_extra_fields
def pre_update(self, item: "DruidColumnInlineView") -> None:
# If a dimension spec JSON is given, ensure that it is
# valid JSON and that `outputName` is specified
if item.dimension_spec_json:
try:
dimension_spec = json.loads(item.dimension_spec_json)
except ValueError as ex:
raise ValueError("Invalid Dimension Spec JSON: " + str(ex))
if not isinstance(dimension_spec, dict):
raise ValueError("Dimension Spec must be a JSON object")
if "outputName" not in dimension_spec:
raise ValueError("Dimension Spec does not contain `outputName`")
if "dimension" not in dimension_spec:
raise ValueError("Dimension Spec is missing `dimension`")
# `outputName` should be the same as the `column_name`
if dimension_spec["outputName"] != item.column_name:
raise ValueError(
"`outputName` [{}] unequal to `column_name` [{}]".format(
dimension_spec["outputName"], item.column_name
)
)
def post_update(self, item: "DruidColumnInlineView") -> None:
item.refresh_metrics()
def post_add(self, item: "DruidColumnInlineView") -> None:
self.post_update(item)
class DruidMetricInlineView(CompactCRUDMixin, SupersetModelView):
datamodel = SQLAInterface(models.DruidMetric)
include_route_methods = RouteMethod.RELATED_VIEW_SET
list_title = _("Metrics")
show_title = _("Show Druid Metric")
add_title = _("Add Druid Metric")
edit_title = _("Edit Druid Metric")
list_columns = ["metric_name", "verbose_name", "metric_type"]
edit_columns = [
"metric_name",
"description",
"verbose_name",
"metric_type",
"json",
"datasource",
"d3format",
"warning_text",
]
add_columns = edit_columns
page_size = 500
validators_columns = {"json": [validate_json]}
description_columns = {
"metric_type": utils.markdown(
"use `postagg` as the metric type if you are defining a "
"[Druid Post Aggregation]"
"(http://druid.io/docs/latest/querying/post-aggregations.html)",
True,
)
}
label_columns = {
"metric_name": _("Metric"),
"description": _("Description"),
"verbose_name": _("Verbose Name"),
"metric_type": _("Type"),
"json": _("JSON"),
"datasource": _("Druid Datasource"),
"warning_text": _("Warning Message"),
}
add_form_extra_fields = {
"datasource": QuerySelectField(
"Datasource",
query_factory=lambda: db.session.query(models.DruidDatasource),
allow_blank=True,
widget=Select2Widget(extra_classes="readonly"),
)
}
edit_form_extra_fields = add_form_extra_fields
class DruidClusterModelView(SupersetModelView, DeleteMixin, YamlExportMixin):
datamodel = SQLAInterface(models.DruidCluster)
include_route_methods = RouteMethod.CRUD_SET
list_title = _("Druid Clusters")
show_title = _("Show Druid Cluster")
add_title = _("Add Druid Cluster")
edit_title = _("Edit Druid Cluster")
add_columns = [
"verbose_name",
"broker_host",
"broker_port",
"broker_user",
"broker_pass",
"broker_endpoint",
"cache_timeout",
"cluster_name",
]
edit_columns = add_columns
list_columns = ["cluster_name", "metadata_last_refreshed"]
search_columns = ("cluster_name",)
label_columns = {
"cluster_name": _("Cluster Name"),
"broker_host": _("Broker Host"),
"broker_port": _("Broker Port"),
"broker_user": _("Broker Username"),
"broker_pass": _("Broker Password"),
"broker_endpoint": _("Broker Endpoint"),
"verbose_name": _("Verbose Name"),
"cache_timeout": _("Cache Timeout"),
"metadata_last_refreshed": _("Metadata Last Refreshed"),
}
description_columns = {
"cache_timeout": _(
"Duration (in seconds) of the caching timeout for this cluster. "
"A timeout of 0 indicates that the cache never expires. "
"Note this defaults to the global timeout if undefined."
),
"broker_user": _(
"Druid supports basic authentication. See "
"[auth](http://druid.io/docs/latest/design/auth.html) and "
"druid-basic-security extension"
),
"broker_pass": _(
"Druid supports basic authentication. See "
"[auth](http://druid.io/docs/latest/design/auth.html) and "
"druid-basic-security extension"
),
}
yaml_dict_key = "databases"
def pre_add(self, item: "DruidClusterModelView") -> None:
security_manager.add_permission_view_menu("database_access", item.perm)
def pre_update(self, item: "DruidClusterModelView") -> None:
self.pre_add(item)
def _delete(self, pk: int) -> None:
DeleteMixin._delete(self, pk)
class DruidDatasourceModelView(DatasourceModelView, DeleteMixin, YamlExportMixin):
datamodel = SQLAInterface(models.DruidDatasource)
include_route_methods = RouteMethod.CRUD_SET
list_title = _("Druid Datasources")
show_title = _("Show Druid Datasource")
add_title = _("Add Druid Datasource")
edit_title = _("Edit Druid Datasource")
list_columns = ["datasource_link", "cluster", "changed_by_", "modified"]
order_columns = ["datasource_link", "modified"]
related_views = [DruidColumnInlineView, DruidMetricInlineView]
edit_columns = [
"datasource_name",
"cluster",
"description",
"owners",
"is_hidden",
"filter_select_enabled",
"fetch_values_from",
"default_endpoint",
"offset",
"cache_timeout",
]
search_columns = ("datasource_name", "cluster", "description", "owners")
add_columns = edit_columns
show_columns = add_columns + ["perm", "slices"]
page_size = 500
base_order = ("datasource_name", "asc")
description_columns = {
"slices": _(
"The list of charts associated with this table. By "
"altering this datasource, you may change how these associated "
"charts behave. "
"Also note that charts need to point to a datasource, so "
"this form will fail at saving if removing charts from a "
"datasource. If you want to change the datasource for a chart, "
"overwrite the chart from the 'explore view'"
),
"offset": _("Timezone offset (in hours) for this datasource"),
"description": Markup(
'Supports <a href="'
'https://daringfireball.net/projects/markdown/">markdown</a>'
),
"fetch_values_from": _(
"Time expression to use as a predicate when retrieving "
"distinct values to populate the filter component. "
"Only applies when `Enable Filter Select` is on. If "
"you enter `7 days ago`, the distinct list of values in "
"the filter will be populated based on the distinct value over "
"the past week"
),
"filter_select_enabled": _(
"Whether to populate the filter's dropdown in the explore "
"view's filter section with a list of distinct values fetched "
"from the backend on the fly"
),
"default_endpoint": _(
"Redirects to this endpoint when clicking on the datasource "
"from the datasource list"
),
"cache_timeout": _(
"Duration (in seconds) of the caching timeout for this datasource. "
"A timeout of 0 indicates that the cache never expires. "
"Note this defaults to the cluster timeout if undefined."
),
}
base_filters = [["id", DatasourceFilter, lambda: []]]
label_columns = {
"slices": _("Associated Charts"),
"datasource_link": _("Data Source"),
"cluster": _("Cluster"),
"description": _("Description"),
"owners": _("Owners"),
"is_hidden": _("Is Hidden"),
"filter_select_enabled": _("Enable Filter Select"),
"default_endpoint": _("Default Endpoint"),
"offset": _("Time Offset"),
"cache_timeout": _("Cache Timeout"),
"datasource_name": _("Datasource Name"),
"fetch_values_from": _("Fetch Values From"),
"changed_by_": _("Changed By"),
"modified": _("Modified"),
}
edit_form_extra_fields = {
"cluster": QuerySelectField(
"Cluster",
query_factory=lambda: db.session.query(models.DruidCluster),
widget=Select2Widget(extra_classes="readonly"),
),
"datasource_name": StringField(
"Datasource Name", widget=BS3TextFieldROWidget()
),
}
def pre_add(self, item: "DruidDatasourceModelView") -> None:
with db.session.no_autoflush:
query = db.session.query(models.DruidDatasource).filter(
models.DruidDatasource.datasource_name == item.datasource_name,
models.DruidDatasource.cluster_id == item.cluster_id,
)
if db.session.query(query.exists()).scalar():
raise Exception(get_dataset_exist_error_msg(item.full_name))
def post_add(self, item: "DruidDatasourceModelView") -> None:
item.refresh_metrics()
security_manager.add_permission_view_menu("datasource_access", item.get_perm())
if item.schema:
security_manager.add_permission_view_menu("schema_access", item.schema_perm)
def post_update(self, item: "DruidDatasourceModelView") -> None:
self.post_add(item)
def _delete(self, pk: int) -> None:
DeleteMixin._delete(self, pk)
class Druid(BaseSupersetView):
"""The base views for Superset!"""
@has_access
@expose("/refresh_datasources/")
def refresh_datasources( # pylint: disable=no-self-use
self, refresh_all: bool = True
) -> FlaskResponse:
"""endpoint that refreshes druid datasources metadata"""
session = db.session()
DruidCluster = ConnectorRegistry.sources[ # pylint: disable=invalid-name
"druid"
].cluster_class
for cluster in session.query(DruidCluster).all():
cluster_name = cluster.cluster_name
valid_cluster = True
try:
cluster.refresh_datasources(refresh_all=refresh_all)
except Exception as ex: # pylint: disable=broad-except
valid_cluster = False
flash(
"Error while processing cluster '{}'\n{}".format(
cluster_name, utils.error_msg_from_exception(ex)
),
"danger",
)
logger.exception(ex)
if valid_cluster:
cluster.metadata_last_refreshed = datetime.now()
flash(
_("Refreshed metadata from cluster [{}]").format(
cluster.cluster_name
),
"info",
)
session.commit()
return redirect("/druiddatasourcemodelview/list/")
@has_access
@expose("/scan_new_datasources/")
def scan_new_datasources(self) -> FlaskResponse:
"""
Calling this endpoint will cause a scan for new
datasources only and add them.
"""
return self.refresh_datasources(refresh_all=False)