[FLINK-18316] [sdk] Add SDK classes for PersistedStateRegistry
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java
new file mode 100644
index 0000000..391bffe
--- /dev/null
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistry.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.state;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import javax.annotation.Nullable;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.ForRuntime;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+
+/**
+ * A {@link PersistedStateRegistry} can be used to register persisted state, such as a {@link
+ * PersistedValue} or {@link PersistedTable}, etc. All state that is registered via this registry is
+ * persisted and maintained by the system for fault-tolerance.
+ *
+ * <p>Created state registries must be bound to the system by using the {@link Persisted}
+ * annotation. Please see the class-level Javadoc of {@link StatefulFunction} for an example on how
+ * to do that.
+ *
+ * @see StatefulFunction
+ */
+public final class PersistedStateRegistry {
+
+  private final Map<String, Object> registeredStates = new HashMap<>();
+
+  private StateBinder stateBinder;
+
+  /**
+   * The type of the function that this registry is bound to. This is {@code NULL} if this registry
+   * is not bounded.
+   */
+  @Nullable private FunctionType functionType;
+
+  public PersistedStateRegistry() {
+    this.stateBinder = new NonFaultTolerantStateBinder();
+  }
+
+  /**
+   * Registers a {@link PersistedValue}, given a state name and the type of the values. If a
+   * registered value already exists for the given name, the previous persisted value is returned.
+   *
+   * @param name the state name to register with.
+   * @param type the type of the value.
+   * @param <T> the type of the value.
+   * @return the registered value, or the previous registered value if a registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the given state name, but
+   *     it wasn't registered as a {@link PersistedValue}.
+   */
+  public <T> PersistedValue<T> registerValue(String name, Class<T> type) {
+    return registerValue(name, type, Expiration.none());
+  }
+
+  /**
+   * Registers a {@link PersistedValue}, given a state name and the type of the values. If a
+   * registered value already exists for the given name, the previous persisted value is returned.
+   *
+   * @param name the state name to register with.
+   * @param type the type of the value.
+   * @param expiration expiration configuration for the registered state.
+   * @param <T> the type of the value.
+   * @return the registered value, or the previous registered value if a registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the given state name, but
+   *     it wasn't registered as a {@link PersistedValue}.
+   */
+  public <T> PersistedValue<T> registerValue(String name, Class<T> type, Expiration expiration) {
+    return getStateOrCreateIfAbsent(
+        PersistedValue.class, name, stateName -> createValue(stateName, type, expiration));
+  }
+
+  /**
+   * Registers a {@link PersistedTable}, given a state name and the type of the keys and values of
+   * the table. If a registered value already exists for the given name, the previous persisted
+   * table is returned.
+   *
+   * @param name the state name to register with.
+   * @param keyType the type of the keys.
+   * @param valueType the type of the values.
+   * @param <K> the type of the keys.
+   * @param <V> the type of the values.
+   * @return the registered table, or the previous registered table if a registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the given state name, but
+   *     it wasn't registered as a {@link PersistedTable}.
+   */
+  public <K, V> PersistedTable<K, V> registerTable(
+      String name, Class<K> keyType, Class<V> valueType) {
+    return registerTable(name, keyType, valueType, Expiration.none());
+  }
+
+  /**
+   * Registers a {@link PersistedTable}, given a state name and the type of the keys and values of
+   * the table. If a registered value already exists for the given name, the previous persisted
+   * table is returned.
+   *
+   * @param name the state name to register with.
+   * @param keyType the type of the keys.
+   * @param valueType the type of the values.
+   * @param expiration expiration configuration for the registered state.
+   * @param <K> the type of the keys.
+   * @param <V> the type of the values.
+   * @return the registered table, or the previous registered table if a registration for the state
+   *     name already exists.
+   * @throws IllegalStateException if a previous registration exists for the given state name, but
+   *     it wasn't registered as a {@link PersistedTable}.
+   */
+  public <K, V> PersistedTable<K, V> registerTable(
+      String name, Class<K> keyType, Class<V> valueType, Expiration expiration) {
+    return getStateOrCreateIfAbsent(
+        PersistedTable.class,
+        name,
+        stateName -> createTable(stateName, keyType, valueType, expiration));
+  }
+
+  /**
+   * Registers a {@link PersistedAppendingBuffer}, given a state name and the type of the buffer
+   * elements. If a registered buffer already exists for the given name, the previous persisted
+   * buffer is returned.
+   *
+   * @param name the state name to register with.
+   * @param elementType the type of the buffer elements.
+   * @param <E> the type of the buffer elements.
+   * @return the registered buffer, or the previous registered buffer if a registration for the
+   *     state name already exists.
+   * @throws IllegalStateException if a previous registration exists for the given state name, but
+   *     it wasn't registered as a {@link PersistedAppendingBuffer}.
+   */
+  public <E> PersistedAppendingBuffer<E> registerAppendingBuffer(
+      String name, Class<E> elementType) {
+    return registerAppendingBuffer(name, elementType, Expiration.none());
+  }
+
+  /**
+   * Registers a {@link PersistedAppendingBuffer}, given a state name and the type of the buffer
+   * elements. If a registered buffer already exists for the given name, the previous persisted
+   * buffer is returned.
+   *
+   * @param name the state name to register with.
+   * @param elementType the type of the buffer elements.
+   * @param expiration expiration configuration for the registered state.
+   * @param <E> the type of the buffer elements.
+   * @return the registered buffer, or the previous registered buffer if a registration for the
+   *     state name already exists.
+   * @throws IllegalStateException if a previous registration exists for the given state name, but
+   *     it wasn't registered as a {@link PersistedAppendingBuffer}.
+   */
+  public <E> PersistedAppendingBuffer<E> registerAppendingBuffer(
+      String name, Class<E> elementType, Expiration expiration) {
+    return getStateOrCreateIfAbsent(
+        PersistedAppendingBuffer.class,
+        name,
+        stateName -> createAppendingBuffer(stateName, elementType, expiration));
+  }
+
+  /**
+   * Binds this state registry to a given function. All existing registered state in this registry
+   * will also be bound to the system.
+   *
+   * @param stateBinder the new fault-tolerant state binder to use.
+   * @param functionType the type of the function that this registry is being bound to.
+   * @throws IllegalStateException if the registry was attempted to be bound more than once.
+   */
+  @ForRuntime
+  void bind(StateBinder stateBinder, FunctionType functionType) {
+    if (this.functionType != null) {
+      throw new IllegalStateException(
+          "This registry was already bound to function type: "
+              + this.functionType
+              + ", attempting to rebind to function type: "
+              + functionType);
+    }
+
+    this.stateBinder = Objects.requireNonNull(stateBinder);
+    this.functionType = Objects.requireNonNull(functionType);
+
+    registeredStates.values().forEach(state -> stateBinder.bind(state, functionType));
+  }
+
+  private <T> PersistedValue<T> createValue(String name, Class<T> type, Expiration expiration) {
+    final PersistedValue<T> value = PersistedValue.of(name, type, expiration);
+    stateBinder.bindValue(value, functionType);
+    return value;
+  }
+
+  private <K, V> PersistedTable<K, V> createTable(
+      String name, Class<K> keyType, Class<V> valueType, Expiration expiration) {
+    final PersistedTable<K, V> table = PersistedTable.of(name, keyType, valueType, expiration);
+    stateBinder.bindTable(table, functionType);
+    return table;
+  }
+
+  private <E> PersistedAppendingBuffer<E> createAppendingBuffer(
+      String name, Class<E> elementType, Expiration expiration) {
+    final PersistedAppendingBuffer<E> buffer =
+        PersistedAppendingBuffer.of(name, elementType, expiration);
+    stateBinder.bindAppendingBuffer(buffer, functionType);
+    return buffer;
+  }
+
+  @SuppressWarnings("unchecked")
+  private <ST> ST getStateOrCreateIfAbsent(
+      Class<?> statePrimitiveType, String name, Function<String, ST> createFunction) {
+    final ST state = (ST) registeredStates.computeIfAbsent(name, createFunction::apply);
+    if (state.getClass() != statePrimitiveType) {
+      throw new IllegalStateException(
+          "Unexpected state primitive type. The state was registered with type: "
+              + state.getClass()
+              + ", but was attempting to access it again as type: "
+              + statePrimitiveType);
+    }
+    return state;
+  }
+
+  private static final class NonFaultTolerantStateBinder extends StateBinder {
+    @Override
+    public void bindValue(PersistedValue<?> persistedValue, FunctionType functionType) {}
+
+    @Override
+    public void bindTable(PersistedTable<?, ?> persistedTable, FunctionType functionType) {}
+
+    @Override
+    public void bindAppendingBuffer(
+        PersistedAppendingBuffer<?> persistedAppendingBuffer, FunctionType functionType) {}
+  }
+}
diff --git a/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java
new file mode 100644
index 0000000..1cfcb3e
--- /dev/null
+++ b/statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state/StateBinder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.state;
+
+import org.apache.flink.statefun.sdk.FunctionType;
+
+public abstract class StateBinder {
+  public abstract void bindValue(PersistedValue<?> persistedValue, FunctionType functionType);
+
+  public abstract void bindTable(PersistedTable<?, ?> persistedTable, FunctionType functionType);
+
+  public abstract void bindAppendingBuffer(
+      PersistedAppendingBuffer<?> persistedAppendingBuffer, FunctionType functionType);
+
+  public final void bind(Object stateObject, FunctionType functionType) {
+    if (stateObject instanceof PersistedValue) {
+      bindValue((PersistedValue<?>) stateObject, functionType);
+    } else if (stateObject instanceof PersistedTable) {
+      bindTable((PersistedTable<?, ?>) stateObject, functionType);
+    } else if (stateObject instanceof PersistedAppendingBuffer) {
+      bindAppendingBuffer((PersistedAppendingBuffer<?>) stateObject, functionType);
+    } else {
+      throw new IllegalArgumentException("Unknown persisted state object " + stateObject);
+    }
+  }
+}
diff --git a/statefun-sdk/src/test/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistryTest.java b/statefun-sdk/src/test/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistryTest.java
new file mode 100644
index 0000000..0c91178
--- /dev/null
+++ b/statefun-sdk/src/test/java/org/apache/flink/statefun/sdk/state/PersistedStateRegistryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.state;
+
+import org.junit.Test;
+
+public class PersistedStateRegistryTest {
+
+  @Test
+  public void exampleUsage() {
+    final PersistedStateRegistry registry = new PersistedStateRegistry();
+
+    registry.registerValue("value", String.class);
+    registry.registerTable("table", String.class, Integer.class);
+    registry.registerAppendingBuffer("buffer", String.class);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void reaccessAsWrongStatePrimitiveType() {
+    final PersistedStateRegistry registry = new PersistedStateRegistry();
+
+    registry.registerValue("my-state", String.class);
+    registry.registerAppendingBuffer("my-state", String.class);
+  }
+}