[KYUUBI #6594] Port HIVE-26633: Make thrift client maxMessageSize configurable
# :mag: Description
Fix #6594.
This PR ports HIVE-26633(https://github.com/apache/hive/pull/3674): Make thrift client maxMessageSize configurable to fix a regression after upgrading Thrift 0.16 in 1.9.0.
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing functionality to change)
## Test Plan ๐งช
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6631 from pan3793/thrift-max-size.
Closes #6594
e4841c88e [Cheng Pan] [KYUUBI #6594] Port HIVE-26633: Make thrift client maxMessageSize configurable
Authored-by: Cheng Pan <chengpan@apache.org>
Signed-off-by: Cheng Pan <chengpan@apache.org>
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 2b9a71f..68014a5 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -274,6 +274,7 @@
| kyuubi.frontend.thrift.binary.ssl.disallowed.protocols | SSLv2,SSLv3 | SSL versions to disallow for Kyuubi thrift binary frontend. | set | 1.7.0 |
| kyuubi.frontend.thrift.binary.ssl.enabled | false | Set this to true for using SSL encryption in thrift binary frontend server. | boolean | 1.7.0 |
| kyuubi.frontend.thrift.binary.ssl.include.ciphersuites || A comma-separated list of include SSL cipher suite names for thrift binary frontend. | seq | 1.7.0 |
+| kyuubi.frontend.thrift.client.max.message.size | 1073741824 | Maximum message size in bytes a thrift client will receive. | int | 1.9.3 |
| kyuubi.frontend.thrift.http.bind.host | <undefined> | Hostname or IP of the machine on which to run the thrift frontend service via http protocol. | string | 1.6.0 |
| kyuubi.frontend.thrift.http.bind.port | 10010 | Port of the machine on which to run the thrift frontend service via http protocol. | int | 1.6.0 |
| kyuubi.frontend.thrift.http.compression.enabled | true | Enable thrift http compression via Jetty compression support | boolean | 1.6.0 |
diff --git a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index c265f92..a88b5f6 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -665,6 +665,13 @@
.version("1.4.0")
.fallbackConf(FRONTEND_MAX_MESSAGE_SIZE)
+ val FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE: ConfigEntry[Int] =
+ buildConf("kyuubi.frontend.thrift.client.max.message.size")
+ .doc("Maximum message size in bytes a thrift client will receive.")
+ .version("1.9.3")
+ .intConf
+ .createWithDefault(1 * 1024 * 1024 * 1024) // follow HIVE-26633 to use 1g as default value
+
val FRONTEND_THRIFT_HTTP_REQUEST_HEADER_SIZE: ConfigEntry[Int] =
buildConf("kyuubi.frontend.thrift.http.request.header.size")
.doc("Request header size in bytes, when using HTTP transport mode. Jetty defaults used.")
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
index cd9fd51..b3884c6 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/JdbcConnectionParams.java
@@ -97,6 +97,7 @@
static final String CONNECT_TIMEOUT = "connectTimeout";
static final String SOCKET_TIMEOUT = "socketTimeout";
+ static final String THRIFT_CLIENT_MAX_MESSAGE_SIZE = "thrift.client.max.message.size";
// We support ways to specify application name modeled after some existing DBs, since
// there's no standard approach.
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
index 4c39fb3..eaf71cf 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
@@ -67,6 +67,7 @@
import org.apache.kyuubi.jdbc.hive.cli.RowSetFactory;
import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable;
import org.apache.kyuubi.shaded.hive.service.rpc.thrift.*;
+import org.apache.kyuubi.shaded.thrift.TConfiguration;
import org.apache.kyuubi.shaded.thrift.TException;
import org.apache.kyuubi.shaded.thrift.protocol.TBinaryProtocol;
import org.apache.kyuubi.shaded.thrift.transport.THttpClient;
@@ -419,7 +420,13 @@
boolean useSsl = isSslConnection();
// Create an http client from the configs
httpClient = getHttpClient(useSsl);
- transport = new THttpClient(getServerHttpUrl(useSsl), httpClient);
+ int maxMessageSize = getMaxMessageSize();
+ TConfiguration.Builder tConfBuilder = TConfiguration.custom();
+ if (maxMessageSize > 0) {
+ tConfBuilder.setMaxMessageSize(maxMessageSize);
+ }
+ TConfiguration tConf = tConfBuilder.build();
+ transport = new THttpClient(tConf, getServerHttpUrl(useSsl), httpClient);
return transport;
}
@@ -629,7 +636,8 @@
}
/** Create underlying SSL or non-SSL transport */
- private TTransport createUnderlyingTransport() throws TTransportException {
+ private TTransport createUnderlyingTransport() throws TTransportException, SQLException {
+ int maxMessageSize = getMaxMessageSize();
TTransport transport = null;
// Note: Thrift returns an SSL socket that is already bound to the specified host:port
// Therefore an open called on this would be a no-op later
@@ -643,19 +651,46 @@
Utils.getPassword(sessConfMap, JdbcConnectionParams.SSL_TRUST_STORE_PASSWORD);
if (sslTrustStore == null || sslTrustStore.isEmpty()) {
- transport = ThriftUtils.getSSLSocket(host, port, connectTimeout, socketTimeout);
+ transport =
+ ThriftUtils.getSSLSocket(host, port, connectTimeout, socketTimeout, maxMessageSize);
} else {
transport =
ThriftUtils.getSSLSocket(
- host, port, connectTimeout, socketTimeout, sslTrustStore, sslTrustStorePassword);
+ host,
+ port,
+ connectTimeout,
+ socketTimeout,
+ sslTrustStore,
+ sslTrustStorePassword,
+ maxMessageSize);
}
} else {
// get non-SSL socket transport
- transport = ThriftUtils.getSocketTransport(host, port, connectTimeout, socketTimeout);
+ transport =
+ ThriftUtils.getSocketTransport(host, port, connectTimeout, socketTimeout, maxMessageSize);
}
return transport;
}
+ private int getMaxMessageSize() throws SQLException {
+ String maxMessageSize = sessConfMap.get(JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE);
+ if (maxMessageSize == null) {
+ return -1;
+ }
+
+ try {
+ return Integer.parseInt(maxMessageSize);
+ } catch (Exception e) {
+ String errFormat =
+ "Invalid {} configuration of '{}'. Expected an integer specifying number of bytes. "
+ + "A configuration of <= 0 uses default max message size.";
+ String errMsg =
+ String.format(
+ errFormat, JdbcConnectionParams.THRIFT_CLIENT_MAX_MESSAGE_SIZE, maxMessageSize);
+ throw new SQLException(errMsg, "42000", e);
+ }
+ }
+
/**
* Create transport per the connection options Supported transport options are: - SASL based
* transports over + Kerberos + SSL + non-SSL - Raw (non-SASL) socket
diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
index 7f0099b..331b871 100644
--- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
+++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/auth/ThriftUtils.java
@@ -24,23 +24,70 @@
import org.apache.kyuubi.shaded.thrift.transport.TSocket;
import org.apache.kyuubi.shaded.thrift.transport.TTransport;
import org.apache.kyuubi.shaded.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class helps in some aspects of authentication. It creates the proper Thrift classes for the
* given configuration as well as helps with authenticating requests.
*/
public class ThriftUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ThriftUtils.class);
+
+ /**
+ * Configure the provided T transport's max message size.
+ *
+ * @param transport Transport to configure maxMessage for
+ * @param maxMessageSize Maximum allowed message size in bytes, less than or equal to 0 means use
+ * the Thrift library default.
+ * @return The passed in T transport configured with desired max message size. The same object
+ * passed in is returned.
+ */
+ public static <T extends TTransport> T configureThriftMaxMessageSize(
+ T transport, int maxMessageSize) {
+ if (maxMessageSize > 0) {
+ if (transport.getConfiguration() == null) {
+ LOG.warn(
+ "TTransport {} is returning a null Configuration, Thrift max message size is not getting configured",
+ transport.getClass().getName());
+ return transport;
+ }
+ transport.getConfiguration().setMaxMessageSize(maxMessageSize);
+ }
+ return transport;
+ }
+
+ /**
+ * Create a TSocket for the provided host and port with specified connectTimeout, loginTimeout and
+ * maxMessageSize.
+ *
+ * @param host Host to connect to.
+ * @param port Port to connect to.
+ * @param connectTimeout Socket connect timeout (0 means no timeout).
+ * @param socketTimeout Socket read/write timeout (0 means no timeout).
+ * @param maxMessageSize Size in bytes for max allowable Thrift message size, less than or equal
+ * to 0 results in using the Thrift library default.
+ * @return TTransport TSocket for host/port
+ */
public static TTransport getSocketTransport(
- String host, int port, int connectTimeout, int socketTimeout) throws TTransportException {
- return new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout, connectTimeout);
+ String host, int port, int connectTimeout, int socketTimeout, int maxMessageSize)
+ throws TTransportException {
+ TConfiguration.Builder tConfBuilder = TConfiguration.custom();
+ if (maxMessageSize > 0) {
+ tConfBuilder.setMaxMessageSize(maxMessageSize);
+ }
+ TConfiguration tConf = tConfBuilder.build();
+ return new TSocket(tConf, host, port, socketTimeout, connectTimeout);
}
public static TTransport getSSLSocket(
- String host, int port, int connectTimeout, int socketTimeout) throws TTransportException {
+ String host, int port, int connectTimeout, int socketTimeout, int maxMessageSize)
+ throws TTransportException {
// The underlying SSLSocket object is bound to host:port with the given SO_TIMEOUT
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout);
tSSLSocket.setConnectTimeout(connectTimeout);
- return getSSLSocketWithHttps(tSSLSocket);
+ return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
}
public static TTransport getSSLSocket(
@@ -49,7 +96,8 @@
int connectTimeout,
int socketTimeout,
String trustStorePath,
- String trustStorePassWord)
+ String trustStorePassWord,
+ int maxMessageSize)
throws TTransportException {
TSSLTransportFactory.TSSLTransportParameters params =
new TSSLTransportFactory.TSSLTransportParameters();
@@ -59,16 +107,18 @@
// SSLContext created with the given params
TSocket tSSLSocket = TSSLTransportFactory.getClientSocket(host, port, socketTimeout, params);
tSSLSocket.setConnectTimeout(connectTimeout);
- return getSSLSocketWithHttps(tSSLSocket);
+ return getSSLSocketWithHttps(tSSLSocket, maxMessageSize);
}
// Using endpoint identification algorithm as HTTPS enables us to do
// CNAMEs/subjectAltName verification
- private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket) throws TTransportException {
+ private static TSocket getSSLSocketWithHttps(TSocket tSSLSocket, int maxMessageSize)
+ throws TTransportException {
SSLSocket sslSocket = (SSLSocket) tSSLSocket.getSocket();
SSLParameters sslParams = sslSocket.getSSLParameters();
sslParams.setEndpointIdentificationAlgorithm("HTTPS");
sslSocket.setSSLParameters(sslParams);
- return new TSocket(sslSocket);
+ TSocket tSocket = new TSocket(sslSocket);
+ return configureThriftMaxMessageSize(tSocket, maxMessageSize);
}
}
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index d34458c..c36e9ec 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -470,8 +470,10 @@
host: String,
port: Int,
socketTimeout: Int,
- connectionTimeout: Int): TProtocol = {
- val tSocket = new TSocket(TConfiguration.DEFAULT, host, port, socketTimeout, connectionTimeout)
+ connectionTimeout: Int,
+ maxMessageSize: Int): TProtocol = {
+ val tConf = TConfiguration.custom().setMaxMessageSize(maxMessageSize).build()
+ val tSocket = new TSocket(tConf, host, port, socketTimeout, connectionTimeout)
val tTransport = PlainSASLHelper.getPlainTransport(user, passwd, tSocket)
tTransport.open()
new TBinaryProtocol(tTransport)
@@ -485,15 +487,23 @@
conf: KyuubiConf): KyuubiSyncThriftClient = {
val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
val loginTimeout = conf.get(ENGINE_LOGIN_TIMEOUT).toInt
+ val maxMessageSize = conf.get(KyuubiConf.FRONTEND_THRIFT_CLIENT_MAX_MESSAGE_SIZE)
val aliveProbeEnabled = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_ENABLED)
val aliveProbeInterval = conf.get(KyuubiConf.ENGINE_ALIVE_PROBE_INTERVAL).toInt
val aliveTimeout = conf.get(KyuubiConf.ENGINE_ALIVE_TIMEOUT)
- val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout)
+ val tProtocol = createTProtocol(user, passwd, host, port, 0, loginTimeout, maxMessageSize)
val aliveProbeProtocol =
if (aliveProbeEnabled) {
- Option(createTProtocol(user, passwd, host, port, aliveProbeInterval, loginTimeout))
+ Some(createTProtocol(
+ user,
+ passwd,
+ host,
+ port,
+ aliveProbeInterval,
+ loginTimeout,
+ maxMessageSize))
} else {
None
}