| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| import json |
| import logging |
| from datetime import datetime |
| from distutils.util import strtobool |
| from io import BytesIO |
| from typing import Any |
| from zipfile import is_zipfile, ZipFile |
| |
| import yaml |
| from flask import g, request, Response, send_file |
| from flask_appbuilder.api import expose, protect, rison, safe |
| from flask_appbuilder.models.sqla.interface import SQLAInterface |
| from flask_babel import ngettext |
| from marshmallow import ValidationError |
| |
| from superset import event_logger, is_feature_enabled |
| from superset.commands.exceptions import CommandInvalidError |
| from superset.commands.importers.v1.utils import get_contents_from_bundle |
| from superset.connectors.sqla.models import SqlaTable |
| from superset.constants import MODEL_API_RW_METHOD_PERMISSION_MAP, RouteMethod |
| from superset.databases.filters import DatabaseFilter |
| from superset.datasets.commands.bulk_delete import BulkDeleteDatasetCommand |
| from superset.datasets.commands.create import CreateDatasetCommand |
| from superset.datasets.commands.delete import DeleteDatasetCommand |
| from superset.datasets.commands.exceptions import ( |
| DatasetBulkDeleteFailedError, |
| DatasetCreateFailedError, |
| DatasetDeleteFailedError, |
| DatasetForbiddenError, |
| DatasetImportError, |
| DatasetInvalidError, |
| DatasetNotFoundError, |
| DatasetRefreshFailedError, |
| DatasetUpdateFailedError, |
| ) |
| from superset.datasets.commands.export import ExportDatasetsCommand |
| from superset.datasets.commands.importers.dispatcher import ImportDatasetsCommand |
| from superset.datasets.commands.refresh import RefreshDatasetCommand |
| from superset.datasets.commands.update import UpdateDatasetCommand |
| from superset.datasets.dao import DatasetDAO |
| from superset.datasets.filters import DatasetIsNullOrEmptyFilter |
| from superset.datasets.schemas import ( |
| DatasetPostSchema, |
| DatasetPutSchema, |
| DatasetRelatedObjectsResponse, |
| get_delete_ids_schema, |
| get_export_ids_schema, |
| ) |
| from superset.views.base import DatasourceFilter, generate_download_headers |
| from superset.views.base_api import ( |
| BaseSupersetModelRestApi, |
| RelatedFieldFilter, |
| statsd_metrics, |
| ) |
| from superset.views.filters import FilterRelatedOwners |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| class DatasetRestApi(BaseSupersetModelRestApi): |
| datamodel = SQLAInterface(SqlaTable) |
| base_filters = [["id", DatasourceFilter, lambda: []]] |
| |
| resource_name = "dataset" |
| allow_browser_login = True |
| class_permission_name = "Dataset" |
| method_permission_name = MODEL_API_RW_METHOD_PERMISSION_MAP |
| include_route_methods = RouteMethod.REST_MODEL_VIEW_CRUD_SET | { |
| RouteMethod.EXPORT, |
| RouteMethod.IMPORT, |
| RouteMethod.RELATED, |
| RouteMethod.DISTINCT, |
| "bulk_delete", |
| "refresh", |
| "related_objects", |
| } |
| list_columns = [ |
| "id", |
| "database.id", |
| "database.database_name", |
| "changed_by_name", |
| "changed_by_url", |
| "changed_by.first_name", |
| "changed_by.username", |
| "changed_on_utc", |
| "changed_on_delta_humanized", |
| "default_endpoint", |
| "explore_url", |
| "extra", |
| "kind", |
| "owners.id", |
| "owners.username", |
| "owners.first_name", |
| "owners.last_name", |
| "schema", |
| "sql", |
| "table_name", |
| ] |
| list_select_columns = list_columns + ["changed_on", "changed_by_fk"] |
| order_columns = [ |
| "table_name", |
| "schema", |
| "changed_by.first_name", |
| "changed_on_delta_humanized", |
| "database.database_name", |
| ] |
| show_columns = [ |
| "id", |
| "database.database_name", |
| "database.id", |
| "table_name", |
| "sql", |
| "filter_select_enabled", |
| "fetch_values_predicate", |
| "schema", |
| "description", |
| "main_dttm_col", |
| "offset", |
| "default_endpoint", |
| "cache_timeout", |
| "is_sqllab_view", |
| "template_params", |
| "owners.id", |
| "owners.username", |
| "owners.first_name", |
| "owners.last_name", |
| "columns", |
| "metrics", |
| "datasource_type", |
| "url", |
| "extra", |
| ] |
| add_model_schema = DatasetPostSchema() |
| edit_model_schema = DatasetPutSchema() |
| add_columns = ["database", "schema", "table_name", "owners"] |
| edit_columns = [ |
| "table_name", |
| "sql", |
| "filter_select_enabled", |
| "fetch_values_predicate", |
| "schema", |
| "description", |
| "main_dttm_col", |
| "offset", |
| "default_endpoint", |
| "cache_timeout", |
| "is_sqllab_view", |
| "template_params", |
| "owners", |
| "columns", |
| "metrics", |
| "extra", |
| ] |
| openapi_spec_tag = "Datasets" |
| related_field_filters = { |
| "owners": RelatedFieldFilter("first_name", FilterRelatedOwners), |
| "database": "database_name", |
| } |
| search_filters = {"sql": [DatasetIsNullOrEmptyFilter]} |
| filter_rel_fields = {"database": [["id", DatabaseFilter, lambda: []]]} |
| allowed_rel_fields = {"database", "owners"} |
| allowed_distinct_fields = {"schema"} |
| |
| apispec_parameter_schemas = { |
| "get_export_ids_schema": get_export_ids_schema, |
| } |
| openapi_spec_component_schemas = (DatasetRelatedObjectsResponse,) |
| |
| @expose("/", methods=["POST"]) |
| @protect() |
| @safe |
| @statsd_metrics |
| @event_logger.log_this_with_context( |
| action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.post", |
| log_to_statsd=False, |
| ) |
| def post(self) -> Response: |
| """Creates a new Dataset |
| --- |
| post: |
| description: >- |
| Create a new Dataset |
| requestBody: |
| description: Dataset schema |
| required: true |
| content: |
| application/json: |
| schema: |
| $ref: '#/components/schemas/{{self.__class__.__name__}}.post' |
| responses: |
| 201: |
| description: Dataset added |
| content: |
| application/json: |
| schema: |
| type: object |
| properties: |
| id: |
| type: number |
| result: |
| $ref: '#/components/schemas/{{self.__class__.__name__}}.post' |
| 400: |
| $ref: '#/components/responses/400' |
| 401: |
| $ref: '#/components/responses/401' |
| 422: |
| $ref: '#/components/responses/422' |
| 500: |
| $ref: '#/components/responses/500' |
| """ |
| if not request.is_json: |
| return self.response_400(message="Request is not JSON") |
| try: |
| item = self.add_model_schema.load(request.json) |
| # This validates custom Schema with custom validations |
| except ValidationError as error: |
| return self.response_400(message=error.messages) |
| |
| try: |
| new_model = CreateDatasetCommand(g.user, item).run() |
| return self.response(201, id=new_model.id, result=item) |
| except DatasetInvalidError as ex: |
| return self.response_422(message=ex.normalized_messages()) |
| except DatasetCreateFailedError as ex: |
| logger.error( |
| "Error creating model %s: %s", self.__class__.__name__, str(ex) |
| ) |
| return self.response_422(message=str(ex)) |
| |
| @expose("/<pk>", methods=["PUT"]) |
| @protect() |
| @safe |
| @statsd_metrics |
| @event_logger.log_this_with_context( |
| action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.put", |
| log_to_statsd=False, |
| ) |
| def put(self, pk: int) -> Response: |
| """Changes a Dataset |
| --- |
| put: |
| description: >- |
| Changes a Dataset |
| parameters: |
| - in: path |
| schema: |
| type: integer |
| name: pk |
| - in: path |
| schema: |
| type: bool |
| name: override_columns |
| requestBody: |
| description: Dataset schema |
| required: true |
| content: |
| application/json: |
| schema: |
| $ref: '#/components/schemas/{{self.__class__.__name__}}.put' |
| responses: |
| 200: |
| description: Dataset changed |
| content: |
| application/json: |
| schema: |
| type: object |
| properties: |
| id: |
| type: number |
| result: |
| $ref: '#/components/schemas/{{self.__class__.__name__}}.put' |
| 400: |
| $ref: '#/components/responses/400' |
| 401: |
| $ref: '#/components/responses/401' |
| 403: |
| $ref: '#/components/responses/403' |
| 404: |
| $ref: '#/components/responses/404' |
| 422: |
| $ref: '#/components/responses/422' |
| 500: |
| $ref: '#/components/responses/500' |
| """ |
| override_columns = ( |
| bool(strtobool(request.args["override_columns"])) |
| if "override_columns" in request.args |
| else False |
| ) |
| if not request.is_json: |
| return self.response_400(message="Request is not JSON") |
| try: |
| item = self.edit_model_schema.load(request.json) |
| # This validates custom Schema with custom validations |
| except ValidationError as error: |
| return self.response_400(message=error.messages) |
| try: |
| changed_model = UpdateDatasetCommand( |
| g.user, pk, item, override_columns |
| ).run() |
| response = self.response(200, id=changed_model.id, result=item) |
| except DatasetNotFoundError: |
| response = self.response_404() |
| except DatasetForbiddenError: |
| response = self.response_403() |
| except DatasetInvalidError as ex: |
| response = self.response_422(message=ex.normalized_messages()) |
| except DatasetUpdateFailedError as ex: |
| logger.error( |
| "Error updating model %s: %s", self.__class__.__name__, str(ex) |
| ) |
| response = self.response_422(message=str(ex)) |
| return response |
| |
| @expose("/<pk>", methods=["DELETE"]) |
| @protect() |
| @safe |
| @statsd_metrics |
| @event_logger.log_this_with_context( |
| action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.delete", |
| log_to_statsd=False, |
| ) |
| def delete(self, pk: int) -> Response: |
| """Deletes a Dataset |
| --- |
| delete: |
| description: >- |
| Deletes a Dataset |
| parameters: |
| - in: path |
| schema: |
| type: integer |
| name: pk |
| responses: |
| 200: |
| description: Dataset delete |
| content: |
| application/json: |
| schema: |
| type: object |
| properties: |
| message: |
| type: string |
| 401: |
| $ref: '#/components/responses/401' |
| 403: |
| $ref: '#/components/responses/403' |
| 404: |
| $ref: '#/components/responses/404' |
| 422: |
| $ref: '#/components/responses/422' |
| 500: |
| $ref: '#/components/responses/500' |
| """ |
| try: |
| DeleteDatasetCommand(g.user, pk).run() |
| return self.response(200, message="OK") |
| except DatasetNotFoundError: |
| return self.response_404() |
| except DatasetForbiddenError: |
| return self.response_403() |
| except DatasetDeleteFailedError as ex: |
| logger.error( |
| "Error deleting model %s: %s", self.__class__.__name__, str(ex) |
| ) |
| return self.response_422(message=str(ex)) |
| |
| @expose("/export/", methods=["GET"]) |
| @protect() |
| @safe |
| @statsd_metrics |
| @rison(get_export_ids_schema) |
| @event_logger.log_this_with_context( |
| action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.export", |
| log_to_statsd=False, |
| ) |
| def export(self, **kwargs: Any) -> Response: |
| """Export datasets |
| --- |
| get: |
| description: >- |
| Exports multiple datasets and downloads them as YAML files |
| parameters: |
| - in: query |
| name: q |
| content: |
| application/json: |
| schema: |
| $ref: '#/components/schemas/get_export_ids_schema' |
| responses: |
| 200: |
| description: Dataset export |
| content: |
| text/plain: |
| schema: |
| type: string |
| 400: |
| $ref: '#/components/responses/400' |
| 401: |
| $ref: '#/components/responses/401' |
| 404: |
| $ref: '#/components/responses/404' |
| 500: |
| $ref: '#/components/responses/500' |
| """ |
| requested_ids = kwargs["rison"] |
| |
| if is_feature_enabled("VERSIONED_EXPORT"): |
| timestamp = datetime.now().strftime("%Y%m%dT%H%M%S") |
| root = f"dataset_export_{timestamp}" |
| filename = f"{root}.zip" |
| |
| buf = BytesIO() |
| with ZipFile(buf, "w") as bundle: |
| try: |
| for file_name, file_content in ExportDatasetsCommand( |
| requested_ids |
| ).run(): |
| with bundle.open(f"{root}/{file_name}", "w") as fp: |
| fp.write(file_content.encode()) |
| except DatasetNotFoundError: |
| return self.response_404() |
| buf.seek(0) |
| |
| return send_file( |
| buf, |
| mimetype="application/zip", |
| as_attachment=True, |
| attachment_filename=filename, |
| ) |
| |
| query = self.datamodel.session.query(SqlaTable).filter( |
| SqlaTable.id.in_(requested_ids) |
| ) |
| query = self._base_filters.apply_all(query) |
| items = query.all() |
| ids = [item.id for item in items] |
| if len(ids) != len(requested_ids): |
| return self.response_404() |
| |
| data = [t.export_to_dict() for t in items] |
| return Response( |
| yaml.safe_dump(data), |
| headers=generate_download_headers("yaml"), |
| mimetype="application/text", |
| ) |
| |
| @expose("/<pk>/refresh", methods=["PUT"]) |
| @protect() |
| @safe |
| @statsd_metrics |
| @event_logger.log_this_with_context( |
| action=lambda self, *args, **kwargs: f"{self.__class__.__name__}" f".refresh", |
| log_to_statsd=False, |
| ) |
| def refresh(self, pk: int) -> Response: |
| """Refresh a Dataset |
| --- |
| put: |
| description: >- |
| Refreshes and updates columns of a dataset |
| parameters: |
| - in: path |
| schema: |
| type: integer |
| name: pk |
| responses: |
| 200: |
| description: Dataset delete |
| content: |
| application/json: |
| schema: |
| type: object |
| properties: |
| message: |
| type: string |
| 401: |
| $ref: '#/components/responses/401' |
| 403: |
| $ref: '#/components/responses/403' |
| 404: |
| $ref: '#/components/responses/404' |
| 422: |
| $ref: '#/components/responses/422' |
| 500: |
| $ref: '#/components/responses/500' |
| """ |
| try: |
| RefreshDatasetCommand(g.user, pk).run() |
| return self.response(200, message="OK") |
| except DatasetNotFoundError: |
| return self.response_404() |
| except DatasetForbiddenError: |
| return self.response_403() |
| except DatasetRefreshFailedError as ex: |
| logger.error( |
| "Error refreshing dataset %s: %s", self.__class__.__name__, str(ex) |
| ) |
| return self.response_422(message=str(ex)) |
| |
| @expose("/<pk>/related_objects", methods=["GET"]) |
| @protect() |
| @safe |
| @statsd_metrics |
| @event_logger.log_this_with_context( |
| action=lambda self, *args, **kwargs: f"{self.__class__.__name__}" |
| f".related_objects", |
| log_to_statsd=False, |
| ) |
| def related_objects(self, pk: int) -> Response: |
| """Get charts and dashboards count associated to a dataset |
| --- |
| get: |
| description: |
| Get charts and dashboards count associated to a dataset |
| parameters: |
| - in: path |
| name: pk |
| schema: |
| type: integer |
| responses: |
| 200: |
| 200: |
| description: Query result |
| content: |
| application/json: |
| schema: |
| $ref: "#/components/schemas/DatasetRelatedObjectsResponse" |
| 401: |
| $ref: '#/components/responses/401' |
| 404: |
| $ref: '#/components/responses/404' |
| 500: |
| $ref: '#/components/responses/500' |
| """ |
| dataset = DatasetDAO.find_by_id(pk) |
| if not dataset: |
| return self.response_404() |
| data = DatasetDAO.get_related_objects(pk) |
| charts = [ |
| { |
| "id": chart.id, |
| "slice_name": chart.slice_name, |
| "viz_type": chart.viz_type, |
| } |
| for chart in data["charts"] |
| ] |
| dashboards = [ |
| { |
| "id": dashboard.id, |
| "json_metadata": dashboard.json_metadata, |
| "slug": dashboard.slug, |
| "title": dashboard.dashboard_title, |
| } |
| for dashboard in data["dashboards"] |
| ] |
| return self.response( |
| 200, |
| charts={"count": len(charts), "result": charts}, |
| dashboards={"count": len(dashboards), "result": dashboards}, |
| ) |
| |
| @expose("/", methods=["DELETE"]) |
| @protect() |
| @safe |
| @statsd_metrics |
| @rison(get_delete_ids_schema) |
| @event_logger.log_this_with_context( |
| action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.bulk_delete", |
| log_to_statsd=False, |
| ) |
| def bulk_delete(self, **kwargs: Any) -> Response: |
| """Delete bulk Datasets |
| --- |
| delete: |
| description: >- |
| Deletes multiple Datasets in a bulk operation. |
| parameters: |
| - in: query |
| name: q |
| content: |
| application/json: |
| schema: |
| $ref: '#/components/schemas/get_delete_ids_schema' |
| responses: |
| 200: |
| description: Dataset bulk delete |
| content: |
| application/json: |
| schema: |
| type: object |
| properties: |
| message: |
| type: string |
| 400: |
| $ref: '#/components/responses/400' |
| 401: |
| $ref: '#/components/responses/401' |
| 403: |
| $ref: '#/components/responses/403' |
| 404: |
| $ref: '#/components/responses/404' |
| 422: |
| $ref: '#/components/responses/422' |
| 500: |
| $ref: '#/components/responses/500' |
| """ |
| item_ids = kwargs["rison"] |
| try: |
| BulkDeleteDatasetCommand(g.user, item_ids).run() |
| return self.response( |
| 200, |
| message=ngettext( |
| "Deleted %(num)d dataset", |
| "Deleted %(num)d datasets", |
| num=len(item_ids), |
| ), |
| ) |
| except DatasetNotFoundError: |
| return self.response_404() |
| except DatasetForbiddenError: |
| return self.response_403() |
| except DatasetBulkDeleteFailedError as ex: |
| return self.response_422(message=str(ex)) |
| |
| @expose("/import/", methods=["POST"]) |
| @protect() |
| @safe |
| @statsd_metrics |
| @event_logger.log_this_with_context( |
| action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.import_", |
| log_to_statsd=False, |
| ) |
| def import_(self) -> Response: |
| """Import dataset(s) with associated databases |
| --- |
| post: |
| requestBody: |
| required: true |
| content: |
| multipart/form-data: |
| schema: |
| type: object |
| properties: |
| formData: |
| description: upload file (ZIP or YAML) |
| type: string |
| format: binary |
| passwords: |
| description: JSON map of passwords for each file |
| type: string |
| overwrite: |
| description: overwrite existing databases? |
| type: bool |
| responses: |
| 200: |
| description: Dataset import result |
| content: |
| application/json: |
| schema: |
| type: object |
| properties: |
| message: |
| type: string |
| 400: |
| $ref: '#/components/responses/400' |
| 401: |
| $ref: '#/components/responses/401' |
| 422: |
| $ref: '#/components/responses/422' |
| 500: |
| $ref: '#/components/responses/500' |
| """ |
| upload = request.files.get("formData") |
| if not upload: |
| return self.response_400() |
| if is_zipfile(upload): |
| with ZipFile(upload) as bundle: |
| contents = get_contents_from_bundle(bundle) |
| else: |
| upload.seek(0) |
| contents = {upload.filename: upload.read()} |
| |
| passwords = ( |
| json.loads(request.form["passwords"]) |
| if "passwords" in request.form |
| else None |
| ) |
| overwrite = request.form.get("overwrite") == "true" |
| |
| command = ImportDatasetsCommand( |
| contents, passwords=passwords, overwrite=overwrite |
| ) |
| try: |
| command.run() |
| return self.response(200, message="OK") |
| except CommandInvalidError as exc: |
| logger.warning("Import dataset failed") |
| return self.response_422(message=exc.normalized_messages()) |
| except DatasetImportError as exc: |
| logger.exception("Import dataset failed") |
| return self.response_500(message=str(exc)) |