disable `with_create_default_catalog_and_schema` if the default catalog exists (#11991)

diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs
index 0a057d6..e9c8762 100644
--- a/datafusion/core/src/execution/session_state.rs
+++ b/datafusion/core/src/execution/session_state.rs
@@ -987,8 +987,24 @@
 
     /// Returns a new [SessionStateBuilder] based on an existing [SessionState]
     /// The session id for the new builder will be unset; all other fields will
-    /// be cloned from what is set in the provided session state
+    /// be cloned from what is set in the provided session state. If the default
+    /// catalog exists in existing session state, the new session state will not
+    /// create default catalog and schema.
     pub fn new_from_existing(existing: SessionState) -> Self {
+        let default_catalog_exist = existing
+            .catalog_list()
+            .catalog(&existing.config.options().catalog.default_catalog)
+            .is_some();
+        // The new `with_create_default_catalog_and_schema` should be false if the default catalog exists
+        let create_default_catalog_and_schema = existing
+            .config
+            .options()
+            .catalog
+            .create_default_catalog_and_schema
+            && !default_catalog_exist;
+        let new_config = existing
+            .config
+            .with_create_default_catalog_and_schema(create_default_catalog_and_schema);
         Self {
             session_id: None,
             analyzer: Some(existing.analyzer),
@@ -1005,7 +1021,7 @@
             window_functions: Some(existing.window_functions.into_values().collect_vec()),
             serializer_registry: Some(existing.serializer_registry),
             file_formats: Some(existing.file_formats.into_values().collect_vec()),
-            config: Some(existing.config),
+            config: Some(new_config),
             table_options: Some(existing.table_options),
             execution_props: Some(existing.execution_props),
             table_factories: Some(existing.table_factories),
@@ -1801,17 +1817,19 @@
 
 #[cfg(test)]
 mod tests {
-    use std::collections::HashMap;
-
+    use super::{SessionContextProvider, SessionStateBuilder};
+    use crate::catalog_common::MemoryCatalogProviderList;
+    use crate::datasource::MemTable;
+    use crate::execution::context::SessionState;
+    use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray};
     use arrow_schema::{DataType, Field, Schema};
     use datafusion_common::DFSchema;
     use datafusion_common::Result;
+    use datafusion_execution::config::SessionConfig;
     use datafusion_expr::Expr;
     use datafusion_sql::planner::{PlannerContext, SqlToRel};
-
-    use crate::execution::context::SessionState;
-
-    use super::{SessionContextProvider, SessionStateBuilder};
+    use std::collections::HashMap;
+    use std::sync::Arc;
 
     #[test]
     fn test_session_state_with_default_features() {
@@ -1841,4 +1859,68 @@
 
         assert!(sql_to_expr(&state).is_err())
     }
+
+    #[test]
+    fn test_from_existing() -> Result<()> {
+        fn employee_batch() -> RecordBatch {
+            let name: ArrayRef =
+                Arc::new(StringArray::from_iter_values(["Andy", "Andrew"]));
+            let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22]));
+            RecordBatch::try_from_iter(vec![("name", name), ("age", age)]).unwrap()
+        }
+        let batch = employee_batch();
+        let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?;
+
+        let session_state = SessionStateBuilder::new()
+            .with_catalog_list(Arc::new(MemoryCatalogProviderList::new()))
+            .build();
+        let table_ref = session_state.resolve_table_ref("employee").to_string();
+        session_state
+            .schema_for_ref(&table_ref)?
+            .register_table("employee".to_string(), Arc::new(table))?;
+
+        let default_catalog = session_state
+            .config
+            .options()
+            .catalog
+            .default_catalog
+            .clone();
+        let default_schema = session_state
+            .config
+            .options()
+            .catalog
+            .default_schema
+            .clone();
+        let is_exist = session_state
+            .catalog_list()
+            .catalog(default_catalog.as_str())
+            .unwrap()
+            .schema(default_schema.as_str())
+            .unwrap()
+            .table_exist("employee");
+        assert!(is_exist);
+        let new_state = SessionStateBuilder::new_from_existing(session_state).build();
+        assert!(new_state
+            .catalog_list()
+            .catalog(default_catalog.as_str())
+            .unwrap()
+            .schema(default_schema.as_str())
+            .unwrap()
+            .table_exist("employee"));
+
+        // if `with_create_default_catalog_and_schema` is disabled, the new one shouldn't create default catalog and schema
+        let disable_create_default =
+            SessionConfig::default().with_create_default_catalog_and_schema(false);
+        let without_default_state = SessionStateBuilder::new()
+            .with_config(disable_create_default)
+            .build();
+        assert!(without_default_state
+            .catalog_list()
+            .catalog(&default_catalog)
+            .is_none());
+        let new_state =
+            SessionStateBuilder::new_from_existing(without_default_state).build();
+        assert!(new_state.catalog_list().catalog(&default_catalog).is_none());
+        Ok(())
+    }
 }