Background knowledge

Apache Pulsar is a distributed messaging system that supports multiple messaging protocols and storage methods. Among them, Pulsar Topic Compaction provides a key-based data retention mechanism that allows you only to keep the most recent message associated with that key to reduce storage space and improve system efficiency.

Another Pulsar's internal use case, the Topic Compaction of the new load balancer, changed the strategy of compaction. It only keeps the first value of the key. For more detail, see PIP-215.

More topic compaction details can be found in Pulsar Topic Compaction.

Motivation

Currently, the implementation of Pulsar Topic Compaction is fixed and does not support customized strategy, which limits users from using more Compactor policies in their applications.

For example, current topic compaction can work with pulsar format data in KoP, but it can‘t work with Kafka format data since the data written to the entry is in Kafka format. The Pulsar compactor doesn’t aware of Kafka format data. And it doesn't make sense to support Kafka format data handling in Pulsar. We need to implement a pluggable compactor to support Kafka format data handling in KoP.

Another long-term consideration is that we may need to support writing the compacted data anywhere, S3, in columnar format, or even partitioning.

So we need to make the whole topic compaction service (including Write API & Read API) pluggable to support more customize compaction service implementation.

Goals

In Scope

  • Abstract topic compaction service interface and support topic compaction service pluggable.

  • Migrate the current implementation to a new interface implementation.

  • Makes existing tests compatible with new implementations.

Out of Scope

  • For CompactorMetrics, keep the current implementation and don't define related methods in the topic compaction service interface. In the future, it will use the Otel interface or other metrics API instead.

  • For StrategicTwoPhaseCompactor, it‘s out of the scope for regular compaction. It’s only used for the load balancer. So it won't change.

High Level Design

To make the whole topic compaction service pluggable, we need to abstract TopicCompactionService interface, it can provide the capability that the compactor has and provide the read API to read entries from compacted data.

We should combine CompactedTopicImpl and TwoPhaseCompactor to the Pulsar implementation of the topic compaction service and make behavior with the current implementation consistent.

Class Diagram of core class:

classDiagram
    direction BT
    class CompactedTopic {
    <<Interface>>
    + deleteCompactedLedger(long) CompletableFuture~Void~
    + getCompactionHorizon() Optional~Position~
    + newCompactedLedger(Position, long) CompletableFuture~CompactedTopicContext~
    + asyncReadEntriesOrWait(ManagedCursor, int, boolean, ReadEntriesCallback, Consumer) void
    + readLastEntryOfCompactedLedger() CompletableFuture~Entry~
    }
    class CompactedTopicImpl {
    + newCompactedLedger(Position, long) CompletableFuture~CompactedTopicContext~
    + getCompactedTopicContext() Optional~CompactedTopicContext~
    + asyncReadEntriesOrWait(ManagedCursor, int, boolean, ReadEntriesCallback, Consumer) void
    + getCompactionHorizon() Optional~Position~
    + deleteCompactedLedger(long) CompletableFuture~Void~
    + getCompactedTopicContextFuture() CompletableFuture~CompactedTopicContext~
    + readLastEntryOfCompactedLedger() CompletableFuture~Entry~
    }
    class CompactionServiceFactory {
    <<Interface>>
    + newTopicCompactionService(String) CompletableFuture~TopicCompactionService~
    + initialize(PulsarService) CompletableFuture~Void~
    }
    class Compactor {
    + getStats() CompactorMXBean
    + compact(String) CompletableFuture~Long~
    }
    class PulsarCompactionServiceFactory {
    + getNullableCompactor() Compactor?
    + getCompactor() Compactor
    + newTopicCompactionService(String) CompletableFuture~TopicCompactionService~
    + initialize(PulsarService) CompletableFuture~Void~
    + close() void
    }
    class PulsarCompactorSubscription {
    + acknowledgeMessage(List~Position~, AckType, Map&lt;String, Long>) void
    }
    class PulsarTopicCompactionService {
    + compact() CompletableFuture~Void~
    + readCompactedEntries(Position, int) CompletableFuture~List~Entry~~
    + getCompactedLastPosition() CompletableFuture~Position~
    + readCompactedLastEntry() CompletableFuture~Entry~
    + getCompactedTopic() CompactedTopicImpl
    + close() void
    }
    class TopicCompactionService {
    <<Interface>>
    + compact() CompletableFuture~Void~
    + readCompactedEntries(Position, int) CompletableFuture~List~Entry~~
    + getCompactedLastPosition() CompletableFuture~Position~
    + readCompactedLastEntry() CompletableFuture~Entry~
    }
    class TwoPhaseCompactor

    CompactedTopicImpl  ..>  CompactedTopic 
    PulsarCompactionServiceFactory  ..>  CompactionServiceFactory 
    PulsarCompactionServiceFactory "1" *--> "compactor 1" Compactor 
    PulsarCompactionServiceFactory  ..>  PulsarTopicCompactionService : «create»
    PulsarCompactionServiceFactory  ..>  TwoPhaseCompactor : «create»
    PulsarCompactorSubscription "1" *--> "compactedTopic 1" CompactedTopic 
    PulsarTopicCompactionService  ..>  CompactedTopicImpl : «create»
    PulsarTopicCompactionService "1" *--> "compactedTopic 1" CompactedTopicImpl 
    PulsarTopicCompactionService  ..>  TopicCompactionService 
    TwoPhaseCompactor  -->  Compactor 

Detailed Design

Design & Implementation Details

  • Define a standard TopicCompactionService interface.

    import javax.annotation.Nonnull;
    
    public interface TopicCompactionService extends AutoCloseable {
        /**
         * Compact the topic.
         * Topic Compaction is a key-based retention mechanism. It keeps the most recent value for a given key and
         * user reads compacted data from TopicCompactionService.
         *
         * @return a future that will be completed when the compaction is done.
         */
        CompletableFuture<Void> compact();
    
        /**
         * Read the compacted entries from the TopicCompactionService.
         *
         * @param startPosition         the position to start reading from.
         * @param numberOfEntriesToRead the maximum number of entries to read.
         * @return a future that will be completed with the list of entries, this list can be null.
         */
        CompletableFuture<List<Entry>> readCompactedEntries(@Nonnull Position startPosition, int numberOfEntriesToRead);
    
        /**
         * Read the last compacted entry from the TopicCompactionService.
         *
         * @return a future that will be completed with the compacted last entry, this entry can be null.
         */
        CompletableFuture<Entry> readLastCompactedEntry();
    
        /**
         * Get the last compacted position from the TopicCompactionService.
         *
         * @return a future that will be completed with the last compacted position, this position can be null.
         */
        CompletableFuture<Position> getLastCompactedPosition();
    }
    
  • Define a standard CompactionServiceFactory interface to manage TopicCompactionService.

    public interface CompactionServiceFactory extends AutoCloseable {
    
      /**
       * Initialize the compaction service factory.
       *
       * @param pulsarService
       *            the pulsar service instance
       * @return a future represents the initialization result
       */
      CompletableFuture<Void> initialize(PulsarService pulsarService);
    
      /**
       * Create a new topic compaction service for topic.
       *
       * @param topic
       *            the topic name
       * @return a future represents the topic compaction service
       */
      CompletableFuture<TopicCompactionService> newTopicCompactionService(String topic);
    }
    
  • Implement PulsarCompactionServiceFactory and PulsarCompactionService

  • Combining CompactedTopicImpl and TwoPhaseCompactor to PulsarTopicCompactionService

  • Rename CompactorSubscription to PulsarCompactorSubscription, since it is only applicable to the implementation of Pulsar.

  • For CompactorMetrics: keep the current implementation. Currently, it only supports PulsarTopicCompactionService. In the future, it will use the Otel API or other metrics API instead, and customized TopicCompactedService should implement the Otel API or other metrics API.

  • Fix tests and makes them compatible with new implementations.

Public-facing Changes

Configuration

broker.conf

compactionServiceFactoryClassName=org.apache.pulsar.compaction.PulsarCompactionServiceFactory

Backward & Forward Compatability

Revert

Upgrade

Alternatives

  • Only make the compactor pluggable
  • Make the compaction data serializer and deserializer pluggable in the current Pulsar implementation.

But they will introduce some short-term configurations and interfaces, so they are not good for the long-term view of Pulsar. For a discussion of alternatives see: PIP-274

General Notes

Links