Support function with format: Function<I, CompletableFuture<O>> (#6684)
Fixes #6519
### Motivation
Currently, Pulsar Functions not support Async mode, e.g. user passed in a Function in format :
```
Function<I, CompletableFuture<O>>
```
This kind of function is useful if the function might use RPCs to call external systems.
e.g.
```java
public class AsyncFunction implements Function<String, CompletableFuture<O>> {
CompletableFuture<O> apply (String input) {
CompletableFuture future = new CompletableFuture();
...function compute...
future.whenComplete(() -> {
... call external system ...
});
return future;
}
```
### Modifications
- add support for Async Functions support.
### Verifying this change
current ut passed.
* support func: Function<I, CompletableFuture<O>>
* add 2 examples
* add limit to the max outstanding items
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index d0b72ab..7b8a48b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -113,4 +113,7 @@
// to change behavior at runtime. Currently, this primarily used by the KubernetesManifestCustomizer
// interface
private String customRuntimeOptions;
+ // Max pending async requests per instance to avoid large number of concurrent requests.
+ // Only used in AsyncFunction. Default: 1000.
+ private Integer maxPendingAsyncRequests = 1000;
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
index 14dadac..1f34173 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceConfig.java
@@ -36,6 +36,9 @@
private Function.FunctionAuthenticationSpec functionAuthenticationSpec;
private int port;
private String clusterName;
+ // Max pending async requests per instance to avoid large number of concurrent requests.
+ // Only used in AsyncFunction. Default: 1000
+ private int maxPendingAsyncRequests = 1000;
/**
* Get the string representation of {@link #getInstanceId()}.
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 8aee702..1e18a07 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -18,6 +18,11 @@
*/
package org.apache.pulsar.functions.instance;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -40,9 +45,18 @@
private Function function;
private java.util.function.Function javaUtilFunction;
- public JavaInstance(ContextImpl contextImpl, Object userClassObject) {
+ // for Async function max out standing items
+ private final InstanceConfig instanceConfig;
+ private final Executor executor;
+ @Getter
+ private final LinkedBlockingQueue<CompletableFuture<Void>> pendingAsyncRequests;
+
+ public JavaInstance(ContextImpl contextImpl, Object userClassObject, InstanceConfig instanceConfig) {
this.context = contextImpl;
+ this.instanceConfig = instanceConfig;
+ this.executor = Executors.newSingleThreadExecutor();
+ this.pendingAsyncRequests = new LinkedBlockingQueue<>(this.instanceConfig.getMaxPendingAsyncRequests());
// create the functions
if (userClassObject instanceof Function) {
@@ -52,23 +66,63 @@
}
}
- public JavaExecutionResult handleMessage(Record<?> record, Object input) {
+ public CompletableFuture<JavaExecutionResult> handleMessage(Record<?> record, Object input) {
if (context != null) {
context.setCurrentMessageContext(record);
}
+
+ final CompletableFuture<JavaExecutionResult> future = new CompletableFuture<>();
JavaExecutionResult executionResult = new JavaExecutionResult();
+
+ final Object output;
+
try {
- Object output;
if (function != null) {
output = function.process(input, context);
} else {
output = javaUtilFunction.apply(input);
}
- executionResult.setResult(output);
} catch (Exception ex) {
executionResult.setUserException(ex);
+ future.complete(executionResult);
+ return future;
}
- return executionResult;
+
+ if (output instanceof CompletableFuture) {
+ // Function is in format: Function<I, CompletableFuture<O>>
+ try {
+ pendingAsyncRequests.put((CompletableFuture) output);
+ } catch (InterruptedException ie) {
+ log.warn("Exception while put Async requests", ie);
+ executionResult.setUserException(ie);
+ future.complete(executionResult);
+ return future;
+ }
+
+ ((CompletableFuture) output).whenCompleteAsync((obj, throwable) -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Got result async: object: {}, throwable: {}", obj, throwable);
+ }
+
+ if (throwable != null) {
+ executionResult.setUserException(new Exception((Throwable)throwable));
+ pendingAsyncRequests.remove(output);
+ future.complete(executionResult);
+ return;
+ }
+ executionResult.setResult(obj);
+ pendingAsyncRequests.remove(output);
+ future.complete(executionResult);
+ }, executor);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Got result: object: {}", output);
+ }
+ executionResult.setResult(output);
+ future.complete(executionResult);
+ }
+
+ return future;
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 25689a9..66cdb60 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -24,6 +24,7 @@
import com.google.gson.reflect.TypeToken;
import io.netty.buffer.ByteBuf;
import io.prometheus.client.CollectorRegistry;
+import java.util.concurrent.CompletableFuture;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -214,7 +215,7 @@
// start any log topic handler
setupLogHandler();
- return new JavaInstance(contextImpl, object);
+ return new JavaInstance(contextImpl, object, instanceConfig);
}
ContextImpl setupContext() {
@@ -254,7 +255,7 @@
}
addLogTopicHandler();
- JavaExecutionResult result;
+ CompletableFuture<JavaExecutionResult> result;
// set last invocation time
stats.setLastInvocation(System.currentTimeMillis());
@@ -272,10 +273,6 @@
removeLogTopicHandler();
- if (log.isDebugEnabled()) {
- log.debug("Got result: {}", result.getResult());
- }
-
try {
processResult(currentRecord, result);
} catch (Exception e) {
@@ -415,23 +412,27 @@
}
private void processResult(Record srcRecord,
- JavaExecutionResult result) throws Exception {
- if (result.getUserException() != null) {
- log.info("Encountered user exception when processing message {}", srcRecord, result.getUserException());
- stats.incrUserExceptions(result.getUserException());
- srcRecord.fail();
- } else {
- if (result.getResult() != null) {
- sendOutputMessage(srcRecord, result.getResult());
+ CompletableFuture<JavaExecutionResult> result) throws Exception {
+ result.whenComplete((result1, throwable) -> {
+ if (throwable != null || result1.getUserException() != null) {
+ Throwable t = throwable != null ? throwable : result1.getUserException();
+ log.warn("Encountered exception when processing message {}",
+ srcRecord, t);
+ stats.incrUserExceptions(t);
+ srcRecord.fail();
} else {
- if (instanceConfig.getFunctionDetails().getAutoAck()) {
- // the function doesn't produce any result or the user doesn't want the result.
- srcRecord.ack();
+ if (result1.getResult() != null) {
+ sendOutputMessage(srcRecord, result1.getResult());
+ } else {
+ if (instanceConfig.getFunctionDetails().getAutoAck()) {
+ // the function doesn't produce any result or the user doesn't want the result.
+ srcRecord.ack();
+ }
}
+ // increment total successfully processed
+ stats.incrTotalProcessedSuccessfully();
}
- // increment total successfully processed
- stats.incrTotalProcessedSuccessfully();
- }
+ });
}
private void sendOutputMessage(Record srcRecord, Object output) {
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 0cb361d..5061d1e 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -22,10 +22,14 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.Record;
import org.testng.annotations.Test;
+@Slf4j
public class JavaInstanceTest {
/**
@@ -33,14 +37,94 @@
* @throws Exception
*/
@Test
- public void testLambda() {
+ public void testLambda() throws Exception {
JavaInstance instance = new JavaInstance(
- mock(ContextImpl.class),
- (Function<String, String>) (input, context) -> input + "-lambda");
+ mock(ContextImpl.class),
+ (Function<String, String>) (input, context) -> input + "-lambda",
+ new InstanceConfig());
String testString = "ABC123";
- JavaExecutionResult result = instance.handleMessage(mock(Record.class), testString);
- assertNotNull(result.getResult());
- assertEquals(new String(testString + "-lambda"), result.getResult());
+ CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
+ assertNotNull(result.get().getResult());
+ assertEquals(new String(testString + "-lambda"), result.get().getResult());
+ instance.close();
+ }
+
+ @Test
+ public void testAsyncFunction() throws Exception {
+ InstanceConfig instanceConfig = new InstanceConfig();
+
+ Function<String, CompletableFuture<String>> function = (input, context) -> {
+ log.info("input string: {}", input);
+ CompletableFuture<String> result = new CompletableFuture<>();
+ Executors.newCachedThreadPool().submit(() -> {
+ try {
+ Thread.sleep(500);
+ result.complete(String.format("%s-lambda", input));
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ }
+ });
+
+ return result;
+ };
+
+ JavaInstance instance = new JavaInstance(
+ mock(ContextImpl.class),
+ function,
+ instanceConfig);
+ String testString = "ABC123";
+ CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString);
+ assertNotNull(result.get().getResult());
+ assertEquals(new String(testString + "-lambda"), result.get().getResult());
+ instance.close();
+ }
+
+ @Test
+ public void testAsyncFunctionMaxPending() throws Exception {
+ InstanceConfig instanceConfig = new InstanceConfig();
+ int pendingQueueSize = 2;
+ instanceConfig.setMaxPendingAsyncRequests(pendingQueueSize);
+
+ Function<String, CompletableFuture<String>> function = (input, context) -> {
+ log.info("input string: {}", input);
+ CompletableFuture<String> result = new CompletableFuture<>();
+ Executors.newCachedThreadPool().submit(() -> {
+ try {
+ Thread.sleep(500);
+ result.complete(String.format("%s-lambda", input));
+ } catch (Exception e) {
+ result.completeExceptionally(e);
+ }
+ });
+
+ return result;
+ };
+
+ JavaInstance instance = new JavaInstance(
+ mock(ContextImpl.class),
+ function,
+ instanceConfig);
+ String testString = "ABC123";
+
+ long startTime = System.currentTimeMillis();
+ assertEquals(pendingQueueSize, instance.getPendingAsyncRequests().remainingCapacity());
+ CompletableFuture<JavaExecutionResult> result1 = instance.handleMessage(mock(Record.class), testString);
+ assertEquals(pendingQueueSize - 1, instance.getPendingAsyncRequests().remainingCapacity());
+ CompletableFuture<JavaExecutionResult> result2 = instance.handleMessage(mock(Record.class), testString);
+ assertEquals(pendingQueueSize - 2, instance.getPendingAsyncRequests().remainingCapacity());
+ CompletableFuture<JavaExecutionResult> result3 = instance.handleMessage(mock(Record.class), testString);
+ // no space left
+ assertEquals(0, instance.getPendingAsyncRequests().remainingCapacity());
+
+ instance.getPendingAsyncRequests().remainingCapacity();
+ assertNotNull(result1.get().getResult());
+ assertNotNull(result2.get().getResult());
+ assertNotNull(result3.get().getResult());
+
+ assertEquals(new String(testString + "-lambda"), result1.get().getResult());
+ long endTime = System.currentTimeMillis();
+
+ log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime);
instance.close();
}
}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AsyncContextFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AsyncContextFunction.java
new file mode 100644
index 0000000..b70bc7c
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/AsyncContextFunction.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+import org.apache.pulsar.functions.api.Context;
+import org.apache.pulsar.functions.api.Function;
+import org.slf4j.Logger;
+
+public class AsyncContextFunction implements Function<String, CompletableFuture<Void>> {
+ @Override
+ public CompletableFuture<Void> process(String input, Context context) {
+ Logger LOG = context.getLogger();
+ CompletableFuture<Void> future = new CompletableFuture();
+
+ // this method only delay a function execute.
+ Executors.newCachedThreadPool().submit(() -> {
+ try {
+ Thread.sleep(500);
+ } catch (Exception e) {
+ LOG.error("Exception when Thread.sleep", e);
+ future.completeExceptionally(e);
+ }
+
+ String inputTopics = context.getInputTopics().stream().collect(Collectors.joining(", "));
+ String funcName = context.getFunctionName();
+
+ String logMessage = String
+ .format("A message with value of \"%s\" has arrived on one of the following topics: %s\n",
+ input, inputTopics);
+ LOG.info(logMessage);
+
+ String metricName = String.format("function-%s-messages-received", funcName);
+ context.recordMetric(metricName, 1);
+
+ future.complete(null);
+ });
+
+ return future;
+ }
+}
diff --git a/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeAsyncExclamationFunction.java b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeAsyncExclamationFunction.java
new file mode 100644
index 0000000..7cad46b
--- /dev/null
+++ b/pulsar-functions/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/JavaNativeAsyncExclamationFunction.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.api.examples;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.function.Function;
+
+public class JavaNativeAsyncExclamationFunction implements Function<String, CompletableFuture<String>> {
+ @Override
+ public CompletableFuture<String> apply(String input) {
+ CompletableFuture<String> future = new CompletableFuture();
+
+ Executors.newCachedThreadPool().submit(() -> {
+ try {
+ Thread.sleep(500);
+ future.complete(String.format("%s-!!", input));
+ } catch (Exception e) {
+ future.completeExceptionally(e);
+ }
+ });
+
+ return future;
+ }
+}
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index b9ac10e..2d8b72f 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -358,6 +358,7 @@
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());
instanceConfig.setClusterName("local");
+ instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
@@ -417,6 +418,7 @@
instanceConfig.setMaxBufferedTuples(1024);
instanceConfig.setPort(FunctionCommon.findAvailablePort());
instanceConfig.setClusterName("local");
+ instanceConfig.setMaxPendingAsyncRequests(functionConfig.getMaxPendingAsyncRequests());
RuntimeSpawner runtimeSpawner = new RuntimeSpawner(
instanceConfig,
userCodeFile,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index ec2e36a..970047f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -125,6 +125,9 @@
@Parameter(names = "--cluster_name", description = "The name of the cluster this instance is running on", required = true)
public String clusterName;
+ @Parameter(names = "--pending_async_requests", description = "Max pending async requests per instance", required = false)
+ public int maxPendingAsyncRequests = 1000;
+
private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
@@ -147,6 +150,7 @@
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
instanceConfig.setClusterName(clusterName);
+ instanceConfig.setMaxPendingAsyncRequests(maxPendingAsyncRequests);
Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
if (functionDetailsJsonString.charAt(0) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(1);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 4ffffe9..7cd7186 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -409,6 +409,12 @@
)
private Map<String, Object> runtimeCustomizerConfig = Collections.emptyMap();
+ @FieldContext(
+ doc = "Max pending async requests per instance to avoid large number of concurrent requests."
+ + "Only used in AsyncFunction. Default: 1000"
+ )
+ private int maxPendingAsyncRequests = 1000;
+
public String getFunctionMetadataTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 5c4f871..eef95a1 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -177,6 +177,7 @@
instanceConfig.setPort(FunctionCommon.findAvailablePort());
instanceConfig.setClusterName(clusterName);
instanceConfig.setFunctionAuthenticationSpec(functionAuthSpec);
+ instanceConfig.setMaxPendingAsyncRequests(workerConfig.getMaxPendingAsyncRequests());
return instanceConfig;
}