discuss mail-thread: https://lists.apache.org/thread/o9k7trfmxrz89b0woybnshonpkq8ybw1
The motivation is the same as PIP-63, with a new broadcast use case of supporting 100K subscriptions in a single topic.
However, it's too complicated to implement with original PIP-63 proposal, the changed code is already over 3K+ lines, see #11960, and there are still some problems left,
This PIP tries to come up with a simpler solution to support readonly topic ownership and solve the problems the previous PR left. The main idea of this solution is to reuse the feature of geo-replication, but instead of duplicating storage, it shares underlying bookie ledgers between different topics.
The goal is to introduce Shadow Topic as a new type of topic to support readonly topic ownership. Just as its name implies, a shadow topic is the shadow of some normal persistent topic (let's call it source topic here). The source topic and the shadow topic must have the same number of partitions or both non-partitioned. Multiply shadow topics can be created from a source topic.
Shadow topic shares the underlying bookie ledgers from its source topic. User can‘t produce any messages to shadow topic directly and shadow topic don’t create any new ledger for messages, all messages in shadow topic come from source topic.
Shadow topic have its own subscriptions and don't share with its source topic. This means the shadow topic have its own cursor ledger to store persistent mark-delete info for each persistent subscriptions.
The message sync procedure of shadow topic is supported by shadow replication, which is very like geo-replication, with these difference:
message_id
in CommandSend for replicator.message CommandSend { // ... old fields. // Message id of this message, currently is used in replicator for shadow topic. optional MessageIdData message_id = 9; }
org.apache.pulsar.client.admin.Topics
.void createShadowTopic(String sourceTopicName, String shadowTopicName); void deleteShadowTopic(String sourceTopicName, String shadowTopicName); List<String> admin.topics().getShadowTopics(String sourceTopicName); //And their async version methods.
This requires new REST interfaces in admin server, where
PATH = "/{tenant}/{namespace}/{topic}/shadowTopics"; METHOD = POST/DELETE/GET;
int shadowReplicatorAutoResetBacklogEntries = 0;
Once backlog entry number exceeded this threshold, the shadow replicator will reset the cursor to LATEST automatically.
There are some key changes for implementation:
This part is mostly implemented by ShadowReplicator
, which extends PersistentReplicator
introduced in geo-replication. The shadow topic list is added as a new topic policy of the source topic. Source topic manage the lifecycle of all the replicators. The key is to add message_id
when produce message to shadow topics.
This part is mostly implemented by ShadowManagedLedger
, which extends current ManagedLedgerImpl
with two key override methods.
initialize(..)
a. Fetch ManagedLedgerInfo of source topic instead of current shadow topic. The source topic name is stored in the topic policy of the shadow topic. b. Open the last ledger and read the explicit LAC from bookie, instead of creating new ledger. Reading LAC here requires that the source topic must enable explicit LAC feature by set bookkeeperExplicitLacIntervalInMills
to non-zero value in broker.conf. c. Do not start checkLedgerRollTask, which tries roll over ledger periodicallyinternalAsyncAddEntry()
Instead of write entry data to bookie, It only update metadata of ledgers, like currentLedger
, lastConfirmedEntry
and put the replicated message into EntryCache
.Besides, some other problems need to be taken care of.
LedgerHandle
won't updated in time, so we need refresh LAC when a managed cursor requests entries beyond known LAC.Schema is part of a topic‘s metadata. So shadow topic won’t have it's own schema, but it shares the schema info of source topic.
For consumers, we need to support GetSchema
command for shadow topic, and there are two interface for this.
Binary protocol, which handles in CommandGetSchema
in ServerCnx#handleGetSchema
. We only need to replace the requested shadow topic 's schemaName
to the schemaName
of source topic, and the underlying read operation is supported by SchemaRegistry#getSchema(String, SchemaVersion)
.
HTTP protocol, which handles in SchemasResource#getSchema(...)
. Similar with the approach in binary protocol, replace the schemaId
with source topic in SchemasResourceBase#getSchemaId
.
For admins, we can support other “read” ops besides getSchema
, including getAllSchemas
and getVersionBySchema
, which all can be supported by the same way as getSchema
.
Truncation, from command bin/pulsar-admin topics truncate source-topic
. For source topic truncation, nothing changes. It still move all cursors to the end of the topic and delete all inactive ledgers. As shadow topic will watch ManagedLedgerInfo
in metadata store, once it knows ledgers deleted, all cursors will skip all deleted ledgers.
Deletion, from command bin/pulsar-admin topics delete source-topic
. Like geo-replication, topic deletion is forbidden if topic have shadow replicators, users have to delete shadow topics first.
As shadow topic is usually in another namespace, it would have its own independent permission settings, and we can configure different permissions for source topic and shadow topic. So there would be no guarantee that you are allowed to consume shadow topic if you have permission to consume source topic.
On the other hand, we uses topic policy to store shadow topic settings, so a new policy permission item needs be added as PolicyName.SHADOW_TOPIC, and user must have PolicyOperation.WRITE to this policy to create/delete shadow topics.
Offloading a ledger is a kind of writing operation to topic‘s metadata, so shadow topic can’t offload ledgers to other long term storage. However, for ledgers thats are already offloaded by source topic, it's expected to support reading from offload ledgers in shadow topic, just like read from source topic.
The implementation depends on shadow topic watching ManagedLedgerInfo
in metadata store, and if LedgerInfo.offloadContext is updated by source topic offloader, shadow topic can get fully information to get a readHandle from ledgerOffload. And of course, the pre-condition is the shadow topic must have the same offload driver settings.
The updates on partition number will be synced to the shadow topic. A source topic or partition will be responsible for the creation and deletion of its corresponding shadow topic partitions.
For topic stats on source topic, as shadow replicator will reuse most of current PersistentReplicator, the ReplicatorStatsImpl also can be applied to shadow replicators. And we need to add a new field in TopicStatsImpl
like geo-replication:
Map<String /*shadow topic name*/, ReplicatorStatsImpl> shadowReplication;
As for topic stats on shadow topic, previous TopicStatsImpl
still applies. And I don't see any other stats need to be added at this point.
See PIP-63.