[SPARK-47941][SS][CONNECT] Propagate ForeachBatch worker initialization errors to users for PySpark

### What changes were proposed in this pull request?
This change was made to propagate errors in PySpark foreachBatch worker initialization to the user, which can happen if the UDF isn't serializable or deserializable.

### Why are the changes needed?

When the foreachBatch function isn't serializable, previously we were not propagating this error to the user, and instead were failing with an ambiguous SparkConnectGrpcException (while the real error was being written to stderr). With this change, we will send the error message to the user with a more specific error message about worker initialization failure.

### Does this PR introduce _any_ user-facing change?

Yes, we are simply propagating the error to the user instead of just writing the error to stderr.

### How was this patch tested?

Using unit tests and a notebook with the change.
Stack trace before change:
```
SparkConnectGrpcException: (java.io.EOFException)
File <command-41186175708098>, line 27
      def fcn(batch, id):
          print(obj)
       q = (df.select("value")
          .writeStream
          .foreachBatch(fcn)
--->       .start())
```

Stack trace after change:
```
SparkConnectGrpcException: (java.lang.RuntimeException) Runner initialization failed (returned -2). Error message: Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 192, in _read_with_length
    return self.loads(obj)
           ^^^^^^^^^^^^^^^
  File "/databricks/spark/python/pyspark/serializers.py", line 572, in loads
    return cloudpickle.loads(obj, encoding=encoding)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/spark-f94b69d4-fdb2-409f-89cf-1a/.ipykernel/13276/command-41186175708098-95633136", line 4, in _reduce_function
ValueError: Cannot unpickle this object
```
### Was this patch authored or co-authored using generative AI tooling?

No

Closes #46125 from ericm-db/feb-error-prop.

Authored-by: Eric Marnadi <eric.marnadi@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json
index 0a6a2fb..ead1076 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -3779,6 +3779,12 @@
     ],
     "sqlState" : "42601"
   },
+  "STREAMING_PYTHON_RUNNER_INITIALIZATION_FAILURE" : {
+    "message" : [
+      "Streaming Runner initialization failed, returned <resFromPython>. Cause: <msg>"
+    ],
+    "sqlState" : "XXKST"
+  },
   "STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA" : {
     "message" : [
       "Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query.",
diff --git a/common/utils/src/main/scala/org/apache/spark/SparkException.scala b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
index 6bacdd0..398cb1fa 100644
--- a/common/utils/src/main/scala/org/apache/spark/SparkException.scala
+++ b/common/utils/src/main/scala/org/apache/spark/SparkException.scala
@@ -464,6 +464,35 @@
   override def getQueryContext: Array[QueryContext] = context
 }
 
+private[spark] class SparkPythonException private(
+    message: String,
+    cause: Option[Throwable],
+    errorClass: Option[String],
+    messageParameters: Map[String, String],
+    context: Array[QueryContext])
+  extends RuntimeException(message, cause.orNull) with SparkThrowable {
+
+  def this(
+      errorClass: String,
+      messageParameters: Map[String, String],
+      cause: Throwable = null,
+      context: Array[QueryContext] = Array.empty,
+      summary: String = "") = {
+    this(
+      SparkThrowableHelper.getMessage(errorClass, messageParameters, summary),
+      Option(cause),
+      Option(errorClass),
+      messageParameters,
+      context
+    )
+  }
+
+  override def getMessageParameters: java.util.Map[String, String] = messageParameters.asJava
+
+  override def getErrorClass: String = errorClass.orNull
+  override def getQueryContext: Array[QueryContext] = context
+}
+
 /**
  * No such element exception thrown from Spark with an error class.
  */
diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
index de01a70..b238e2b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
@@ -21,7 +21,7 @@
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.SparkEnv
+import org.apache.spark.{SparkEnv, SparkPythonException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.BUFFER_SIZE
 import org.apache.spark.internal.config.Python.PYTHON_AUTH_SOCKET_TIMEOUT
@@ -91,11 +91,27 @@
       new BufferedInputStream(pythonWorker.get.channel.socket().getInputStream, bufferSize))
 
     val resFromPython = dataIn.readInt()
+    if (resFromPython != 0) {
+      val errMessage = PythonWorkerUtils.readUTF(dataIn)
+      throw streamingPythonRunnerInitializationFailure(resFromPython, errMessage)
+    }
     logInfo(s"Runner initialization succeeded (returned $resFromPython).")
 
     (dataOut, dataIn)
   }
 
+  def streamingPythonRunnerInitializationFailure(resFromPython: Int, errMessage: String):
+    StreamingPythonRunnerInitializationException = {
+    new StreamingPythonRunnerInitializationException(resFromPython, errMessage)
+  }
+
+  class StreamingPythonRunnerInitializationException(resFromPython: Int, errMessage: String)
+    extends SparkPythonException(
+      errorClass = "STREAMING_PYTHON_RUNNER_INITIALIZATION_FAILURE",
+      messageParameters = Map(
+        "resFromPython" -> resFromPython.toString,
+        "msg" -> errMessage))
+
   /**
    * Stops the Python worker.
    */
diff --git a/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py
index c5730de..80cc691 100644
--- a/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py
+++ b/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py
@@ -62,11 +62,6 @@
     assert spark_connect_session.session_id == session_id
     spark = spark_connect_session
 
-    func = worker.read_command(pickle_ser, infile)
-    write_int(0, outfile)  # Indicate successful initialization
-
-    outfile.flush()
-
     log_name = "Streaming ForeachBatch worker"
 
     def process(df_id, batch_id):  # type: ignore[no-untyped-def]
@@ -76,16 +71,21 @@
         func(batch_df, batch_id)
         print(f"{log_name} Completed batch {batch_id} with DF id {df_id}")
 
-    while True:
-        df_ref_id = utf8_deserializer.loads(infile)
-        batch_id = read_long(infile)
-        # Handle errors inside Python worker. Write 0 to outfile if no errors and write -2 with
-        # traceback string if error occurs.
-        try:
+    try:
+        func = worker.read_command(pickle_ser, infile)
+        write_int(0, outfile)
+        outfile.flush()
+
+        while True:
+            df_ref_id = utf8_deserializer.loads(infile)
+            batch_id = read_long(infile)
+            # Handle errors inside Python worker. Write 0 to outfile if no errors and write -2 with
+            # traceback string if error occurs.
             process(df_ref_id, int(batch_id))
             write_int(0, outfile)
-        except BaseException as e:
-            handle_worker_exception(e, outfile)
+            outfile.flush()
+    except Exception as e:
+        handle_worker_exception(e, outfile)
         outfile.flush()
 
 
diff --git a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
index 4598cbb..d79bfef 100644
--- a/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
+++ b/python/pyspark/sql/tests/connect/streaming/test_parity_foreach_batch.py
@@ -20,6 +20,7 @@
 from pyspark.sql.tests.streaming.test_streaming_foreach_batch import StreamingTestsForeachBatchMixin
 from pyspark.testing.connectutils import ReusedConnectTestCase
 from pyspark.errors import PySparkPicklingError
+from pyspark.errors.exceptions.connect import SparkConnectGrpcException
 
 
 class StreamingForeachBatchParityTests(StreamingTestsForeachBatchMixin, ReusedConnectTestCase):
@@ -66,6 +67,41 @@
             q = df.writeStream.foreachBatch(func).start()
             q.processAllAvailable()
 
+    def test_worker_initialization_error(self):
+        class SerializableButNotDeserializable:
+            @staticmethod
+            def _reduce_function():
+                raise ValueError("Cannot unpickle this object")
+
+            def __reduce__(self):
+                # Return a static method that cannot be called during unpickling
+                return self._reduce_function, ()
+
+        # Create an instance of the class
+        obj = SerializableButNotDeserializable()
+
+        df = (
+            self.spark.readStream.format("rate")
+            .option("rowsPerSecond", "10")
+            .option("numPartitions", "1")
+            .load()
+        )
+
+        obj = SerializableButNotDeserializable()
+
+        def fcn(df, _):
+            print(obj)
+
+        # Assert that an exception occurs during the initialization
+        with self.assertRaises(SparkConnectGrpcException) as error:
+            df.select("value").writeStream.foreachBatch(fcn).start()
+
+        # Assert that the error message contains the expected string
+        self.assertIn(
+            "Streaming Runner initialization failed",
+            str(error.exception),
+        )
+
     def test_accessing_spark_session(self):
         spark = self.spark