Transaction buffer stores aborted transaction IDs to filter messages which are aborted. In order to recover, the Transaction buffer will take snapshots to store the aborted transaction IDs in the bookkeeper, but the size of aborted transaction IDs is not limited. When the size of aborted transaction IDs is bigger than the size that a bookkeeper entry can store, the Transaction buffer needs to store multiple-snapshot into multiple entries to store aborted transaction IDs.
Due to compression and incomplete sending, there are some challenges to achieve multiple-snapshot.
The first snapshot is taken when new a producer to send message, So there must be a snasphot with key (topicName-end) which has maxReadPosition to recover.
When transaction IDs are sorted by the position of the aborted marker and transaction IDs have not been deleted from aborts, the txn IDs stored in snapshots are the same for the snapshot same key (Exclude key topicName-end).
Because it is deleted in the order of the position of the aborted marker, no message will be lost when compressing with the new snapshot. There always is a valid maxReadPsoition that can be used to recover. As you can see in the figure below, the ledger where txn025 is located has been deleted, and the corresponding txn025 have also been removed from aborts. But this does not affect the information in the snapshot.
handleSnapshot
public void handleSnapshot(TransactionBufferSnapshot snapshot) { PositionImpl newMaxReadPosition = PositionImpl.get(snapshot.getMaxReadPositionLedgerId(), snapshot.getMaxReadPositionEntryId()); if (newMaxReadPosition.compareTo(maxReadPosition) > 0) { maxReadPosition = newMaxReadPosition; } if (snapshot.getAborts() != null) { snapshot.getAborts().forEach(abortTxnMetadata -> aborts.put(new TxnID(abortTxnMetadata.getTxnIdMostBits(), abortTxnMetadata.getTxnIdLeastBits()), PositionImpl.get(abortTxnMetadata.getLedgerId(), abortTxnMetadata.getEntryId()))); } }
takeSnapshot
private CompletableFuture<Void> takeSnapshot() { changeMaxReadPositionAndAddAbortTimes.set(0); return takeSnapshotWriter.thenCompose(writer -> { TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot(); ArrayList<TransactionBufferSnapshot> snapshots = new ArrayList<>(); synchronized (TopicTransactionBuffer.this) { List<AbortTxnMetadata> list = new ArrayList<>(); aborts.forEach((k, v) -> { AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata(); abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits()); abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits()); abortTxnMetadata.setLedgerId(v.getLedgerId()); abortTxnMetadata.setEntryId(v.getEntryId()); list.add(abortTxnMetadata); }); while (list.size() > maxSize) { List<AbortTxnMetadata> newList = new ArrayList<>(); while (newList.size() < maxSzie) { newList.add(list.remove(0)); } snapshot.setAborts(newList); snapshot.setTopicName(topic.getName()); snapshots.add(snapshot); snapshot = new TransactionBufferSnapshot(); } snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId()); snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId()); snapshot.setAborts(list); snapshot.setTopicName(topic.getName()); snapshots.add(snapshot); } List <CompletableFuture<Void>> completableFutures = new LinkedList<>(); snapshots.forEach(snapshot_ -> { completableFutures.add(writer.writeAsync(snapshot_).thenAccept(messageId-> { this.lastSnapshotTimestamps = System.currentTimeMillis(); if (log.isDebugEnabled()) { log.debug("[{}]Transaction buffer take snapshot success! " + "messageId : {}", topic.getName(), messageId); } }).exceptionally(e -> { log.warn("[{}]Transaction buffer take snapshot fail! ", topic.getName(), e); return null; })); }); return FutureUtil.waitForAll(completableFutures); });
Add a snapshotEntryCounts field for each transactionBufferSnapshot. For the normal transactionBufferSnapshot, snapshotEntryCount will be set to 1; for the multiple-snapshot, snapshotEntryCount will be set to the number of entries to store the snapshot.
public class TransactionBufferSnapshot { private String topicName; private long maxReadPositionLedgerId; private long maxReadPositionEntryId; private long snapshotEntryCount; private List<AbortTxnMetadata> aborts; }
For the multiple-snapshot, we only write the data of aborts and maxRead Position in the front entries without setting topicName . Only set topicName in the last entry. When the reader reads TopicName = null, it means the beginning of a multiple-snapshot, and read topicName! =null is the end of this multiple-snapshot.
interface Writer<T> { /** * Write event to the system topic. * @param t pulsar event * @param topic the topicName for the pulsar event * @return message id * @throws PulsarClientException exception while write event cause */ MessageId write(T t, String Topic) throws PulsarClientException; /** * Async write event to the system topic. * @param t pulsar event * @param topic the topicName for the pulsar event * @return message id future */ CompletableFuture<MessageId> writeAsync(T t, String topic);
TransactionBufferSnapshotWriter public CompletableFuture<MessageId> writeAsync(TransactionBufferSnapshot transactionBufferSnapshot, String topicName) { return producer.newMessage().key(topicName) .value(transactionBufferSnapshot).sendAsync(); }