[fix][fn] Add missing `version` field back to `querystate` API (#21966)

diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
index 8dbd7b3..d938fe0 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
@@ -73,4 +73,31 @@
      */
     CompletableFuture<ByteBuffer> getAsync(String key);
 
+    /**
+     * Retrieve the StateValue for the key.
+     *
+     * @param key name of the key
+     * @return the StateValue.
+     */
+    default StateValue getStateValue(String key) {
+        return getStateValueAsync(key).join();
+    }
+
+    /**
+     * Retrieve the StateValue for the key, but don't wait for the operation to be completed.
+     *
+     * @param key name of the key
+     * @return the StateValue.
+     */
+    default CompletableFuture<StateValue> getStateValueAsync(String key) {
+        return getAsync(key).thenApply(val -> {
+            if (val != null && val.remaining() >= 0) {
+                byte[] data = new byte[val.remaining()];
+                val.get(data);
+                return new StateValue(data, null, null);
+            } else {
+                return null;
+            }
+        });
+    }
 }
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java
new file mode 100644
index 0000000..ce06b54
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@Getter
+@AllArgsConstructor
+public class StateValue {
+    private final byte[] value;
+    private final Long version;
+    private final Boolean isNumber;
+}
\ No newline at end of file
diff --git a/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml b/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
index 9638cfc..d593536 100644
--- a/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
+++ b/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
@@ -30,6 +30,11 @@
     <Bug pattern="EI_EXPOSE_REP"/>
   </Match>
   <Match>
+    <Class name="org.apache.pulsar.functions.api.state.StateValue"/>
+    <Method name="getValue"/>
+    <Bug pattern="EI_EXPOSE_REP"/>
+  </Match>
+  <Match>
     <Class name="org.apache.pulsar.functions.api.utils.FunctionRecord$FunctionRecordBuilder"/>
     <Method name="properties"/>
     <Bug pattern="EI_EXPOSE_REP2"/>
@@ -39,4 +44,8 @@
     <Method name="schema"/>
     <Bug pattern="EI_EXPOSE_REP2"/>
   </Match>
+  <Match>
+    <Class name="org.apache.pulsar.functions.api.state.StateValue"/>
+    <Bug pattern="EI_EXPOSE_REP2"/>
+  </Match>
 </FindBugsFilter>
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
index bf43f18..d85e4af 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
@@ -28,6 +28,7 @@
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.options.Options;
 import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.api.state.StateValue;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 
 /**
@@ -190,4 +191,33 @@
             throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
         }
     }
+
+    @Override
+    public StateValue getStateValue(String key) {
+        try {
+            return result(getStateValueAsync(key));
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<StateValue> getStateValueAsync(String key) {
+        return table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
+                data -> {
+                    try {
+                        if (data != null && data.value() != null && data.value().readableBytes() >= 0) {
+                            byte[] result = new byte[data.value().readableBytes()];
+                            data.value().readBytes(result);
+                            return new StateValue(result, data.version(), data.isNumber());
+                        }
+                        return null;
+                    } finally {
+                        if (data != null) {
+                            ReferenceCountUtil.safeRelease(data);
+                        }
+                    }
+                }
+        );
+    }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
index 50541c4..bba3cea 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
@@ -22,6 +22,7 @@
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.api.state.StateValue;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 
@@ -112,6 +113,20 @@
     }
 
     @Override
+    public StateValue getStateValue(String key) {
+        return getStateValueAsync(key).join();
+    }
+
+    @Override
+    public CompletableFuture<StateValue> getStateValueAsync(String key) {
+        return store.get(getPath(key))
+                .thenApply(optRes ->
+                        optRes.map(x ->
+                            new StateValue(x.getValue(), x.getStat().getVersion(), null))
+                                .orElse(null));
+    }
+
+    @Override
     public void incrCounter(String key, long amount) {
         incrCounterAsync(key, amount).join();
     }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
index 1d35f3d..7696c71 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
@@ -35,7 +35,9 @@
 import org.apache.bookkeeper.api.kv.Table;
 import org.apache.bookkeeper.api.kv.options.Options;
 import org.apache.bookkeeper.api.kv.result.DeleteResult;
+import org.apache.bookkeeper.api.kv.result.KeyValue;
 import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.functions.api.state.StateValue;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -115,6 +117,24 @@
     }
 
     @Test
+    public void testGetStateValue() throws Exception {
+        KeyValue returnedKeyValue = mock(KeyValue.class);
+        ByteBuf returnedValue = Unpooled.copiedBuffer("test-value", UTF_8);
+        when(returnedKeyValue.value()).thenReturn(returnedValue);
+        when(returnedKeyValue.version()).thenReturn(1l);
+        when(returnedKeyValue.isNumber()).thenReturn(false);
+        when(mockTable.getKv(any(ByteBuf.class)))
+            .thenReturn(FutureUtils.value(returnedKeyValue));
+        StateValue result = stateContext.getStateValue("test-key");
+        assertEquals("test-value", new String(result.getValue(), UTF_8));
+        assertEquals(1l, result.getVersion().longValue());
+        assertEquals(false, result.getIsNumber().booleanValue());
+        verify(mockTable, times(1)).getKv(
+            eq(Unpooled.copiedBuffer("test-key", UTF_8))
+        );
+    }
+
+    @Test
     public void testGetAmount() throws Exception {
         when(mockTable.getNumber(any(ByteBuf.class)))
             .thenReturn(FutureUtils.value(10L));
@@ -132,6 +152,12 @@
         assertTrue(result != null);
         assertEquals(result.get(), null);
 
+        when(mockTable.getKv(any(ByteBuf.class)))
+                .thenReturn(FutureUtils.value(null));
+        CompletableFuture<StateValue> stateValueResult = stateContext.getStateValueAsync("test-key");
+        assertTrue(stateValueResult != null);
+        assertEquals(stateValueResult.get(), null);
+
     }
 
 }
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
index 3b8cb02..4d1a1f7 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
@@ -24,6 +24,7 @@
 import static org.testng.Assert.assertTrue;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.functions.api.state.StateValue;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataStore;
 import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -101,6 +102,10 @@
         CompletableFuture<ByteBuffer> result = stateContext.getAsync("test-key");
         assertTrue(result != null);
         assertEquals(result.get(), null);
+
+        CompletableFuture<StateValue> stateValueResult = stateContext.getStateValueAsync("test-key");
+        assertTrue(stateValueResult != null);
+        assertEquals(stateValueResult.get(), null);
     }
 
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 613158a..db31847 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -74,6 +74,7 @@
 import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.RestException;
+import org.apache.pulsar.functions.api.state.StateValue;
 import org.apache.pulsar.functions.instance.InstanceUtils;
 import org.apache.pulsar.functions.instance.state.DefaultStateStore;
 import org.apache.pulsar.functions.proto.Function;
@@ -1151,23 +1152,29 @@
 
         try {
             DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
-            ByteBuffer buf = store.get(key);
-            if (buf == null) {
+            StateValue value = store.getStateValue(key);
+            if (value == null) {
+                throw new RestException(Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
+            }
+            byte[] data = value.getValue();
+            if (data == null) {
                 throw new RestException(Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
             }
 
-            // try to parse the state as a long
-            // but even if it can be parsed as a long, this number may not be the actual state,
-            // so we will always return a `stringValue` or `bytesValue` with the number value
+            ByteBuffer buf = ByteBuffer.wrap(data);
+
             Long number = null;
             if (buf.remaining() == Long.BYTES) {
                 number = buf.getLong();
             }
+            if (Boolean.TRUE.equals(value.getIsNumber())) {
+                return new FunctionState(key, null, null, number, value.getVersion());
+            }
 
-            if (Utf8.isWellFormed(buf.array())) {
-                return new FunctionState(key, new String(buf.array(), UTF_8), null, number, null);
+            if (Utf8.isWellFormed(data)) {
+                return new FunctionState(key, new String(data, UTF_8), null, number, value.getVersion());
             } else {
-                return new FunctionState(key, null, buf.array(), number, null);
+                return new FunctionState(key, null, data, number, value.getVersion());
             }
         } catch (RestException e) {
             throw e;
@@ -1215,7 +1222,7 @@
         try {
             DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
             ByteBuffer data;
-            if (StringUtils.isNotEmpty(state.getStringValue())) {
+            if (state.getStringValue() != null) {
                 data = ByteBuffer.wrap(state.getStringValue().getBytes(UTF_8));
             } else if (state.getByteValue() != null) {
                 data = ByteBuffer.wrap(state.getByteValue());
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index 5e80c3e..a292e0e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -97,10 +97,10 @@
         getFunctionStatus(functionName, numMessages);
 
         // get state
-        queryState(functionName, "hello", numMessages);
-        queryState(functionName, "test", numMessages);
+        queryState(functionName, "hello", numMessages, numMessages - 1);
+        queryState(functionName, "test", numMessages, numMessages - 1);
         for (int i = 0; i < numMessages; i++) {
-            queryState(functionName, "message-" + i, 1);
+            queryState(functionName, "message-" + i, 1, 0);
         }
 
         // test put state
@@ -468,7 +468,7 @@
         assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\" : " + numMessages));
     }
 
-    private void queryState(String functionName, String key, int amount)
+    private void queryState(String functionName, String key, int amount, long version)
         throws Exception {
         ContainerExecResult result = container.execCmd(
             PulsarCluster.ADMIN_SCRIPT,
@@ -480,6 +480,9 @@
             "--key", key
         );
         assertTrue(result.getStdout().contains("\"numberValue\": " + amount));
+        assertTrue(result.getStdout().contains("\"version\": " + version));
+        assertFalse(result.getStdout().contains("stringValue"));
+        assertFalse(result.getStdout().contains("byteValue"));
     }
 
     private void putAndQueryState(String functionName, String key, String state, String expect)