blob: d8e16dd40156c71bda95bb8188d5798973d62260 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.pendingack;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
/**
* Handle for processing pending acks for transactions.
*/
public interface PendingAckHandle {
/**
* Acknowledge message(s) for an ongoing transaction.
* <p>
* It can be of {@link AckType#Individual}. Single messages acked by ongoing transaction will be put
* in pending_ack state and only marked as deleted after transaction is committed.
* <p>
* If transaction is aborted all messages acked by it will be put back to pending state.
* <p>
* Client will not send batch size to server, we get the batch size from consumer pending ack. When we get the Batch
* size, we can accurate batch ack of this position.
*
* @param txnID {@link TxnID} TransactionID of an ongoing transaction trying to sck message.
* @param positions {@link MutablePair} the pair of positions and these batch size.
* @return the future of this operation.
* @throws TransactionConflictException if the ack with transaction is conflict with pending ack.
* @throws NotAllowedException if Use this method incorrectly eg. not use
* PositionImpl or cumulative ack with a list of positions.
*/
CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePair<PositionImpl,
Integer>> positions);
/**
* Acknowledge message(s) for an ongoing transaction.
* <p>
* It can be of {@link AckType#Cumulative}. Single messages acked by ongoing transaction will be put in
* pending_ack state and only marked as deleted after transaction is committed.
* <p>
* For a moment, we only allow one transaction cumulative ack multiple times when the position is greater than the
* old one.
* <p>
* We have a transaction with cumulative ack, if other transaction want to cumulative ack, we will
* return {@link TransactionConflictException}.
* <p>
* If an ongoing transaction cumulative acked a message and then try to ack single message which is
* greater than that one it cumulative acked, it'll succeed.
*
* @param txnID {@link TxnID} TransactionID of an ongoing transaction trying to sck message.
* @param positions {@link MutablePair} the pair of positions and these batch size.
* @return the future of this operation.
* @throws TransactionConflictException if the ack with transaction is conflict with pending ack.
* @throws NotAllowedException if Use this method incorrectly eg. not use
* PositionImpl or cumulative ack with a list of positions.
*/
CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions);
/**
* Commit a transaction.
*
* @param txnID {@link TxnID} to identify the transaction.
* @param properties Additional user-defined properties that can be
* associated with a particular cursor position.
* @param lowWaterMark the low water mark of this transaction
* @return the future of this operation.
*/
CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark);
/**
* Abort a transaction.
*
* @param txnId {@link TxnID} to identify the transaction.
* @param consumer {@link Consumer} which aborting transaction.
* @param lowWaterMark the low water mark of this transaction
* @return the future of this operation.
*/
CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark);
/**
* Sync the position ack set, in order to clean up the cache of this position for pending ack handle.
*
* @param position {@link Position} which position need to sync and carry it batch size
*/
void syncBatchPositionAckSetForTransaction(PositionImpl position);
/**
* Judge the all ack set point have acked by normal ack and transaction pending ack.
*
* @param position {@link Position} which position need to check
*/
boolean checkIsCanDeleteConsumerPendingAck(PositionImpl position);
/**
* When the position is actually deleted, we can use this method to clear the cache for individual ack messages.
*
* @param position {@link Position} which position need to clear
*/
void clearIndividualPosition(Position position);
/**
* Pending ack recover whether ready future.
*
* @return the future of result.
*/
CompletableFuture<PendingAckHandle> pendingAckHandleFuture();
/**
* Get transaction in pending ack stats.
*
* @param txnID the txnID
* @return the stats of this transaction in pending ack.
*/
TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID);
/**
* Get pending ack handle stats.
*
* @return the stats of this pending ack handle.
*/
TransactionPendingAckStats getStats(boolean lowWaterMarks);
/**
* Close the pending ack handle.
*
* @return the future of this operation.
*/
CompletableFuture<Void> close();
/**
* Check if the PendingAckStore is init.
* @return if the PendingAckStore is init.
*/
boolean checkIfPendingAckStoreInit();
/**
* Get the stats of this message position is in pending ack.
* @param position message position.
* @param batchIndex the batch index of ths position.
* @return the stats of the message position.
*/
PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl position, Integer batchIndex);
}