| # PIP-312: Use StateStoreProvider to manage state in Pulsar Functions endpoints |
| |
| # Background knowledge |
| |
| States are key-value pairs, where a key is a string and its value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. |
| Keys are scoped to an individual function and shared between instances of that function. |
| |
| Pulsar Functions use `StateStoreProvider` to initialize a `StateStore` to manage state, so it can support multiple state storage backend, such as: |
| - `BKStateStoreProviderImpl`: use Apache BookKeeper as the backend |
| - `PulsarMetadataStateStoreProviderImpl`: use Pulsar Metadata as the backend |
| |
| Users can also implement their own `StateStoreProvider` to support other state storage backend. |
| |
| The Broker also exposes two endpoints to put and query a state key of a function: |
| - GET /{tenant}/{namespace}/{functionName}/state/{key} |
| - POST /{tenant}/{namespace}/{functionName}/state/{key} |
| |
| Although Pulsar Function supports multiple state storage backend, these two endpoints are still using BookKeeper's `StorageAdminClient` directly to put and query state, |
| this makes the Pulsar Functions' state store highly coupled with Apache BookKeeper. |
| |
| See: [code](https://github.com/apache/pulsar/blob/1a66b640c3cd86bfca75dc9ab37bfdb37427a13f/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java#L1152-L1297) |
| |
| # Motivation |
| |
| This proposal aims to decouple Pulsar Functions' state store from Apache BookKeeper, so it can support other state storage backend. |
| |
| # Goals |
| |
| ## In Scope |
| |
| - Pulsar Functions can use other state storage backend other than Apache BookKeeper. |
| |
| ## Out of Scope |
| |
| None |
| |
| # High Level Design |
| |
| - Replace the `StorageAdminClient` in `ComponentImpl` with `StateStoreProvider` to manage state. |
| - Add a `cleanup` method to the `StateStoreProvider` interface |
| |
| # Detailed Design |
| |
| ## Design & Implementation Details |
| |
| 1. In the `ComponentImpl#getFunctionState` and `ComponentImpl#queryState` methods, replace the `StorageAdminClient` with `StateStoreProvider`: |
| |
| ```java |
| String tableNs = getStateNamespace(tenant, namespace); |
| String tableName = functionName; |
| |
| String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl(); |
| |
| if (storageClient.get() == null) { |
| storageClient.compareAndSet(null, StorageClientBuilder.newBuilder() |
| .withSettings(StorageClientSettings.newBuilder() |
| .serviceUri(stateStorageServiceUrl) |
| .clientName("functions-admin") |
| .build()) |
| .withNamespace(tableNs) |
| .build()); |
| } |
| ... |
| ``` |
| |
| Replaced to: |
| |
| ```java |
| DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, name); |
| ``` |
| |
| 2. Add a `cleanup` method to the `StateStoreProvider` interface: |
| |
| ```java |
| default void cleanUp(String tenant, String namespace, String name) throws Exception; |
| ``` |
| |
| Because when delete a function, the related state store should also be deleted. |
| Currently, it's also using BookKeeper's `StorageAdminClient` to delete the state store table: |
| |
| ```java |
| deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName); |
| |
| |
| private void deleteStatestoreTableAsync(String namespace, String table) { |
| StorageAdminClient adminClient = worker().getStateStoreAdminClient(); |
| if (adminClient != null) { |
| adminClient.deleteStream(namespace, table).whenComplete((res, throwable) -> { |
| if ((throwable == null && res) |
| || ((throwable instanceof NamespaceNotFoundException |
| || throwable instanceof StreamNotFoundException))) { |
| log.info("{}/{} table deleted successfully", namespace, table); |
| } else { |
| if (throwable != null) { |
| log.error("{}/{} table deletion failed {} but moving on", namespace, table, throwable); |
| } else { |
| log.error("{}/{} table deletion failed but moving on", namespace, table); |
| } |
| } |
| }); |
| } |
| } |
| ``` |
| |
| So this proposal will add a `cleanup` method to the `StateStoreProvider` and call it after a function is deleted: |
| |
| ```java |
| worker().getStateStoreProvider().cleanUp(tenant, namespace, hashName); |
| ``` |
| |
| 3. Add a new `init` method to `StateStoreProvider` interface: |
| |
| The current `init` method requires a `FunctionDetails` parameter, but we cannot get the `FunctionDetails` in the `ComponentImpl` class, |
| and this parameter is not used either in `BKStateStoreProviderImpl` or in `PulsarMetadataStateStoreProviderImpl`, |
| but for backward compatibility, instead of updating the `init` method, this proposal will add a new `init` method without `FunctionDetails` parameter: |
| |
| ```java |
| default void init(Map<String, Object> config) throws Exception {} |
| ``` |
| |
| ## Public-facing Changes |
| |
| None |
| |
| # Monitoring |
| |
| # Security Considerations |
| |
| # Backward & Forward Compatibility |
| |
| ## Revert |
| |
| - Nothing needs to be done if users use the Apache BookKeeper as the state storage backend. |
| - If users use another state storage backend, they need to change it back to BookKeeper. |
| |
| ## Upgrade |
| |
| Nothing needs to be done. |
| |
| # Alternatives |
| |
| # General Notes |
| |
| # Links |
| |
| <!-- |
| Updated afterwards |
| --> |
| * Mailing List discussion thread: https://lists.apache.org/thread/0rz29wotonmdck76pdscwbqo19t3rbds |
| * Mailing List voting thread: https://lists.apache.org/thread/t8vmyxovrrb5xl8jvrp1om50l6nprdjt |