blob: 470cdc3e74ba1117cfc776bc589cc91b25363a6b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.impl;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Transactions;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.SnapshotSystemTopicInternalStats;
import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionLogStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
@Slf4j
public abstract class TransactionsBase extends AdminResource {
protected void internalListCoordinators(AsyncResponse asyncResponse) {
final PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
} catch (PulsarServerException ex) {
asyncResponse.resume(new RestException(ex));
return;
}
Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>();
admin.lookups()
.lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName())
.thenAccept(map -> {
map.forEach((topicPartition, brokerServiceUrl) -> {
final int coordinatorId = TopicName.getPartitionIndex(topicPartition);
result.put(coordinatorId, new TransactionCoordinatorInfo(coordinatorId, brokerServiceUrl));
});
asyncResponse.resume(result.values());
})
.exceptionally(ex -> {
log.error("[{}] Failed to list transaction coordinators: {}",
clientAppId(), ex.getMessage(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative,
Integer coordinatorId) {
if (coordinatorId != null) {
validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
authoritative);
TransactionMetadataStore transactionMetadataStore =
pulsar().getTransactionMetadataStoreService().getStores()
.get(TransactionCoordinatorID.get(coordinatorId));
if (transactionMetadataStore == null) {
asyncResponse.resume(new RestException(NOT_FOUND,
"Transaction coordinator not found! coordinator id : " + coordinatorId));
return;
}
asyncResponse.resume(transactionMetadataStore.getCoordinatorStats());
} else {
getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
false, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
"Transaction coordinator not found"));
return;
}
List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures =
new ArrayList<>();
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
transactionMetadataStoreInfoFutures
.add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
}
}
Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>();
FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> {
if (e != null) {
asyncResponse.resume(new RestException(e));
return;
}
for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) {
try {
stats.put(i, transactionMetadataStoreInfoFutures.get(i).get());
} catch (Exception exception) {
asyncResponse.resume(new RestException(exception.getCause()));
return;
}
}
asyncResponse.resume(stats);
});
}).exceptionally(ex -> {
log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
}
protected CompletableFuture<TransactionInPendingAckStats> internalGetTransactionInPendingAckStats(
boolean authoritative, long mostSigBits, long leastSigBits, String subName) {
return getExistingPersistentTopicAsync(authoritative)
.thenApply(topic -> topic.getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits),
subName));
}
protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBufferStats(
boolean authoritative, long mostSigBits, long leastSigBits) {
return getExistingPersistentTopicAsync(authoritative)
.thenApply(topic -> topic.getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits)));
}
protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative,
boolean lowWaterMarks,
boolean segmentStats) {
return getExistingPersistentTopicAsync(authoritative)
.thenApply(topic -> topic.getTransactionBufferStats(lowWaterMarks, segmentStats));
}
protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckStats(
boolean authoritative, String subName, boolean lowWaterMarks) {
return getExistingPersistentTopicAsync(authoritative)
.thenApply(topic -> topic.getTransactionPendingAckStats(subName, lowWaterMarks));
}
protected void internalGetTransactionMetadata(AsyncResponse asyncResponse,
boolean authoritative, int mostSigBits, long leastSigBits) {
try {
validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits),
authoritative);
CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>();
TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService()
.getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get();
getTransactionMetadata(txnMeta, transactionMetadataFuture);
asyncResponse.resume(transactionMetadataFuture.get(10, TimeUnit.SECONDS));
} catch (Exception e) {
if (e instanceof ExecutionException) {
if (e.getCause() instanceof CoordinatorNotFoundException
|| e.getCause() instanceof TransactionNotFoundException) {
asyncResponse.resume(new RestException(NOT_FOUND, e.getCause()));
return;
}
asyncResponse.resume(new RestException(e.getCause()));
} else {
asyncResponse.resume(new RestException(e));
}
}
}
private void getTransactionMetadata(TxnMeta txnMeta,
CompletableFuture<TransactionMetadata> transactionMetadataFuture)
throws PulsarServerException {
Transactions transactions = pulsar().getAdminClient().transactions();
TransactionMetadata transactionMetadata = new TransactionMetadata();
TxnID txnID = txnMeta.id();
transactionMetadata.txnId = txnID.toString();
transactionMetadata.status = txnMeta.status().name();
transactionMetadata.openTimestamp = txnMeta.getOpenTimestamp();
transactionMetadata.timeoutAt = txnMeta.getTimeoutAt();
transactionMetadata.owner = txnMeta.getOwner();
List<CompletableFuture<TransactionInPendingAckStats>> ackedPartitionsFutures = new ArrayList<>();
Map<String, Map<String, CompletableFuture<TransactionInPendingAckStats>>> ackFutures = new HashMap<>();
txnMeta.ackedPartitions().forEach(transactionSubscription -> {
String topic = transactionSubscription.getTopic();
String subName = transactionSubscription.getSubscription();
CompletableFuture<TransactionInPendingAckStats> future =
transactions.getTransactionInPendingAckStatsAsync(txnID, topic, subName);
ackedPartitionsFutures.add(future);
if (ackFutures.containsKey(topic)) {
ackFutures.get(topic)
.put(transactionSubscription.getSubscription(), future);
} else {
Map<String, CompletableFuture<TransactionInPendingAckStats>> pendingAckStatsMap =
new HashMap<>();
pendingAckStatsMap.put(transactionSubscription.getSubscription(), future);
ackFutures.put(topic, pendingAckStatsMap);
}
});
List<CompletableFuture<TransactionInBufferStats>> producedPartitionsFutures = new ArrayList<>();
Map<String, CompletableFuture<TransactionInBufferStats>> produceFutures = new HashMap<>();
txnMeta.producedPartitions().forEach(topic -> {
CompletableFuture<TransactionInBufferStats> future =
transactions.getTransactionInBufferStatsAsync(txnID, topic);
producedPartitionsFutures.add(future);
produceFutures.put(topic, future);
});
FutureUtil.waitForAll(ackedPartitionsFutures).whenComplete((v, e) -> {
if (e != null) {
transactionMetadataFuture.completeExceptionally(e);
return;
}
FutureUtil.waitForAll(producedPartitionsFutures).whenComplete((x, t) -> {
if (t != null) {
transactionMetadataFuture.completeExceptionally(e);
return;
}
Map<String, Map<String, TransactionInPendingAckStats>> ackedPartitions = new HashMap<>();
Map<String, TransactionInBufferStats> producedPartitions = new HashMap<>();
for (String topic : ackFutures.keySet()) {
Map<String, TransactionInPendingAckStats> subs = new HashMap<>();
for (String sub : ackFutures.get(topic).keySet()) {
try {
subs.put(sub, ackFutures.get(topic).get(sub).get());
} catch (Exception exception) {
transactionMetadataFuture.completeExceptionally(exception);
return;
}
}
ackedPartitions.put(topic, subs);
}
for (String topic : produceFutures.keySet()) {
try {
producedPartitions.put(topic, produceFutures.get(topic).get());
} catch (Exception exception) {
transactionMetadataFuture.completeExceptionally(exception);
return;
}
}
transactionMetadata.ackedPartitions = ackedPartitions;
transactionMetadata.producedPartitions = producedPartitions;
transactionMetadataFuture.complete(transactionMetadata);
});
});
}
protected void internalGetSlowTransactions(AsyncResponse asyncResponse,
boolean authoritative, long timeout, Integer coordinatorId) {
try {
if (coordinatorId != null) {
validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId),
authoritative);
TransactionMetadataStore transactionMetadataStore =
pulsar().getTransactionMetadataStoreService().getStores()
.get(TransactionCoordinatorID.get(coordinatorId));
if (transactionMetadataStore == null) {
asyncResponse.resume(new RestException(NOT_FOUND,
"Transaction coordinator not found! coordinator id : " + coordinatorId));
return;
}
List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout);
List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>();
for (TxnMeta txnMeta : transactions) {
CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>();
getTransactionMetadata(txnMeta, completableFuture);
completableFutures.add(completableFuture);
}
FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> {
if (e != null) {
asyncResponse.resume(new RestException(e.getCause()));
return;
}
Map<String, TransactionMetadata> transactionMetadata = new HashMap<>();
for (CompletableFuture<TransactionMetadata> future : completableFutures) {
try {
transactionMetadata.put(future.get().txnId, future.get());
} catch (Exception exception) {
asyncResponse.resume(new RestException(exception.getCause()));
return;
}
}
asyncResponse.resume(transactionMetadata);
});
} else {
getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
false, false).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
asyncResponse.resume(new RestException(Response.Status.NOT_FOUND,
"Transaction coordinator not found"));
return;
}
List<CompletableFuture<Map<String, TransactionMetadata>>> completableFutures =
new ArrayList<>();
for (int i = 0; i < partitionMetadata.partitions; i++) {
try {
completableFutures
.add(pulsar().getAdminClient().transactions()
.getSlowTransactionsByCoordinatorIdAsync(i, timeout,
TimeUnit.MILLISECONDS));
} catch (PulsarServerException e) {
asyncResponse.resume(new RestException(e));
return;
}
}
Map<String, TransactionMetadata> transactionMetadataMaps = new HashMap<>();
FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> {
if (e != null) {
asyncResponse.resume(new RestException(e));
return;
}
for (CompletableFuture<Map<String, TransactionMetadata>> transactionMetadataMap
: completableFutures) {
try {
transactionMetadataMaps.putAll(transactionMetadataMap.get());
} catch (Exception exception) {
asyncResponse.resume(new RestException(exception.getCause()));
return;
}
}
asyncResponse.resume(transactionMetadataMaps);
});
}).exceptionally(ex -> {
log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
}
protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse, boolean authoritative,
boolean metadata, int coordinatorId) {
try {
TopicName topicName = SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId);
validateTopicOwnership(topicName, authoritative);
TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(coordinatorId));
if (metadataStore == null) {
asyncResponse.resume(new RestException(NOT_FOUND,
"Transaction coordinator not found! coordinator id : " + coordinatorId));
return;
}
if (metadataStore instanceof MLTransactionMetadataStore) {
ManagedLedger managedLedger = ((MLTransactionMetadataStore) metadataStore).getManagedLedger();
TransactionCoordinatorInternalStats transactionCoordinatorInternalStats =
new TransactionCoordinatorInternalStats();
TransactionLogStats transactionLogStats = new TransactionLogStats();
transactionLogStats.managedLedgerName = managedLedger.getName();
transactionLogStats.managedLedgerInternalStats =
managedLedger.getManagedLedgerInternalStats(metadata).get();
transactionCoordinatorInternalStats.transactionLogStats = transactionLogStats;
asyncResponse.resume(transactionCoordinatorInternalStats);
} else {
asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED,
"Broker don't use MLTransactionMetadataStore!"));
}
} catch (Exception e) {
resumeAsyncResponseExceptionally(asyncResponse, e);
}
}
protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendingAckInternalStats(
boolean authoritative, String subName, boolean metadata) {
return getExistingPersistentTopicAsync(authoritative)
.thenCompose(topic -> topic.getPendingAckManagedLedger(subName))
.thenCompose(managedLedger ->
managedLedger.getManagedLedgerInternalStats(metadata)
.thenApply(internalStats -> {
TransactionLogStats pendingAckLogStats = new TransactionLogStats();
pendingAckLogStats.managedLedgerName = managedLedger.getName();
pendingAckLogStats.managedLedgerInternalStats = internalStats;
return pendingAckLogStats;
})
.thenApply(pendingAckLogStats -> {
TransactionPendingAckInternalStats stats = new TransactionPendingAckInternalStats();
stats.pendingAckLogStats = pendingAckLogStats;
return stats;
})
);
}
protected CompletableFuture<TransactionBufferInternalStats> internalGetTransactionBufferInternalStats(
boolean authoritative, boolean metadata) {
TransactionBufferInternalStats transactionBufferInternalStats = new TransactionBufferInternalStats();
return getExistingPersistentTopicAsync(authoritative)
.thenCompose(topic -> {
AbortedTxnProcessor.SnapshotType snapshotType = topic.getTransactionBuffer().getSnapshotType();
if (snapshotType == null) {
return FutureUtil.failedFuture(new RestException(NOT_FOUND,
"Transaction buffer Snapshot for the topic does not exist"));
} else if (snapshotType == AbortedTxnProcessor.SnapshotType.Segment) {
transactionBufferInternalStats.snapshotType = snapshotType.toString();
TopicName segmentTopic = TopicName.get(TopicDomain.persistent.toString(), namespaceName,
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
CompletableFuture<SnapshotSystemTopicInternalStats> segmentInternalStatsFuture =
getTxnSnapshotInternalStats(segmentTopic, metadata);
TopicName indexTopic = TopicName.get(TopicDomain.persistent.toString(),
namespaceName,
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
CompletableFuture<SnapshotSystemTopicInternalStats> segmentIndexInternalStatsFuture =
getTxnSnapshotInternalStats(indexTopic, metadata);
return segmentIndexInternalStatsFuture
.thenCombine(segmentInternalStatsFuture, (indexStats, segmentStats) -> {
transactionBufferInternalStats.segmentIndexInternalStats = indexStats;
transactionBufferInternalStats.segmentInternalStats = segmentStats;
return transactionBufferInternalStats;
});
} else if (snapshotType == AbortedTxnProcessor.SnapshotType.Single) {
transactionBufferInternalStats.snapshotType = snapshotType.toString();
TopicName singleSnapshotTopic = TopicName.get(TopicDomain.persistent.toString(), namespaceName,
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
return getTxnSnapshotInternalStats(singleSnapshotTopic, metadata)
.thenApply(snapshotSystemTopicInternalStats -> {
transactionBufferInternalStats.singleSnapshotSystemTopicInternalStats =
snapshotSystemTopicInternalStats;
return transactionBufferInternalStats;
});
}
return FutureUtil.failedFuture(new RestException(INTERNAL_SERVER_ERROR, "Unknown SnapshotType "
+ snapshotType));
});
}
private CompletableFuture<SnapshotSystemTopicInternalStats> getTxnSnapshotInternalStats(TopicName topicName,
boolean metadata) {
final PulsarAdmin admin;
try {
admin = pulsar().getAdminClient();
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(new RestException(e));
}
return admin.topics().getInternalStatsAsync(topicName.toString(), metadata)
.thenApply(persistentTopicInternalStats -> {
SnapshotSystemTopicInternalStats
snapshotSystemTopicInternalStats = new SnapshotSystemTopicInternalStats();
snapshotSystemTopicInternalStats.managedLedgerInternalStats = persistentTopicInternalStats;
snapshotSystemTopicInternalStats.managedLedgerName = topicName.getEncodedLocalName();
return snapshotSystemTopicInternalStats;
});
}
protected CompletableFuture<PersistentTopic> getExistingPersistentTopicAsync(boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> {
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
.getTopics().get(topicName.toString());
if (topicFuture == null) {
return FutureUtil.failedFuture(new RestException(NOT_FOUND,
String.format("Topic not found %s", topicName.toString())));
}
return topicFuture.thenCompose(optionalTopic -> {
if (!optionalTopic.isPresent()) {
return FutureUtil.failedFuture(new RestException(NOT_FOUND,
String.format("Topic not found %s", topicName.toString())));
}
return CompletableFuture.completedFuture((PersistentTopic) optionalTopic.get());
});
});
}
protected void checkTransactionCoordinatorEnabled() {
if (!pulsar().getConfig().isTransactionCoordinatorEnabled()) {
throw new RestException(SERVICE_UNAVAILABLE,
"This Broker is not configured with transactionCoordinatorEnabled=true.");
}
}
protected void validateTopicName(String property, String namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
try {
this.namespaceName = NamespaceName.get(property, namespace);
this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName, topic);
} catch (IllegalArgumentException e) {
log.warn("[{}] Failed to validate topic name {}://{}/{}/{}", clientAppId(), domain(), property, namespace,
topic, e);
throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic name is not valid");
}
}
protected CompletableFuture<Void> internalScaleTransactionCoordinators(int replicas) {
return validateSuperUserAccessAsync()
.thenCompose((ignore) -> namespaceResources().getPartitionedTopicResources()
.updatePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, p -> {
if (p.partitions >= replicas) {
throw new RestException(Response.Status.NOT_ACCEPTABLE,
"Number of transaction coordinators should "
+ "be more than the current number of transaction coordinator");
}
return new PartitionedTopicMetadata(replicas);
}));
}
protected CompletableFuture<PositionInPendingAckStats> internalGetPositionStatsPendingAckStats(
boolean authoritative, String subName, PositionImpl position, Integer batchIndex) {
CompletableFuture<PositionInPendingAckStats> completableFuture = new CompletableFuture<>();
getExistingPersistentTopicAsync(authoritative)
.thenAccept(topic -> {
PositionInPendingAckStats result = topic.getSubscription(subName)
.checkPositionInPendingAckState(position, batchIndex);
completableFuture.complete(result);
}).exceptionally(ex -> {
completableFuture.completeExceptionally(ex);
return null;
});
return completableFuture;
}
}