[Improve][StarRocksSink] add http socket timeout. (#5918)
diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md
index 38893a4..03c8a93 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -37,6 +37,7 @@
| enable_upsert_delete | boolean | no | false | Whether to enable upsert/delete, only supports PrimaryKey model. |
| save_mode_create_template | string | no | see below | see below |
| starrocks.config | map | no | - | The parameter of the stream load `data_desc` |
+| http_socket_timeout_ms | int | no | 180000 | Set http socket timeout, default is 3 minutes. |
### save_mode_create_template
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
index d9a9a68..80fd226 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/HttpHelper.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
@@ -45,6 +46,14 @@
public class HttpHelper {
private static final int DEFAULT_CONNECT_TIMEOUT = 1000000;
+ private SinkConfig sinkConfig;
+
+ public HttpHelper() {}
+
+ public HttpHelper(SinkConfig sinkConfig) {
+ this.sinkConfig = sinkConfig;
+ }
+
public HttpEntity getHttpEntity(CloseableHttpResponse resp) {
int code = resp.getStatusLine().getStatusCode();
if (HttpStatus.SC_OK != code) {
@@ -133,7 +142,11 @@
}
}
httpPut.setEntity(new ByteArrayEntity(data));
- httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
+ httpPut.setConfig(
+ RequestConfig.custom()
+ .setSocketTimeout(sinkConfig.getHttpSocketTimeout())
+ .setRedirectsEnabled(true)
+ .build());
try (CloseableHttpResponse resp = httpclient.execute(httpPut)) {
int code = resp.getStatusLine().getStatusCode();
if (HttpStatus.SC_OK != code) {
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
index 41cbbf6..d004213 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java
@@ -42,7 +42,7 @@
private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
- private final HttpHelper httpHelper = new HttpHelper();
+ private final HttpHelper httpHelper;
private static final int MAX_SLEEP_TIME = 5;
private final SinkConfig sinkConfig;
@@ -61,6 +61,7 @@
public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, List<String> fieldNames) {
this.sinkConfig = sinkConfig;
this.fieldNames = fieldNames;
+ this.httpHelper = new HttpHelper(sinkConfig);
}
public Boolean doStreamLoad(StarRocksFlushTuple flushData) throws IOException {
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index c1709b6..6862a87 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -60,6 +60,8 @@
private DataSaveMode dataSaveMode;
+ private int httpSocketTimeout;
+
@Getter private final Map<String, Object> streamLoadProps = new HashMap<>();
public static SinkConfig of(ReadonlyConfig config) {
@@ -90,6 +92,7 @@
.ifPresent(sinkConfig::setColumnSeparator);
sinkConfig.setLoadFormat(config.get(StarRocksSinkOptions.LOAD_FORMAT));
sinkConfig.setDataSaveMode(config.get(StarRocksSinkOptions.SAVE_MODE));
+ sinkConfig.setHttpSocketTimeout(config.get(StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS));
return sinkConfig;
}
}
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
index d03e568..6500d14 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java
@@ -138,4 +138,10 @@
.defaultValue(DataSaveMode.APPEND_DATA)
.withDescription(
"Table structure and data processing methods that already exist on the target end");
+
+ Option<Integer> HTTP_SOCKET_TIMEOUT_MS =
+ Options.key("http_socket_timeout_ms")
+ .intType()
+ .defaultValue(3 * 60 * 1000)
+ .withDescription("Set http socket timeout, default is 3 minutes.");
}
diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 7c35924..9c0a8b4 100644
--- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -55,7 +55,8 @@
StarRocksSinkOptions.STARROCKS_CONFIG,
StarRocksSinkOptions.ENABLE_UPSERT_DELETE,
StarRocksSinkOptions.SAVE_MODE,
- StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE)
+ StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE,
+ StarRocksSinkOptions.HTTP_SOCKET_TIMEOUT_MS)
.build();
}