[hotfix] Refactor StateBinder.bind dispatch logic to runtime
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkStateBinder.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkStateBinder.java
index 6a0421d..5cd505f 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkStateBinder.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/state/FlinkStateBinder.java
@@ -44,20 +44,30 @@
   }
 
   @Override
-  public void bindValue(PersistedValue<?> persistedValue) {
+  public void bind(Object stateObject) {
+    if (stateObject instanceof PersistedValue) {
+      bindValue((PersistedValue<?>) stateObject);
+    } else if (stateObject instanceof PersistedTable) {
+      bindTable((PersistedTable<?, ?>) stateObject);
+    } else if (stateObject instanceof PersistedAppendingBuffer) {
+      bindAppendingBuffer((PersistedAppendingBuffer<?>) stateObject);
+    } else {
+      throw new IllegalArgumentException("Unknown persisted state object " + stateObject);
+    }
+  }
+
+  private void bindValue(PersistedValue<?> persistedValue) {
     Accessor<?> accessor = state.createFlinkStateAccessor(functionType, persistedValue);
     setAccessorRaw(persistedValue, accessor);
   }
 
-  @Override
-  public void bindTable(PersistedTable<?, ?> persistedTable) {
+  private void bindTable(PersistedTable<?, ?> persistedTable) {
     TableAccessor<?, ?> accessor =
         state.createFlinkStateTableAccessor(functionType, persistedTable);
     setAccessorRaw(persistedTable, accessor);
   }
 
-  @Override
-  public void bindAppendingBuffer(PersistedAppendingBuffer<?> persistedAppendingBuffer) {
+  private void bindAppendingBuffer(PersistedAppendingBuffer<?> persistedAppendingBuffer) {
     AppendingBufferAccessor<?> accessor =
         state.createFlinkStateAppendingBufferAccessor(functionType, persistedAppendingBuffer);
     setAccessorRaw(persistedAppendingBuffer, accessor);
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/annotations/ForRuntime.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/annotations/ForRuntime.java
index 121c791..2c78f66 100644
--- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/annotations/ForRuntime.java
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/annotations/ForRuntime.java
@@ -26,5 +26,5 @@
  * API with specialized implementation
  */
 @Documented
-@Target({ElementType.CONSTRUCTOR, ElementType.METHOD})
+@Target({ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.TYPE})
 public @interface ForRuntime {}
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java
index bce8364..321a661 100644
--- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java
@@ -123,12 +123,6 @@
 
   private static final class NonFaultTolerantStateBinder extends StateBinder {
     @Override
-    public void bindValue(PersistedValue<?> persistedValue) {}
-
-    @Override
-    public void bindTable(PersistedTable<?, ?> persistedTable) {}
-
-    @Override
-    public void bindAppendingBuffer(PersistedAppendingBuffer<?> persistedAppendingBuffer) {}
+    public void bind(Object stateObject) {}
   }
 }
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java
index 01560cc..1ce33f2 100644
--- a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java
@@ -18,22 +18,9 @@
 
 package org.apache.flink.statefun.sdk.state;
 
+import org.apache.flink.statefun.sdk.annotations.ForRuntime;
+
+@ForRuntime
 public abstract class StateBinder {
-  public abstract void bindValue(PersistedValue<?> persistedValue);
-
-  public abstract void bindTable(PersistedTable<?, ?> persistedTable);
-
-  public abstract void bindAppendingBuffer(PersistedAppendingBuffer<?> persistedAppendingBuffer);
-
-  public final void bind(Object stateObject) {
-    if (stateObject instanceof PersistedValue) {
-      bindValue((PersistedValue<?>) stateObject);
-    } else if (stateObject instanceof PersistedTable) {
-      bindTable((PersistedTable<?, ?>) stateObject);
-    } else if (stateObject instanceof PersistedAppendingBuffer) {
-      bindAppendingBuffer((PersistedAppendingBuffer<?>) stateObject);
-    } else {
-      throw new IllegalArgumentException("Unknown persisted state object " + stateObject);
-    }
-  }
+  public abstract void bind(Object stateObject);
 }