| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| import functools |
| import logging |
| from typing import Any, Callable, Optional |
| |
| from flask import g |
| from flask_babel import lazy_gettext as _ |
| |
| from superset.models.core import Database |
| from superset.sql_parse import Table |
| from superset.utils.core import parse_js_uri_path_item |
| from superset.views.base_api import BaseSupersetModelRestApi |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| def check_datasource_access(f: Callable[..., Any]) -> Callable[..., Any]: |
| """ |
| A Decorator that checks if a user has datasource access |
| """ |
| |
| def wraps( |
| self: BaseSupersetModelRestApi, |
| pk: int, |
| table_name: str, |
| schema_name: Optional[str] = None, |
| ) -> Any: |
| schema_name_parsed = parse_js_uri_path_item(schema_name, eval_undefined=True) |
| table_name_parsed = parse_js_uri_path_item(table_name) |
| if not table_name_parsed: |
| return self.response_422(message=_("Table name undefined")) |
| database: Database = self.datamodel.get(pk) |
| if not database: |
| self.stats_logger.incr( |
| f"database_not_found_{self.__class__.__name__}.select_star" |
| ) |
| return self.response_404() |
| if not self.appbuilder.sm.can_access_table( |
| database, Table(table_name_parsed, schema_name_parsed) |
| ): |
| self.stats_logger.incr( |
| f"permisssion_denied_{self.__class__.__name__}.select_star" |
| ) |
| logger.warning( |
| "Permission denied for user %s on table: %s schema: %s", |
| g.user, |
| table_name_parsed, |
| schema_name_parsed, |
| ) |
| return self.response_404() |
| return f(self, database, table_name_parsed, schema_name_parsed) |
| |
| return functools.update_wrapper(wraps, f) |