[SPARK-54194][CONNECT][FOLLOWUP] Spark Connect Proto Plan Compression - Scala Client

### What changes were proposed in this pull request?

In the previous PR https://github.com/apache/spark/pull/52894 of Spark Connect Proto Plan Compression, both Server-side and PySpark client changes were implemented.

In this PR, the corresponding Scala client changes are implemented, so plan compression are now supported on the Scala client as well.

To reproduce the existing issue we are solving here, run this code on Spark Connect Scala client:
```
import scala.util.Random
import org.apache.spark.sql.DataFrame
import spark.implicits._

def randomLetters(n: Int): String = {
  Iterator.continually(Random.nextPrintableChar())
    .filter(_.isLetter)
    .take(n)
    .mkString
}

val numUniqueSmallRelations = 5
val sizePerSmallRelation = 512 * 1024
val smallDfs: Seq[DataFrame] =
  (0 until numUniqueSmallRelations).map { _ =>
    Seq(randomLetters(sizePerSmallRelation)).toDF("value")
  }

var resultDf = smallDfs.head
for (_ <- 0 until 500) {
  val idx = Random.nextInt(smallDfs.length)
  resultDf = resultDf.unionByName(smallDfs(idx))
}

resultDf.collect()
```
It fails with RESOURCE_EXHAUSTED error with message `gRPC message exceeds maximum size 134217728: 269207219`, because the server is trying to send an ExecutePlanResponse of ~260MB to the client.

With the improvement introduced by the PR, the above code runs successfully and prints the expected result.

### Why are the changes needed?

It improves Spark Connect stability when handling large plans.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #53003 from xi-db/plan-compression-scala-client.

Authored-by: Xi Lyu <xi.lyu@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
diff --git a/python/docs/source/getting_started/install.rst b/python/docs/source/getting_started/install.rst
index 8b3c969..6b5a092 100644
--- a/python/docs/source/getting_started/install.rst
+++ b/python/docs/source/getting_started/install.rst
@@ -230,6 +230,7 @@
 `grpcio`                   >=1.76.0          Required for Spark Connect
 `grpcio-status`            >=1.76.0          Required for Spark Connect
 `googleapis-common-protos` >=1.71.0          Required for Spark Connect
+`zstandard`                >=0.25.0          Required for Spark Connect
 `graphviz`                 >=0.20            Optional for Spark Connect
 ========================== ================= ==========================
 
@@ -313,6 +314,7 @@
 `grpcio`                   >=1.76.0          Required for Spark Connect
 `grpcio-status`            >=1.76.0          Required for Spark Connect
 `googleapis-common-protos` >=1.71.0          Required for Spark Connect
+`zstandard`                >=0.25.0          Required for Spark Connect
 `pyyaml`                   >=3.11            Required for spark-pipelines command line interface
 `graphviz`                 >=0.20            Optional for Spark Connect
 ========================== ================= ===================================================
diff --git a/sql/connect/client/jvm/pom.xml b/sql/connect/client/jvm/pom.xml
index 38c1ebd..6c0c086 100644
--- a/sql/connect/client/jvm/pom.xml
+++ b/sql/connect/client/jvm/pom.xml
@@ -84,6 +84,11 @@
       <artifactId>failureaccess</artifactId>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>com.github.luben</groupId>
+      <artifactId>zstd-jni</artifactId>
+      <scope>compile</scope>
+    </dependency>
     <!--
       When upgrading ammonite, consider upgrading semanticdb-shared too.
     -->
diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
index 450ff8c..33524c8 100644
--- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
+++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/ClientE2ETestSuite.scala
@@ -41,7 +41,7 @@
 import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.connect.ConnectConversions._
-import org.apache.spark.sql.connect.client.{RetryPolicy, SparkConnectClient, SparkResult}
+import org.apache.spark.sql.connect.client.{PlanCompressionOptions, RetryPolicy, SparkConnectClient, SparkResult}
 import org.apache.spark.sql.connect.test.{ConnectFunSuite, IntegrationTestUtils, QueryTest, RemoteSparkSession, SQLHelper}
 import org.apache.spark.sql.connect.test.SparkConnectServerUtils.{createSparkSession, port}
 import org.apache.spark.sql.functions._
@@ -2005,6 +2005,22 @@
         }
     }
   }
+
+  test("Plan compression works correctly") {
+    val originalPlanCompressionOptions = spark.client.getPlanCompressionOptions
+    assert(originalPlanCompressionOptions.nonEmpty)
+    assert(originalPlanCompressionOptions.get.thresholdBytes > 0)
+    assert(originalPlanCompressionOptions.get.algorithm == "ZSTD")
+    try {
+      spark.client.setPlanCompressionOptions(Some(PlanCompressionOptions(1000, "ZSTD")))
+      // Execution should work
+      assert(spark.sql(s"select '${"Apache Spark" * 10000}' as value").collect().length == 1)
+      // Analysis should work
+      assert(spark.sql(s"select '${"Apache Spark" * 10000}' as value").columns.length == 1)
+    } finally {
+      spark.client.setPlanCompressionOptions(originalPlanCompressionOptions)
+    }
+  }
 }
 
 private[sql] case class ClassData(a: String, b: Int)
diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
index a41ea34..743112c 100644
--- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
+++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala
@@ -187,6 +187,9 @@
       .builder()
       .connectionString(s"sc://localhost:${server.getPort}")
       .build()
+    // Disable plan compression to make sure there is only one RPC request in client.analyze,
+    // so the interceptor can capture the initial header.
+    client.setPlanCompressionOptions(None)
 
     val session = SparkSession.builder().client(client).create()
     val df = session.range(10)
@@ -521,6 +524,9 @@
       .connectionString(s"sc://localhost:${server.getPort}")
       .enableReattachableExecute()
       .build()
+    // Disable plan compression to make sure there is only one RPC request in client.analyze,
+    // so the interceptor can capture the initial header.
+    client.setPlanCompressionOptions(None)
 
     val plan = buildPlan("select * from range(10000000)")
     val dummyUUID = "10a4c38e-7e87-40ee-9d6f-60ff0751e63b"
@@ -533,6 +539,87 @@
       assert(resp.getOperationId == dummyUUID)
     }
   }
+
+  test("Plan compression works correctly for execution") {
+    startDummyServer(0)
+    client = SparkConnectClient
+      .builder()
+      .connectionString(s"sc://localhost:${server.getPort}")
+      .enableReattachableExecute()
+      .build()
+    // Set plan compression options for testing
+    client.setPlanCompressionOptions(Some(PlanCompressionOptions(1000, "ZSTD")))
+
+    // Small plan should not be compressed
+    val plan = buildPlan("select * from range(10)")
+    val iter = client.execute(plan)
+    val reattachableIter =
+      ExecutePlanResponseReattachableIterator.fromIterator(iter)
+    while (reattachableIter.hasNext) {
+      reattachableIter.next()
+    }
+    assert(service.getAndClearLatestInputPlan().hasRoot)
+
+    // Large plan should be compressed
+    val plan2 = buildPlan(s"select ${"Apache Spark" * 10000} as value")
+    val iter2 = client.execute(plan2)
+    val reattachableIter2 =
+      ExecutePlanResponseReattachableIterator.fromIterator(iter2)
+    while (reattachableIter2.hasNext) {
+      reattachableIter2.next()
+    }
+    assert(service.getAndClearLatestInputPlan().hasCompressedOperation)
+  }
+
+  test("Plan compression works correctly for analysis") {
+    startDummyServer(0)
+    client = SparkConnectClient
+      .builder()
+      .connectionString(s"sc://localhost:${server.getPort}")
+      .enableReattachableExecute()
+      .build()
+    // Set plan compression options for testing
+    client.setPlanCompressionOptions(Some(PlanCompressionOptions(1000, "ZSTD")))
+
+    // Small plan should not be compressed
+    val plan = buildPlan("select * from range(10)")
+    client.analyze(proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA, Some(plan))
+    assert(service.getAndClearLatestInputPlan().hasRoot)
+
+    // Large plan should be compressed
+    val plan2 = buildPlan(s"select ${"Apache Spark" * 10000} as value")
+    client.analyze(proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA, Some(plan2))
+    assert(service.getAndClearLatestInputPlan().hasCompressedOperation)
+  }
+
+  test("Plan compression will be disabled if the configs are not defined on the server") {
+    startDummyServer(0)
+    client = SparkConnectClient
+      .builder()
+      .connectionString(s"sc://localhost:${server.getPort}")
+      .enableReattachableExecute()
+      .build()
+
+    service.setErrorToThrowOnConfig(
+      "spark.connect.session.planCompression.defaultAlgorithm",
+      new StatusRuntimeException(Status.INTERNAL.withDescription("SQL_CONF_NOT_FOUND")))
+
+    // Execute a few queries to make sure the client fetches the configs only once.
+    (1 to 3).foreach { _ =>
+      val plan = buildPlan(s"select ${"Apache Spark" * 10000} as value")
+      val iter = client.execute(plan)
+      val reattachableIter =
+        ExecutePlanResponseReattachableIterator.fromIterator(iter)
+      while (reattachableIter.hasNext) {
+        reattachableIter.next()
+      }
+      assert(service.getAndClearLatestInputPlan().hasRoot)
+    }
+    // The plan compression options should be empty.
+    assert(client.getPlanCompressionOptions.isEmpty)
+    // The client should try to fetch the config only once.
+    assert(service.getAndClearLatestConfigRequests().size == 1)
+  }
 }
 
 class DummySparkConnectService() extends SparkConnectServiceGrpc.SparkConnectServiceImplBase {
@@ -540,9 +627,17 @@
   private var inputPlan: proto.Plan = _
   private val inputArtifactRequests: mutable.ListBuffer[AddArtifactsRequest] =
     mutable.ListBuffer.empty
+  private val inputConfigRequests = mutable.ListBuffer.empty[proto.ConfigRequest]
+  private val sparkConfigs = mutable.Map.empty[String, String]
 
   var errorToThrowOnExecute: Option[Throwable] = None
 
+  private var errorToThrowOnConfig: Map[String, Throwable] = Map.empty
+
+  private[sql] def setErrorToThrowOnConfig(key: String, error: Throwable): Unit = synchronized {
+    errorToThrowOnConfig = errorToThrowOnConfig + (key -> error)
+  }
+
   private[sql] def getAndClearLatestInputPlan(): proto.Plan = synchronized {
     val plan = inputPlan
     inputPlan = null
@@ -556,6 +651,13 @@
       requests
     }
 
+  private[sql] def getAndClearLatestConfigRequests(): Seq[proto.ConfigRequest] =
+    synchronized {
+      val requests = inputConfigRequests.clone().toSeq
+      inputConfigRequests.clear()
+      requests
+    }
+
   override def executePlan(
       request: ExecutePlanRequest,
       responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
@@ -666,6 +768,38 @@
     responseObserver.onCompleted()
   }
 
+  override def config(
+      request: proto.ConfigRequest,
+      responseObserver: StreamObserver[proto.ConfigResponse]): Unit = {
+    inputConfigRequests.synchronized {
+      inputConfigRequests.append(request)
+    }
+    require(
+      request.getOperation.hasGetOption,
+      "Only GetOption is supported. Other operations " +
+        "can be implemented by following the same procedure below.")
+
+    val responseBuilder = proto.ConfigResponse.newBuilder().setSessionId(request.getSessionId)
+    request.getOperation.getGetOption.getKeysList.asScala.iterator.foreach { key =>
+      if (errorToThrowOnConfig.contains(key)) {
+        val error = errorToThrowOnConfig(key)
+        responseObserver.onError(error)
+        return
+      }
+
+      val kvBuilder = proto.KeyValue.newBuilder()
+      synchronized {
+        sparkConfigs.get(key).foreach { value =>
+          kvBuilder.setKey(key)
+          kvBuilder.setValue(value)
+        }
+      }
+      responseBuilder.addPairs(kvBuilder.build())
+    }
+    responseObserver.onNext(responseBuilder.build())
+    responseObserver.onCompleted()
+  }
+
   override def interrupt(
       request: proto.InterruptRequest,
       responseObserver: StreamObserver[proto.InterruptResponse]): Unit = {
diff --git a/sql/connect/common/pom.xml b/sql/connect/common/pom.xml
index e7c4b62..550fe8b 100644
--- a/sql/connect/common/pom.xml
+++ b/sql/connect/common/pom.xml
@@ -87,6 +87,10 @@
             <artifactId>netty-transport-native-unix-common</artifactId>
             <version>${netty.version}</version>
         </dependency>
+        <dependency>
+            <groupId>com.github.luben</groupId>
+            <artifactId>zstd-jni</artifactId>
+        </dependency>
         <!--
           This spark-tags test-dep is needed even though it isn't used in this module,
           otherwise testing-cmds that excludethem will yield errors.
diff --git a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
index ee42a87..1a7d062 100644
--- a/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
+++ b/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala
@@ -24,13 +24,20 @@
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 import scala.util.Properties
+import scala.util.control.NonFatal
 
+import com.google.protobuf
+import com.google.protobuf.ByteString
 import io.grpc._
 
 import org.apache.spark.SparkBuildInfo.{spark_version => SPARK_VERSION}
+import org.apache.spark.SparkThrowable
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.UserContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.LogKeys.{ERROR, RATIO, SIZE, TIME}
+import org.apache.spark.sql.connect.RuntimeConfig
 import org.apache.spark.sql.connect.common.ProtoUtils
 import org.apache.spark.sql.connect.common.config.ConnectCommon
 import org.apache.spark.util.SparkSystemUtils
@@ -40,7 +47,8 @@
  */
 private[sql] class SparkConnectClient(
     private[sql] val configuration: SparkConnectClient.Configuration,
-    private[sql] val channel: ManagedChannel) {
+    private[sql] val channel: ManagedChannel)
+    extends Logging {
 
   private val userContext: UserContext = configuration.userContext
 
@@ -64,6 +72,70 @@
   // a new client will create a new session ID.
   private[sql] val sessionId: String = configuration.sessionId.getOrElse(UUID.randomUUID.toString)
 
+  private val conf: RuntimeConfig = new RuntimeConfig(this)
+
+  // Cached plan compression options.
+  private var _planCompressionOptions: Option[Option[PlanCompressionOptions]] = None
+
+  // Get the plan compression options. The options are cached after the first call.
+  private[sql] def getPlanCompressionOptions: Option[PlanCompressionOptions] = {
+    _planCompressionOptions match {
+      case Some(options) => options
+      case None =>
+        val options =
+          try {
+            Some(
+              PlanCompressionOptions(
+                thresholdBytes =
+                  conf.get("spark.connect.session.planCompression.threshold").toInt,
+                algorithm = conf.get("spark.connect.session.planCompression.defaultAlgorithm")))
+          } catch {
+            // Disable plan compression if the server does not support it. Other exceptions are not
+            // swallowed.
+            case e: NoSuchElementException =>
+              logWarning(
+                log"Plan compression is disabled because the server does not support it",
+                e)
+              None
+            case e: SparkThrowable
+                if e.getCondition == "INVALID_CONF_VALUE"
+                  || e.getCondition == "SQL_CONF_NOT_FOUND"
+                  || e.getCondition == "CONFIG_NOT_AVAILABLE" =>
+              logWarning(
+                log"Plan compression is disabled because the server does not support it",
+                e)
+              None
+          }
+        _planCompressionOptions = Some(options)
+        options
+    }
+  }
+
+  // For testing and internal use only.
+  private[sql] def setPlanCompressionOptions(
+      planCompressionOptions: Option[PlanCompressionOptions]): Unit = {
+    _planCompressionOptions = Some(planCompressionOptions)
+  }
+
+  /**
+   * Handle plan compression errors.
+   */
+  private def handlePlanCompressionErrors[E](fn: => E): E = {
+    try {
+      fn
+    } catch {
+      // If the server cannot parse the compressed plan, disable plan compression for subsequent
+      // requests on the session.
+      case e: SparkThrowable if e.getCondition == "CONNECT_INVALID_PLAN.CANNOT_PARSE" =>
+        logWarning(
+          log"Disabling plan compression for the session due to " +
+            log"CONNECT_INVALID_PLAN.CANNOT_PARSE error.")
+        setPlanCompressionOptions(None)
+        // Retry the code block without plan compression.
+        fn
+    }
+  }
+
   /**
    * Hijacks the stored server side session ID with the given suffix. Used for testing to make
    * sure that server is validating the session ID.
@@ -121,6 +193,90 @@
   }
 
   /**
+   * Try to compress the plan if it exceeds the threshold defined in the planCompressionOptions.
+   * Return the original plan if compression is disabled, not needed, or not effective.
+   */
+  private def tryCompressPlan(plan: proto.Plan): proto.Plan = {
+    def tryCompressMessage(
+        message: protobuf.Message,
+        opType: proto.Plan.CompressedOperation.OpType,
+        options: PlanCompressionOptions): Option[proto.Plan.CompressedOperation] = {
+      val serialized = message.toByteArray
+      if (serialized.length > options.thresholdBytes) {
+        try {
+          import com.github.luben.zstd.Zstd
+
+          val startTime = System.nanoTime()
+          val compressed = Zstd.compress(serialized)
+          val duration = (System.nanoTime() - startTime) / 1e9
+          val savingRatio = 1 - compressed.length.toDouble / serialized.length
+          logDebug(
+            log"Plan compression: original_size=${MDC(SIZE, serialized.length)}, " +
+              log"compressed_size=${MDC(SIZE, compressed.length)}, " +
+              log"saving_ratio=${MDC(RATIO, savingRatio)}, " +
+              log"duration_s=${MDC(TIME, duration)}")
+          if (compressed.length < serialized.length) {
+            return Some(
+              proto.Plan.CompressedOperation
+                .newBuilder()
+                .setData(ByteString.copyFrom(compressed))
+                .setOpType(opType)
+                .setCompressionCodec(proto.CompressionCodec.COMPRESSION_CODEC_ZSTD)
+                .build())
+          } else {
+            logDebug(log"Plan compression not effective. Using original plan.")
+          }
+        } catch {
+          case _: NoClassDefFoundError | _: ClassNotFoundException =>
+            logInfo(log"Zstd library not available. Disabling plan compression.")
+            setPlanCompressionOptions(None)
+          case NonFatal(e) =>
+            logWarning(
+              log"Failed to compress plan: ${MDC(ERROR, e.getMessage)}. Using original " +
+                log"plan and disabling plan compression.")
+            setPlanCompressionOptions(None)
+        }
+      }
+      None
+    }
+
+    def maybeCompressPlan(
+        plan: proto.Plan,
+        message: protobuf.Message,
+        opType: proto.Plan.CompressedOperation.OpType,
+        clearFn: proto.Plan.Builder => proto.Plan.Builder,
+        options: PlanCompressionOptions): proto.Plan = {
+      tryCompressMessage(message, opType, options) match {
+        case Some(compressedOperation) =>
+          clearFn(proto.Plan.newBuilder(plan)).setCompressedOperation(compressedOperation).build()
+        case None => plan
+      }
+    }
+
+    getPlanCompressionOptions match {
+      case Some(options) if options.algorithm == "ZSTD" && options.thresholdBytes >= 0 =>
+        plan.getOpTypeCase match {
+          case proto.Plan.OpTypeCase.ROOT =>
+            maybeCompressPlan(
+              plan,
+              plan.getRoot,
+              proto.Plan.CompressedOperation.OpType.OP_TYPE_RELATION,
+              _.clearRoot(),
+              options)
+          case proto.Plan.OpTypeCase.COMMAND =>
+            maybeCompressPlan(
+              plan,
+              plan.getCommand,
+              proto.Plan.CompressedOperation.OpType.OP_TYPE_COMMAND,
+              _.clearCommand(),
+              options)
+          case _ => plan
+        }
+      case _ => plan
+    }
+  }
+
+  /**
    * Execute the plan and return response iterator.
    *
    * It returns CloseableIterator. For resource management it is better to close it once you are
@@ -131,41 +287,46 @@
       plan: proto.Plan,
       operationId: Option[String] = None): CloseableIterator[proto.ExecutePlanResponse] = {
     artifactManager.uploadAllClassFileArtifacts()
-    val request = proto.ExecutePlanRequest
-      .newBuilder()
-      .setPlan(plan)
-      .setUserContext(userContext)
-      .setSessionId(sessionId)
-      .setClientType(userAgent)
-      .addAllTags(tags.get.toSeq.asJava)
-
-    // Add request option to allow result chunking.
-    if (configuration.allowArrowBatchChunking) {
-      val chunkingOptionsBuilder = proto.ResultChunkingOptions
+    handlePlanCompressionErrors {
+      // Compress the plan if needed.
+      val maybeCompressedPlan = tryCompressPlan(plan)
+      val request = proto.ExecutePlanRequest
         .newBuilder()
-        .setAllowArrowBatchChunking(true)
-      configuration.preferredArrowChunkSize.foreach { size =>
-        chunkingOptionsBuilder.setPreferredArrowChunkSize(size)
-      }
-      request.addRequestOptions(
-        proto.ExecutePlanRequest.RequestOption
-          .newBuilder()
-          .setResultChunkingOptions(chunkingOptionsBuilder.build())
-          .build())
-    }
+        .setPlan(maybeCompressedPlan)
+        .setUserContext(userContext)
+        .setSessionId(sessionId)
+        .setClientType(userAgent)
+        .addAllTags(tags.get.toSeq.asJava)
 
-    serverSideSessionId.foreach(session => request.setClientObservedServerSideSessionId(session))
-    operationId.foreach { opId =>
-      require(
-        isValidUUID(opId),
-        s"Invalid operationId: $opId. The id must be an UUID string of " +
-          "the format `00112233-4455-6677-8899-aabbccddeeff`")
-      request.setOperationId(opId)
-    }
-    if (configuration.useReattachableExecute) {
-      bstub.executePlanReattachable(request.build())
-    } else {
-      bstub.executePlan(request.build())
+      // Add request option to allow result chunking.
+      if (configuration.allowArrowBatchChunking) {
+        val chunkingOptionsBuilder = proto.ResultChunkingOptions
+          .newBuilder()
+          .setAllowArrowBatchChunking(true)
+        configuration.preferredArrowChunkSize.foreach { size =>
+          chunkingOptionsBuilder.setPreferredArrowChunkSize(size)
+        }
+        request.addRequestOptions(
+          proto.ExecutePlanRequest.RequestOption
+            .newBuilder()
+            .setResultChunkingOptions(chunkingOptionsBuilder.build())
+            .build())
+      }
+
+      serverSideSessionId.foreach(session =>
+        request.setClientObservedServerSideSessionId(session))
+      operationId.foreach { opId =>
+        require(
+          isValidUUID(opId),
+          s"Invalid operationId: $opId. The id must be an UUID string of " +
+            "the format `00112233-4455-6677-8899-aabbccddeeff`")
+        request.setOperationId(opId)
+      }
+      if (configuration.useReattachableExecute) {
+        bstub.executePlanReattachable(request.build())
+      } else {
+        bstub.executePlan(request.build())
+      }
     }
   }
 
@@ -196,71 +357,87 @@
       plan: Option[proto.Plan] = None,
       explainMode: Option[proto.AnalyzePlanRequest.Explain.ExplainMode] = None)
       : proto.AnalyzePlanResponse = {
-    val builder = proto.AnalyzePlanRequest.newBuilder()
-    method match {
-      case proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA =>
-        assert(plan.isDefined)
-        builder.setSchema(
-          proto.AnalyzePlanRequest.Schema
-            .newBuilder()
-            .setPlan(plan.get)
-            .build())
-      case proto.AnalyzePlanRequest.AnalyzeCase.EXPLAIN =>
-        if (explainMode.isEmpty) {
-          throw new IllegalArgumentException(s"ExplainMode is required in Explain request")
-        }
-        assert(plan.isDefined)
-        builder.setExplain(
-          proto.AnalyzePlanRequest.Explain
-            .newBuilder()
-            .setPlan(plan.get)
-            .setExplainMode(explainMode.get)
-            .build())
-      case proto.AnalyzePlanRequest.AnalyzeCase.IS_LOCAL =>
-        assert(plan.isDefined)
-        builder.setIsLocal(
-          proto.AnalyzePlanRequest.IsLocal
-            .newBuilder()
-            .setPlan(plan.get)
-            .build())
-      case proto.AnalyzePlanRequest.AnalyzeCase.IS_STREAMING =>
-        assert(plan.isDefined)
-        builder.setIsStreaming(
-          proto.AnalyzePlanRequest.IsStreaming
-            .newBuilder()
-            .setPlan(plan.get)
-            .build())
-      case proto.AnalyzePlanRequest.AnalyzeCase.INPUT_FILES =>
-        assert(plan.isDefined)
-        builder.setInputFiles(
-          proto.AnalyzePlanRequest.InputFiles
-            .newBuilder()
-            .setPlan(plan.get)
-            .build())
-      case proto.AnalyzePlanRequest.AnalyzeCase.SPARK_VERSION =>
-        builder.setSparkVersion(proto.AnalyzePlanRequest.SparkVersion.newBuilder().build())
-      case other => throw new IllegalArgumentException(s"Unknown Analyze request $other")
+    handlePlanCompressionErrors {
+      val builder = proto.AnalyzePlanRequest.newBuilder()
+      // Compress the plan if needed.
+      val maybeCompressedPlan = plan match {
+        case Some(p) => Some(tryCompressPlan(p))
+        case None => None
+      }
+      method match {
+        case proto.AnalyzePlanRequest.AnalyzeCase.SCHEMA =>
+          assert(maybeCompressedPlan.isDefined)
+          builder.setSchema(
+            proto.AnalyzePlanRequest.Schema
+              .newBuilder()
+              .setPlan(maybeCompressedPlan.get)
+              .build())
+        case proto.AnalyzePlanRequest.AnalyzeCase.EXPLAIN =>
+          if (explainMode.isEmpty) {
+            throw new IllegalArgumentException(s"ExplainMode is required in Explain request")
+          }
+          assert(maybeCompressedPlan.isDefined)
+          builder.setExplain(
+            proto.AnalyzePlanRequest.Explain
+              .newBuilder()
+              .setPlan(maybeCompressedPlan.get)
+              .setExplainMode(explainMode.get)
+              .build())
+        case proto.AnalyzePlanRequest.AnalyzeCase.IS_LOCAL =>
+          assert(maybeCompressedPlan.isDefined)
+          builder.setIsLocal(
+            proto.AnalyzePlanRequest.IsLocal
+              .newBuilder()
+              .setPlan(maybeCompressedPlan.get)
+              .build())
+        case proto.AnalyzePlanRequest.AnalyzeCase.IS_STREAMING =>
+          assert(maybeCompressedPlan.isDefined)
+          builder.setIsStreaming(
+            proto.AnalyzePlanRequest.IsStreaming
+              .newBuilder()
+              .setPlan(maybeCompressedPlan.get)
+              .build())
+        case proto.AnalyzePlanRequest.AnalyzeCase.INPUT_FILES =>
+          assert(maybeCompressedPlan.isDefined)
+          builder.setInputFiles(
+            proto.AnalyzePlanRequest.InputFiles
+              .newBuilder()
+              .setPlan(maybeCompressedPlan.get)
+              .build())
+        case proto.AnalyzePlanRequest.AnalyzeCase.SPARK_VERSION =>
+          builder.setSparkVersion(proto.AnalyzePlanRequest.SparkVersion.newBuilder().build())
+        case other => throw new IllegalArgumentException(s"Unknown Analyze request $other")
+      }
+      analyze(builder)
     }
-    analyze(builder)
   }
 
   def sameSemantics(plan: proto.Plan, otherPlan: proto.Plan): proto.AnalyzePlanResponse = {
-    val builder = proto.AnalyzePlanRequest.newBuilder()
-    builder.setSameSemantics(
-      proto.AnalyzePlanRequest.SameSemantics
-        .newBuilder()
-        .setTargetPlan(plan)
-        .setOtherPlan(otherPlan))
-    analyze(builder)
+    handlePlanCompressionErrors {
+      val builder = proto.AnalyzePlanRequest.newBuilder()
+      // Compress the plan if needed.
+      val maybeCompressedPlan = tryCompressPlan(plan)
+      val otherMaybeCompressedPlan = tryCompressPlan(otherPlan)
+      builder.setSameSemantics(
+        proto.AnalyzePlanRequest.SameSemantics
+          .newBuilder()
+          .setTargetPlan(maybeCompressedPlan)
+          .setOtherPlan(otherMaybeCompressedPlan))
+      analyze(builder)
+    }
   }
 
   def semanticHash(plan: proto.Plan): proto.AnalyzePlanResponse = {
-    val builder = proto.AnalyzePlanRequest.newBuilder()
-    builder.setSemanticHash(
-      proto.AnalyzePlanRequest.SemanticHash
-        .newBuilder()
-        .setPlan(plan))
-    analyze(builder)
+    handlePlanCompressionErrors {
+      val builder = proto.AnalyzePlanRequest.newBuilder()
+      // Compress the plan if needed.
+      val maybeCompressedPlan = tryCompressPlan(plan)
+      builder.setSemanticHash(
+        proto.AnalyzePlanRequest.SemanticHash
+          .newBuilder()
+          .setPlan(maybeCompressedPlan))
+      analyze(builder)
+    }
   }
 
   private[sql] def analyze(
@@ -479,6 +656,9 @@
   }
 }
 
+// Options for plan compression
+case class PlanCompressionOptions(thresholdBytes: Int, algorithm: String)
+
 object SparkConnectClient {
 
   private[sql] val SPARK_REMOTE: String = "SPARK_REMOTE"
diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 1ffed71..1df97d8 100644
--- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -434,7 +434,8 @@
   val CONNECT_SESSION_PLAN_COMPRESSION_THRESHOLD =
     buildConf("spark.connect.session.planCompression.threshold")
       .doc("The threshold in bytes for the size of proto plan to be compressed. " +
-        "If the size of proto plan is smaller than this threshold, it will not be compressed.")
+        "If the size of proto plan is smaller than this threshold, it will not be compressed. " +
+        "Set to -1 to disable plan compression.")
       .version("4.1.0")
       .internal()
       .intConf
diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
index 91dbf41..a433534 100644
--- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
+++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@@ -182,8 +182,8 @@
     }
     withClient(sessionId = sessionId, userId = userId) { client =>
       // shall not be able to create a new session with the same id and user.
-      val query = client.execute(buildPlan("SELECT 1"))
       val queryError = intercept[SparkException] {
+        val query = client.execute(buildPlan("SELECT 1"))
         while (query.hasNext) query.next()
       }
       assert(queryError.getMessage.contains("INVALID_HANDLE.SESSION_CLOSED"))