[KYUUBI #6594] Port HIVE-26633: Make thrift client maxMessageSize configurable

# :mag: Description

Fix #6594.

This PR ports HIVE-26633(https://github.com/apache/hive/pull/3674): Make thrift client maxMessageSize configurable to fix a regression after upgrading Thrift 0.16 in 1.9.0.

## Types of changes :bookmark:

- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)

## Test Plan ๐Ÿงช

---

# Checklist ๐Ÿ“

- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)

**Be nice. Be informative.**

Closes #6631 from pan3793/thrift-max-size.

Closes #6594

e4841c88e [Cheng Pan] [KYUUBI #6594] Port HIVE-26633: Make thrift client maxMessageSize configurable

Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 2b9a71f..68014a5 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -274,6 +274,7 @@
 | kyuubi.frontend.thrift.binary.ssl.disallowed.protocols     | SSLv2,SSLv3        | SSL versions to disallow for Kyuubi thrift binary frontend.                                                                                                                                                                                                                                                                                                                                                                                                                                                                 | set      | 1.7.0  |
 | kyuubi.frontend.thrift.binary.ssl.enabled                  | false              | Set this to true for using SSL encryption in thrift binary frontend server.                                                                                                                                                                                                                                                                                                                                                                                                                                                 | boolean  | 1.7.0  |
 | kyuubi.frontend.thrift.binary.ssl.include.ciphersuites                         || A comma-separated list of include SSL cipher suite names for thrift binary frontend.                                                                                                                                                                                                                                                                                                                                                                                                                                        | seq      | 1.7.0  |
+| kyuubi.frontend.thrift.client.max.message.size             | 1073741824         | Maximum message size in bytes a thrift client will receive.                                                                                                                                                                                                                                                                                                                                                                                                                                                                 | int      | 1.9.3  |
 | kyuubi.frontend.thrift.http.bind.host                      | &lt;undefined&gt;  | Hostname or IP of the machine on which to run the thrift frontend service via http protocol.                                                                                                                                                                                                                                                                                                                                                                                                                                | string   | 1.6.0  |
 | kyuubi.frontend.thrift.http.bind.port                      | 10010              | Port of the machine on which to run the thrift frontend service via http protocol.                                                                                                                                                                                                                                                                                                                                                                                                                                          | int      | 1.6.0  |
 | kyuubi.frontend.thrift.http.compression.enabled            | true               | Enable thrift http compression via Jetty compression support                                                                                                                                                                                                                                                                                                                                                                                                                                                                | boolean  | 1.6.0  |
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index c265f92..a88b5f6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -665,6 +665,13 @@
       .version("1.4.0")
       .fallbackConf(FRONTEND_MAX_MESSAGE_SIZE)
 
+  val FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE: ConfigEntry[Int] =
+    buildConf("kyuubi.frontend.thrift.client.max.message.size")
+      .doc("Maximum message size in bytes a thrift client will receive.")
+      .version("1.9.3")
+      .intConf
+      .createWithDefault(1 * 1024 * 1024 * 1024) // follow HIVE-26633 to use 1g as default value
+
   val FRONTEND_THRIFT_HTTP_REQUEST_HEADER_SIZE: ConfigEntry[Int] =
     buildConf("kyuubi.frontend.thrift.http.request.header.size")
       .doc("Request header size in bytes, when using HTTP transport mode. Jetty defaults used.")
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
index cd9fd51..b3884c6 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
@@ -97,6 +97,7 @@
 
   static final String CONNECT_TIMEOUT = "connectTimeout";
   static final String SOCKET_TIMEOUT = "socketTimeout";
+  static final String THRIFT_CLIENT_MAX_MESSAGE_SIZE = "thrift.client.max.message.size";
 
   // We support ways to specify application name modeled after some existing DBs, since
   // there's no standard approach.
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
index 4c39fb3..eaf71cf 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
@@ -67,6 +67,7 @@
 import org.apache.kyuubi.jdbc.hive.cli.RowSetFactory;
 import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable;
 import org.apache.kyuubi.shaded.hive.service.rpc.thrift.*;
+import org.apache.kyuubi.shaded.thrift.TConfiguration;
 import org.apache.kyuubi.shaded.thrift.TException;
 import org.apache.kyuubi.shaded.thrift.protocol.TBinaryProtocol;
 import org.apache.kyuubi.shaded.thrift.transport.THttpClient;
@@ -419,7 +420,13 @@
     boolean useSsl = isSslConnection();
     // Create an http client from the configs
     httpClient = getHttpClient(useSsl);
-    transport = new THttpClient(getServerHttpUrl(useSsl), httpClient);
+    int maxMessageSize = getMaxMessageSize();
+    TConfiguration.Builder tConfBuilder = TConfiguration.custom();
+    if (maxMessageSize > 0) {
+      tConfBuilder.setMaxMessageSize(maxMessageSize);
+    }
+    TConfiguration tConf = tConfBuilder.build();
+    transport = new THttpClient(tConf, getServerHttpUrl(useSsl), httpClient);
     return transport;
   }
 
@@ -629,7 +636,8 @@
   }
 
   /** Create underlying SSL or non-SSL transport */
-  private TTransport createUnderlyingTransport() throws TTransportException {
+  private TTransport createUnderlyingTransport() throws TTransportException, SQLException {
+    int maxMessageSize = getMaxMessageSize();
     TTransport transport = null;
     // Note: Thrift returns an SSL socket that is already bound to the specified host:port
     // Therefore an open called on this would be a no-op later
@@ -643,19 +651,46 @@
           Utils.getPassword(sessConfMap, JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
 
       if (sslTrustStore == null || sslTrustStore.isEmpty()) {
-        transport = ThriftUtils.getSSLSocket(host, port, connectTimeout, socketTimeout);
+        transport =
+            ThriftUtils.getSSLSocket(host, port, connectTimeout, socketTimeout, maxMessageSize);
       } else {
         transport =
             ThriftUtils.getSSLSocket(
-                host, port, connectTimeout, socketTimeout, sslTrustStore, sslTrustStorePassword);
+                host,
+                port,
+                connectTimeout,
+                socketTimeout,
+                sslTrustStore,
+                sslTrustStorePassword,
+                maxMessageSize);
       }
     } else {
       // get non-SSL socket transport
-      transport = ThriftUtils.getSocketTransport(host, port, connectTimeout, socketTimeout);
+      transport =
+          ThriftUtils.getSocketTransport(host, port, connectTimeout, socketTimeout, maxMessageSize);
     }
     return transport;
   }
 
+  private int getMaxMessageSize() throws SQLException {
+    String maxMessageSize = sessConfMap.get(JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE);
+    if (maxMessageSize == null) {
+      return -1;
+    }
+
+    try {
+      return Integer.parseInt(maxMessageSize);
+    } catch (Exception e) {
+      String errFormat =
+          "Invalid {} configuration of '{}'. Expected an integer specifying number of bytes. "
+              + "A configuration of <= 0 uses default max message size.";
+      String errMsg =
+          String.format(
+              errFormat, JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE, maxMessageSize);
+      throw new SQLException(errMsg, "42000", e);
+    }
+  }
+
   /**
    * Create transport per the connection options Supported transport options are: - SASL based
    * transports over + Kerberos + SSL + non-SSL - Raw (non-SASL) socket
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
index 7f0099b..331b871 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
@@ -24,23 +24,70 @@
 import org.apache.kyuubi.shaded.thrift.transport.TSocket;
 import org.apache.kyuubi.shaded.thrift.transport.TTransport;
 import org.apache.kyuubi.shaded.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class helps in some aspects of authentication. It creates the proper Thrift classes for the
  * given configuration as well as helps with authenticating requests.
  */
 public class ThriftUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ThriftUtils.class);
+
+  /**
+   * Configure the provided T transport's max message size.
+   *
+   * @param transport Transport to configure maxMessage for
+   * @param maxMessageSize Maximum allowed message size in bytes, less than or equal to 0 means use
+   *     the Thrift library default.
+   * @return The passed in T transport configured with desired max message size. The same object
+   *     passed in is returned.
+   */
+  public static <T extends TTransport> T configureThriftMaxMessageSize(
+      T transport, int maxMessageSize) {
+    if (maxMessageSize > 0) {
+      if (transport.getConfiguration() == null) {
+        LOG.warn(
+            "TTransport {} is returning a null Configuration, Thrift max message size is not getting configured",
+            transport.getClass().getName());
+        return transport;
+      }
+      transport.getConfiguration().setMaxMessageSize(maxMessageSize);
+    }
+    return transport;
+  }
+
+  /**
+   * Create a TSocket for the provided host and port with specified connectTimeout, loginTimeout and
+   * maxMessageSize.
+   *
+   * @param host Host to connect to.
+   * @param port Port to connect to.
+   * @param connectTimeout Socket connect timeout (0 means no timeout).
+   * @param socketTimeout Socket read/write timeout (0 means no timeout).
+   * @param maxMessageSize Size in bytes for max allowable Thrift message size, less than or equal
+   *     to 0 results in using the Thrift library default.
+   * @return TTransport TSocket for host/port
+   */
   public static TTransport getSocketTransport(
-      String host, int port, int connectTimeout, int socketTimeout) throws TTransportException {
-    return new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout, connectTimeout);
+      String host, int port, int connectTimeout, int socketTimeout, int maxMessageSize)
+      throws TTransportException {
+    TConfiguration.Builder tConfBuilder = TConfiguration.custom();
+    if (maxMessageSize > 0) {
+      tConfBuilder.setMaxMessageSize(maxMessageSize);
+    }
+    TConfiguration tConf = tConfBuilder.build();
+    return new TSocket(tConf, host, port, socketTimeout, connectTimeout);
   }
 
   public static TTransport getSSLSocket(
-      String host, int port, int connectTimeout, int socketTimeout) throws TTransportException {
+      String host, int port, int connectTimeout, int socketTimeout, int maxMessageSize)
+      throws TTransportException {
     // The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT
     TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout);
     tSSLSocket.setConnectTimeout(connectTimeout);
-    return getSSLSocketWithHttps(tSSLSocket);
+    return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
   }
 
   public static TTransport getSSLSocket(
@@ -49,7 +96,8 @@
       int connectTimeout,
       int socketTimeout,
       String trustStorePath,
-      String trustStorePassWord)
+      String trustStorePassWord,
+      int maxMessageSize)
       throws TTransportException {
     TSSLTransportFactory.TSSLTransportParameters params =
         new TSSLTransportFactory.TSSLTransportParameters();
@@ -59,16 +107,18 @@
     // SSLContext created with the given params
     TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout, params);
     tSSLSocket.setConnectTimeout(connectTimeout);
-    return getSSLSocketWithHttps(tSSLSocket);
+    return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
   }
 
   // Using endpoint identification algorithm as HTTPS enables us to do
   // CNAMEs/subjectAltName verification
-  private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws TTransportException {
+  private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket, int maxMessageSize)
+      throws TTransportException {
     SSLSocket sslSocket = (SSLSocket) tSSLSocket.getSocket();
     SSLParameters sslParams = sslSocket.getSSLParameters();
     sslParams.setEndpointIdentificationAlgorithm("HTTPS");
     sslSocket.setSSLParameters(sslParams);
-    return new TSocket(sslSocket);
+    TSocket tSocket = new TSocket(sslSocket);
+    return configureThriftMaxMessageSize(tSocket, maxMessageSize);
   }
 }
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index d34458c..c36e9ec 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -470,8 +470,10 @@
       host: String,
       port: Int,
       socketTimeout: Int,
-      connectionTimeout: Int): TProtocol = {
-    val tSocket = new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout, connectionTimeout)
+      connectionTimeout: Int,
+      maxMessageSize: Int): TProtocol = {
+    val tConf = TConfiguration.custom().setMaxMessageSize(maxMessageSize).build()
+    val tSocket = new TSocket(tConf, host, port, socketTimeout, connectionTimeout)
     val tTransport = PlainSASLHelper.getPlainTransport(user, passwd, tSocket)
     tTransport.open()
     new TBinaryProtocol(tTransport)
@@ -485,15 +487,23 @@
       conf: KyuubiConf): KyuubiSyncThriftClient = {
     val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
     val loginTimeout = conf.get(ENGINE_LOGIN_TIMEOUT).toInt
+    val maxMessageSize = conf.get(KyuubiConf.FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE)
     val aliveProbeEnabled = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
     val aliveProbeInterval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL).toInt
     val aliveTimeout = conf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
 
-    val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout)
+    val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout, maxMessageSize)
 
     val aliveProbeProtocol =
       if (aliveProbeEnabled) {
-        Option(createTProtocol(user, passwd, host, port, aliveProbeInterval, loginTimeout))
+        Some(createTProtocol(
+          user,
+          passwd,
+          host,
+          port,
+          aliveProbeInterval,
+          loginTimeout,
+          maxMessageSize))
       } else {
         None
       }