fix: return query if it already exists (#15207)

* check if query exists before saving a new one

* fix test

(cherry picked from commit 58cc78d2c142c9eb61e585a054044987180808c7)
diff --git a/superset/views/core.py b/superset/views/core.py
index 622b117..e991ee9 100755
--- a/superset/views/core.py
+++ b/superset/views/core.py
@@ -2467,14 +2467,34 @@
             CtasMethod, query_params.get("ctas_method", CtasMethod.TABLE)
         )
         tmp_table_name: str = cast(str, query_params.get("tmp_table_name"))
-        client_id: str = cast(
-            str, query_params.get("client_id") or utils.shortid()[:10]
-        )
+        client_id: str = cast(str, query_params.get("client_id"))
+        client_id_or_short_id: str = cast(str, client_id or utils.shortid()[:10])
         sql_editor_id: str = cast(str, query_params.get("sql_editor_id"))
         tab_name: str = cast(str, query_params.get("tab"))
         status: str = QueryStatus.PENDING if async_flag else QueryStatus.RUNNING
+        user_id: int = g.user.get_id() if g.user else None
 
         session = db.session()
+
+        # check to see if this query is already running
+        query = (
+            session.query(Query)
+            .filter_by(
+                client_id=client_id, user_id=user_id, sql_editor_id=sql_editor_id
+            )
+            .one_or_none()
+        )
+        if query is not None and query.status in [
+            QueryStatus.RUNNING,
+            QueryStatus.PENDING,
+            QueryStatus.TIMED_OUT,
+        ]:
+            # return the existing query
+            payload = json.dumps(
+                {"query": query.to_dict()}, default=utils.json_int_dttm_ser
+            )
+            return json_success(payload)
+
         mydb = session.query(Database).get(database_id)
         if not mydb:
             return json_error_response("Database with id %i is missing.", database_id)
@@ -2502,8 +2522,8 @@
             sql_editor_id=sql_editor_id,
             tmp_table_name=tmp_table_name,
             tmp_schema_name=tmp_schema_name,
-            user_id=g.user.get_id() if g.user else None,
-            client_id=client_id,
+            user_id=user_id,
+            client_id=client_id_or_short_id,
         )
         try:
             session.add(query)
diff --git a/superset/views/sql_lab.py b/superset/views/sql_lab.py
index ef3d901..e3a09e6 100644
--- a/superset/views/sql_lab.py
+++ b/superset/views/sql_lab.py
@@ -228,10 +228,12 @@
     @has_access_api
     @expose("<int:tab_state_id>/query/<client_id>", methods=["DELETE"])
     def delete_query(  # pylint: disable=no-self-use
-        self, tab_state_id: str, client_id: str
+        self, tab_state_id: int, client_id: str
     ) -> FlaskResponse:
         db.session.query(Query).filter_by(
-            client_id=client_id, user_id=g.user.get_id(), sql_editor_id=tab_state_id
+            client_id=client_id,
+            user_id=g.user.get_id(),
+            sql_editor_id=str(tab_state_id),
         ).delete(synchronize_session=False)
         db.session.commit()
         return json_success(json.dumps("OK"))
diff --git a/tests/core_tests.py b/tests/core_tests.py
index abf0ac3..9ad5806 100644
--- a/tests/core_tests.py
+++ b/tests/core_tests.py
@@ -1412,7 +1412,7 @@
             "client_id_1",
             user_name=username,
             raise_on_error=True,
-            sql_editor_id=tab_state_id,
+            sql_editor_id=str(tab_state_id),
         )
         # run an orphan query (no tab)
         self.run_sql(