blob: 68cfdbb85556d2794066961baa42a0dce393b89a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
import logging
from datetime import datetime
from distutils.util import strtobool
from io import BytesIO
from typing import Any
from zipfile import ZipFile
import yaml
from flask import g, request, Response, send_file
from flask_appbuilder.api import expose, protect, rison, safe
from flask_appbuilder.models.sqla.interface import SQLAInterface
from flask_babel import ngettext
from marshmallow import ValidationError
from superset import event_logger, is_feature_enabled
from superset.commands.exceptions import CommandInvalidError
from superset.commands.importers.v1.utils import remove_root
from superset.connectors.sqla.models import SqlaTable
from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod
from superset.databases.filters import DatabaseFilter
from superset.datasets.commands.bulk_delete import BulkDeleteDatasetCommand
from superset.datasets.commands.create import CreateDatasetCommand
from superset.datasets.commands.delete import DeleteDatasetCommand
from superset.datasets.commands.exceptions import (
DatasetBulkDeleteFailedError,
DatasetCreateFailedError,
DatasetDeleteFailedError,
DatasetForbiddenError,
DatasetImportError,
DatasetInvalidError,
DatasetNotFoundError,
DatasetRefreshFailedError,
DatasetUpdateFailedError,
)
from superset.datasets.commands.export import ExportDatasetsCommand
from superset.datasets.commands.importers.dispatcher import ImportDatasetsCommand
from superset.datasets.commands.refresh import RefreshDatasetCommand
from superset.datasets.commands.update import UpdateDatasetCommand
from superset.datasets.dao import DatasetDAO
from superset.datasets.filters import DatasetIsNullOrEmptyFilter
from superset.datasets.schemas import (
DatasetPostSchema,
DatasetPutSchema,
DatasetRelatedObjectsResponse,
get_delete_ids_schema,
get_export_ids_schema,
)
from superset.views.base import DatasourceFilter, generate_download_headers
from superset.views.base_api import (
BaseSupersetModelRestApi,
RelatedFieldFilter,
statsd_metrics,
)
from superset.views.filters import FilterRelatedOwners
logger = logging.getLogger(__name__)
class DatasetRestApi(BaseSupersetModelRestApi):
datamodel = SQLAInterface(SqlaTable)
base_filters = [["id", DatasourceFilter, lambda: []]]
resource_name = "dataset"
allow_browser_login = True
class_permission_name = "Dataset"
method_permission_name = MODEL_API_RW_METHOD_PERMISSION_MAP
include_route_methods = RouteMethod.REST_MODEL_VIEW_CRUD_SET | {
RouteMethod.EXPORT,
RouteMethod.IMPORT,
RouteMethod.RELATED,
RouteMethod.DISTINCT,
"bulk_delete",
"refresh",
"related_objects",
}
list_columns = [
"id",
"database.id",
"database.database_name",
"changed_by_name",
"changed_by_url",
"changed_by.first_name",
"changed_by.username",
"changed_on_utc",
"changed_on_delta_humanized",
"default_endpoint",
"explore_url",
"extra",
"kind",
"owners.id",
"owners.username",
"owners.first_name",
"owners.last_name",
"schema",
"sql",
"table_name",
]
list_select_columns = list_columns + ["changed_on", "changed_by_fk"]
order_columns = [
"table_name",
"schema",
"changed_by.first_name",
"changed_on_delta_humanized",
"database.database_name",
]
show_columns = [
"id",
"database.database_name",
"database.id",
"table_name",
"sql",
"filter_select_enabled",
"fetch_values_predicate",
"schema",
"description",
"main_dttm_col",
"offset",
"default_endpoint",
"cache_timeout",
"is_sqllab_view",
"template_params",
"owners.id",
"owners.username",
"owners.first_name",
"owners.last_name",
"columns",
"metrics",
"datasource_type",
"url",
"extra",
]
add_model_schema = DatasetPostSchema()
edit_model_schema = DatasetPutSchema()
add_columns = ["database", "schema", "table_name", "owners"]
edit_columns = [
"table_name",
"sql",
"filter_select_enabled",
"fetch_values_predicate",
"schema",
"description",
"main_dttm_col",
"offset",
"default_endpoint",
"cache_timeout",
"is_sqllab_view",
"template_params",
"owners",
"columns",
"metrics",
"extra",
]
openapi_spec_tag = "Datasets"
related_field_filters = {
"owners": RelatedFieldFilter("first_name", FilterRelatedOwners),
"database": "database_name",
}
search_filters = {"sql": [DatasetIsNullOrEmptyFilter]}
filter_rel_fields = {"database": [["id", DatabaseFilter, lambda: []]]}
allowed_rel_fields = {"database", "owners"}
allowed_distinct_fields = {"schema"}
apispec_parameter_schemas = {
"get_export_ids_schema": get_export_ids_schema,
}
openapi_spec_component_schemas = (DatasetRelatedObjectsResponse,)
@expose("/", methods=["POST"])
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(log_to_statsd=False)
def post(self) -> Response:
"""Creates a new Dataset
---
post:
description: >-
Create a new Dataset
requestBody:
description: Dataset schema
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/{{self.__class__.__name__}}.post'
responses:
201:
description: Dataset added
content:
application/json:
schema:
type: object
properties:
id:
type: number
result:
$ref: '#/components/schemas/{{self.__class__.__name__}}.post'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
if not request.is_json:
return self.response_400(message="Request is not JSON")
try:
item = self.add_model_schema.load(request.json)
# This validates custom Schema with custom validations
except ValidationError as error:
return self.response_400(message=error.messages)
try:
new_model = CreateDatasetCommand(g.user, item).run()
return self.response(201, id=new_model.id, result=item)
except DatasetInvalidError as ex:
return self.response_422(message=ex.normalized_messages())
except DatasetCreateFailedError as ex:
logger.error(
"Error creating model %s: %s", self.__class__.__name__, str(ex)
)
return self.response_422(message=str(ex))
@expose("/<pk>", methods=["PUT"])
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(log_to_statsd=False)
def put(self, pk: int) -> Response:
"""Changes a Dataset
---
put:
description: >-
Changes a Dataset
parameters:
- in: path
schema:
type: integer
name: pk
- in: path
schema:
type: bool
name: override_columns
requestBody:
description: Dataset schema
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/{{self.__class__.__name__}}.put'
responses:
200:
description: Dataset changed
content:
application/json:
schema:
type: object
properties:
id:
type: number
result:
$ref: '#/components/schemas/{{self.__class__.__name__}}.put'
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
403:
$ref: '#/components/responses/403'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
override_columns = (
bool(strtobool(request.args["override_columns"]))
if "override_columns" in request.args
else False
)
if not request.is_json:
return self.response_400(message="Request is not JSON")
try:
item = self.edit_model_schema.load(request.json)
# This validates custom Schema with custom validations
except ValidationError as error:
return self.response_400(message=error.messages)
try:
changed_model = UpdateDatasetCommand(
g.user, pk, item, override_columns
).run()
response = self.response(200, id=changed_model.id, result=item)
except DatasetNotFoundError:
response = self.response_404()
except DatasetForbiddenError:
response = self.response_403()
except DatasetInvalidError as ex:
response = self.response_422(message=ex.normalized_messages())
except DatasetUpdateFailedError as ex:
logger.error(
"Error updating model %s: %s", self.__class__.__name__, str(ex)
)
response = self.response_422(message=str(ex))
return response
@expose("/<pk>", methods=["DELETE"])
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(log_to_statsd=False)
def delete(self, pk: int) -> Response:
"""Deletes a Dataset
---
delete:
description: >-
Deletes a Dataset
parameters:
- in: path
schema:
type: integer
name: pk
responses:
200:
description: Dataset delete
content:
application/json:
schema:
type: object
properties:
message:
type: string
401:
$ref: '#/components/responses/401'
403:
$ref: '#/components/responses/403'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
try:
DeleteDatasetCommand(g.user, pk).run()
return self.response(200, message="OK")
except DatasetNotFoundError:
return self.response_404()
except DatasetForbiddenError:
return self.response_403()
except DatasetDeleteFailedError as ex:
logger.error(
"Error deleting model %s: %s", self.__class__.__name__, str(ex)
)
return self.response_422(message=str(ex))
@expose("/export/", methods=["GET"])
@protect()
@safe
@statsd_metrics
@rison(get_export_ids_schema)
@event_logger.log_this_with_context(log_to_statsd=False)
def export(self, **kwargs: Any) -> Response:
"""Export datasets
---
get:
description: >-
Exports multiple datasets and downloads them as YAML files
parameters:
- in: query
name: q
content:
application/json:
schema:
$ref: '#/components/schemas/get_export_ids_schema'
responses:
200:
description: Dataset export
content:
text/plain:
schema:
type: string
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
requested_ids = kwargs["rison"]
if is_feature_enabled("VERSIONED_EXPORT"):
timestamp = datetime.now().strftime("%Y%m%dT%H%M%S")
root = f"dataset_export_{timestamp}"
filename = f"{root}.zip"
buf = BytesIO()
with ZipFile(buf, "w") as bundle:
try:
for file_name, file_content in ExportDatasetsCommand(
requested_ids
).run():
with bundle.open(f"{root}/{file_name}", "w") as fp:
fp.write(file_content.encode())
except DatasetNotFoundError:
return self.response_404()
buf.seek(0)
return send_file(
buf,
mimetype="application/zip",
as_attachment=True,
attachment_filename=filename,
)
query = self.datamodel.session.query(SqlaTable).filter(
SqlaTable.id.in_(requested_ids)
)
query = self._base_filters.apply_all(query)
items = query.all()
ids = [item.id for item in items]
if len(ids) != len(requested_ids):
return self.response_404()
data = [t.export_to_dict() for t in items]
return Response(
yaml.safe_dump(data),
headers=generate_download_headers("yaml"),
mimetype="application/text",
)
@expose("/<pk>/refresh", methods=["PUT"])
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(log_to_statsd=False)
def refresh(self, pk: int) -> Response:
"""Refresh a Dataset
---
put:
description: >-
Refreshes and updates columns of a dataset
parameters:
- in: path
schema:
type: integer
name: pk
responses:
200:
description: Dataset delete
content:
application/json:
schema:
type: object
properties:
message:
type: string
401:
$ref: '#/components/responses/401'
403:
$ref: '#/components/responses/403'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
try:
RefreshDatasetCommand(g.user, pk).run()
return self.response(200, message="OK")
except DatasetNotFoundError:
return self.response_404()
except DatasetForbiddenError:
return self.response_403()
except DatasetRefreshFailedError as ex:
logger.error(
"Error refreshing dataset %s: %s", self.__class__.__name__, str(ex)
)
return self.response_422(message=str(ex))
@expose("/<pk>/related_objects", methods=["GET"])
@protect()
@safe
@statsd_metrics
@event_logger.log_this_with_context(log_to_statsd=False)
def related_objects(self, pk: int) -> Response:
"""Get charts and dashboards count associated to a dataset
---
get:
description:
Get charts and dashboards count associated to a dataset
parameters:
- in: path
name: pk
schema:
type: integer
responses:
200:
200:
description: Query result
content:
application/json:
schema:
$ref: "#/components/schemas/DatasetRelatedObjectsResponse"
401:
$ref: '#/components/responses/401'
404:
$ref: '#/components/responses/404'
500:
$ref: '#/components/responses/500'
"""
dataset = DatasetDAO.find_by_id(pk)
if not dataset:
return self.response_404()
data = DatasetDAO.get_related_objects(pk)
charts = [
{
"id": chart.id,
"slice_name": chart.slice_name,
"viz_type": chart.viz_type,
}
for chart in data["charts"]
]
dashboards = [
{
"id": dashboard.id,
"json_metadata": dashboard.json_metadata,
"slug": dashboard.slug,
"title": dashboard.dashboard_title,
}
for dashboard in data["dashboards"]
]
return self.response(
200,
charts={"count": len(charts), "result": charts},
dashboards={"count": len(dashboards), "result": dashboards},
)
@expose("/", methods=["DELETE"])
@protect()
@safe
@statsd_metrics
@rison(get_delete_ids_schema)
@event_logger.log_this_with_context(log_to_statsd=False)
def bulk_delete(self, **kwargs: Any) -> Response:
"""Delete bulk Datasets
---
delete:
description: >-
Deletes multiple Datasets in a bulk operation.
parameters:
- in: query
name: q
content:
application/json:
schema:
$ref: '#/components/schemas/get_delete_ids_schema'
responses:
200:
description: Dataset bulk delete
content:
application/json:
schema:
type: object
properties:
message:
type: string
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
403:
$ref: '#/components/responses/403'
404:
$ref: '#/components/responses/404'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
item_ids = kwargs["rison"]
try:
BulkDeleteDatasetCommand(g.user, item_ids).run()
return self.response(
200,
message=ngettext(
"Deleted %(num)d dataset",
"Deleted %(num)d datasets",
num=len(item_ids),
),
)
except DatasetNotFoundError:
return self.response_404()
except DatasetForbiddenError:
return self.response_403()
except DatasetBulkDeleteFailedError as ex:
return self.response_422(message=str(ex))
@expose("/import/", methods=["POST"])
@protect()
@safe
@statsd_metrics
def import_(self) -> Response:
"""Import dataset(s) with associated databases
---
post:
requestBody:
required: true
content:
multipart/form-data:
schema:
type: object
properties:
formData:
type: string
format: binary
passwords:
type: string
overwrite:
type: bool
responses:
200:
description: Dataset import result
content:
application/json:
schema:
type: object
properties:
message:
type: string
400:
$ref: '#/components/responses/400'
401:
$ref: '#/components/responses/401'
422:
$ref: '#/components/responses/422'
500:
$ref: '#/components/responses/500'
"""
upload = request.files.get("formData")
if not upload:
return self.response_400()
with ZipFile(upload) as bundle:
contents = {
remove_root(file_name): bundle.read(file_name).decode()
for file_name in bundle.namelist()
}
passwords = (
json.loads(request.form["passwords"])
if "passwords" in request.form
else None
)
overwrite = request.form.get("overwrite") == "true"
command = ImportDatasetsCommand(
contents, passwords=passwords, overwrite=overwrite
)
try:
command.run()
return self.response(200, message="OK")
except CommandInvalidError as exc:
logger.warning("Import dataset failed")
return self.response_422(message=exc.normalized_messages())
except DatasetImportError as exc:
logger.exception("Import dataset failed")
return self.response_500(message=str(exc))