| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.transaction.buffer.impl; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.util.Timeout; |
| import io.netty.util.Timer; |
| import io.netty.util.TimerTask; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.LongAdder; |
| import lombok.SneakyThrows; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks; |
| import org.apache.bookkeeper.mledger.Entry; |
| import org.apache.bookkeeper.mledger.ManagedCursor; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException; |
| import org.apache.bookkeeper.mledger.Position; |
| import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; |
| import org.apache.bookkeeper.mledger.impl.PositionImpl; |
| import org.apache.commons.collections4.map.LinkedMap; |
| import org.apache.pulsar.broker.service.BrokerServiceException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.broker.systopic.SystemTopicClient; |
| import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; |
| import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader; |
| import org.apache.pulsar.broker.transaction.buffer.TransactionMeta; |
| import org.apache.pulsar.broker.transaction.buffer.matadata.AbortTxnMetadata; |
| import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot; |
| import org.apache.pulsar.client.api.Message; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.api.transaction.TxnID; |
| import org.apache.pulsar.common.api.proto.MessageMetadata; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.TransactionBufferStats; |
| import org.apache.pulsar.common.policies.data.TransactionInBufferStats; |
| import org.apache.pulsar.common.protocol.Commands; |
| import org.apache.pulsar.common.protocol.Markers; |
| import org.apache.pulsar.common.util.Codec; |
| import org.apache.pulsar.common.util.RecoverTimeRecord; |
| import org.jctools.queues.MessagePassingQueue; |
| import org.jctools.queues.SpscArrayQueue; |
| |
| /** |
| * Transaction buffer based on normal persistent topic. |
| */ |
| @Slf4j |
| public class TopicTransactionBuffer extends TopicTransactionBufferState implements TransactionBuffer, TimerTask { |
| |
| private final PersistentTopic topic; |
| |
| private volatile PositionImpl maxReadPosition; |
| |
| /** |
| * Ongoing transaction, map for remove txn stable position, linked for find max read position. |
| */ |
| private final LinkedMap<TxnID, PositionImpl> ongoingTxns = new LinkedMap<>(); |
| |
| /** |
| * Aborts, map for jude message is aborted, linked for remove abort txn in memory when this |
| * position have been deleted. |
| */ |
| private final LinkedMap<TxnID, PositionImpl> aborts = new LinkedMap<>(); |
| |
| private final CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> takeSnapshotWriter; |
| |
| // when add abort or change max read position, the count will +1. Take snapshot will set 0 into it. |
| private final AtomicLong changeMaxReadPositionAndAddAbortTimes = new AtomicLong(); |
| |
| private final LongAdder txnCommittedCounter = new LongAdder(); |
| |
| private final LongAdder txnAbortedCounter = new LongAdder(); |
| |
| private final Timer timer; |
| |
| private final int takeSnapshotIntervalNumber; |
| |
| private final int takeSnapshotIntervalTime; |
| |
| private volatile long lastSnapshotTimestamps; |
| |
| private final CompletableFuture<Void> transactionBufferFuture = new CompletableFuture<>(); |
| |
| /** |
| * The map is used to store the lowWaterMarks which key is TC ID and value is lowWaterMark of the TC. |
| */ |
| private final ConcurrentHashMap<Long, Long> lowWaterMarks = new ConcurrentHashMap<>(); |
| |
| public final RecoverTimeRecord recoverTime = new RecoverTimeRecord(); |
| |
| private final Semaphore handleLowWaterMark = new Semaphore(1); |
| |
| public TopicTransactionBuffer(PersistentTopic topic) { |
| super(State.None); |
| this.topic = topic; |
| this.takeSnapshotWriter = this.topic.getBrokerService().getPulsar() |
| .getTransactionBufferSnapshotService().createWriter(TopicName.get(topic.getName())); |
| this.timer = topic.getBrokerService().getPulsar().getTransactionTimer(); |
| this.takeSnapshotIntervalNumber = topic.getBrokerService().getPulsar() |
| .getConfiguration().getTransactionBufferSnapshotMaxTransactionCount(); |
| this.takeSnapshotIntervalTime = topic.getBrokerService().getPulsar() |
| .getConfiguration().getTransactionBufferSnapshotMinTimeInMillis(); |
| this.maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); |
| this.recover(); |
| } |
| |
| private void recover() { |
| recoverTime.setRecoverStartTime(System.currentTimeMillis()); |
| this.topic.getBrokerService().getPulsar().getTransactionExecutorProvider().getExecutor(this) |
| .execute(new TopicTransactionBufferRecover(new TopicTransactionBufferRecoverCallBack() { |
| @Override |
| public void recoverComplete() { |
| synchronized (TopicTransactionBuffer.this) { |
| // sync maxReadPosition change to LAC when TopicTransaction buffer have not recover |
| // completely the normal message have been sent to broker and state is |
| // not Ready can't sync maxReadPosition when no ongoing transactions |
| if (ongoingTxns.isEmpty()) { |
| maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); |
| } |
| if (!changeToReadyState()) { |
| log.error("[{}]Transaction buffer recover fail, current state: {}", |
| topic.getName(), getState()); |
| transactionBufferFuture.completeExceptionally |
| (new BrokerServiceException.ServiceUnitNotReadyException( |
| "Transaction buffer recover failed to change the status to Ready," |
| + "current state is: " + getState())); |
| } else { |
| timer.newTimeout(TopicTransactionBuffer.this, |
| takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); |
| transactionBufferFuture.complete(null); |
| recoverTime.setRecoverEndTime(System.currentTimeMillis()); |
| } |
| } |
| } |
| |
| @Override |
| public void noNeedToRecover() { |
| synchronized (TopicTransactionBuffer.this) { |
| // sync maxReadPosition change to LAC when TopicTransaction buffer have not recover |
| // completely the normal message have been sent to broker and state is |
| // not NoSnapshot can't sync maxReadPosition |
| maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); |
| if (!changeToNoSnapshotState()) { |
| log.error("[{}]Transaction buffer recover fail", topic.getName()); |
| } else { |
| transactionBufferFuture.complete(null); |
| recoverTime.setRecoverEndTime(System.currentTimeMillis()); |
| } |
| } |
| } |
| |
| @Override |
| public void handleSnapshot(TransactionBufferSnapshot snapshot) { |
| maxReadPosition = PositionImpl.get(snapshot.getMaxReadPositionLedgerId(), |
| snapshot.getMaxReadPositionEntryId()); |
| if (snapshot.getAborts() != null) { |
| snapshot.getAborts().forEach(abortTxnMetadata -> |
| aborts.put(new TxnID(abortTxnMetadata.getTxnIdMostBits(), |
| abortTxnMetadata.getTxnIdLeastBits()), |
| PositionImpl.get(abortTxnMetadata.getLedgerId(), |
| abortTxnMetadata.getEntryId()))); |
| } |
| } |
| |
| @Override |
| public void handleTxnEntry(Entry entry) { |
| ByteBuf metadataAndPayload = entry.getDataBuffer(); |
| |
| MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, |
| TopicTransactionBufferRecover.SUBSCRIPTION_NAME, -1); |
| if (msgMetadata != null && msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) { |
| TxnID txnID = new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()); |
| PositionImpl position = PositionImpl.get(entry.getLedgerId(), entry.getEntryId()); |
| if (Markers.isTxnMarker(msgMetadata)) { |
| if (Markers.isTxnAbortMarker(msgMetadata)) { |
| aborts.put(txnID, position); |
| } |
| updateMaxReadPosition(txnID); |
| } else { |
| handleTransactionMessage(txnID, position); |
| } |
| } |
| } |
| |
| @Override |
| public void recoverExceptionally(Throwable e) { |
| |
| log.warn("Closing topic {} due to read transaction buffer snapshot while recovering the " |
| + "transaction buffer throw exception", topic.getName(), e); |
| // when create reader or writer fail throw PulsarClientException, |
| // should close this topic and then reinit this topic |
| if (e instanceof PulsarClientException) { |
| // if transaction buffer recover fail throw PulsarClientException, |
| // we need to change the PulsarClientException to ServiceUnitNotReadyException, |
| // the tc do op will retry |
| transactionBufferFuture.completeExceptionally |
| (new BrokerServiceException.ServiceUnitNotReadyException(e.getMessage(), e)); |
| } else { |
| transactionBufferFuture.completeExceptionally(e); |
| } |
| recoverTime.setRecoverEndTime(System.currentTimeMillis()); |
| topic.close(true); |
| } |
| }, this.topic, this, takeSnapshotWriter)); |
| } |
| |
| @Override |
| public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) { |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| @Override |
| public CompletableFuture<Void> checkIfTBRecoverCompletely(boolean isTxnEnabled) { |
| if (!isTxnEnabled) { |
| return CompletableFuture.completedFuture(null); |
| } else { |
| CompletableFuture<Void> completableFuture = new CompletableFuture<>(); |
| transactionBufferFuture.thenRun(() -> { |
| if (checkIfNoSnapshot()) { |
| takeSnapshot().thenRun(() -> { |
| if (changeToReadyStateFromNoSnapshot()) { |
| timer.newTimeout(TopicTransactionBuffer.this, |
| takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); |
| } |
| completableFuture.complete(null); |
| }).exceptionally(exception -> { |
| log.error("Topic {} failed to take snapshot", this.topic.getName()); |
| completableFuture.completeExceptionally(exception); |
| return null; |
| }); |
| } else { |
| completableFuture.complete(null); |
| } |
| }).exceptionally(exception -> { |
| log.error("Topic {}: TransactionBuffer recover failed", this.topic.getName(), exception.getCause()); |
| completableFuture.completeExceptionally(exception.getCause()); |
| return null; |
| }); |
| return completableFuture; |
| } |
| } |
| |
| @Override |
| public long getOngoingTxnCount() { |
| return this.ongoingTxns.size(); |
| } |
| |
| @Override |
| public long getAbortedTxnCount() { |
| return this.txnAbortedCounter.sum(); |
| } |
| |
| @Override |
| public long getCommittedTxnCount() { |
| return this.txnCommittedCounter.sum(); |
| } |
| |
| @Override |
| public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) { |
| CompletableFuture<Position> completableFuture = new CompletableFuture<>(); |
| Long lowWaterMark = lowWaterMarks.get(txnId.getMostSigBits()); |
| if (lowWaterMark != null && lowWaterMark >= txnId.getLeastSigBits()) { |
| completableFuture.completeExceptionally(new BrokerServiceException |
| .NotAllowedException("Transaction [" + txnId + "] has been ended. " |
| + "Please use a new transaction to send message.")); |
| return completableFuture; |
| } |
| topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() { |
| @Override |
| public void addComplete(Position position, ByteBuf entryData, Object ctx) { |
| synchronized (TopicTransactionBuffer.this) { |
| handleTransactionMessage(txnId, position); |
| } |
| completableFuture.complete(position); |
| } |
| |
| @Override |
| public void addFailed(ManagedLedgerException exception, Object ctx) { |
| log.error("Failed to append buffer to txn {}", txnId, exception); |
| completableFuture.completeExceptionally(exception); |
| } |
| }, null); |
| return completableFuture; |
| } |
| |
| private void handleTransactionMessage(TxnID txnId, Position position) { |
| if (!ongoingTxns.containsKey(txnId) && !aborts.containsKey(txnId)) { |
| ongoingTxns.put(txnId, (PositionImpl) position); |
| PositionImpl firstPosition = ongoingTxns.get(ongoingTxns.firstKey()); |
| //max read position is less than first ongoing transaction message position, so entryId -1 |
| maxReadPosition = PositionImpl.get(firstPosition.getLedgerId(), firstPosition.getEntryId() - 1); |
| } |
| } |
| |
| |
| @Override |
| public CompletableFuture<TransactionBufferReader> openTransactionBufferReader(TxnID txnID, long startSequenceId) { |
| return null; |
| } |
| |
| @Override |
| public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) { |
| if (log.isDebugEnabled()) { |
| log.debug("Transaction {} commit on topic {}.", txnID.toString(), topic.getName()); |
| } |
| CompletableFuture<Void> completableFuture = new CompletableFuture<>(); |
| //Wait TB recover completely. |
| transactionBufferFuture.thenRun(() -> { |
| ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, txnID.getMostSigBits(), |
| txnID.getLeastSigBits()); |
| try { |
| topic.getManagedLedger().asyncAddEntry(commitMarker, new AsyncCallbacks.AddEntryCallback() { |
| @Override |
| public void addComplete(Position position, ByteBuf entryData, Object ctx) { |
| synchronized (TopicTransactionBuffer.this) { |
| updateMaxReadPosition(txnID); |
| handleLowWaterMark(txnID, lowWaterMark); |
| clearAbortedTransactions(); |
| takeSnapshotByChangeTimes(); |
| } |
| txnCommittedCounter.increment(); |
| completableFuture.complete(null); |
| } |
| |
| @Override |
| public void addFailed(ManagedLedgerException exception, Object ctx) { |
| log.error("Failed to commit for txn {}", txnID, exception); |
| checkAppendMarkerException(exception); |
| completableFuture.completeExceptionally(new PersistenceException(exception)); |
| } |
| }, null); |
| } finally { |
| commitMarker.release(); |
| } |
| }).exceptionally(exception -> { |
| log.error("Transaction {} commit on topic {}.", txnID.toString(), topic.getName(), exception.getCause()); |
| completableFuture.completeExceptionally(exception.getCause()); |
| return null; |
| }); |
| return completableFuture; |
| } |
| |
| @Override |
| public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) { |
| if (log.isDebugEnabled()) { |
| log.debug("Transaction {} abort on topic {}.", txnID.toString(), topic.getName()); |
| } |
| CompletableFuture<Void> completableFuture = new CompletableFuture<>(); |
| //Wait TB recover completely. |
| transactionBufferFuture.thenRun(() -> { |
| //no message sent, need not to add abort mark by txn timeout. |
| if (!checkIfReady()) { |
| completableFuture.complete(null); |
| return; |
| } |
| ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L, txnID.getMostSigBits(), txnID.getLeastSigBits()); |
| try { |
| topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() { |
| @Override |
| public void addComplete(Position position, ByteBuf entryData, Object ctx) { |
| synchronized (TopicTransactionBuffer.this) { |
| aborts.put(txnID, (PositionImpl) position); |
| updateMaxReadPosition(txnID); |
| changeMaxReadPositionAndAddAbortTimes.getAndIncrement(); |
| clearAbortedTransactions(); |
| takeSnapshotByChangeTimes(); |
| } |
| txnAbortedCounter.increment(); |
| completableFuture.complete(null); |
| handleLowWaterMark(txnID, lowWaterMark); |
| } |
| |
| @Override |
| public void addFailed(ManagedLedgerException exception, Object ctx) { |
| log.error("Failed to abort for txn {}", txnID, exception); |
| checkAppendMarkerException(exception); |
| completableFuture.completeExceptionally(new PersistenceException(exception)); |
| } |
| }, null); |
| } finally { |
| abortMarker.release(); |
| } |
| }).exceptionally(exception -> { |
| log.error("Transaction {} abort on topic {}.", txnID.toString(), topic.getName(), exception.getCause()); |
| completableFuture.completeExceptionally(exception.getCause()); |
| return null; |
| }); |
| return completableFuture; |
| } |
| |
| private void checkAppendMarkerException(ManagedLedgerException exception) { |
| if (exception instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) { |
| topic.getManagedLedger().readyToCreateNewLedger(); |
| } |
| } |
| |
| private void handleLowWaterMark(TxnID txnID, long lowWaterMark) { |
| lowWaterMarks.compute(txnID.getMostSigBits(), (tcId, oldLowWaterMark) -> { |
| if (oldLowWaterMark == null || oldLowWaterMark < lowWaterMark) { |
| return lowWaterMark; |
| } else { |
| return oldLowWaterMark; |
| } |
| }); |
| if (handleLowWaterMark.tryAcquire()) { |
| if (!ongoingTxns.isEmpty()) { |
| TxnID firstTxn = ongoingTxns.firstKey(); |
| long tCId = firstTxn.getMostSigBits(); |
| Long lowWaterMarkOfFirstTxnId = lowWaterMarks.get(tCId); |
| if (lowWaterMarkOfFirstTxnId != null && firstTxn.getLeastSigBits() <= lowWaterMarkOfFirstTxnId) { |
| abortTxn(firstTxn, lowWaterMarkOfFirstTxnId) |
| .thenRun(() -> { |
| log.warn("Successes to abort low water mark for txn [{}], topic [{}]," |
| + " lowWaterMark [{}]", firstTxn, topic.getName(), lowWaterMarkOfFirstTxnId); |
| handleLowWaterMark.release(); |
| }) |
| .exceptionally(ex -> { |
| log.warn("Failed to abort low water mark for txn {}, topic [{}], " |
| + "lowWaterMark [{}], ", firstTxn, topic.getName(), lowWaterMarkOfFirstTxnId, |
| ex); |
| handleLowWaterMark.release(); |
| return null; |
| }); |
| return; |
| } |
| } |
| handleLowWaterMark.release(); |
| } |
| } |
| |
| private void takeSnapshotByChangeTimes() { |
| if (changeMaxReadPositionAndAddAbortTimes.get() >= takeSnapshotIntervalNumber) { |
| takeSnapshot(); |
| } |
| } |
| |
| private void takeSnapshotByTimeout() { |
| if (changeMaxReadPositionAndAddAbortTimes.get() > 0) { |
| takeSnapshot(); |
| } |
| this.timer.newTimeout(TopicTransactionBuffer.this, |
| takeSnapshotIntervalTime, TimeUnit.MILLISECONDS); |
| } |
| |
| private CompletableFuture<Void> takeSnapshot() { |
| changeMaxReadPositionAndAddAbortTimes.set(0); |
| return takeSnapshotWriter.thenCompose(writer -> { |
| TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot(); |
| synchronized (TopicTransactionBuffer.this) { |
| snapshot.setTopicName(topic.getName()); |
| snapshot.setMaxReadPositionLedgerId(maxReadPosition.getLedgerId()); |
| snapshot.setMaxReadPositionEntryId(maxReadPosition.getEntryId()); |
| List<AbortTxnMetadata> list = new ArrayList<>(); |
| aborts.forEach((k, v) -> { |
| AbortTxnMetadata abortTxnMetadata = new AbortTxnMetadata(); |
| abortTxnMetadata.setTxnIdMostBits(k.getMostSigBits()); |
| abortTxnMetadata.setTxnIdLeastBits(k.getLeastSigBits()); |
| abortTxnMetadata.setLedgerId(v.getLedgerId()); |
| abortTxnMetadata.setEntryId(v.getEntryId()); |
| list.add(abortTxnMetadata); |
| }); |
| snapshot.setAborts(list); |
| } |
| return writer.writeAsync(snapshot).thenAccept(messageId-> { |
| this.lastSnapshotTimestamps = System.currentTimeMillis(); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}]Transaction buffer take snapshot success! " |
| + "messageId : {}", topic.getName(), messageId); |
| } |
| }).exceptionally(e -> { |
| log.warn("[{}]Transaction buffer take snapshot fail! ", topic.getName(), e); |
| return null; |
| }); |
| }); |
| } |
| private void clearAbortedTransactions() { |
| while (!aborts.isEmpty() && !((ManagedLedgerImpl) topic.getManagedLedger()) |
| .ledgerExists(aborts.get(aborts.firstKey()).getLedgerId())) { |
| if (log.isDebugEnabled()) { |
| aborts.firstKey(); |
| log.debug("[{}] Topic transaction buffer clear aborted transaction, TxnId : {}, Position : {}", |
| topic.getName(), aborts.firstKey(), aborts.get(aborts.firstKey())); |
| } |
| aborts.remove(aborts.firstKey()); |
| } |
| } |
| void updateMaxReadPosition(TxnID txnID) { |
| PositionImpl preMaxReadPosition = this.maxReadPosition; |
| ongoingTxns.remove(txnID); |
| if (!ongoingTxns.isEmpty()) { |
| PositionImpl position = ongoingTxns.get(ongoingTxns.firstKey()); |
| //max read position is less than first ongoing transaction message position, so entryId -1 |
| maxReadPosition = PositionImpl.get(position.getLedgerId(), position.getEntryId() - 1); |
| } else { |
| maxReadPosition = (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); |
| } |
| if (preMaxReadPosition.compareTo(this.maxReadPosition) != 0) { |
| this.changeMaxReadPositionAndAddAbortTimes.getAndIncrement(); |
| } |
| } |
| |
| @Override |
| public CompletableFuture<Void> purgeTxns(List<Long> dataLedgers) { |
| return null; |
| } |
| |
| @Override |
| public CompletableFuture<Void> clearSnapshot() { |
| return this.takeSnapshotWriter.thenCompose(writer -> { |
| TransactionBufferSnapshot snapshot = new TransactionBufferSnapshot(); |
| snapshot.setTopicName(topic.getName()); |
| return writer.deleteAsync(snapshot); |
| }).thenCompose(__ -> CompletableFuture.completedFuture(null)); |
| } |
| |
| @Override |
| public CompletableFuture<Void> closeAsync() { |
| changeToCloseState(); |
| return this.takeSnapshotWriter.thenCompose(SystemTopicClient.Writer::closeAsync); |
| } |
| |
| @Override |
| public boolean isTxnAborted(TxnID txnID) { |
| return aborts.containsKey(txnID); |
| } |
| |
| @Override |
| public void syncMaxReadPositionForNormalPublish(PositionImpl position) { |
| // when ongoing transaction is empty, proved that lastAddConfirm is can read max position, because callback |
| // thread is the same tread, in this time the lastAddConfirm don't content transaction message. |
| synchronized (TopicTransactionBuffer.this) { |
| if (checkIfNoSnapshot()) { |
| maxReadPosition = position; |
| } else if (checkIfReady()) { |
| if (ongoingTxns.isEmpty()) { |
| maxReadPosition = position; |
| changeMaxReadPositionAndAddAbortTimes.incrementAndGet(); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public PositionImpl getMaxReadPosition() { |
| if (checkIfReady() || checkIfNoSnapshot()) { |
| return this.maxReadPosition; |
| } else { |
| return PositionImpl.EARLIEST; |
| } |
| } |
| |
| @Override |
| public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) { |
| TransactionInBufferStats transactionInBufferStats = new TransactionInBufferStats(); |
| transactionInBufferStats.aborted = isTxnAborted(txnID); |
| if (ongoingTxns.containsKey(txnID)) { |
| transactionInBufferStats.startPosition = ongoingTxns.get(txnID).toString(); |
| } |
| return transactionInBufferStats; |
| } |
| |
| @Override |
| public TransactionBufferStats getStats(boolean lowWaterMarks) { |
| TransactionBufferStats transactionBufferStats = new TransactionBufferStats(); |
| transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps; |
| transactionBufferStats.state = this.getState().name(); |
| transactionBufferStats.maxReadPosition = this.maxReadPosition.toString(); |
| if (lowWaterMarks) { |
| transactionBufferStats.lowWaterMarks = this.lowWaterMarks; |
| } |
| transactionBufferStats.ongoingTxnSize = ongoingTxns.size(); |
| |
| transactionBufferStats.recoverStartTime = recoverTime.getRecoverStartTime(); |
| transactionBufferStats.recoverEndTime = recoverTime.getRecoverEndTime(); |
| return transactionBufferStats; |
| } |
| |
| @Override |
| public void run(Timeout timeout) { |
| if (checkIfReady()) { |
| takeSnapshotByTimeout(); |
| } |
| } |
| |
| // we store the maxReadPosition from snapshot then open the non-durable cursor by this topic's manageLedger. |
| // the non-durable cursor will read to lastConfirmedEntry. |
| @VisibleForTesting |
| public static class TopicTransactionBufferRecover implements Runnable { |
| |
| private final PersistentTopic topic; |
| |
| private final TopicTransactionBufferRecoverCallBack callBack; |
| |
| private Position startReadCursorPosition = PositionImpl.EARLIEST; |
| |
| private final SpscArrayQueue<Entry> entryQueue; |
| |
| private final AtomicLong exceptionNumber = new AtomicLong(); |
| |
| public static final String SUBSCRIPTION_NAME = "transaction-buffer-sub"; |
| |
| private final TopicTransactionBuffer topicTransactionBuffer; |
| |
| private final CompletableFuture<SystemTopicClient.Writer<TransactionBufferSnapshot>> takeSnapshotWriter; |
| |
| private TopicTransactionBufferRecover(TopicTransactionBufferRecoverCallBack callBack, PersistentTopic topic, |
| TopicTransactionBuffer transactionBuffer, CompletableFuture< |
| SystemTopicClient.Writer<TransactionBufferSnapshot>> takeSnapshotWriter) { |
| this.topic = topic; |
| this.callBack = callBack; |
| this.entryQueue = new SpscArrayQueue<>(2000); |
| this.topicTransactionBuffer = transactionBuffer; |
| this.takeSnapshotWriter = takeSnapshotWriter; |
| } |
| |
| @SneakyThrows |
| @Override |
| public void run() { |
| this.takeSnapshotWriter.thenRunAsync(() -> { |
| if (!this.topicTransactionBuffer.changeToInitializingState()) { |
| log.warn("TransactionBuffer {} of topic {} can not change state to Initializing", |
| this, topic.getName()); |
| return; |
| } |
| topic.getBrokerService().getPulsar().getTransactionBufferSnapshotService() |
| .createReader(TopicName.get(topic.getName())).thenAcceptAsync(reader -> { |
| try { |
| boolean hasSnapshot = false; |
| while (reader.hasMoreEvents()) { |
| Message<TransactionBufferSnapshot> message = reader.readNext(); |
| if (topic.getName().equals(message.getKey())) { |
| TransactionBufferSnapshot transactionBufferSnapshot = message.getValue(); |
| if (transactionBufferSnapshot != null) { |
| hasSnapshot = true; |
| callBack.handleSnapshot(transactionBufferSnapshot); |
| this.startReadCursorPosition = PositionImpl.get( |
| transactionBufferSnapshot.getMaxReadPositionLedgerId(), |
| transactionBufferSnapshot.getMaxReadPositionEntryId()); |
| } |
| } |
| } |
| if (!hasSnapshot) { |
| closeReader(reader); |
| callBack.noNeedToRecover(); |
| return; |
| } |
| } catch (Exception ex) { |
| log.error("[{}] Transaction buffer recover fail when read " |
| + "transactionBufferSnapshot!", topic.getName(), ex); |
| callBack.recoverExceptionally(ex); |
| closeReader(reader); |
| return; |
| } |
| closeReader(reader); |
| |
| ManagedCursor managedCursor; |
| try { |
| managedCursor = topic.getManagedLedger() |
| .newNonDurableCursor(this.startReadCursorPosition, SUBSCRIPTION_NAME); |
| } catch (ManagedLedgerException e) { |
| callBack.recoverExceptionally(e); |
| log.error("[{}]Transaction buffer recover fail when open cursor!", topic.getName(), e); |
| return; |
| } |
| PositionImpl lastConfirmedEntry = |
| (PositionImpl) topic.getManagedLedger().getLastConfirmedEntry(); |
| PositionImpl currentLoadPosition = (PositionImpl) this.startReadCursorPosition; |
| FillEntryQueueCallback fillEntryQueueCallback = new FillEntryQueueCallback(entryQueue, |
| managedCursor, TopicTransactionBufferRecover.this); |
| if (lastConfirmedEntry.getEntryId() != -1) { |
| while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0 |
| && fillEntryQueueCallback.fillQueue()) { |
| Entry entry = entryQueue.poll(); |
| if (entry != null) { |
| try { |
| currentLoadPosition = PositionImpl.get(entry.getLedgerId(), |
| entry.getEntryId()); |
| callBack.handleTxnEntry(entry); |
| } finally { |
| entry.release(); |
| } |
| } else { |
| try { |
| Thread.sleep(1); |
| } catch (InterruptedException e) { |
| //no-op |
| } |
| } |
| } |
| } |
| |
| closeCursor(SUBSCRIPTION_NAME); |
| callBack.recoverComplete(); |
| }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider() |
| .getExecutor(this)).exceptionally(e -> { |
| callBack.recoverExceptionally(e.getCause()); |
| log.error("[{}]Transaction buffer new snapshot reader fail!", topic.getName(), e); |
| return null; |
| }); |
| }, topic.getBrokerService().getPulsar().getTransactionExecutorProvider() |
| .getExecutor(this)).exceptionally(e -> { |
| callBack.recoverExceptionally(e.getCause()); |
| log.error("[{}]Transaction buffer create snapshot writer fail!", |
| topic.getName(), e); |
| return null; |
| }); |
| } |
| |
| private void closeCursor(String subscriptionName) { |
| topic.getManagedLedger().asyncDeleteCursor(Codec.encode(subscriptionName), |
| new AsyncCallbacks.DeleteCursorCallback() { |
| @Override |
| public void deleteCursorComplete(Object ctx) { |
| log.info("[{}]Transaction buffer snapshot recover cursor close complete.", topic.getName()); |
| } |
| |
| @Override |
| public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { |
| log.error("[{}]Transaction buffer snapshot recover cursor close fail.", topic.getName()); |
| } |
| |
| }, null); |
| } |
| |
| private void callBackException(ManagedLedgerException e) { |
| log.error("Transaction buffer recover fail when recover transaction entry!", e); |
| this.exceptionNumber.getAndIncrement(); |
| } |
| |
| private void closeReader(SystemTopicClient.Reader<TransactionBufferSnapshot> reader) { |
| reader.closeAsync().exceptionally(e -> { |
| log.error("[{}]Transaction buffer reader close error!", topic.getName(), e); |
| return null; |
| }); |
| } |
| } |
| |
| static class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback { |
| |
| private final AtomicLong outstandingReadsRequests = new AtomicLong(0); |
| |
| private final SpscArrayQueue<Entry> entryQueue; |
| |
| private final ManagedCursor cursor; |
| |
| private final TopicTransactionBufferRecover recover; |
| |
| private volatile boolean isReadable = true; |
| |
| private static final int NUMBER_OF_PER_READ_ENTRY = 100; |
| |
| private FillEntryQueueCallback(SpscArrayQueue<Entry> entryQueue, ManagedCursor cursor, |
| TopicTransactionBufferRecover recover) { |
| this.entryQueue = entryQueue; |
| this.cursor = cursor; |
| this.recover = recover; |
| } |
| boolean fillQueue() { |
| if (entryQueue.size() + NUMBER_OF_PER_READ_ENTRY < entryQueue.capacity() |
| && outstandingReadsRequests.get() == 0) { |
| if (cursor.hasMoreEntries()) { |
| outstandingReadsRequests.incrementAndGet(); |
| cursor.asyncReadEntries(NUMBER_OF_PER_READ_ENTRY, |
| this, System.nanoTime(), PositionImpl.LATEST); |
| } else { |
| if (entryQueue.size() == 0) { |
| isReadable = false; |
| } |
| } |
| } |
| return isReadable; |
| } |
| |
| @Override |
| public void readEntriesComplete(List<Entry> entries, Object ctx) { |
| entryQueue.fill(new MessagePassingQueue.Supplier<Entry>() { |
| private int i = 0; |
| @Override |
| public Entry get() { |
| Entry entry = entries.get(i); |
| i++; |
| return entry; |
| } |
| }, entries.size()); |
| |
| outstandingReadsRequests.decrementAndGet(); |
| } |
| |
| @Override |
| public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { |
| if (recover.topic.getManagedLedger().getConfig().isAutoSkipNonRecoverableData() |
| && exception instanceof ManagedLedgerException.NonRecoverableLedgerException |
| || exception instanceof ManagedLedgerException.ManagedLedgerFencedException |
| || exception instanceof ManagedLedgerException.CursorAlreadyClosedException) { |
| isReadable = false; |
| } else { |
| outstandingReadsRequests.decrementAndGet(); |
| } |
| recover.callBackException(exception); |
| } |
| } |
| } |