[FLINK-25936][java-sdk] Set value typename correctly in MutableTypeCell.
Before that commit, MutableTypeCell was assuming incorrectly
that the backend will send a TypedValue with a type_name field set even if the
value is missing (TypedValue.has_value = false). This is not the case, and hence
The type_name needs to be set explicitly.
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java
index 181ec4e..2406d7f 100644
--- a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java
@@ -27,6 +27,7 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.ApiExtension;
import org.apache.flink.statefun.sdk.java.ValueSpec;
import org.apache.flink.statefun.sdk.java.annotations.Internal;
import org.apache.flink.statefun.sdk.java.slice.Slice;
@@ -269,13 +270,13 @@
}
lock.lock();
try {
- final TypedValue newTypedValue =
- this.typedValue
- .toBuilder()
+ ByteString typenameBytes = ApiExtension.typeNameByteString(spec.typeName());
+ this.typedValue =
+ TypedValue.newBuilder()
+ .setTypenameBytes(typenameBytes)
.setHasValue(true)
.setValue(serialize(serializer, value))
.build();
- this.typedValue = newTypedValue;
this.status = CellStatus.MODIFIED;
} finally {
lock.unlock();