PIP-312: Use StateStoreProvider to manage state in Pulsar Functions endpoints

Background knowledge

States are key-value pairs, where a key is a string and its value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual function and shared between instances of that function.

Pulsar Functions use StateStoreProvider to initialize a StateStore to manage state, so it can support multiple state storage backend, such as:

  • BKStateStoreProviderImpl: use Apache BookKeeper as the backend
  • PulsarMetadataStateStoreProviderImpl: use Pulsar Metadata as the backend

Users can also implement their own StateStoreProvider to support other state storage backend.

The Broker also exposes two endpoints to put and query a state key of a function:

  • GET /{tenant}/{namespace}/{functionName}/state/{key}
  • POST /{tenant}/{namespace}/{functionName}/state/{key}

Although Pulsar Function supports multiple state storage backend, these two endpoints are still using BookKeeper‘s StorageAdminClient directly to put and query state, this makes the Pulsar Functions’ state store highly coupled with Apache BookKeeper.

See: code

Motivation

This proposal aims to decouple Pulsar Functions' state store from Apache BookKeeper, so it can support other state storage backend.

Goals

In Scope

  • Pulsar Functions can use other state storage backend other than Apache BookKeeper.

Out of Scope

None

High Level Design

  • Replace the StorageAdminClient in ComponentImpl with StateStoreProvider to manage state.
  • Add a cleanup method to the StateStoreProvider interface

Detailed Design

Design & Implementation Details

  1. In the ComponentImpl#getFunctionState and ComponentImpl#queryState methods, replace the StorageAdminClient with StateStoreProvider:

    String tableNs = getStateNamespace(tenant, namespace);
    String tableName = functionName;
    
    String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl();
    
    if (storageClient.get() == null) {
        storageClient.compareAndSet(null, StorageClientBuilder.newBuilder()
        .withSettings(StorageClientSettings.newBuilder()
        .serviceUri(stateStorageServiceUrl)
        .clientName("functions-admin")
        .build())
        .withNamespace(tableNs)
        .build());
    }
    ...
    

    Replaced to:

    DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, name);
    
  2. Add a cleanup method to the StateStoreProvider interface:

    default void cleanUp(String tenant, String namespace, String name) throws Exception;
    

    Because when delete a function, the related state store should also be deleted. Currently, it‘s also using BookKeeper’s StorageAdminClient to delete the state store table:

    deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName);
    
    
    private void deleteStatestoreTableAsync(String namespace, String table) {
        StorageAdminClient adminClient = worker().getStateStoreAdminClient();
        if (adminClient != null) {
            adminClient.deleteStream(namespace, table).whenComplete((res, throwable) -> {
                if ((throwable == null && res)
                    || ((throwable instanceof NamespaceNotFoundException
                    || throwable instanceof StreamNotFoundException))) {
                    log.info("{}/{} table deleted successfully", namespace, table);
                } else {
                    if (throwable != null) {
                        log.error("{}/{} table deletion failed {}  but moving on", namespace, table, throwable);
                    } else {
                        log.error("{}/{} table deletion failed but moving on", namespace, table);
                    }
                }
            });
        }
    }
    

    So this proposal will add a cleanup method to the StateStoreProvider and call it after a function is deleted:

    worker().getStateStoreProvider().cleanUp(tenant, namespace, hashName);
    
  3. Add a new init method to StateStoreProvider interface:

    The current init method requires a FunctionDetails parameter, but we cannot get the FunctionDetails in the ComponentImpl class, and this parameter is not used either in BKStateStoreProviderImpl or in PulsarMetadataStateStoreProviderImpl, but for backward compatibility, instead of updating the init method, this proposal will add a new init method without FunctionDetails parameter:

    default void init(Map<String, Object> config) throws Exception {}
    

Public-facing Changes

None

Monitoring

Security Considerations

Backward & Forward Compatibility

Revert

  • Nothing needs to be done if users use the Apache BookKeeper as the state storage backend.
  • If users use another state storage backend, they need to change it back to BookKeeper.

Upgrade

Nothing needs to be done.

Alternatives

General Notes

Links