| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.admin.impl; |
| |
| import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; |
| import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED; |
| import static javax.ws.rs.core.Response.Status.NOT_FOUND; |
| import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import javax.ws.rs.container.AsyncResponse; |
| import javax.ws.rs.core.Response; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.mledger.ManagedLedger; |
| import org.apache.bookkeeper.mledger.impl.PositionImpl; |
| import org.apache.pulsar.broker.PulsarServerException; |
| import org.apache.pulsar.broker.admin.AdminResource; |
| import org.apache.pulsar.broker.service.Topic; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor; |
| import org.apache.pulsar.broker.web.RestException; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.Transactions; |
| import org.apache.pulsar.client.api.transaction.TxnID; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.SystemTopicNames; |
| import org.apache.pulsar.common.naming.TopicDomain; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.partition.PartitionedTopicMetadata; |
| import org.apache.pulsar.common.policies.data.SnapshotSystemTopicInternalStats; |
| import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats; |
| import org.apache.pulsar.common.policies.data.TransactionBufferStats; |
| import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo; |
| import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats; |
| import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats; |
| import org.apache.pulsar.common.policies.data.TransactionInBufferStats; |
| import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats; |
| import org.apache.pulsar.common.policies.data.TransactionLogStats; |
| import org.apache.pulsar.common.policies.data.TransactionMetadata; |
| import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats; |
| import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; |
| import org.apache.pulsar.common.stats.PositionInPendingAckStats; |
| import org.apache.pulsar.common.util.Codec; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID; |
| import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; |
| import org.apache.pulsar.transaction.coordinator.TxnMeta; |
| import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException; |
| import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException; |
| import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; |
| |
| @Slf4j |
| public abstract class TransactionsBase extends AdminResource { |
| |
| protected void internalListCoordinators(AsyncResponse asyncResponse) { |
| final PulsarAdmin admin; |
| try { |
| admin = pulsar().getAdminClient(); |
| } catch (PulsarServerException ex) { |
| asyncResponse.resume(new RestException(ex)); |
| return; |
| } |
| Map<Integer, TransactionCoordinatorInfo> result = new HashMap<>(); |
| admin.lookups() |
| .lookupPartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartitionedTopicName()) |
| .thenAccept(map -> { |
| map.forEach((topicPartition, brokerServiceUrl) -> { |
| final int coordinatorId = TopicName.getPartitionIndex(topicPartition); |
| result.put(coordinatorId, new TransactionCoordinatorInfo(coordinatorId, brokerServiceUrl)); |
| }); |
| |
| asyncResponse.resume(result.values()); |
| }) |
| .exceptionally(ex -> { |
| log.error("[{}] Failed to list transaction coordinators: {}", |
| clientAppId(), ex.getMessage(), ex); |
| resumeAsyncResponseExceptionally(asyncResponse, ex); |
| return null; |
| }); |
| } |
| |
| protected void internalGetCoordinatorStats(AsyncResponse asyncResponse, boolean authoritative, |
| Integer coordinatorId) { |
| if (coordinatorId != null) { |
| validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId), |
| authoritative); |
| TransactionMetadataStore transactionMetadataStore = |
| pulsar().getTransactionMetadataStoreService().getStores() |
| .get(TransactionCoordinatorID.get(coordinatorId)); |
| if (transactionMetadataStore == null) { |
| asyncResponse.resume(new RestException(NOT_FOUND, |
| "Transaction coordinator not found! coordinator id : " + coordinatorId)); |
| return; |
| } |
| asyncResponse.resume(transactionMetadataStore.getCoordinatorStats()); |
| } else { |
| getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, |
| false, false).thenAccept(partitionMetadata -> { |
| if (partitionMetadata.partitions == 0) { |
| asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, |
| "Transaction coordinator not found")); |
| return; |
| } |
| List<CompletableFuture<TransactionCoordinatorStats>> transactionMetadataStoreInfoFutures = |
| new ArrayList<>(); |
| for (int i = 0; i < partitionMetadata.partitions; i++) { |
| try { |
| transactionMetadataStoreInfoFutures |
| .add(pulsar().getAdminClient().transactions().getCoordinatorStatsByIdAsync(i)); |
| } catch (PulsarServerException e) { |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| } |
| Map<Integer, TransactionCoordinatorStats> stats = new HashMap<>(); |
| FutureUtil.waitForAll(transactionMetadataStoreInfoFutures).whenComplete((result, e) -> { |
| if (e != null) { |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| for (int i = 0; i < transactionMetadataStoreInfoFutures.size(); i++) { |
| try { |
| stats.put(i, transactionMetadataStoreInfoFutures.get(i).get()); |
| } catch (Exception exception) { |
| asyncResponse.resume(new RestException(exception.getCause())); |
| return; |
| } |
| } |
| |
| asyncResponse.resume(stats); |
| }); |
| }).exceptionally(ex -> { |
| log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex); |
| resumeAsyncResponseExceptionally(asyncResponse, ex); |
| return null; |
| }); |
| } |
| } |
| |
| protected CompletableFuture<TransactionInPendingAckStats> internalGetTransactionInPendingAckStats( |
| boolean authoritative, long mostSigBits, long leastSigBits, String subName) { |
| return getExistingPersistentTopicAsync(authoritative) |
| .thenApply(topic -> topic.getTransactionInPendingAckStats(new TxnID(mostSigBits, leastSigBits), |
| subName)); |
| } |
| |
| protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBufferStats( |
| boolean authoritative, long mostSigBits, long leastSigBits) { |
| return getExistingPersistentTopicAsync(authoritative) |
| .thenApply(topic -> topic.getTransactionInBufferStats(new TxnID(mostSigBits, leastSigBits))); |
| } |
| |
| protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative, |
| boolean lowWaterMarks, |
| boolean segmentStats) { |
| return getExistingPersistentTopicAsync(authoritative) |
| .thenApply(topic -> topic.getTransactionBufferStats(lowWaterMarks, segmentStats)); |
| } |
| |
| protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckStats( |
| boolean authoritative, String subName, boolean lowWaterMarks) { |
| return getExistingPersistentTopicAsync(authoritative) |
| .thenApply(topic -> topic.getTransactionPendingAckStats(subName, lowWaterMarks)); |
| } |
| |
| protected void internalGetTransactionMetadata(AsyncResponse asyncResponse, |
| boolean authoritative, int mostSigBits, long leastSigBits) { |
| try { |
| validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(mostSigBits), |
| authoritative); |
| CompletableFuture<TransactionMetadata> transactionMetadataFuture = new CompletableFuture<>(); |
| TxnMeta txnMeta = pulsar().getTransactionMetadataStoreService() |
| .getTxnMeta(new TxnID(mostSigBits, leastSigBits)).get(); |
| getTransactionMetadata(txnMeta, transactionMetadataFuture); |
| asyncResponse.resume(transactionMetadataFuture.get(10, TimeUnit.SECONDS)); |
| } catch (Exception e) { |
| if (e instanceof ExecutionException) { |
| if (e.getCause() instanceof CoordinatorNotFoundException |
| || e.getCause() instanceof TransactionNotFoundException) { |
| asyncResponse.resume(new RestException(NOT_FOUND, e.getCause())); |
| return; |
| } |
| asyncResponse.resume(new RestException(e.getCause())); |
| } else { |
| asyncResponse.resume(new RestException(e)); |
| } |
| } |
| } |
| |
| private void getTransactionMetadata(TxnMeta txnMeta, |
| CompletableFuture<TransactionMetadata> transactionMetadataFuture) |
| throws PulsarServerException { |
| Transactions transactions = pulsar().getAdminClient().transactions(); |
| TransactionMetadata transactionMetadata = new TransactionMetadata(); |
| TxnID txnID = txnMeta.id(); |
| transactionMetadata.txnId = txnID.toString(); |
| transactionMetadata.status = txnMeta.status().name(); |
| transactionMetadata.openTimestamp = txnMeta.getOpenTimestamp(); |
| transactionMetadata.timeoutAt = txnMeta.getTimeoutAt(); |
| transactionMetadata.owner = txnMeta.getOwner(); |
| |
| List<CompletableFuture<TransactionInPendingAckStats>> ackedPartitionsFutures = new ArrayList<>(); |
| Map<String, Map<String, CompletableFuture<TransactionInPendingAckStats>>> ackFutures = new HashMap<>(); |
| txnMeta.ackedPartitions().forEach(transactionSubscription -> { |
| String topic = transactionSubscription.getTopic(); |
| String subName = transactionSubscription.getSubscription(); |
| CompletableFuture<TransactionInPendingAckStats> future = |
| transactions.getTransactionInPendingAckStatsAsync(txnID, topic, subName); |
| ackedPartitionsFutures.add(future); |
| if (ackFutures.containsKey(topic)) { |
| ackFutures.get(topic) |
| .put(transactionSubscription.getSubscription(), future); |
| } else { |
| Map<String, CompletableFuture<TransactionInPendingAckStats>> pendingAckStatsMap = |
| new HashMap<>(); |
| pendingAckStatsMap.put(transactionSubscription.getSubscription(), future); |
| ackFutures.put(topic, pendingAckStatsMap); |
| } |
| }); |
| |
| List<CompletableFuture<TransactionInBufferStats>> producedPartitionsFutures = new ArrayList<>(); |
| Map<String, CompletableFuture<TransactionInBufferStats>> produceFutures = new HashMap<>(); |
| txnMeta.producedPartitions().forEach(topic -> { |
| CompletableFuture<TransactionInBufferStats> future = |
| transactions.getTransactionInBufferStatsAsync(txnID, topic); |
| producedPartitionsFutures.add(future); |
| produceFutures.put(topic, future); |
| |
| }); |
| |
| FutureUtil.waitForAll(ackedPartitionsFutures).whenComplete((v, e) -> { |
| if (e != null) { |
| transactionMetadataFuture.completeExceptionally(e); |
| return; |
| } |
| |
| FutureUtil.waitForAll(producedPartitionsFutures).whenComplete((x, t) -> { |
| if (t != null) { |
| transactionMetadataFuture.completeExceptionally(e); |
| return; |
| } |
| |
| Map<String, Map<String, TransactionInPendingAckStats>> ackedPartitions = new HashMap<>(); |
| Map<String, TransactionInBufferStats> producedPartitions = new HashMap<>(); |
| |
| for (String topic : ackFutures.keySet()) { |
| Map<String, TransactionInPendingAckStats> subs = new HashMap<>(); |
| for (String sub : ackFutures.get(topic).keySet()) { |
| try { |
| subs.put(sub, ackFutures.get(topic).get(sub).get()); |
| } catch (Exception exception) { |
| transactionMetadataFuture.completeExceptionally(exception); |
| return; |
| } |
| } |
| |
| ackedPartitions.put(topic, subs); |
| } |
| |
| for (String topic : produceFutures.keySet()) { |
| try { |
| producedPartitions.put(topic, produceFutures.get(topic).get()); |
| } catch (Exception exception) { |
| transactionMetadataFuture.completeExceptionally(exception); |
| return; |
| } |
| } |
| transactionMetadata.ackedPartitions = ackedPartitions; |
| transactionMetadata.producedPartitions = producedPartitions; |
| transactionMetadataFuture.complete(transactionMetadata); |
| }); |
| }); |
| } |
| |
| protected void internalGetSlowTransactions(AsyncResponse asyncResponse, |
| boolean authoritative, long timeout, Integer coordinatorId) { |
| try { |
| if (coordinatorId != null) { |
| validateTopicOwnership(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId), |
| authoritative); |
| TransactionMetadataStore transactionMetadataStore = |
| pulsar().getTransactionMetadataStoreService().getStores() |
| .get(TransactionCoordinatorID.get(coordinatorId)); |
| if (transactionMetadataStore == null) { |
| asyncResponse.resume(new RestException(NOT_FOUND, |
| "Transaction coordinator not found! coordinator id : " + coordinatorId)); |
| return; |
| } |
| List<TxnMeta> transactions = transactionMetadataStore.getSlowTransactions(timeout); |
| List<CompletableFuture<TransactionMetadata>> completableFutures = new ArrayList<>(); |
| for (TxnMeta txnMeta : transactions) { |
| CompletableFuture<TransactionMetadata> completableFuture = new CompletableFuture<>(); |
| getTransactionMetadata(txnMeta, completableFuture); |
| completableFutures.add(completableFuture); |
| } |
| |
| FutureUtil.waitForAll(completableFutures).whenComplete((v, e) -> { |
| if (e != null) { |
| asyncResponse.resume(new RestException(e.getCause())); |
| return; |
| } |
| |
| Map<String, TransactionMetadata> transactionMetadata = new HashMap<>(); |
| for (CompletableFuture<TransactionMetadata> future : completableFutures) { |
| try { |
| transactionMetadata.put(future.get().txnId, future.get()); |
| } catch (Exception exception) { |
| asyncResponse.resume(new RestException(exception.getCause())); |
| return; |
| } |
| } |
| asyncResponse.resume(transactionMetadata); |
| }); |
| } else { |
| getPartitionedTopicMetadataAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, |
| false, false).thenAccept(partitionMetadata -> { |
| if (partitionMetadata.partitions == 0) { |
| asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, |
| "Transaction coordinator not found")); |
| return; |
| } |
| List<CompletableFuture<Map<String, TransactionMetadata>>> completableFutures = |
| new ArrayList<>(); |
| for (int i = 0; i < partitionMetadata.partitions; i++) { |
| try { |
| completableFutures |
| .add(pulsar().getAdminClient().transactions() |
| .getSlowTransactionsByCoordinatorIdAsync(i, timeout, |
| TimeUnit.MILLISECONDS)); |
| } catch (PulsarServerException e) { |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| } |
| Map<String, TransactionMetadata> transactionMetadataMaps = new HashMap<>(); |
| FutureUtil.waitForAll(completableFutures).whenComplete((result, e) -> { |
| if (e != null) { |
| asyncResponse.resume(new RestException(e)); |
| return; |
| } |
| |
| for (CompletableFuture<Map<String, TransactionMetadata>> transactionMetadataMap |
| : completableFutures) { |
| try { |
| transactionMetadataMaps.putAll(transactionMetadataMap.get()); |
| } catch (Exception exception) { |
| asyncResponse.resume(new RestException(exception.getCause())); |
| return; |
| } |
| } |
| asyncResponse.resume(transactionMetadataMaps); |
| }); |
| }).exceptionally(ex -> { |
| log.error("[{}] Failed to get transaction coordinator state.", clientAppId(), ex); |
| resumeAsyncResponseExceptionally(asyncResponse, ex); |
| return null; |
| }); |
| |
| } |
| } catch (Exception e) { |
| asyncResponse.resume(new RestException(e)); |
| } |
| } |
| |
| protected void internalGetCoordinatorInternalStats(AsyncResponse asyncResponse, boolean authoritative, |
| boolean metadata, int coordinatorId) { |
| try { |
| TopicName topicName = SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.getPartition(coordinatorId); |
| validateTopicOwnership(topicName, authoritative); |
| TransactionMetadataStore metadataStore = pulsar().getTransactionMetadataStoreService() |
| .getStores().get(TransactionCoordinatorID.get(coordinatorId)); |
| if (metadataStore == null) { |
| asyncResponse.resume(new RestException(NOT_FOUND, |
| "Transaction coordinator not found! coordinator id : " + coordinatorId)); |
| return; |
| } |
| if (metadataStore instanceof MLTransactionMetadataStore) { |
| ManagedLedger managedLedger = ((MLTransactionMetadataStore) metadataStore).getManagedLedger(); |
| TransactionCoordinatorInternalStats transactionCoordinatorInternalStats = |
| new TransactionCoordinatorInternalStats(); |
| TransactionLogStats transactionLogStats = new TransactionLogStats(); |
| transactionLogStats.managedLedgerName = managedLedger.getName(); |
| transactionLogStats.managedLedgerInternalStats = |
| managedLedger.getManagedLedgerInternalStats(metadata).get(); |
| transactionCoordinatorInternalStats.transactionLogStats = transactionLogStats; |
| asyncResponse.resume(transactionCoordinatorInternalStats); |
| } else { |
| asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED, |
| "Broker don't use MLTransactionMetadataStore!")); |
| } |
| } catch (Exception e) { |
| resumeAsyncResponseExceptionally(asyncResponse, e); |
| } |
| } |
| |
| protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendingAckInternalStats( |
| boolean authoritative, String subName, boolean metadata) { |
| return getExistingPersistentTopicAsync(authoritative) |
| .thenCompose(topic -> topic.getPendingAckManagedLedger(subName)) |
| .thenCompose(managedLedger -> |
| managedLedger.getManagedLedgerInternalStats(metadata) |
| .thenApply(internalStats -> { |
| TransactionLogStats pendingAckLogStats = new TransactionLogStats(); |
| pendingAckLogStats.managedLedgerName = managedLedger.getName(); |
| pendingAckLogStats.managedLedgerInternalStats = internalStats; |
| return pendingAckLogStats; |
| }) |
| .thenApply(pendingAckLogStats -> { |
| TransactionPendingAckInternalStats stats = new TransactionPendingAckInternalStats(); |
| stats.pendingAckLogStats = pendingAckLogStats; |
| return stats; |
| }) |
| ); |
| } |
| |
| protected CompletableFuture<TransactionBufferInternalStats> internalGetTransactionBufferInternalStats( |
| boolean authoritative, boolean metadata) { |
| TransactionBufferInternalStats transactionBufferInternalStats = new TransactionBufferInternalStats(); |
| return getExistingPersistentTopicAsync(authoritative) |
| .thenCompose(topic -> { |
| AbortedTxnProcessor.SnapshotType snapshotType = topic.getTransactionBuffer().getSnapshotType(); |
| if (snapshotType == null) { |
| return FutureUtil.failedFuture(new RestException(NOT_FOUND, |
| "Transaction buffer Snapshot for the topic does not exist")); |
| } else if (snapshotType == AbortedTxnProcessor.SnapshotType.Segment) { |
| transactionBufferInternalStats.snapshotType = snapshotType.toString(); |
| TopicName segmentTopic = TopicName.get(TopicDomain.persistent.toString(), namespaceName, |
| SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS); |
| CompletableFuture<SnapshotSystemTopicInternalStats> segmentInternalStatsFuture = |
| getTxnSnapshotInternalStats(segmentTopic, metadata); |
| TopicName indexTopic = TopicName.get(TopicDomain.persistent.toString(), |
| namespaceName, |
| SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES); |
| CompletableFuture<SnapshotSystemTopicInternalStats> segmentIndexInternalStatsFuture = |
| getTxnSnapshotInternalStats(indexTopic, metadata); |
| return segmentIndexInternalStatsFuture |
| .thenCombine(segmentInternalStatsFuture, (indexStats, segmentStats) -> { |
| transactionBufferInternalStats.segmentIndexInternalStats = indexStats; |
| transactionBufferInternalStats.segmentInternalStats = segmentStats; |
| return transactionBufferInternalStats; |
| }); |
| } else if (snapshotType == AbortedTxnProcessor.SnapshotType.Single) { |
| transactionBufferInternalStats.snapshotType = snapshotType.toString(); |
| TopicName singleSnapshotTopic = TopicName.get(TopicDomain.persistent.toString(), namespaceName, |
| SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT); |
| return getTxnSnapshotInternalStats(singleSnapshotTopic, metadata) |
| .thenApply(snapshotSystemTopicInternalStats -> { |
| transactionBufferInternalStats.singleSnapshotSystemTopicInternalStats = |
| snapshotSystemTopicInternalStats; |
| return transactionBufferInternalStats; |
| }); |
| } |
| return FutureUtil.failedFuture(new RestException(INTERNAL_SERVER_ERROR, "Unknown SnapshotType " |
| + snapshotType)); |
| }); |
| } |
| |
| private CompletableFuture<SnapshotSystemTopicInternalStats> getTxnSnapshotInternalStats(TopicName topicName, |
| boolean metadata) { |
| final PulsarAdmin admin; |
| try { |
| admin = pulsar().getAdminClient(); |
| } catch (PulsarServerException e) { |
| return FutureUtil.failedFuture(new RestException(e)); |
| } |
| return admin.topics().getInternalStatsAsync(topicName.toString(), metadata) |
| .thenApply(persistentTopicInternalStats -> { |
| SnapshotSystemTopicInternalStats |
| snapshotSystemTopicInternalStats = new SnapshotSystemTopicInternalStats(); |
| snapshotSystemTopicInternalStats.managedLedgerInternalStats = persistentTopicInternalStats; |
| snapshotSystemTopicInternalStats.managedLedgerName = topicName.getEncodedLocalName(); |
| return snapshotSystemTopicInternalStats; |
| }); |
| } |
| |
| protected CompletableFuture<PersistentTopic> getExistingPersistentTopicAsync(boolean authoritative) { |
| return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> { |
| CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService() |
| .getTopics().get(topicName.toString()); |
| if (topicFuture == null) { |
| return FutureUtil.failedFuture(new RestException(NOT_FOUND, |
| String.format("Topic not found %s", topicName.toString()))); |
| } |
| return topicFuture.thenCompose(optionalTopic -> { |
| if (!optionalTopic.isPresent()) { |
| return FutureUtil.failedFuture(new RestException(NOT_FOUND, |
| String.format("Topic not found %s", topicName.toString()))); |
| } |
| return CompletableFuture.completedFuture((PersistentTopic) optionalTopic.get()); |
| }); |
| }); |
| } |
| |
| protected void checkTransactionCoordinatorEnabled() { |
| if (!pulsar().getConfig().isTransactionCoordinatorEnabled()) { |
| throw new RestException(SERVICE_UNAVAILABLE, |
| "This Broker is not configured with transactionCoordinatorEnabled=true."); |
| } |
| } |
| |
| protected void validateTopicName(String property, String namespace, String encodedTopic) { |
| String topic = Codec.decode(encodedTopic); |
| try { |
| this.namespaceName = NamespaceName.get(property, namespace); |
| this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName, topic); |
| } catch (IllegalArgumentException e) { |
| log.warn("[{}] Failed to validate topic name {}://{}/{}/{}", clientAppId(), domain(), property, namespace, |
| topic, e); |
| throw new RestException(Response.Status.PRECONDITION_FAILED, "Topic name is not valid"); |
| } |
| } |
| |
| protected CompletableFuture<Void> internalScaleTransactionCoordinators(int replicas) { |
| return validateSuperUserAccessAsync() |
| .thenCompose((ignore) -> namespaceResources().getPartitionedTopicResources() |
| .updatePartitionedTopicAsync(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, p -> { |
| if (p.partitions >= replicas) { |
| throw new RestException(Response.Status.NOT_ACCEPTABLE, |
| "Number of transaction coordinators should " |
| + "be more than the current number of transaction coordinator"); |
| } |
| return new PartitionedTopicMetadata(replicas); |
| })); |
| } |
| |
| protected CompletableFuture<PositionInPendingAckStats> internalGetPositionStatsPendingAckStats( |
| boolean authoritative, String subName, PositionImpl position, Integer batchIndex) { |
| CompletableFuture<PositionInPendingAckStats> completableFuture = new CompletableFuture<>(); |
| getExistingPersistentTopicAsync(authoritative) |
| .thenAccept(topic -> { |
| PositionInPendingAckStats result = topic.getSubscription(subName) |
| .checkPositionInPendingAckState(position, batchIndex); |
| completableFuture.complete(result); |
| }).exceptionally(ex -> { |
| completableFuture.completeExceptionally(ex); |
| return null; |
| }); |
| return completableFuture; |
| } |
| } |