[FLINK-19106] [remote] Add new keys for timeouts to module.yaml
diff --git a/docs/sdk/index.md b/docs/sdk/index.md
index 42beaa3..5883dc9 100644
--- a/docs/sdk/index.md
+++ b/docs/sdk/index.md
@@ -71,7 +71,17 @@
     * Default: 1000
 * ``function.spec.timeout``
     * The maximum amount of time for the runtime to wait for the remote function to return before failing.
+      This spans the complete call, including connecting to the function endpoint, writing the request, function processing, and reading the response.
     * Default: 1 min
+* ``function.spec.connectTimeout``
+    * The maximum amount of time for the runtime to wait for connecting to the remote function endpoint.
+    * Default: 10 sec
+* ``function.spec.readTimeout``
+    * The maximum amount of time for the runtime to wait for individual read IO operations, such as reading the invocation response.
+    * Default: 10 sec
+* ``function.spec.writeTimeout``
+    * The maximum amount of time for the runtime to wait for individual write IO operations, such as writing the invocation request.
+    * Default: 10 sec
 
 ### Full Example
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
index 94e4fad..2b9c866 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionJsonEntity.java
@@ -67,7 +67,15 @@
     private static final JsonPointer ENDPOINT = JsonPointer.compile("/function/spec/endpoint");
     private static final JsonPointer PORT = JsonPointer.compile("/function/spec/port");
     private static final JsonPointer STATES = JsonPointer.compile("/function/spec/states");
+
     private static final JsonPointer TIMEOUT = JsonPointer.compile("/function/spec/timeout");
+    private static final JsonPointer CONNECT_TIMEOUT =
+        JsonPointer.compile("/function/spec/connectTimeout");
+    private static final JsonPointer READ_TIMEOUT =
+        JsonPointer.compile("/function/spec/readTimeout");
+    private static final JsonPointer WRITE_TIMEOUT =
+        JsonPointer.compile("/function/spec/writeTimeout");
+
     private static final JsonPointer MAX_NUM_BATCH_REQUESTS =
         JsonPointer.compile("/function/spec/maxNumBatchRequests");
   }
@@ -120,7 +128,14 @@
           specBuilder.withState(state);
         }
         optionalMaxNumBatchRequests(functionNode).ifPresent(specBuilder::withMaxNumBatchRequests);
-        optionalMaxRequestDuration(functionNode).ifPresent(specBuilder::withMaxRequestDuration);
+        optionalTimeoutDuration(functionNode, SpecPointers.TIMEOUT)
+            .ifPresent(specBuilder::withMaxRequestDuration);
+        optionalTimeoutDuration(functionNode, SpecPointers.CONNECT_TIMEOUT)
+            .ifPresent(specBuilder::withConnectTimeoutDuration);
+        optionalTimeoutDuration(functionNode, SpecPointers.READ_TIMEOUT)
+            .ifPresent(specBuilder::withReadTimeoutDuration);
+        optionalTimeoutDuration(functionNode, SpecPointers.WRITE_TIMEOUT)
+            .ifPresent(specBuilder::withWriteTimeoutDuration);
 
         return specBuilder.build();
       case GRPC:
@@ -165,9 +180,9 @@
     return Selectors.optionalIntegerAt(functionNode, SpecPointers.MAX_NUM_BATCH_REQUESTS);
   }
 
-  private static Optional<Duration> optionalMaxRequestDuration(JsonNode functionNode) {
-    return Selectors.optionalTextAt(functionNode, SpecPointers.TIMEOUT)
-        .map(TimeUtils::parseDuration);
+  private static Optional<Duration> optionalTimeoutDuration(
+      JsonNode functionNode, JsonPointer timeoutPointer) {
+    return Selectors.optionalTextAt(functionNode, timeoutPointer).map(TimeUtils::parseDuration);
   }
 
   private static Expiration stateTtlExpiration(JsonNode stateSpecNode) {
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
index ee903e1..f93b30e 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/module-v2_0/module.yaml
@@ -37,6 +37,10 @@
               - name: seen_count
                 expireAfter: 60000millisecond
                 expireMode: after-invoke
+            timeout: 1minutes
+            connectTimeout: 10seconds
+            readTimeout: 10second
+            writeTimeout: 10seconds
             maxNumBatchRequests: 10000
       - function:
           meta: