[hotfix] Add duration to YAML config types
diff --git a/docs/content/docs/deployment/module.md b/docs/content/docs/deployment/module.md
index d0854f7..895dd10 100644
--- a/docs/content/docs/deployment/module.md
+++ b/docs/content/docs/deployment/module.md
@@ -63,7 +63,7 @@
           address: kafka-broker:9092
           deliverySemantic:
             type: exactly-once
-            transactionTimeoutMillis: 100000
+            transactionTimeout: 15min
 ```
 
 ## Endpoint Definition
@@ -281,7 +281,7 @@
 An egress identifier, similar to a function type, uniquely identifies an egress.
 
 The spec defines the details of how to connect to the external system, which is specific to each individual I/O module.
-Each identifier-spec pair is bound to the system inside an stateful function module.
+Each identifier-spec pair is bound to the system inside a stateful function module.
 
 See [IO Modules]{{< ref "docs/io-module/overview" >}} for more information on configuring an egress. 
 
diff --git a/docs/content/docs/io-module/apache-kafka.md b/docs/content/docs/io-module/apache-kafka.md
index e0b4653..f331dc2 100644
--- a/docs/content/docs/io-module/apache-kafka.md
+++ b/docs/content/docs/io-module/apache-kafka.md
@@ -140,7 +140,7 @@
          address: kafka-broker:9092
          deliverySemantic:
            type: exactly-once
-           transactionTimeoutMillis: 100000
+           transactionTimeout: 15min
          properties:
            - foo.config: bar
 ```
@@ -177,7 +177,7 @@
 ```yaml
 deliverySemantic:
   type: exactly-once
-  transactionTimeoutMillis: 900000 # 15 min
+  transactionTimeoutMillis: 15min
 ```
 
 ### Writing To Kafka
diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
index c9704b1..de67146 100644
--- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
+++ b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
@@ -51,4 +51,4 @@
             address: kafka-broker:9092
             deliverySemantic:
               type: exactly-once
-              transactionTimeoutMillis: 900000
+              transactionTimeout: 15min
diff --git a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
index 755c3a9..a5b9d2c 100644
--- a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
+++ b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
@@ -17,11 +17,13 @@
  */
 package org.apache.flink.statefun.flink.common.json;
 
+import java.time.Duration;
 import java.util.*;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.util.TimeUtils;
 
 public final class Selectors {
 
@@ -44,6 +46,36 @@
     return Optional.of(node.asText());
   }
 
+  public static Duration durationAt(JsonNode node, JsonPointer pointer) {
+    node = dereference(node, pointer);
+    if (!node.isTextual()) {
+      throw new WrongTypeException(pointer, "not a duration");
+    }
+
+    try {
+      return TimeUtils.parseDuration(node.asText());
+    } catch (IllegalArgumentException ignore) {
+      throw new WrongTypeException(pointer, "not a duration");
+    }
+  }
+
+  public static Optional<Duration> optionalDurationAt(JsonNode node, JsonPointer pointer) {
+    node = node.at(pointer);
+    if (node.isMissingNode()) {
+      return Optional.empty();
+    }
+    if (!node.isTextual()) {
+      throw new WrongTypeException(pointer, "not a duration");
+    }
+
+    try {
+      Duration duration = TimeUtils.parseDuration(node.asText());
+      return Optional.of(duration);
+    } catch (IllegalArgumentException ignore) {
+      throw new WrongTypeException(pointer, "not a duration");
+    }
+  }
+
   public static int integerAt(JsonNode node, JsonPointer pointer) {
     node = dereference(node, pointer);
     if (!node.isInt()) {
@@ -60,6 +92,18 @@
     return node.asLong();
   }
 
+  public static OptionalLong optionalLongAt(JsonNode node, JsonPointer pointer) {
+    node = node.at(pointer);
+    if (node.isMissingNode()) {
+      return OptionalLong.empty();
+    }
+
+    if (!node.isLong() && !node.isInt()) {
+      throw new WrongTypeException(pointer, "not a long");
+    }
+    return OptionalLong.of(node.asLong());
+  }
+
   public static OptionalInt optionalIntegerAt(JsonNode node, JsonPointer pointer) {
     node = node.at(pointer);
     if (node.isMissingNode()) {
diff --git a/statefun-flink/statefun-flink-common/src/test/java/org/apache/flink/statefun/flink/common/json/SelectorsTest.java b/statefun-flink/statefun-flink-common/src/test/java/org/apache/flink/statefun/flink/common/json/SelectorsTest.java
index e1f7b74..ced6db4 100644
--- a/statefun-flink/statefun-flink-common/src/test/java/org/apache/flink/statefun/flink/common/json/SelectorsTest.java
+++ b/statefun-flink/statefun-flink-common/src/test/java/org/apache/flink/statefun/flink/common/json/SelectorsTest.java
@@ -23,9 +23,11 @@
 import static org.hamcrest.Matchers.hasEntry;
 import static org.junit.Assert.assertThat;
 
+import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -79,6 +81,26 @@
   }
 
   @Test
+  public void durationAt() {
+    ObjectNode node = new ObjectNode(mapper.getNodeFactory());
+    node.put("foo", "30s");
+
+    Duration value = Selectors.durationAt(node, FOO_FIELD);
+
+    assertThat(value, is(Duration.ofSeconds(30)));
+  }
+
+  @Test
+  public void optionalDurationAt() {
+    ObjectNode node = new ObjectNode(mapper.getNodeFactory());
+    node.put("foo", "30s");
+
+    Optional<Duration> value = Selectors.optionalDurationAt(node, FOO_FIELD);
+
+    assertThat(value, is(Optional.of(Duration.ofSeconds(30))));
+  }
+
+  @Test
   public void longAt() {
     ObjectNode node = newObject();
     node.put("foo", 100_000L);
@@ -89,6 +111,16 @@
   }
 
   @Test
+  public void optionalLongAt() {
+    ObjectNode node = newObject();
+    node.put("foo", 100_000L);
+
+    OptionalLong value = Selectors.optionalLongAt(node, FOO_FIELD);
+
+    assertThat(value, is(OptionalLong.of(100_000L)));
+  }
+
+  @Test
   public void listAt() {
     ObjectNode node = newObject();
     node.putArray("foo").add(1).add(2).add(3);
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
index 81f2947..b357fe9 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
@@ -22,6 +22,7 @@
 import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Properties;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -40,9 +41,14 @@
       JsonPointer.compile("/egress/spec/deliverySemantic");
   private static final JsonPointer DELIVERY_SEMANTICS_TYPE_POINTER =
       JsonPointer.compile("/egress/spec/deliverySemantic/type");
+
+  /** @deprecated see {@link #DELIVERY_EXACTLY_ONCE_DURATION_TXN_TIMEOUT_POINTER}. */
   private static final JsonPointer DELIVERY_EXACTLY_ONCE_TXN_TIMEOUT_POINTER =
       JsonPointer.compile("/egress/spec/deliverySemantic/transactionTimeoutMillis");
 
+  private static final JsonPointer DELIVERY_EXACTLY_ONCE_DURATION_TXN_TIMEOUT_POINTER =
+      JsonPointer.compile("/egress/spec/deliverySemantic/transactionTimeout");
+
   static String kafkaAddress(JsonNode json) {
     return Selectors.textAt(json, ADDRESS_POINTER);
   }
@@ -77,7 +83,14 @@
   }
 
   static Duration exactlyOnceDeliveryTxnTimeout(JsonNode json) {
-    long transactionTimeout = Selectors.longAt(json, DELIVERY_EXACTLY_ONCE_TXN_TIMEOUT_POINTER);
-    return Duration.ofMillis(transactionTimeout);
+    // Prefer deprecated millis based timeout for backwards compatibility
+    // then fallback to duration based configuration.
+    OptionalLong transactionTimeoutMilli =
+        Selectors.optionalLongAt(json, DELIVERY_EXACTLY_ONCE_TXN_TIMEOUT_POINTER);
+    if (transactionTimeoutMilli.isPresent()) {
+      return Duration.ofMillis(transactionTimeoutMilli.getAsLong());
+    }
+
+    return Selectors.durationAt(json, DELIVERY_EXACTLY_ONCE_DURATION_TXN_TIMEOUT_POINTER);
   }
 }
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kafka-egress.yaml b/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kafka-egress.yaml
index ddc9d1e..2014cb3 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kafka-egress.yaml
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kafka-egress.yaml
@@ -21,6 +21,6 @@
     address: kafka-broker:9092
     deliverySemantic:
       type: exactly-once
-      transactionTimeoutMillis: 100000
+      transactionTimeout: 15min
     properties:
       - foo.config: bar