This hands-on tutorial provides step-by-step instructions and examples on how to create and validate functions in a standalone Pulsar, including stateful functions and window functions.
Start Pulsar locally.
bin/pulsar standalone
All the components (including ZooKeeper, BookKeeper, broker, and so on) of a Pulsar service start in order. You can use the bin/pulsar-admin brokers healthcheck
command to make sure the Pulsar service is up and running.
Check the Pulsar binary protocol port.
telnet localhost 6650
Check the Pulsar Function cluster.
bin/pulsar-admin functions-worker get-cluster
Output
[{"workerId":"c-standalone-fw-localhost-6750","workerHostname":"localhost","port":6750}]
Make sure a public tenant exists.
bin/pulsar-admin tenants list
Output
"public"
Make sure a default namespace exists.
bin/pulsar-admin namespaces list public
Output
"public/default"
Make sure the table service is enabled successfully.
telnet localhost 4181
Output
Trying ::1... telnet: connect to address ::1: Connection refused Trying 127.0.0.1... Connected to localhost. Escape character is '^]'.
:::note
Before starting functions, you need to start Pulsar.
:::
Create a tenant and a namespace.
bin/pulsar-admin tenants create test bin/pulsar-admin namespaces create test/test-namespace
In the same terminal window as step 1, verify the tenant and the namespace.
bin/pulsar-admin namespaces list test
Output
This output shows that both tenant and namespace are created successfully.
"test/test-namespace"
In the same terminal window as step 1, create a function named examples
.
:::tip
You can see both the example-function-config.yaml
and api-examples.jar
files under the examples
folder of the Pulsar’s directory on your local machine.
:::
bin/pulsar-admin functions create \ --function-config-file examples/example-function-config.yaml \ --jar examples/api-examples.jar
Output
Created Successfully
In the same terminal window as step 1, verify the function's configurations.
bin/pulsar-admin functions get \ --tenant test \ --namespace test-namespace \ --name example
Output
{ "tenant": "test", "namespace": "test-namespace", "name": "example", "className": "org.apache.pulsar.functions.api.examples.ExclamationFunction", "userConfig": "{\"PublishTopic\":\"test_result\"}", "autoAck": true, "parallelism": 1, "source": { "topicsToSerDeClassName": { "test_src": "" }, "typeClassName": "java.lang.String" }, "sink": { "topic": "test_result", "typeClassName": "java.lang.String" }, "resources": {} }
In the same terminal window as step 1, verify the function's status.
bin/pulsar-admin functions status \ --tenant test \ --namespace test-namespace \ --name example
Output
"running": true
shows that the function is running.
{ "numInstances" : 1, "numRunning" : 1, "instances" : [ { "instanceId" : 0, "status" : { "running" : true, "error" : "", "numRestarts" : 0, "numReceived" : 0, "numSuccessfullyProcessed" : 0, "numUserExceptions" : 0, "latestUserExceptions" : [ ], "numSystemExceptions" : 0, "latestSystemExceptions" : [ ], "averageLatency" : 0.0, "lastInvocationTime" : 0, "workerId" : "c-standalone-fw-localhost-8080" } } ] }
In the same terminal window as step 1, subscribe to the output topic test_result
.
bin/pulsar-client consume -s test-sub -n 0 test_result
In a new terminal window, produce messages to the input topic test_src
.
bin/pulsar-client produce -m "test-messages-`date`" -n 10 test_src
In the same terminal window as step 1, the messages produced by the example
function are returned.
Output
----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021!
The standalone mode of Pulsar enables BookKeeper table service for stateful functions. For more information, see Configure state storage.
The following example provides instructions to validate counter functions.
:::note
Before starting stateful functions, you need to start Pulsar.
:::
Create a function named word_count
.
bin/pulsar-admin functions create \ --function-config-file examples/example-function-config.yaml \ --jar examples/api-examples.jar \ --name word_count \ --className org.apache.pulsar.functions.api.examples.WordCountFunction \ --inputs test_wordcount_src \ --output test_wordcount_dest
Output
Created Successfully
In the same terminal window as step 1, get the information of the word_count
function.
bin/pulsar-admin functions get \ --tenant test \ --namespace test-namespace \ --name word_count
Output
{ "tenant": "test", "namespace": "test-namespace", "name": "word_count", "className": "org.apache.pulsar.functions.api.examples.WordCountFunction", "inputSpecs": { "test_wordcount_src": { "isRegexPattern": false } }, "output": "test_wordcount_dest", "processingGuarantees": "ATLEAST_ONCE", "retainOrdering": false, "userConfig": { "PublishTopic": "test_result" }, "runtime": "JAVA", "autoAck": true, "parallelism": 1, "resources": { "cpu": 1.0, "ram": 1073741824, "disk": 10737418240 }, "cleanupSubscription": true }
In the same terminal window as step 1, get the status of the word_count
function.
bin/pulsar-admin functions status \ --tenant test \ --namespace test-namespace\ --name word_count
Output
{ "numInstances" : 1, "numRunning" : 1, "instances" : [ { "instanceId" : 0, "status" : { "running" : true, "error" : "", "numRestarts" : 0, "numReceived" : 0, "numSuccessfullyProcessed" : 0, "numUserExceptions" : 0, "latestUserExceptions" : [ ], "numSystemExceptions" : 0, "latestSystemExceptions" : [ ], "averageLatency" : 0.0, "lastInvocationTime" : 0, "workerId" : "c-standalone-fw-localhost-8080" } } ] }
In the same terminal window as step 1, query the state table for the function with the key hello
. This operation watches the changes associated with hello
.
bin/pulsar-admin functions querystate \ --tenant test \ --namespace test-namespace \ --name word_count -k hello -w
:::tip
For more information about the pulsar-admin functions querystate options
command, including flags, descriptions, default values, and shorthands, see Admin API.
:::
Output
key 'hello' doesn't exist. key 'hello' doesn't exist. key 'hello' doesn't exist. ...
In a new terminal window, produce 10 messages with hello
to the input topic test_wordcount_src
using one of the following methods. The value of hello
is updated to 10.
Method 1
bin/pulsar-client produce -m "hello" -n 10 test_wordcount_src
Method 2
bin/pulsar-admin functions putstate \ --tenant test \ --namespace test-namespace \ --name word_count hello-word \
:::tip
For more information about the pulsar-admin functions putstate options
command, including flags, descriptions, default values, and shorthands, see Admin API.
:::
In the same terminal window as step 1, check the result.
The result shows that the output topic test_wordcount_dest
receives the messages.
Output
{ "key": "hello", "numberValue": 10, "version": 9 }
In the terminal window as step 5, produce another 10 messages with hello
. The value of hello
is updated to 20.
bin/pulsar-client produce -m "hello" -n 10 test_wordcount_src
In the same terminal window as step 1, check the result.
The result shows that the output topic test_wordcount_dest
receives the value of 20.
value = 10 value = 20
Window functions are a special form of Pulsar Functions. For more information, see concepts.
:::note
Before starting window functions, you need to start Pulsar.
:::
Create a tenant and a namespace.
bin/pulsar-admin tenants create test bin/pulsar-admin namespaces create test/test-namespace
In the same terminal window as step 1, verify the tenant and the namespace.
bin/pulsar-admin namespaces list test
Output
This output shows that both tenant and namespace are created successfully.
"test/test-namespace"
In the same terminal window as step 1, create a function named example
.
:::tip
You can see both example-window-function-config.yaml
and api-examples.jar
files under the examples
folder of the Pulsar’s directory on your local machine.
:::
bin/pulsar-admin functions create --function-config-file \ examples/example-window-function-config.yaml \ --jar examples/api-examples.jar
Output
Created Successfully
In the same terminal window as step 1, verify the function's configurations.
bin/pulsar-admin functions get \ --tenant test \ --namespace test-namespace \ --name example
Output
{ "tenant": "test", "namespace": "test-namespace", "name": "example", "className": "org.apache.pulsar.functions.api.examples.ExclamationFunction", "userConfig": "{\"PublishTopic\":\"test_result\"}", "autoAck": true, "parallelism": 1, "source": { "topicsToSerDeClassName": { "test_src": "" }, "typeClassName": "java.lang.String" }, "sink": { "topic": "test_result", "typeClassName": "java.lang.String" }, "resources": {} }
In the same terminal window as step 1, verify the function’s status.
bin/pulsar-admin functions status \ --tenant test \ --namespace test-namespace \ --name example
Output
"running": true
shows that the function is running.
{ "numInstances" : 1, "numRunning" : 1, "instances" : [ { "instanceId" : 0, "status" : { "running" : true, "error" : "", "numRestarts" : 0, "numReceived" : 0, "numSuccessfullyProcessed" : 0, "numUserExceptions" : 0, "latestUserExceptions" : [ ], "numSystemExceptions" : 0, "latestSystemExceptions" : [ ], "averageLatency" : 0.0, "lastInvocationTime" : 0, "workerId" : "c-standalone-fw-localhost-8080" } } ] }
In the same terminal window as step 1, subscribe to the output topic test_result
.
bin/pulsar-client consume -s test-sub -n 0 test_result
In a new terminal window, produce messages to the input topic test_src
.
bin/pulsar-client produce -m "test-messages-`date`" -n 10 test_src
In the same terminal window as step 1, the messages produced by the window function example
are returned.
Output
----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021! ----- got message ----- test-messages-Thu Jul 19 11:59:15 PDT 2021!