[fix][fn] Add missing `version` field back to `querystate` API (#21966)
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
index 8dbd7b3..d938fe0 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/ByteBufferStateStore.java
@@ -73,4 +73,31 @@
*/
CompletableFuture<ByteBuffer> getAsync(String key);
+ /**
+ * Retrieve the StateValue for the key.
+ *
+ * @param key name of the key
+ * @return the StateValue.
+ */
+ default StateValue getStateValue(String key) {
+ return getStateValueAsync(key).join();
+ }
+
+ /**
+ * Retrieve the StateValue for the key, but don't wait for the operation to be completed.
+ *
+ * @param key name of the key
+ * @return the StateValue.
+ */
+ default CompletableFuture<StateValue> getStateValueAsync(String key) {
+ return getAsync(key).thenApply(val -> {
+ if (val != null && val.remaining() >= 0) {
+ byte[] data = new byte[val.remaining()];
+ val.get(data);
+ return new StateValue(data, null, null);
+ } else {
+ return null;
+ }
+ });
+ }
}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java
new file mode 100644
index 0000000..ce06b54
--- /dev/null
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/state/StateValue.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.state;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
+@Getter
+@AllArgsConstructor
+public class StateValue {
+ private final byte[] value;
+ private final Long version;
+ private final Boolean isNumber;
+}
\ No newline at end of file
diff --git a/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml b/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
index 9638cfc..d593536 100644
--- a/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
+++ b/pulsar-functions/api-java/src/main/resources/findbugsExclude.xml
@@ -30,6 +30,11 @@
<Bug pattern="EI_EXPOSE_REP"/>
</Match>
<Match>
+ <Class name="org.apache.pulsar.functions.api.state.StateValue"/>
+ <Method name="getValue"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
+ <Match>
<Class name="org.apache.pulsar.functions.api.utils.FunctionRecord$FunctionRecordBuilder"/>
<Method name="properties"/>
<Bug pattern="EI_EXPOSE_REP2"/>
@@ -39,4 +44,8 @@
<Method name="schema"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
+ <Match>
+ <Class name="org.apache.pulsar.functions.api.state.StateValue"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
</FindBugsFilter>
\ No newline at end of file
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
index bf43f18..d85e4af 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/BKStateStoreImpl.java
@@ -28,6 +28,7 @@
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.options.Options;
import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.functions.utils.FunctionCommon;
/**
@@ -190,4 +191,33 @@
throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
}
}
+
+ @Override
+ public StateValue getStateValue(String key) {
+ try {
+ return result(getStateValueAsync(key));
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'", e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<StateValue> getStateValueAsync(String key) {
+ return table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
+ data -> {
+ try {
+ if (data != null && data.value() != null && data.value().readableBytes() >= 0) {
+ byte[] result = new byte[data.value().readableBytes()];
+ data.value().readBytes(result);
+ return new StateValue(result, data.version(), data.isNumber());
+ }
+ return null;
+ } finally {
+ if (data != null) {
+ ReferenceCountUtil.safeRelease(data);
+ }
+ }
+ }
+ );
+ }
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
index 50541c4..bba3cea 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImpl.java
@@ -22,6 +22,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.functions.api.StateStoreContext;
+import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -112,6 +113,20 @@
}
@Override
+ public StateValue getStateValue(String key) {
+ return getStateValueAsync(key).join();
+ }
+
+ @Override
+ public CompletableFuture<StateValue> getStateValueAsync(String key) {
+ return store.get(getPath(key))
+ .thenApply(optRes ->
+ optRes.map(x ->
+ new StateValue(x.getValue(), x.getStat().getVersion(), null))
+ .orElse(null));
+ }
+
+ @Override
public void incrCounter(String key, long amount) {
incrCounterAsync(key, amount).join();
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
index 1d35f3d..7696c71 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/BKStateStoreImplTest.java
@@ -35,7 +35,9 @@
import org.apache.bookkeeper.api.kv.Table;
import org.apache.bookkeeper.api.kv.options.Options;
import org.apache.bookkeeper.api.kv.result.DeleteResult;
+import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
+import org.apache.pulsar.functions.api.state.StateValue;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -115,6 +117,24 @@
}
@Test
+ public void testGetStateValue() throws Exception {
+ KeyValue returnedKeyValue = mock(KeyValue.class);
+ ByteBuf returnedValue = Unpooled.copiedBuffer("test-value", UTF_8);
+ when(returnedKeyValue.value()).thenReturn(returnedValue);
+ when(returnedKeyValue.version()).thenReturn(1l);
+ when(returnedKeyValue.isNumber()).thenReturn(false);
+ when(mockTable.getKv(any(ByteBuf.class)))
+ .thenReturn(FutureUtils.value(returnedKeyValue));
+ StateValue result = stateContext.getStateValue("test-key");
+ assertEquals("test-value", new String(result.getValue(), UTF_8));
+ assertEquals(1l, result.getVersion().longValue());
+ assertEquals(false, result.getIsNumber().booleanValue());
+ verify(mockTable, times(1)).getKv(
+ eq(Unpooled.copiedBuffer("test-key", UTF_8))
+ );
+ }
+
+ @Test
public void testGetAmount() throws Exception {
when(mockTable.getNumber(any(ByteBuf.class)))
.thenReturn(FutureUtils.value(10L));
@@ -132,6 +152,12 @@
assertTrue(result != null);
assertEquals(result.get(), null);
+ when(mockTable.getKv(any(ByteBuf.class)))
+ .thenReturn(FutureUtils.value(null));
+ CompletableFuture<StateValue> stateValueResult = stateContext.getStateValueAsync("test-key");
+ assertTrue(stateValueResult != null);
+ assertEquals(stateValueResult.get(), null);
+
}
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
index 3b8cb02..4d1a1f7 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/PulsarMetadataStateStoreImplTest.java
@@ -24,6 +24,7 @@
import static org.testng.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
@@ -101,6 +102,10 @@
CompletableFuture<ByteBuffer> result = stateContext.getAsync("test-key");
assertTrue(result != null);
assertEquals(result.get(), null);
+
+ CompletableFuture<StateValue> stateValueResult = stateContext.getStateValueAsync("test-key");
+ assertTrue(stateValueResult != null);
+ assertEquals(stateValueResult.get(), null);
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 613158a..db31847 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -74,6 +74,7 @@
import org.apache.pulsar.common.policies.data.FunctionStatsImpl;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.RestException;
+import org.apache.pulsar.functions.api.state.StateValue;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.instance.state.DefaultStateStore;
import org.apache.pulsar.functions.proto.Function;
@@ -1151,23 +1152,29 @@
try {
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
- ByteBuffer buf = store.get(key);
- if (buf == null) {
+ StateValue value = store.getStateValue(key);
+ if (value == null) {
+ throw new RestException(Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
+ }
+ byte[] data = value.getValue();
+ if (data == null) {
throw new RestException(Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
}
- // try to parse the state as a long
- // but even if it can be parsed as a long, this number may not be the actual state,
- // so we will always return a `stringValue` or `bytesValue` with the number value
+ ByteBuffer buf = ByteBuffer.wrap(data);
+
Long number = null;
if (buf.remaining() == Long.BYTES) {
number = buf.getLong();
}
+ if (Boolean.TRUE.equals(value.getIsNumber())) {
+ return new FunctionState(key, null, null, number, value.getVersion());
+ }
- if (Utf8.isWellFormed(buf.array())) {
- return new FunctionState(key, new String(buf.array(), UTF_8), null, number, null);
+ if (Utf8.isWellFormed(data)) {
+ return new FunctionState(key, new String(data, UTF_8), null, number, value.getVersion());
} else {
- return new FunctionState(key, null, buf.array(), number, null);
+ return new FunctionState(key, null, data, number, value.getVersion());
}
} catch (RestException e) {
throw e;
@@ -1215,7 +1222,7 @@
try {
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, functionName);
ByteBuffer data;
- if (StringUtils.isNotEmpty(state.getStringValue())) {
+ if (state.getStringValue() != null) {
data = ByteBuffer.wrap(state.getStringValue().getBytes(UTF_8));
} else if (state.getByteValue() != null) {
data = ByteBuffer.wrap(state.getByteValue());
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index 5e80c3e..a292e0e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -97,10 +97,10 @@
getFunctionStatus(functionName, numMessages);
// get state
- queryState(functionName, "hello", numMessages);
- queryState(functionName, "test", numMessages);
+ queryState(functionName, "hello", numMessages, numMessages - 1);
+ queryState(functionName, "test", numMessages, numMessages - 1);
for (int i = 0; i < numMessages; i++) {
- queryState(functionName, "message-" + i, 1);
+ queryState(functionName, "message-" + i, 1, 0);
}
// test put state
@@ -468,7 +468,7 @@
assertTrue(result.getStdout().contains("\"numSuccessfullyProcessed\" : " + numMessages));
}
- private void queryState(String functionName, String key, int amount)
+ private void queryState(String functionName, String key, int amount, long version)
throws Exception {
ContainerExecResult result = container.execCmd(
PulsarCluster.ADMIN_SCRIPT,
@@ -480,6 +480,9 @@
"--key", key
);
assertTrue(result.getStdout().contains("\"numberValue\": " + amount));
+ assertTrue(result.getStdout().contains("\"version\": " + version));
+ assertFalse(result.getStdout().contains("stringValue"));
+ assertFalse(result.getStdout().contains("byteValue"));
}
private void putAndQueryState(String functionName, String key, String state, String expect)