[FLINK-20265] [core, test] Add UT to cover user contracts of PersistedRemoteFunctionValues
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
new file mode 100644
index 0000000..f633fc2
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.flink.core.reqreply;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import com.google.protobuf.ByteString;
+import java.util.Arrays;
+import java.util.Collections;
+import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation;
+import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec;
+import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest;
+import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.PersistedValue;
+import org.junit.Test;
+
+public class PersistedRemoteFunctionValuesTest {
+
+  @Test
+  public void exampleUsage() {
+    final PersistedRemoteFunctionValues values =
+        new PersistedRemoteFunctionValues(Collections.emptyList());
+
+    // --- register persisted states
+    values.registerStates(
+        Arrays.asList(
+            protocolPersistedValueSpec("state-1"), protocolPersistedValueSpec("state-2")));
+
+    // --- update state values
+    values.updateStateValues(
+        Arrays.asList(
+            protocolPersistedValueModifyMutation("state-1", ByteString.copyFromUtf8("data-1")),
+            protocolPersistedValueModifyMutation("state-2", ByteString.copyFromUtf8("data-2"))));
+
+    final InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
+    values.attachStateValues(builder);
+
+    // --- registered state names and their values should be attached
+    assertThat(builder.getStateList().size(), is(2));
+    assertThat(
+        builder.getStateList(),
+        hasItems(
+            protocolPersistedValue("state-1", ByteString.copyFromUtf8("data-1")),
+            protocolPersistedValue("state-2", ByteString.copyFromUtf8("data-2"))));
+  }
+
+  @Test
+  public void zeroRegisteredStates() {
+    final PersistedRemoteFunctionValues values =
+        new PersistedRemoteFunctionValues(Collections.emptyList());
+
+    final InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
+    values.attachStateValues(builder);
+
+    assertThat(builder.getStateList().size(), is(0));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void updatingNonRegisteredStateShouldThrow() {
+    final PersistedRemoteFunctionValues values =
+        new PersistedRemoteFunctionValues(Collections.emptyList());
+
+    values.updateStateValues(
+        Collections.singletonList(
+            protocolPersistedValueModifyMutation(
+                "non-registered-state", ByteString.copyFromUtf8("data"))));
+  }
+
+  @Test
+  public void registeredStateWithEmptyValueShouldBeAttached() {
+    final PersistedRemoteFunctionValues values =
+        new PersistedRemoteFunctionValues(Collections.emptyList());
+
+    values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state")));
+
+    final InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
+    values.attachStateValues(builder);
+
+    assertThat(builder.getStateList().size(), is(1));
+    assertThat(builder.getStateList(), hasItems(protocolPersistedValue("state", null)));
+  }
+
+  @Test
+  public void registeredStateWithDeletedValueShouldBeAttached() {
+    final PersistedRemoteFunctionValues values =
+        new PersistedRemoteFunctionValues(Collections.emptyList());
+
+    values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state")));
+
+    // modify and then delete state value
+    values.updateStateValues(
+        Collections.singletonList(
+            protocolPersistedValueModifyMutation("state", ByteString.copyFromUtf8("data"))));
+    values.updateStateValues(
+        Collections.singletonList(protocolPersistedValueDeleteMutation("state")));
+
+    final InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
+    values.attachStateValues(builder);
+
+    assertThat(builder.getStateList().size(), is(1));
+    assertThat(builder.getStateList(), hasItems(protocolPersistedValue("state", null)));
+  }
+
+  @Test
+  public void duplicateRegistrationsHasNoEffect() {
+    final PersistedRemoteFunctionValues values =
+        new PersistedRemoteFunctionValues(Collections.emptyList());
+
+    values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state")));
+    values.updateStateValues(
+        Collections.singletonList(
+            protocolPersistedValueModifyMutation("state", ByteString.copyFromUtf8("data"))));
+
+    // duplicate registration under the same state name
+    values.registerStates(Collections.singletonList(protocolPersistedValueSpec("state")));
+
+    final InvocationBatchRequest.Builder builder = InvocationBatchRequest.newBuilder();
+    values.attachStateValues(builder);
+
+    assertThat(builder.getStateList().size(), is(1));
+    assertThat(
+        builder.getStateList(),
+        hasItems(protocolPersistedValue("state", ByteString.copyFromUtf8("data"))));
+  }
+
+  private static PersistedValueSpec protocolPersistedValueSpec(String stateName) {
+    return PersistedValueSpec.newBuilder().setStateName(stateName).build();
+  }
+
+  private static PersistedValueMutation protocolPersistedValueModifyMutation(
+      String stateName, ByteString modifyValue) {
+    return PersistedValueMutation.newBuilder()
+        .setStateName(stateName)
+        .setMutationType(PersistedValueMutation.MutationType.MODIFY)
+        .setStateValue(modifyValue)
+        .build();
+  }
+
+  private static PersistedValueMutation protocolPersistedValueDeleteMutation(String stateName) {
+    return PersistedValueMutation.newBuilder()
+        .setStateName(stateName)
+        .setMutationType(PersistedValueMutation.MutationType.DELETE)
+        .build();
+  }
+
+  private static PersistedValue protocolPersistedValue(String stateName, ByteString stateValue) {
+    final PersistedValue.Builder builder = PersistedValue.newBuilder();
+    builder.setStateName(stateName);
+
+    if (stateValue != null) {
+      builder.setStateValue(stateValue);
+    }
+    return builder.build();
+  }
+}