[FLINK-19106] [datastream] Expose new timeout configs for DataStream API

This closes #138.
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index 07b0ce1..58c382c 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -71,7 +71,8 @@
   }
 
   /**
-   * Set a maximum request duration.
+   * Set a maximum request duration. This duration spans the complete call, including connecting to
+   * the function endpoint, writing the request, function processing, and reading the response.
    *
    * @param duration the duration after which the request is considered failed.
    * @return this builder.
@@ -82,6 +83,39 @@
   }
 
   /**
+   * Set a timeout for connecting to function endpoints.
+   *
+   * @param duration the duration after which a connect attempt is considered failed.
+   * @return this builder.
+   */
+  public RequestReplyFunctionBuilder withConnectTimeout(Duration duration) {
+    builder.withConnectTimeoutDuration(duration);
+    return this;
+  }
+
+  /**
+   * Set a timeout for individual read IO operations during a function invocation request.
+   *
+   * @param duration the duration after which a read IO operation is considered failed.
+   * @return this builder.
+   */
+  public RequestReplyFunctionBuilder withReadTimeout(Duration duration) {
+    builder.withReadTimeoutDuration(duration);
+    return this;
+  }
+
+  /**
+   * Set a timeout for individual write IO operations during a function invocation request.
+   *
+   * @param duration the duration after which a write IO operation is considered failed.
+   * @return this builder.
+   */
+  public RequestReplyFunctionBuilder withWriteTimeout(Duration duration) {
+    builder.withWriteTimeoutDuration(duration);
+    return this;
+  }
+
+  /**
    * Sets the max messages to batch together for a specific address.
    *
    * @param maxNumBatchRequests the maximum number of requests to batch for an address.