blob: b4b12e6cc5c7922c8c9edf5a593f54552ff4052c [file] [log] [blame] [view]
# PIP-312: Use StateStoreProvider to manage state in Pulsar Functions endpoints
# Background knowledge
States are key-value pairs, where a key is a string and its value is arbitrary binary data - counters are stored as 64-bit big-endian binary values.
Keys are scoped to an individual function and shared between instances of that function.
Pulsar Functions use `StateStoreProvider` to initialize a `StateStore` to manage state, so it can support multiple state storage backend, such as:
- `BKStateStoreProviderImpl`: use Apache BookKeeper as the backend
- `PulsarMetadataStateStoreProviderImpl`: use Pulsar Metadata as the backend
Users can also implement their own `StateStoreProvider` to support other state storage backend.
The Broker also exposes two endpoints to put and query a state key of a function:
- GET /{tenant}/{namespace}/{functionName}/state/{key}
- POST /{tenant}/{namespace}/{functionName}/state/{key}
Although Pulsar Function supports multiple state storage backend, these two endpoints are still using BookKeeper's `StorageAdminClient` directly to put and query state,
this makes the Pulsar Functions' state store highly coupled with Apache BookKeeper.
See: [code](https://github.com/apache/pulsar/blob/1a66b640c3cd86bfca75dc9ab37bfdb37427a13f/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java#L1152-L1297)
# Motivation
This proposal aims to decouple Pulsar Functions' state store from Apache BookKeeper, so it can support other state storage backend.
# Goals
## In Scope
- Pulsar Functions can use other state storage backend other than Apache BookKeeper.
## Out of Scope
None
# High Level Design
- Replace the `StorageAdminClient` in `ComponentImpl` with `StateStoreProvider` to manage state.
- Add a `cleanup` method to the `StateStoreProvider` interface
# Detailed Design
## Design & Implementation Details
1. In the `ComponentImpl#getFunctionState` and `ComponentImpl#queryState` methods, replace the `StorageAdminClient` with `StateStoreProvider`:
```java
String tableNs = getStateNamespace(tenant, namespace);
String tableName = functionName;
String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl();
if (storageClient.get() == null) {
storageClient.compareAndSet(null, StorageClientBuilder.newBuilder()
.withSettings(StorageClientSettings.newBuilder()
.serviceUri(stateStorageServiceUrl)
.clientName("functions-admin")
.build())
.withNamespace(tableNs)
.build());
}
...
```
Replaced to:
```java
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, name);
```
2. Add a `cleanup` method to the `StateStoreProvider` interface:
```java
default void cleanUp(String tenant, String namespace, String name) throws Exception;
```
Because when delete a function, the related state store should also be deleted.
Currently, it's also using BookKeeper's `StorageAdminClient` to delete the state store table:
```java
deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName);
private void deleteStatestoreTableAsync(String namespace, String table) {
StorageAdminClient adminClient = worker().getStateStoreAdminClient();
if (adminClient != null) {
adminClient.deleteStream(namespace, table).whenComplete((res, throwable) -> {
if ((throwable == null && res)
|| ((throwable instanceof NamespaceNotFoundException
|| throwable instanceof StreamNotFoundException))) {
log.info("{}/{} table deleted successfully", namespace, table);
} else {
if (throwable != null) {
log.error("{}/{} table deletion failed {} but moving on", namespace, table, throwable);
} else {
log.error("{}/{} table deletion failed but moving on", namespace, table);
}
}
});
}
}
```
So this proposal will add a `cleanup` method to the `StateStoreProvider` and call it after a function is deleted:
```java
worker().getStateStoreProvider().cleanUp(tenant, namespace, hashName);
```
3. Add a new `init` method to `StateStoreProvider` interface:
The current `init` method requires a `FunctionDetails` parameter, but we cannot get the `FunctionDetails` in the `ComponentImpl` class,
and this parameter is not used either in `BKStateStoreProviderImpl` or in `PulsarMetadataStateStoreProviderImpl`,
but for backward compatibility, instead of updating the `init` method, this proposal will add a new `init` method without `FunctionDetails` parameter:
```java
default void init(Map<String, Object> config) throws Exception {}
```
## Public-facing Changes
None
# Monitoring
# Security Considerations
# Backward & Forward Compatibility
## Revert
- Nothing needs to be done if users use the Apache BookKeeper as the state storage backend.
- If users use another state storage backend, they need to change it back to BookKeeper.
## Upgrade
Nothing needs to be done.
# Alternatives
# General Notes
# Links
<!--
Updated afterwards
-->
* Mailing List discussion thread: https://lists.apache.org/thread/0rz29wotonmdck76pdscwbqo19t3rbds
* Mailing List voting thread: https://lists.apache.org/thread/t8vmyxovrrb5xl8jvrp1om50l6nprdjt