[hotfix] Refactor StateBinder.bind dispatch logic to runtime
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkStateBinder.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkStateBinder.java
index 6a0421d..5cd505f 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkStateBinder.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkStateBinder.java
@@ -44,20 +44,30 @@
}
@Override
- public void bindValue(PersistedValue<?> persistedValue) {
+ public void bind(Object stateObject) {
+ if (stateObject instanceof PersistedValue) {
+ bindValue((PersistedValue<?>) stateObject);
+ } else if (stateObject instanceof PersistedTable) {
+ bindTable((PersistedTable<?, ?>) stateObject);
+ } else if (stateObject instanceof PersistedAppendingBuffer) {
+ bindAppendingBuffer((PersistedAppendingBuffer<?>) stateObject);
+ } else {
+ throw new IllegalArgumentException("Unknown persisted state object " + stateObject);
+ }
+ }
+
+ private void bindValue(PersistedValue<?> persistedValue) {
Accessor<?> accessor = state.createFlinkStateAccessor(functionType, persistedValue);
setAccessorRaw(persistedValue, accessor);
}
- @Override
- public void bindTable(PersistedTable<?, ?> persistedTable) {
+ private void bindTable(PersistedTable<?, ?> persistedTable) {
TableAccessor<?, ?> accessor =
state.createFlinkStateTableAccessor(functionType, persistedTable);
setAccessorRaw(persistedTable, accessor);
}
- @Override
- public void bindAppendingBuffer(PersistedAppendingBuffer<?> persistedAppendingBuffer) {
+ private void bindAppendingBuffer(PersistedAppendingBuffer<?> persistedAppendingBuffer) {
AppendingBufferAccessor<?> accessor =
state.createFlinkStateAppendingBufferAccessor(functionType, persistedAppendingBuffer);
setAccessorRaw(persistedAppendingBuffer, accessor);
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/annotations/ForRuntime.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/annotations/ForRuntime.java
index 121c791..2c78f66 100644
--- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/annotations/ForRuntime.java
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/annotations/ForRuntime.java
@@ -26,5 +26,5 @@
* API with specialized implementation
*/
@Documented
-@Target({ElementType.CONSTRUCTOR, ElementType.METHOD})
+@Target({ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.TYPE})
public @interface ForRuntime {}
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java
index bce8364..321a661 100644
--- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java
@@ -123,12 +123,6 @@
private static final class NonFaultTolerantStateBinder extends StateBinder {
@Override
- public void bindValue(PersistedValue<?> persistedValue) {}
-
- @Override
- public void bindTable(PersistedTable<?, ?> persistedTable) {}
-
- @Override
- public void bindAppendingBuffer(PersistedAppendingBuffer<?> persistedAppendingBuffer) {}
+ public void bind(Object stateObject) {}
}
}
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java
index 01560cc..1ce33f2 100644
--- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java
@@ -18,22 +18,9 @@
package org.apache.flink.statefun.sdk.state;
+import org.apache.flink.statefun.sdk.annotations.ForRuntime;
+
+@ForRuntime
public abstract class StateBinder {
- public abstract void bindValue(PersistedValue<?> persistedValue);
-
- public abstract void bindTable(PersistedTable<?, ?> persistedTable);
-
- public abstract void bindAppendingBuffer(PersistedAppendingBuffer<?> persistedAppendingBuffer);
-
- public final void bind(Object stateObject) {
- if (stateObject instanceof PersistedValue) {
- bindValue((PersistedValue<?>) stateObject);
- } else if (stateObject instanceof PersistedTable) {
- bindTable((PersistedTable<?, ?>) stateObject);
- } else if (stateObject instanceof PersistedAppendingBuffer) {
- bindAppendingBuffer((PersistedAppendingBuffer<?>) stateObject);
- } else {
- throw new IllegalArgumentException("Unknown persisted state object " + stateObject);
- }
- }
+ public abstract void bind(Object stateObject);
}