fix: better handle datasource exceptions (#13578)

* fix: handle datasource injected security exception

* add tests

* fix error text on create update dbs

* fix lint

* revert create update message

* fix test

* add sqlalchemy exceptions

(cherry picked from commit 1e884084189a520ba25c9d577a82020a27edff10)
diff --git a/superset/databases/commands/update.py b/superset/databases/commands/update.py
index 5d33c08..7c48e74 100644
--- a/superset/databases/commands/update.py
+++ b/superset/databases/commands/update.py
@@ -54,7 +54,7 @@
             # TODO Improve this simplistic implementation for catching DB conn fails
             try:
                 schemas = database.get_all_schema_names()
-            except Exception:
+            except Exception as ex:
                 db.session.rollback()
                 raise DatabaseConnectionFailedError()
             for schema in schemas:
diff --git a/superset/models/dashboard.py b/superset/models/dashboard.py
index b73027d..0f21c52 100644
--- a/superset/models/dashboard.py
+++ b/superset/models/dashboard.py
@@ -38,6 +38,7 @@
     UniqueConstraint,
 )
 from sqlalchemy.engine.base import Connection
+from sqlalchemy.exc import SQLAlchemyError
 from sqlalchemy.orm import relationship, sessionmaker, subqueryload
 from sqlalchemy.orm.mapper import Mapper
 from sqlalchemy.orm.session import object_session
@@ -48,6 +49,7 @@
 from superset.connectors.base.models import BaseDatasource
 from superset.connectors.druid.models import DruidColumn, DruidMetric
 from superset.connectors.sqla.models import SqlMetric, TableColumn
+from superset.exceptions import SupersetException
 from superset.extensions import cache_manager
 from superset.models.helpers import AuditMixinNullable, ImportExportMixin
 from superset.models.slice import Slice
@@ -241,18 +243,22 @@
         """Bootstrap data for rendering the dashboard page."""
         slices = self.slices
         datasource_slices = utils.indexed(slices, "datasource")
+        try:
+            datasources = {
+                # Filter out unneeded fields from the datasource payload
+                datasource.uid: datasource.data_for_slices(slices)
+                for datasource, slices in datasource_slices.items()
+                if datasource
+            }
+        except (SupersetException, SQLAlchemyError):
+            datasources = {}
         return {
             # dashboard metadata
             "dashboard": self.data,
             # slices metadata
             "slices": [slc.data for slc in slices],
             # datasource metadata
-            "datasources": {
-                # Filter out unneeded fields from the datasource payload
-                datasource.uid: datasource.data_for_slices(slices)
-                for datasource, slices in datasource_slices.items()
-                if datasource
-            },
+            "datasources": datasources,
         }
 
     @property  # type: ignore
diff --git a/superset/views/core.py b/superset/views/core.py
index 2c390c4..d807dc5 100755
--- a/superset/views/core.py
+++ b/superset/views/core.py
@@ -666,7 +666,7 @@
     @event_logger.log_this
     @expose("/explore/<datasource_type>/<int:datasource_id>/", methods=["GET", "POST"])
     @expose("/explore/", methods=["GET", "POST"])
-    def explore(  # pylint: disable=too-many-locals,too-many-return-statements
+    def explore(  # pylint: disable=too-many-locals,too-many-return-statements,too-many-statements
         self, datasource_type: Optional[str] = None, datasource_id: Optional[int] = None
     ) -> FlaskResponse:
         user_id = g.user.get_id() if g.user else None
@@ -789,12 +789,18 @@
             "name": datasource_name,
             "columns": [],
             "metrics": [],
+            "database": {"id": 0, "backend": "",},
         }
+        try:
+            datasource_data = datasource.data if datasource else dummy_datasource_data
+        except (SupersetException, SQLAlchemyError):
+            datasource_data = dummy_datasource_data
+
         bootstrap_data = {
             "can_add": slice_add_perm,
             "can_download": slice_download_perm,
             "can_overwrite": slice_overwrite_perm,
-            "datasource": datasource.data if datasource else dummy_datasource_data,
+            "datasource": datasource_data,
             "form_data": form_data,
             "datasource_id": datasource_id,
             "datasource_type": datasource_type,
diff --git a/tests/core_tests.py b/tests/core_tests.py
index fb1721d..f176d8f 100644
--- a/tests/core_tests.py
+++ b/tests/core_tests.py
@@ -36,7 +36,7 @@
 
 import pandas as pd
 import sqlalchemy as sqla
-
+from sqlalchemy.exc import SQLAlchemyError
 from superset.models.cache import CacheKey
 from superset.utils.core import get_example_database
 from tests.fixtures.energy_dashboard import load_energy_table_with_slice
@@ -53,6 +53,7 @@
 from superset.connectors.sqla.models import SqlaTable
 from superset.db_engine_specs.base import BaseEngineSpec
 from superset.db_engine_specs.mssql import MssqlEngineSpec
+from superset.exceptions import SupersetException
 from superset.extensions import async_query_manager
 from superset.models import core as models
 from superset.models.annotations import Annotation, AnnotationLayer
@@ -1474,6 +1475,59 @@
             "my_col"
         ]
 
+    @pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
+    @mock.patch("superset.models.core.DB_CONNECTION_MUTATOR")
+    def test_explore_injected_exceptions(self, mock_db_connection_mutator):
+        """
+        Handle injected exceptions from the db mutator
+        """
+        # Assert we can handle a custom exception at the mutator level
+        exception = SupersetException("Error message")
+        mock_db_connection_mutator.side_effect = exception
+        slice = db.session.query(Slice).first()
+        url = f"/superset/explore/?form_data=%7B%22slice_id%22%3A%20{slice.id}%7D"
+
+        self.login()
+        data = self.get_resp(url)
+        self.assertIn("Error message", data)
+
+        # Assert we can handle a driver exception at the mutator level
+        exception = SQLAlchemyError("Error message")
+        mock_db_connection_mutator.side_effect = exception
+        slice = db.session.query(Slice).first()
+        url = f"/superset/explore/?form_data=%7B%22slice_id%22%3A%20{slice.id}%7D"
+
+        self.login()
+        data = self.get_resp(url)
+        self.assertIn("Error message", data)
+
+    @pytest.mark.usefixtures("load_world_bank_dashboard_with_slices")
+    @mock.patch("superset.models.core.DB_CONNECTION_MUTATOR")
+    def test_dashboard_injected_exceptions(self, mock_db_connection_mutator):
+        """
+        Handle injected exceptions from the db mutator
+        """
+
+        # Assert we can handle a custom excetion at the mutator level
+        exception = SupersetException("Error message")
+        mock_db_connection_mutator.side_effect = exception
+        dash = db.session.query(Dashboard).first()
+        url = f"/superset/dashboard/{dash.id}/"
+
+        self.login()
+        data = self.get_resp(url)
+        self.assertIn("Error message", data)
+
+        # Assert we can handle a driver exception at the mutator level
+        exception = SQLAlchemyError("Error message")
+        mock_db_connection_mutator.side_effect = exception
+        dash = db.session.query(Dashboard).first()
+        url = f"/superset/dashboard/{dash.id}/"
+
+        self.login()
+        data = self.get_resp(url)
+        self.assertIn("Error message", data)
+
 
 if __name__ == "__main__":
     unittest.main()