[fix][fn] Throw 404 RestException when state key not found (#21921)
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index b175a7f..613158a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -1169,6 +1169,8 @@
} else {
return new FunctionState(key, null, buf.array(), number, null);
}
+ } catch (RestException e) {
+ throw e;
} catch (Throwable e) {
log.error("Error while getFunctionState request @ /{}/{}/{}/{}",
tenant, namespace, functionName, key, e);
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
index 8472ed3..5e80c3e 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarStateTest.java
@@ -25,6 +25,7 @@
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
import static org.testng.Assert.fail;
import com.google.common.base.Utf8;
import java.util.Base64;
@@ -32,6 +33,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
@@ -144,6 +146,14 @@
assertEquals(functionState.getStringValue(), "val1");
}
+ // query a non-exist key should get a 404 error
+ {
+ PulsarAdminException e = expectThrows(PulsarAdminException.class, () -> {
+ admin.functions().getFunctionState("public", "default", sourceName, "non-exist");
+ });
+ assertEquals(e.getStatusCode(), 404);
+ }
+
Awaitility.await().ignoreExceptions().untilAsserted(() -> {
FunctionState functionState = admin.functions().getFunctionState("public", "default", sourceName, "now");
assertTrue(functionState.getStringValue().matches("val1-.*"));
@@ -186,6 +196,14 @@
assertEquals(functionState.getStringValue(), "val1");
}
+ // query a non-exist key should get a 404 error
+ {
+ PulsarAdminException e = expectThrows(PulsarAdminException.class, () -> {
+ admin.functions().getFunctionState("public", "default", sinkName, "non-exist");
+ });
+ assertEquals(e.getStatusCode(), 404);
+ }
+
for (int i = 0; i < numMessages; i++) {
producer.send("foo");
}