PIP-393: Improve performance of Negative Acknowledgement

Background knowledge

Negative Acknowledgement is a feature in Pulsar that allows consumers to trigger the redelivery of a message after some time when they fail to process it. When user calls negativeAcknowledge method, NegativeAcksTracker in ConsumerImpl will add an entry into the map NegativeAcksTracker.nackedMessages, mapping the message ID to the redelivery time. When the redelivery time comes, NegativeAcksTracker will send a redelivery request to the broker to redeliver the message.

Motivation

There are many issues with the current implementation of Negative Acknowledgement in Pulsar:

  • the memory occupation is high.
  • the code execution efficiency is low.
  • the redelivery time is not accurate.
  • multiple negative ack for messages in the same entry(batch) will interfere with each other. All of these problem is severe and need to be solved.

Memory occupation is high

After the improvement of https://github.com/apache/pulsar/pull/23582, we have reduce half more memory occupation of NegativeAcksTracker by replacing HashMap with ConcurrentLongLongPairHashMap. With 1 million entry, the memory occupation decrease from 178MB to 64MB. With 10 million entry, the memory occupation decrease from 1132MB to 512MB. The average memory occupation of each entry decrease from 1132MB/10000000=118byte to 512MB/10000000=53byte.

But it is not enough. Assuming that we negative ack message 10k/s, assigning 1h redelivery delay for each message, the memory occupation of NegativeAcksTracker will be 3600*10000*53/1024/1024/1024=1.77GB, if the delay is 5h, the required memory is 3600*10000*53/1024/1024/1024*5=8.88GB, which increase too fast.

Code execution efficiency is low

Currently, each time the timer task is triggered, it will iterate all the entries in NegativeAcksTracker.nackedMessages, which is unnecessary. We can sort entries by timestamp and only iterate the entries that need to be redelivered.

Redelivery time is not accurate

Currently, the redelivery check time is controlled by the timerIntervalNanos, which is 1/3 of the negativeAckRedeliveryDelay. That means, if the negativeAckRedeliveryDelay is 1h, check task will be started every 20min, the deviation of the redelivery time is 20min, which is unacceptable.

Multiple negative ack for messages in the same entry(batch) will interfere with each other

Currently, NegativeAcksTracker#nackedMessages map (ledgerId, entryId) to timestamp, which means multiple nacks from messages in the same batch share single one timestamp. If we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, these two messages are delivered 20s later together. msg1 will not be redelivered 10s later as the timestamp recorded in NegativeAcksTracker#nackedMessages is overrode by the second nack call.

Goals

Refactor the NegativeAcksTracker to solve the above problems.

To avoid interation of all entries in NegativeAcksTracker.nackedMessages, we use a sorted map to store the entries. To reduce memory occupation, we use util class provided by fastutil(https://fastutil.di.unimi.it/docs/), and design a new algorithm to store the entries, reduce the memory occupation to even 1% less than the current implementation. (the actual effect rely on the configuration and the throughput).

Detailed Design

Design & Implementation Details

New Data Structure

Use following data structure to store the entries:

Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = new Long2ObjectAVLTreeMap<>();

mapping timestamp -> ledgerId -> entryId. We need to sort timestamp in ascending order, so we use a sorted map to map timestamp to ledgerId -> entryId map. As there will many entries in the map, we use Long2ObjectAVLTreeMap instead of Long2ObjectRBTreeMap. As for the inner map, we use Long2ObjectMap to map ledgerId to entryId because we don't need to keep the order of ledgerId. Long2ObjectOpenHashMap will be satisfied. All entry id for the same ledger id will be stored in a bit set, as we only care about the existence of the entry id.

TimeStamp Bucket

Timestamp in ms is used as the key of the map. As most of the use cases don't require that the precision of the delay time is 1ms, we can make the timestamp bucketed, that is, we can trim the lower bit of the timestamp to map the timestamp to a bucket. For example, if we trim the lower 1 bit of the timestamp, the timestamp 0b1000 and 0b1001 will be mapped to the same bucket 0b1000. Then all messages in the same bucket will be redelivered at the same time. If user can accept 1024ms deviation of the redelivery time, we can trim the lower 10 bits of the timestamp, which can group a lot entries into the same bucket and reduce the memory occupation.

following code snippet will be helpful to understand the design:

    static long trimLowerBit(long timestamp, int bits) {
        return timestamp & (-1L << bits);
    }
Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> map = new Long2ObjectAVLTreeMap<>();
Long2ObjectMap<LongSet> ledgerMap = new Long2ObjectOpenHashMap<>();
LongSet entrySet = new LongOpenHashSet();
entrySet.add(entryId);
ledgerMap.put(ledgerId, entrySet);
map.put(timestamp, ledgerMap);

Configuration

Add a new configuration negativeAckPrecisionBitCnt to control the precision of the redelivery time.

@ApiModelProperty(
            name = "negativeAckPrecisionBitCnt",
            value = "The redelivery time precision bit count. The lower bits of the redelivery time will be\n" +
                "trimmed to reduce the memory occupation. The default value is 8, which means the redelivery time\n" +
                "will be bucketed by 256ms. In worst cases, the redelivery time will be 512ms earlier(no later)\n" +
                "than the expected time. If the value is 0, the redelivery time will be accurate to ms.".
    )
    private long negativeAckPrecisionBitCnt = 8;

The higher the value, the more entries will be grouped into the same bucket, the less memory occupation, the less accurate the redelivery time. Default value is 8, which means the redelivery time will be bucketed by 256ms. In worst cases, the redelivery time will be 512ms earlier(no later) than the expected time.

Space complexity analysis

Space complexity of ConcurrentLongLongPairHashMap

Before analyzing the new data structure, we need to know how much space it take before this pip.

We need to store 4 long field for (ledgerId, entryId, partitionIndex, timestamp) for each entry, which takes 4*8=32byte. As ConcurrentLongLongPairHashMap use open hash addressing and linear probe to handle hash conflict, there are some redundant spaces to avoid high conflict rate. There are two configurations that control how much redundant space to reserver: fill factor and idle factor. When the space utility rate soar high to fill factor, the size of backing array will be double, when the space utility rate reduce to idle factor, the size of backing array will reduce by half.

The default value of fill factor is 0.66, idle factor is 0.15, which means the min space occupation of ConcurrentLongLongPairHashMap is 32/0.66N byte = 48N byte, the max space occupation is 32/0.15N byte=213N byte, where N is the number of entries.

In the experiment showed in the PR, there are 1 million entries in the map, taking up 32*1000000/1024/1024byte=30MB, the space utility rate is 30/64=0.46, in the range of [0.15, 0.66].

Space complexity of the new data structure

The space used by new data structure is related to several factors: message rate, the time deviation user accepted, the max entries written in one ledger.

  • Pulsar conf managedLedgerMaxEntriesPerLedger=50000 determine the max entries can be written into one ledger, we use the default value to analyze.
  • the time deviation user accepted: when user accept 1024ms delivery time deviation, we can trim the lower 10 bit of the timestamp in ms, which can bucket 1024 timestamp.

Following we will analyze the space used by one bucket, and calculate the average space used by one entry.

Assuming that the message rate is x msg/ms, and we trim y bit of the timestamp, one bucket will contains 2**x ms, and M=2**x*y msgs.

  • For one single bucket, we only need to store one timestamp, which takes 8byte.
  • Then, we need to store the ledgerId, when M is greater than 5w(managedLedgerMaxEntriesPerLedger), the ledger will switch. There are L=ceil(M/50000) ledgers, which take 8*L byte.
  • Further, we analyze how much space the entry id takes. As there are L=ceil(M/50000) ledgers, there will be L bitmap to store, which take L*size(bitmap). The total space consumed by new data structure is 8byte + 8L byte + L*size(bitmap).

As the size(bitmap) is far more greater than 8byte, we can ignore the first two items. Then we get the formular of space consumed one bucket: D=L*size(bitmap)=ceil(M/50000)*size(bitmap).

Entry id is stored in a Roaring64Bitmap, for simplicity we can replace it with RoaringBitmap, as the max entry id is 49999, which is smaller than 4294967296 (2 * Integer.MAX_VALUE)(the max value can be stored in RoaringBitmap). The space consume by RoaringBitmap depends on how many elements it contains, when the size of bitmap < 4096, the space is 4N byte, when the size of bitmap > 4096, the consumed space is a fixed value 8KB.

Then we get the final result:

  • when M>50000, D = ceil(M/50000)*size(bitmap) ~= M/50000 * 8KB = M/50000 * 8 * 1024 byte = 0.163M byte, each entry takes 0.163byte by average.
  • when 4096<M<50000, D = ceil(M/50000)*size(bitmap) = 1 * 8KB = 8KB, each entry takes 8*1024/M=8192/M byte by average.
  • when M<4096, D = ceil(M/50000)*size(bitmap) = 1 * 4M byte = 4M byte, each entry take 4 byte by average.

Conclusion

Assuming N is the number of entries, M is the number of messages in one bucket.

  • ConcurrentLongLongPairHashMap: 48N byte in best case, 213N byte in worst case.
  • New data structure:
    • when M>50000, 0.163N byte.
    • when 4096<M<50000, 8192/M * N byte .
    • when M<4096, 4N byte.

Some experiment results are showed in the PR, we can fine tune the configuration to get the best performance.

Effect

Memory occupation is high

With such kind of design, we can reduce the memory occupation of NegativeAcksTracker to 1% less than the current implementation.

Code execution efficiency is low

With the new design, we can avoid the iteration of all entries in NegativeAcksTracker.nackedMessages, and only iterate the entries that need to be redelivered.

Redelivery time is not accurate

With the new design, we avoid the fixed interval of the redelivery check time. We can control the precision of the redelivery time by trimming the lower bits of the timestamp. If user can accept 1024ms deviation of the redelivery time, we can trim the lower 10 bits of the timestamp, which can group a lot

Multiple negative ack for messages in the same entry(batch) will interfere with each other

With the new design, if we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, these two nacks will not interfere with each other, as they are stored in different buckets.

High-Level Design

As this pip introduce new dependency fastutil into client, which is very large(23MB), while few classes are used, we need to reduce the size of the dependency.

Though there is alternative dependency fastutil-core, which is smaller(6MB), but it is also relatively large and using fastutil-core will introduce another problem on the broker side since there's already fastutil jar which also includes fastutil-core jar classes.

The optimal solution would be to include only the classes from fastutil into the shaded pulsar-client and pulsar-client-all which are really used and needed. This could be achieved in many ways. One possible solution is to introduce an intermediate module for shaded pulsar-client and pulsar-client-all that isn't published to maven central at all. It would be used to minimize and include only the classes from fastutil which are required by pulsar-client shading.

Backward & Forward Compatibility

Upgrade

User can upgrade to the new version without any compatibility issue.

Downgrade / Rollback

User can downgrade to the old version without any compatibility issue.

Links