| |
| # PIP-393: Improve performance of Negative Acknowledgement |
| |
| # Background knowledge |
| |
| Negative Acknowledgement is a feature in Pulsar that allows consumers to trigger the redelivery |
| of a message after some time when they fail to process it. When user calls `negativeAcknowledge` method, |
| `NegativeAcksTracker` in `ConsumerImpl` will add an entry into the map `NegativeAcksTracker.nackedMessages`, |
| mapping the message ID to the redelivery time. When the redelivery time comes, `NegativeAcksTracker` will |
| send a redelivery request to the broker to redeliver the message. |
| |
| # Motivation |
| |
| There are many issues with the current implementation of Negative Acknowledgement in Pulsar: |
| - the memory occupation is high. |
| - the code execution efficiency is low. |
| - the redelivery time is not accurate. |
| - multiple negative ack for messages in the same entry(batch) will interfere with each other. |
| All of these problem is severe and need to be solved. |
| |
| ## Memory occupation is high |
| After the improvement of https://github.com/apache/pulsar/pull/23582, we have reduce half more memory occupation |
| of `NegativeAcksTracker` by replacing `HashMap` with `ConcurrentLongLongPairHashMap`. With 1 million entry, the memory |
| occupation decrease from 178MB to 64MB. With 10 million entry, the memory occupation decrease from 1132MB to 512MB. |
| The average memory occupation of each entry decrease from 1132MB/10000000=118byte to 512MB/10000000=53byte. |
| |
| But it is not enough. Assuming that we negative ack message 10k/s, assigning 1h redelivery delay for each message, |
| the memory occupation of `NegativeAcksTracker` will be `3600*10000*53/1024/1024/1024=1.77GB`, if the delay is 5h, |
| the required memory is `3600*10000*53/1024/1024/1024*5=8.88GB`, which increase too fast. |
| |
| ## Code execution efficiency is low |
| Currently, each time the timer task is triggered, it will iterate all the entries in `NegativeAcksTracker.nackedMessages`, |
| which is unnecessary. We can sort entries by timestamp and only iterate the entries that need to be redelivered. |
| |
| ## Redelivery time is not accurate |
| Currently, the redelivery check time is controlled by the `timerIntervalNanos`, which is 1/3 of the `negativeAckRedeliveryDelay`. |
| That means, if the `negativeAckRedeliveryDelay` is 1h, check task will be started every 20min, the deviation of the redelivery |
| time is 20min, which is unacceptable. |
| |
| ## Multiple negative ack for messages in the same entry(batch) will interfere with each other |
| Currently, `NegativeAcksTracker#nackedMessages` map `(ledgerId, entryId)` to `timestamp`, which means multiple nacks from messages |
| in the same batch share single one timestamp. |
| If we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, these two messages are delivered 20s later together. |
| msg1 will not be redelivered 10s later as the timestamp recorded in `NegativeAcksTracker#nackedMessages` is overrode by the second |
| nack call. |
| |
| |
| # Goals |
| |
| Refactor the `NegativeAcksTracker` to solve the above problems. |
| |
| To avoid interation of all entries in `NegativeAcksTracker.nackedMessages`, we use a sorted map to store the entries. |
| To reduce memory occupation, we use util class provided by fastutil(https://fastutil.di.unimi.it/docs/), and design |
| a new algorithm to store the entries, reduce the memory occupation to even 1% less than the current implementation. |
| (the actual effect rely on the configuration and the throughput). |
| |
| # Detailed Design |
| |
| ## Design & Implementation Details |
| |
| ### New Data Structure |
| Use following data structure to store the entries: |
| ```java |
| Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> nackedMessages = new Long2ObjectAVLTreeMap<>(); |
| ``` |
| mapping `timestamp -> ledgerId -> entryId`. |
| We need to sort timestamp in ascending order, so we use a sorted map to map timestamp to `ledgerId -> entryId` map. |
| As there will many entries in the map, we use `Long2ObjectAVLTreeMap` instead of `Long2ObjectRBTreeMap`. |
| As for the inner map, we use `Long2ObjectMap` to map `ledgerId` to `entryId` because we don't need to keep the order of `ledgerId`. |
| `Long2ObjectOpenHashMap` will be satisfied. |
| All entry id for the same ledger id will be stored in a bit set, as we only care about the existence of the entry id. |
| |
| |
| ### TimeStamp Bucket |
| Timestamp in ms is used as the key of the map. As most of the use cases don't require that the precision of the delay time is 1ms, |
| we can make the timestamp bucketed, that is, we can trim the lower bit of the timestamp to map the timestamp to a bucket. |
| For example, if we trim the lower 1 bit of the timestamp, the timestamp 0b1000 and 0b1001 will be mapped to the same bucket 0b1000. |
| Then all messages in the same bucket will be redelivered at the same time. |
| If user can accept 1024ms deviation of the redelivery time, we can trim the lower 10 bits of the timestamp, which can group a lot |
| entries into the same bucket and reduce the memory occupation. |
| |
| following code snippet will be helpful to understand the design: |
| ```java |
| static long trimLowerBit(long timestamp, int bits) { |
| return timestamp & (-1L << bits); |
| } |
| ``` |
| |
| ```java |
| Long2ObjectSortedMap<Long2ObjectMap<Roaring64Bitmap>> map = new Long2ObjectAVLTreeMap<>(); |
| Long2ObjectMap<LongSet> ledgerMap = new Long2ObjectOpenHashMap<>(); |
| LongSet entrySet = new LongOpenHashSet(); |
| entrySet.add(entryId); |
| ledgerMap.put(ledgerId, entrySet); |
| map.put(timestamp, ledgerMap); |
| ``` |
| |
| ### Configuration |
| |
| Add a new configuration `negativeAckPrecisionBitCnt` to control the precision of the redelivery time. |
| ``` |
| @ApiModelProperty( |
| name = "negativeAckPrecisionBitCnt", |
| value = "The redelivery time precision bit count. The lower bits of the redelivery time will be\n" + |
| "trimmed to reduce the memory occupation. The default value is 8, which means the redelivery time\n" + |
| "will be bucketed by 256ms. In worst cases, the redelivery time will be 512ms earlier(no later)\n" + |
| "than the expected time. If the value is 0, the redelivery time will be accurate to ms.". |
| ) |
| private long negativeAckPrecisionBitCnt = 8; |
| ``` |
| The higher the value, the more entries will be grouped into the same bucket, the less memory occupation, the less accurate the redelivery time. |
| Default value is 8, which means the redelivery time will be bucketed by 256ms. In worst cases, the redelivery time will be 512ms earlier(no later) |
| than the expected time. |
| |
| |
| ## Space complexity analysis |
| ### Space complexity of `ConcurrentLongLongPairHashMap` |
| Before analyzing the new data structure, we need to know how much space it take before this pip. |
| |
| We need to store 4 long field for `(ledgerId, entryId, partitionIndex, timestamp)` for each entry, which takes `4*8=32byte`. |
| As `ConcurrentLongLongPairHashMap` use open hash addressing and linear probe to handle hash conflict, there are some |
| redundant spaces to avoid high conflict rate. There are two configurations that control how much redundant space to reserver: |
| `fill factor` and `idle factor`. When the space utility rate soar high to `fill factor`, the size of backing array will |
| be double, when the space utility rate reduce to `idle factor`, the size of backing array will reduce by half. |
| |
| The default value of `fill factor` is 0.66, `idle factor` is 0.15, which means the min space occupation of |
| `ConcurrentLongLongPairHashMap` is `32/0.66N byte = 48N byte`, the max space occupation is `32/0.15N byte=213N byte`, |
| where N is the number of entries. |
| |
| In the experiment showed in the PR, there are 1 million entries in the map, taking up `32*1000000/1024/1024byte=30MB`, |
| the space utility rate is 30/64=0.46, in the range of `[0.15, 0.66]`. |
| |
| |
| ### Space complexity of the new data structure |
| The space used by new data structure is related to several factors: `message rate`, `the time deviation user accepted`, |
| `the max entries written in one ledger`. |
| - Pulsar conf `managedLedgerMaxEntriesPerLedger=50000` determine the max entries can be written into one ledger, |
| we use the default value to analyze. |
| - `the time deviation user accepted`: when user accept 1024ms delivery time deviation, we can trim the lower 10 bit |
| of the timestamp in ms, which can bucket 1024 timestamp. |
| |
| Following we will analyze the space used by one bucket, and calculate the average space used by one entry. |
| |
| Assuming that the message rate is `x msg/ms`, and we trim `y bit` of the timestamp, one bucket will contains `2**x` ms, and |
| `M=2**x*y` msgs. |
| - For one single bucket, we only need to store one timestamp, which takes `8byte`. |
| - Then, we need to store the ledgerId, when M is greater than 5w(`managedLedgerMaxEntriesPerLedger`), the ledger will switch. |
| There are `L=ceil(M/50000)` ledgers, which take `8*L` byte. |
| - Further, we analyze how much space the entry id takes. As there are `L=ceil(M/50000)` ledgers, there will be `L` bitmap to store, |
| which take `L*size(bitmap)`. The total space consumed by new data structure is `8byte + 8L byte + L*size(bitmap)`. |
| |
| As the `size(bitmap)` is far more greater than `8byte`, we can ignore the first two items. Then we get the formular of space |
| consumed **one bucket**: `D=L*size(bitmap)=ceil(M/50000)*size(bitmap)`. |
| |
| Entry id is stored in a `Roaring64Bitmap`, for simplicity we can replace it with `RoaringBitmap`, as the max entry id is 49999, |
| which is smaller than `4294967296 (2 * Integer.MAX_VALUE)`(the max value can be stored in `RoaringBitmap`). The space consume |
| by `RoaringBitmap` depends on how many elements it contains, when the size of bitmap < 4096, the space is `4N byte`, |
| when the size of bitmap > 4096, the consumed space is a fixed value `8KB`. |
| |
| Then we get the final result: |
| - when M>50000, `D = ceil(M/50000)*size(bitmap) ~= M/50000 * 8KB = M/50000 * 8 * 1024 byte = 0.163M byte`, |
| each entry takes `0.163byte` by average. |
| - when 4096<M<50000, `D = ceil(M/50000)*size(bitmap) = 1 * 8KB = 8KB`, each entry takes `8*1024/M=8192/M byte` by average. |
| - when M<4096, `D = ceil(M/50000)*size(bitmap) = 1 * 4M byte = 4M byte`, each entry take `4 byte` by average. |
| |
| ### Conclusion |
| Assuming N is the number of entries, M is the number of messages in one bucket. |
| - `ConcurrentLongLongPairHashMap`: `48N` byte in best case, `213N byte` in worst case. |
| - New data structure: |
| - when M>50000, `0.163N byte`. |
| - when 4096<M<50000, `8192/M * N byte` . |
| - when M<4096, `4N byte`. |
| |
| Some experiment results are showed in the PR, we can fine tune the configuration to get the best performance. |
| |
| ## Effect |
| |
| ### Memory occupation is high |
| With such kind of design, we can reduce the memory occupation of `NegativeAcksTracker` to 1% less than the current implementation. |
| |
| ### Code execution efficiency is low |
| With the new design, we can avoid the iteration of all entries in `NegativeAcksTracker.nackedMessages`, and only iterate the entries |
| that need to be redelivered. |
| |
| ### Redelivery time is not accurate |
| With the new design, we avoid the fixed interval of the redelivery check time. We can control the precision of the redelivery time |
| by trimming the lower bits of the timestamp. If user can accept 1024ms deviation of the redelivery time, we can trim the lower |
| 10 bits of the timestamp, which can group a lot |
| |
| ### Multiple negative ack for messages in the same entry(batch) will interfere with each other |
| With the new design, if we let msg1 redelivered 10s later, then let msg2 redelivered 20s later, these two nacks will not interfere |
| with each other, as they are stored in different buckets. |
| |
| |
| ## High-Level Design |
| As this pip introduce new dependency `fastutil` into client, which is very large(23MB), while few classes are used, we need to |
| reduce the size of the dependency. |
| |
| Though there is alternative dependency `fastutil-core`, which is smaller(6MB), but it is also |
| relatively large and using `fastutil-core` will introduce another problem on the broker side since there's already `fastutil` jar |
| which also includes `fastutil-core` jar classes. |
| |
| The optimal solution would be to include only the classes from fastutil into the shaded pulsar-client and pulsar-client-all |
| which are really used and needed. This could be achieved in many ways. One possible solution is to introduce an intermediate |
| module for shaded pulsar-client and pulsar-client-all that isn't published to maven central at all. |
| It would be used to minimize and include only the classes from fastutil which are required by pulsar-client shading. |
| |
| |
| |
| # Backward & Forward Compatibility |
| |
| ## Upgrade |
| |
| User can upgrade to the new version without any compatibility issue. |
| |
| ## Downgrade / Rollback |
| |
| User can downgrade to the old version without any compatibility issue. |
| |
| # Links |
| |
| <!-- |
| Updated afterwards |
| --> |
| * Mailing List discussion thread: https://lists.apache.org/thread/yojl7ylk7cyjxktq3cn8849hvmyv0fg8 |
| * Mailing List voting thread: https://lists.apache.org/thread/hyc1r2s9chowdhck53lq07tznopt50dy |