blob: 2b5ed892ff173a467520e78a051cd6bf995d94cc [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import json
from collections import Counter
from typing import Any
from flask import g, request
from flask_appbuilder import expose
from flask_appbuilder.api import rison
from flask_appbuilder.security.decorators import has_access_api
from flask_babel import _
from marshmallow import ValidationError
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.orm.exc import NoResultFound
from superset import app, db, event_logger
from superset.commands.utils import populate_owners
from superset.connectors.connector_registry import ConnectorRegistry
from superset.connectors.sqla.utils import get_physical_table_metadata
from superset.datasets.commands.exceptions import (
DatasetForbiddenError,
DatasetNotFoundError,
)
from superset.exceptions import SupersetException, SupersetSecurityException
from superset.extensions import security_manager
from superset.models.core import Database
from superset.typing import FlaskResponse
from superset.views.base import (
api,
BaseSupersetView,
check_ownership,
handle_api_exception,
json_error_response,
)
from superset.views.datasource.schemas import (
ExternalMetadataParams,
ExternalMetadataSchema,
get_external_metadata_schema,
)
class Datasource(BaseSupersetView):
"""Datasource-related views"""
@expose("/save/", methods=["POST"])
@event_logger.log_this_with_context(
action=lambda self, *args, **kwargs: f"{self.__class__.__name__}.save",
log_to_statsd=False,
)
@has_access_api
@api
@handle_api_exception
def save(self) -> FlaskResponse:
data = request.form.get("data")
if not isinstance(data, str):
return json_error_response(_("Request missing data field."), status=500)
datasource_dict = json.loads(data)
datasource_id = datasource_dict.get("id")
datasource_type = datasource_dict.get("type")
database_id = datasource_dict["database"].get("id")
orm_datasource = ConnectorRegistry.get_datasource(
datasource_type, datasource_id, db.session
)
orm_datasource.database_id = database_id
if "owners" in datasource_dict and orm_datasource.owner_class is not None:
# Check ownership
if app.config["OLD_API_CHECK_DATASET_OWNERSHIP"]:
# mimic the behavior of the new dataset command that
# checks ownership and ensures that non-admins aren't locked out
# of the object
try:
check_ownership(orm_datasource)
except SupersetSecurityException as ex:
raise DatasetForbiddenError() from ex
user = security_manager.get_user_by_id(g.user.id)
datasource_dict["owners"] = populate_owners(
user, datasource_dict["owners"], default_to_user=False
)
else:
# legacy behavior
datasource_dict["owners"] = (
db.session.query(orm_datasource.owner_class)
.filter(
orm_datasource.owner_class.id.in_(
datasource_dict["owners"] or []
)
)
.all()
)
duplicates = [
name
for name, count in Counter(
[col["column_name"] for col in datasource_dict["columns"]]
).items()
if count > 1
]
if duplicates:
return json_error_response(
_(
"Duplicate column name(s): %(columns)s",
columns=",".join(duplicates),
),
status=409,
)
orm_datasource.update_from_object(datasource_dict)
data = orm_datasource.data
db.session.commit()
return self.json_response(data)
@expose("/get/<datasource_type>/<datasource_id>/")
@has_access_api
@api
@handle_api_exception
def get(self, datasource_type: str, datasource_id: int) -> FlaskResponse:
datasource = ConnectorRegistry.get_datasource(
datasource_type, datasource_id, db.session
)
return self.json_response(datasource.data)
@expose("/external_metadata/<datasource_type>/<datasource_id>/")
@has_access_api
@api
@handle_api_exception
def external_metadata(
self, datasource_type: str, datasource_id: int
) -> FlaskResponse:
"""Gets column info from the source system"""
datasource = ConnectorRegistry.get_datasource(
datasource_type, datasource_id, db.session
)
try:
external_metadata = datasource.external_metadata()
except SupersetException as ex:
return json_error_response(str(ex), status=400)
return self.json_response(external_metadata)
@expose("/external_metadata_by_name/")
@has_access_api
@api
@handle_api_exception
@rison(get_external_metadata_schema)
def external_metadata_by_name(self, **kwargs: Any) -> FlaskResponse:
"""Gets table metadata from the source system and SQLAlchemy inspector"""
try:
params: ExternalMetadataParams = (
ExternalMetadataSchema().load(kwargs.get("rison"))
)
except ValidationError as err:
return json_error_response(str(err), status=400)
datasource = ConnectorRegistry.get_datasource_by_name(
session=db.session,
datasource_type=params["datasource_type"],
database_name=params["database_name"],
schema=params["schema_name"],
datasource_name=params["table_name"],
)
try:
if datasource is not None:
# Get columns from Superset metadata
external_metadata = datasource.external_metadata()
else:
# Use the SQLAlchemy inspector to get columns
database = (
db.session.query(Database)
.filter_by(database_name=params["database_name"])
.one()
)
external_metadata = get_physical_table_metadata(
database=database,
table_name=params["table_name"],
schema_name=params["schema_name"],
)
except (NoResultFound, NoSuchTableError) as ex:
raise DatasetNotFoundError() from ex
return self.json_response(external_metadata)