[hotfix] Add duration to YAML config types
diff --git a/docs/content/docs/deployment/module.md b/docs/content/docs/deployment/module.md
index d0854f7..895dd10 100644
--- a/docs/content/docs/deployment/module.md
+++ b/docs/content/docs/deployment/module.md
@@ -63,7 +63,7 @@
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
- transactionTimeoutMillis: 100000
+ transactionTimeout: 15min
```
## Endpoint Definition
@@ -281,7 +281,7 @@
An egress identifier, similar to a function type, uniquely identifies an egress.
The spec defines the details of how to connect to the external system, which is specific to each individual I/O module.
-Each identifier-spec pair is bound to the system inside an stateful function module.
+Each identifier-spec pair is bound to the system inside a stateful function module.
See [IO Modules]{{< ref "docs/io-module/overview" >}} for more information on configuring an egress.
diff --git a/docs/content/docs/io-module/apache-kafka.md b/docs/content/docs/io-module/apache-kafka.md
index e0b4653..f331dc2 100644
--- a/docs/content/docs/io-module/apache-kafka.md
+++ b/docs/content/docs/io-module/apache-kafka.md
@@ -140,7 +140,7 @@
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
- transactionTimeoutMillis: 100000
+ transactionTimeout: 15min
properties:
- foo.config: bar
```
@@ -177,7 +177,7 @@
```yaml
deliverySemantic:
type: exactly-once
- transactionTimeoutMillis: 900000 # 15 min
+ transactionTimeoutMillis: 15min
```
### Writing To Kafka
diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
index c9704b1..de67146 100644
--- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
+++ b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/resources/remote-module/module.yaml
@@ -51,4 +51,4 @@
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
- transactionTimeoutMillis: 900000
+ transactionTimeout: 15min
diff --git a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
index 755c3a9..a5b9d2c 100644
--- a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
+++ b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/Selectors.java
@@ -17,11 +17,13 @@
*/
package org.apache.flink.statefun.flink.common.json;
+import java.time.Duration;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.util.TimeUtils;
public final class Selectors {
@@ -44,6 +46,36 @@
return Optional.of(node.asText());
}
+ public static Duration durationAt(JsonNode node, JsonPointer pointer) {
+ node = dereference(node, pointer);
+ if (!node.isTextual()) {
+ throw new WrongTypeException(pointer, "not a duration");
+ }
+
+ try {
+ return TimeUtils.parseDuration(node.asText());
+ } catch (IllegalArgumentException ignore) {
+ throw new WrongTypeException(pointer, "not a duration");
+ }
+ }
+
+ public static Optional<Duration> optionalDurationAt(JsonNode node, JsonPointer pointer) {
+ node = node.at(pointer);
+ if (node.isMissingNode()) {
+ return Optional.empty();
+ }
+ if (!node.isTextual()) {
+ throw new WrongTypeException(pointer, "not a duration");
+ }
+
+ try {
+ Duration duration = TimeUtils.parseDuration(node.asText());
+ return Optional.of(duration);
+ } catch (IllegalArgumentException ignore) {
+ throw new WrongTypeException(pointer, "not a duration");
+ }
+ }
+
public static int integerAt(JsonNode node, JsonPointer pointer) {
node = dereference(node, pointer);
if (!node.isInt()) {
@@ -60,6 +92,18 @@
return node.asLong();
}
+ public static OptionalLong optionalLongAt(JsonNode node, JsonPointer pointer) {
+ node = node.at(pointer);
+ if (node.isMissingNode()) {
+ return OptionalLong.empty();
+ }
+
+ if (!node.isLong() && !node.isInt()) {
+ throw new WrongTypeException(pointer, "not a long");
+ }
+ return OptionalLong.of(node.asLong());
+ }
+
public static OptionalInt optionalIntegerAt(JsonNode node, JsonPointer pointer) {
node = node.at(pointer);
if (node.isMissingNode()) {
diff --git a/statefun-flink/statefun-flink-common/src/test/java/org/apache/flink/statefun/flink/common/json/SelectorsTest.java b/statefun-flink/statefun-flink-common/src/test/java/org/apache/flink/statefun/flink/common/json/SelectorsTest.java
index e1f7b74..ced6db4 100644
--- a/statefun-flink/statefun-flink-common/src/test/java/org/apache/flink/statefun/flink/common/json/SelectorsTest.java
+++ b/statefun-flink/statefun-flink-common/src/test/java/org/apache/flink/statefun/flink/common/json/SelectorsTest.java
@@ -23,9 +23,11 @@
import static org.hamcrest.Matchers.hasEntry;
import static org.junit.Assert.assertThat;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -79,6 +81,26 @@
}
@Test
+ public void durationAt() {
+ ObjectNode node = new ObjectNode(mapper.getNodeFactory());
+ node.put("foo", "30s");
+
+ Duration value = Selectors.durationAt(node, FOO_FIELD);
+
+ assertThat(value, is(Duration.ofSeconds(30)));
+ }
+
+ @Test
+ public void optionalDurationAt() {
+ ObjectNode node = new ObjectNode(mapper.getNodeFactory());
+ node.put("foo", "30s");
+
+ Optional<Duration> value = Selectors.optionalDurationAt(node, FOO_FIELD);
+
+ assertThat(value, is(Optional.of(Duration.ofSeconds(30))));
+ }
+
+ @Test
public void longAt() {
ObjectNode node = newObject();
node.put("foo", 100_000L);
@@ -89,6 +111,16 @@
}
@Test
+ public void optionalLongAt() {
+ ObjectNode node = newObject();
+ node.put("foo", 100_000L);
+
+ OptionalLong value = Selectors.optionalLongAt(node, FOO_FIELD);
+
+ assertThat(value, is(OptionalLong.of(100_000L)));
+ }
+
+ @Test
public void listAt() {
ObjectNode node = newObject();
node.putArray("foo").add(1).add(2).add(3);
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
index 81f2947..b357fe9 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/KafkaEgressSpecJsonParser.java
@@ -22,6 +22,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Properties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonPointer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
@@ -40,9 +41,14 @@
JsonPointer.compile("/egress/spec/deliverySemantic");
private static final JsonPointer DELIVERY_SEMANTICS_TYPE_POINTER =
JsonPointer.compile("/egress/spec/deliverySemantic/type");
+
+ /** @deprecated see {@link #DELIVERY_EXACTLY_ONCE_DURATION_TXN_TIMEOUT_POINTER}. */
private static final JsonPointer DELIVERY_EXACTLY_ONCE_TXN_TIMEOUT_POINTER =
JsonPointer.compile("/egress/spec/deliverySemantic/transactionTimeoutMillis");
+ private static final JsonPointer DELIVERY_EXACTLY_ONCE_DURATION_TXN_TIMEOUT_POINTER =
+ JsonPointer.compile("/egress/spec/deliverySemantic/transactionTimeout");
+
static String kafkaAddress(JsonNode json) {
return Selectors.textAt(json, ADDRESS_POINTER);
}
@@ -77,7 +83,14 @@
}
static Duration exactlyOnceDeliveryTxnTimeout(JsonNode json) {
- long transactionTimeout = Selectors.longAt(json, DELIVERY_EXACTLY_ONCE_TXN_TIMEOUT_POINTER);
- return Duration.ofMillis(transactionTimeout);
+ // Prefer deprecated millis based timeout for backwards compatibility
+ // then fallback to duration based configuration.
+ OptionalLong transactionTimeoutMilli =
+ Selectors.optionalLongAt(json, DELIVERY_EXACTLY_ONCE_TXN_TIMEOUT_POINTER);
+ if (transactionTimeoutMilli.isPresent()) {
+ return Duration.ofMillis(transactionTimeoutMilli.getAsLong());
+ }
+
+ return Selectors.durationAt(json, DELIVERY_EXACTLY_ONCE_DURATION_TXN_TIMEOUT_POINTER);
}
}
diff --git a/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kafka-egress.yaml b/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kafka-egress.yaml
index ddc9d1e..2014cb3 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kafka-egress.yaml
+++ b/statefun-flink/statefun-flink-io-bundle/src/test/resources/generic-kafka-egress.yaml
@@ -21,6 +21,6 @@
address: kafka-broker:9092
deliverySemantic:
type: exactly-once
- transactionTimeoutMillis: 100000
+ transactionTimeout: 15min
properties:
- foo.config: bar