[FLINK-19106] [datastream] Expose new timeout configs for DataStream API
This closes #138.
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index 07b0ce1..58c382c 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -71,7 +71,8 @@
}
/**
- * Set a maximum request duration.
+ * Set a maximum request duration. This duration spans the complete call, including connecting to
+ * the function endpoint, writing the request, function processing, and reading the response.
*
* @param duration the duration after which the request is considered failed.
* @return this builder.
@@ -82,6 +83,39 @@
}
/**
+ * Set a timeout for connecting to function endpoints.
+ *
+ * @param duration the duration after which a connect attempt is considered failed.
+ * @return this builder.
+ */
+ public RequestReplyFunctionBuilder withConnectTimeout(Duration duration) {
+ builder.withConnectTimeoutDuration(duration);
+ return this;
+ }
+
+ /**
+ * Set a timeout for individual read IO operations during a function invocation request.
+ *
+ * @param duration the duration after which a read IO operation is considered failed.
+ * @return this builder.
+ */
+ public RequestReplyFunctionBuilder withReadTimeout(Duration duration) {
+ builder.withReadTimeoutDuration(duration);
+ return this;
+ }
+
+ /**
+ * Set a timeout for individual write IO operations during a function invocation request.
+ *
+ * @param duration the duration after which a write IO operation is considered failed.
+ * @return this builder.
+ */
+ public RequestReplyFunctionBuilder withWriteTimeout(Duration duration) {
+ builder.withWriteTimeoutDuration(duration);
+ return this;
+ }
+
+ /**
* Sets the max messages to batch together for a specific address.
*
* @param maxNumBatchRequests the maximum number of requests to batch for an address.