blob: 4b2b45bb66c07070b533aeaf8290206e7669258f [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# pylint: disable=too-many-ancestors
import json
import logging
from datetime import datetime
from flask import flash, Markup, redirect
from flask_appbuilder import CompactCRUDMixin, expose
from flask_appbuilder.fieldwidgets import Select2Widget
from flask_appbuilder.models.sqla.interface import SQLAInterface
from import has_access
from flask_babel import lazy_gettext as _
from wtforms import StringField
from wtforms.ext.sqlalchemy.fields import QuerySelectField
from superset import db, security_manager
from superset.connectors.base.views import BS3TextFieldROWidget, DatasourceModelView
from superset.connectors.connector_registry import ConnectorRegistry
from superset.connectors.druid import models
from superset.constants import RouteMethod
from superset.typing import FlaskResponse
from superset.utils import core as utils
from superset.views.base import (
logger = logging.getLogger(__name__)
class DruidColumnInlineView(CompactCRUDMixin, SupersetModelView):
datamodel = SQLAInterface(models.DruidColumn)
include_route_methods = RouteMethod.RELATED_VIEW_SET
list_title = _("Columns")
show_title = _("Show Druid Column")
add_title = _("Add Druid Column")
edit_title = _("Edit Druid Column")
list_widget = ListWidgetWithCheckboxes
edit_columns = [
add_columns = edit_columns
list_columns = ["column_name", "verbose_name", "type", "groupby", "filterable"]
can_delete = False
page_size = 500
label_columns = {
"column_name": _("Column"),
"type": _("Type"),
"datasource": _("Datasource"),
"groupby": _("Groupable"),
"filterable": _("Filterable"),
description_columns = {
"filterable": _(
"Whether this column is exposed in the `Filters` section "
"of the explore view."
"dimension_spec_json": utils.markdown(
"this field can be used to specify "
"a `dimensionSpec` as documented [here]"
"( "
"Make sure to input valid JSON and that the "
"`outputName` matches the `column_name` defined "
add_form_extra_fields = {
"datasource": QuerySelectField(
query_factory=lambda: db.session.query(models.DruidDatasource),
edit_form_extra_fields = add_form_extra_fields
def pre_update(self, item: "DruidColumnInlineView") -> None:
# If a dimension spec JSON is given, ensure that it is
# valid JSON and that `outputName` is specified
if item.dimension_spec_json:
dimension_spec = json.loads(item.dimension_spec_json)
except ValueError as ex:
raise ValueError("Invalid Dimension Spec JSON: " + str(ex))
if not isinstance(dimension_spec, dict):
raise ValueError("Dimension Spec must be a JSON object")
if "outputName" not in dimension_spec:
raise ValueError("Dimension Spec does not contain `outputName`")
if "dimension" not in dimension_spec:
raise ValueError("Dimension Spec is missing `dimension`")
# `outputName` should be the same as the `column_name`
if dimension_spec["outputName"] != item.column_name:
raise ValueError(
"`outputName` [{}] unequal to `column_name` [{}]".format(
dimension_spec["outputName"], item.column_name
def post_update(self, item: "DruidColumnInlineView") -> None:
def post_add(self, item: "DruidColumnInlineView") -> None:
class DruidMetricInlineView(CompactCRUDMixin, SupersetModelView):
datamodel = SQLAInterface(models.DruidMetric)
include_route_methods = RouteMethod.RELATED_VIEW_SET
list_title = _("Metrics")
show_title = _("Show Druid Metric")
add_title = _("Add Druid Metric")
edit_title = _("Edit Druid Metric")
list_columns = ["metric_name", "verbose_name", "metric_type"]
edit_columns = [
add_columns = edit_columns
page_size = 500
validators_columns = {"json": [validate_json]}
description_columns = {
"metric_type": utils.markdown(
"use `postagg` as the metric type if you are defining a "
"[Druid Post Aggregation]"
label_columns = {
"metric_name": _("Metric"),
"description": _("Description"),
"verbose_name": _("Verbose Name"),
"metric_type": _("Type"),
"json": _("JSON"),
"datasource": _("Druid Datasource"),
"warning_text": _("Warning Message"),
add_form_extra_fields = {
"datasource": QuerySelectField(
query_factory=lambda: db.session.query(models.DruidDatasource),
edit_form_extra_fields = add_form_extra_fields
class DruidClusterModelView(SupersetModelView, DeleteMixin, YamlExportMixin):
datamodel = SQLAInterface(models.DruidCluster)
include_route_methods = RouteMethod.CRUD_SET
list_title = _("Druid Clusters")
show_title = _("Show Druid Cluster")
add_title = _("Add Druid Cluster")
edit_title = _("Edit Druid Cluster")
add_columns = [
edit_columns = add_columns
list_columns = ["cluster_name", "metadata_last_refreshed"]
search_columns = ("cluster_name",)
label_columns = {
"cluster_name": _("Cluster Name"),
"broker_host": _("Broker Host"),
"broker_port": _("Broker Port"),
"broker_user": _("Broker Username"),
"broker_pass": _("Broker Password"),
"broker_endpoint": _("Broker Endpoint"),
"verbose_name": _("Verbose Name"),
"cache_timeout": _("Cache Timeout"),
"metadata_last_refreshed": _("Metadata Last Refreshed"),
description_columns = {
"cache_timeout": _(
"Duration (in seconds) of the caching timeout for this cluster. "
"A timeout of 0 indicates that the cache never expires. "
"Note this defaults to the global timeout if undefined."
"broker_user": _(
"Druid supports basic authentication. See "
"[auth]( and "
"druid-basic-security extension"
"broker_pass": _(
"Druid supports basic authentication. See "
"[auth]( and "
"druid-basic-security extension"
yaml_dict_key = "databases"
def pre_add(self, item: "DruidClusterModelView") -> None:
security_manager.add_permission_view_menu("database_access", item.perm)
def pre_update(self, item: "DruidClusterModelView") -> None:
def _delete(self, pk: int) -> None:
DeleteMixin._delete(self, pk)
class DruidDatasourceModelView(DatasourceModelView, DeleteMixin, YamlExportMixin):
datamodel = SQLAInterface(models.DruidDatasource)
include_route_methods = RouteMethod.CRUD_SET
list_title = _("Druid Datasources")
show_title = _("Show Druid Datasource")
add_title = _("Add Druid Datasource")
edit_title = _("Edit Druid Datasource")
list_columns = ["datasource_link", "cluster", "changed_by_", "modified"]
order_columns = ["datasource_link", "modified"]
related_views = [DruidColumnInlineView, DruidMetricInlineView]
edit_columns = [
search_columns = ("datasource_name", "cluster", "description", "owners")
add_columns = edit_columns
show_columns = add_columns + ["perm", "slices"]
page_size = 500
base_order = ("datasource_name", "asc")
description_columns = {
"slices": _(
"The list of charts associated with this table. By "
"altering this datasource, you may change how these associated "
"charts behave. "
"Also note that charts need to point to a datasource, so "
"this form will fail at saving if removing charts from a "
"datasource. If you want to change the datasource for a chart, "
"overwrite the chart from the 'explore view'"
"offset": _("Timezone offset (in hours) for this datasource"),
"description": Markup(
'Supports <a href="'
"fetch_values_from": _(
"Time expression to use as a predicate when retrieving "
"distinct values to populate the filter component. "
"Only applies when `Enable Filter Select` is on. If "
"you enter `7 days ago`, the distinct list of values in "
"the filter will be populated based on the distinct value over "
"the past week"
"filter_select_enabled": _(
"Whether to populate the filter's dropdown in the explore "
"view's filter section with a list of distinct values fetched "
"from the backend on the fly"
"default_endpoint": _(
"Redirects to this endpoint when clicking on the datasource "
"from the datasource list"
"cache_timeout": _(
"Duration (in seconds) of the caching timeout for this datasource. "
"A timeout of 0 indicates that the cache never expires. "
"Note this defaults to the cluster timeout if undefined."
base_filters = [["id", DatasourceFilter, lambda: []]]
label_columns = {
"slices": _("Associated Charts"),
"datasource_link": _("Data Source"),
"cluster": _("Cluster"),
"description": _("Description"),
"owners": _("Owners"),
"is_hidden": _("Is Hidden"),
"filter_select_enabled": _("Enable Filter Select"),
"default_endpoint": _("Default Endpoint"),
"offset": _("Time Offset"),
"cache_timeout": _("Cache Timeout"),
"datasource_name": _("Datasource Name"),
"fetch_values_from": _("Fetch Values From"),
"changed_by_": _("Changed By"),
"modified": _("Modified"),
edit_form_extra_fields = {
"cluster": QuerySelectField(
query_factory=lambda: db.session.query(models.DruidCluster),
"datasource_name": StringField(
"Datasource Name", widget=BS3TextFieldROWidget()
def pre_add(self, item: "DruidDatasourceModelView") -> None:
with db.session.no_autoflush:
query = db.session.query(models.DruidDatasource).filter(
models.DruidDatasource.datasource_name == item.datasource_name,
models.DruidDatasource.cluster_id == item.cluster_id,
if db.session.query(query.exists()).scalar():
raise Exception(get_dataset_exist_error_msg(item.full_name))
def post_add(self, item: "DruidDatasourceModelView") -> None:
security_manager.add_permission_view_menu("datasource_access", item.get_perm())
if item.schema:
security_manager.add_permission_view_menu("schema_access", item.schema_perm)
def post_update(self, item: "DruidDatasourceModelView") -> None:
def _delete(self, pk: int) -> None:
DeleteMixin._delete(self, pk)
class Druid(BaseSupersetView):
"""The base views for Superset!"""
def refresh_datasources( # pylint: disable=no-self-use
self, refresh_all: bool = True
) -> FlaskResponse:
"""endpoint that refreshes druid datasources metadata"""
session = db.session()
DruidCluster = ConnectorRegistry.sources[ # pylint: disable=invalid-name
for cluster in session.query(DruidCluster).all():
cluster_name = cluster.cluster_name
valid_cluster = True
except Exception as ex: # pylint: disable=broad-except
valid_cluster = False
"Error while processing cluster '{}'\n{}".format(
cluster_name, utils.error_msg_from_exception(ex)
if valid_cluster:
cluster.metadata_last_refreshed =
_("Refreshed metadata from cluster [{}]").format(
return redirect("/druiddatasourcemodelview/list/")
def scan_new_datasources(self) -> FlaskResponse:
Calling this endpoint will cause a scan for new
datasources only and add them.
return self.refresh_datasources(refresh_all=False)