[SPARK-48147][SS][CONNECT] Remove client side listeners when local Spark session is deleted
### What changes were proposed in this pull request?
Add a graceful handing of the local streaming query listener bus. We need to remove all client side listeners, and as a result, a request will be made to the server to remove the server side listener.
### Why are the changes needed?
Graceful exit for spark connect client
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
The change is straightforward
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #46406 from WweiL/client-listener-close.
Authored-by: Wei Liu <wei.liu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py
index e0f5ad6..b688ca0 100644
--- a/python/pyspark/sql/connect/session.py
+++ b/python/pyspark/sql/connect/session.py
@@ -726,6 +726,9 @@
def __del__(self) -> None:
try:
+ # StreamingQueryManager has client states that needs to be cleaned up
+ if hasattr(self, "_sqm"):
+ self._sqm.close()
# Try its best to close.
self.client.close()
except Exception:
diff --git a/python/pyspark/sql/connect/streaming/query.py b/python/pyspark/sql/connect/streaming/query.py
index 0624f89..98ecdc4 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -190,6 +190,9 @@
self._session = session
self._sqlb = StreamingQueryListenerBus(self)
+ def close(self) -> None:
+ self._sqlb.close()
+
@property
def active(self) -> List[StreamingQuery]:
cmd = pb2.StreamingQueryManagerCommand()
@@ -276,6 +279,10 @@
self._execution_thread: Optional[Thread] = None
self._lock = Lock()
+ def close(self) -> None:
+ for listener in self._listener_bus:
+ self.remove(listener)
+
def append(self, listener: StreamingQueryListener) -> None:
"""
Append a listener to the local listener bus. When the added listener is