States are key-value pairs, where a key is a string and its value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual function and shared between instances of that function.
Pulsar Functions use StateStoreProvider
to initialize a StateStore
to manage state, so it can support multiple state storage backend, such as:
BKStateStoreProviderImpl
: use Apache BookKeeper as the backendPulsarMetadataStateStoreProviderImpl
: use Pulsar Metadata as the backendUsers can also implement their own StateStoreProvider
to support other state storage backend.
The Broker also exposes two endpoints to put and query a state key of a function:
Although Pulsar Function supports multiple state storage backend, these two endpoints are still using BookKeeper‘s StorageAdminClient
directly to put and query state, this makes the Pulsar Functions’ state store highly coupled with Apache BookKeeper.
See: code
This proposal aims to decouple Pulsar Functions' state store from Apache BookKeeper, so it can support other state storage backend.
None
StorageAdminClient
in ComponentImpl
with StateStoreProvider
to manage state.cleanup
method to the StateStoreProvider
interfaceIn the ComponentImpl#getFunctionState
and ComponentImpl#queryState
methods, replace the StorageAdminClient
with StateStoreProvider
:
String tableNs = getStateNamespace(tenant, namespace); String tableName = functionName; String stateStorageServiceUrl = worker().getWorkerConfig().getStateStorageServiceUrl(); if (storageClient.get() == null) { storageClient.compareAndSet(null, StorageClientBuilder.newBuilder() .withSettings(StorageClientSettings.newBuilder() .serviceUri(stateStorageServiceUrl) .clientName("functions-admin") .build()) .withNamespace(tableNs) .build()); } ...
Replaced to:
DefaultStateStore store = worker().getStateStoreProvider().getStateStore(tenant, namespace, name);
Add a cleanup
method to the StateStoreProvider
interface:
default void cleanUp(String tenant, String namespace, String name) throws Exception;
Because when delete a function, the related state store should also be deleted. Currently, it‘s also using BookKeeper’s StorageAdminClient
to delete the state store table:
deleteStatestoreTableAsync(getStateNamespace(tenant, namespace), componentName); private void deleteStatestoreTableAsync(String namespace, String table) { StorageAdminClient adminClient = worker().getStateStoreAdminClient(); if (adminClient != null) { adminClient.deleteStream(namespace, table).whenComplete((res, throwable) -> { if ((throwable == null && res) || ((throwable instanceof NamespaceNotFoundException || throwable instanceof StreamNotFoundException))) { log.info("{}/{} table deleted successfully", namespace, table); } else { if (throwable != null) { log.error("{}/{} table deletion failed {} but moving on", namespace, table, throwable); } else { log.error("{}/{} table deletion failed but moving on", namespace, table); } } }); } }
So this proposal will add a cleanup
method to the StateStoreProvider
and call it after a function is deleted:
worker().getStateStoreProvider().cleanUp(tenant, namespace, hashName);
Add a new init
method to StateStoreProvider
interface:
The current init
method requires a FunctionDetails
parameter, but we cannot get the FunctionDetails
in the ComponentImpl
class, and this parameter is not used either in BKStateStoreProviderImpl
or in PulsarMetadataStateStoreProviderImpl
, but for backward compatibility, instead of updating the init
method, this proposal will add a new init
method without FunctionDetails
parameter:
default void init(Map<String, Object> config) throws Exception {}
None
Nothing needs to be done.