blob: 73ff714b55f8043fe9700c68cb9b4f7f92b94df4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;
import com.google.common.annotations.VisibleForTesting;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
import org.apache.pulsar.broker.transaction.timeout.TransactionTimeoutTrackerFactoryImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TransactionBufferClientException.RequestTimeoutException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TransactionTimeoutTrackerFactory;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionMetadataStoreStateException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionNotFoundException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransactionMetadataStoreService {
private static final Logger LOG = LoggerFactory.getLogger(TransactionMetadataStoreService.class);
private final Map<TransactionCoordinatorID, TransactionMetadataStore> stores;
private final TransactionMetadataStoreProvider transactionMetadataStoreProvider;
private final PulsarService pulsarService;
private final TransactionBufferClient tbClient;
private final TransactionTimeoutTrackerFactory timeoutTrackerFactory;
private static final long endTransactionRetryIntervalTime = 1000;
private final Timer transactionOpRetryTimer;
public TransactionMetadataStoreService(TransactionMetadataStoreProvider transactionMetadataStoreProvider,
PulsarService pulsarService, TransactionBufferClient tbClient,
HashedWheelTimer timer) {
this.pulsarService = pulsarService;
this.stores = new ConcurrentHashMap<>();
this.transactionMetadataStoreProvider = transactionMetadataStoreProvider;
this.tbClient = tbClient;
this.timeoutTrackerFactory = new TransactionTimeoutTrackerFactoryImpl(this, timer);
this.transactionOpRetryTimer = timer;
}
public void start() {
pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() {
@Override
public void onLoad(NamespaceBundle bundle) {
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
.whenComplete((topics, ex) -> {
if (ex == null) {
for (String topic : topics) {
TopicName name = TopicName.get(topic);
if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
&& name.isPartitioned()) {
addTransactionMetadataStore(TransactionCoordinatorID.get(name.getPartitionIndex()));
}
}
} else {
LOG.error("Failed to get owned topic list when triggering on-loading bundle {}.",
bundle, ex);
}
});
}
@Override
public void unLoad(NamespaceBundle bundle) {
pulsarService.getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle)
.whenComplete((topics, ex) -> {
if (ex == null) {
for (String topic : topics) {
TopicName name = TopicName.get(topic);
if (TopicName.TRANSACTION_COORDINATOR_ASSIGN.getLocalName()
.equals(TopicName.get(name.getPartitionedTopicName()).getLocalName())
&& name.isPartitioned()) {
removeTransactionMetadataStore(
TransactionCoordinatorID.get(name.getPartitionIndex()));
}
}
} else {
LOG.error("Failed to get owned topic list error when triggering un-loading bundle {}.",
bundle, ex);
}
});
}
@Override
public boolean test(NamespaceBundle namespaceBundle) {
return namespaceBundle.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE);
}
});
}
public void addTransactionMetadataStore(TransactionCoordinatorID tcId) {
pulsarService.getBrokerService()
.getManagedLedgerConfig(TopicName.get(MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + tcId))
.whenComplete((v, e) -> {
if (e != null) {
LOG.error("Add transaction metadata store with id {} error", tcId.getId(), e);
} else {
transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
timeoutTrackerFactory.newTracker(tcId))
.whenComplete((store, ex) -> {
if (ex != null) {
LOG.error("Add transaction metadata store with id {} error", tcId.getId(), ex);
} else {
stores.put(tcId, store);
LOG.info("Added new transaction meta store {}", tcId);
}
});
}
});
}
public void removeTransactionMetadataStore(TransactionCoordinatorID tcId) {
TransactionMetadataStore metadataStore = stores.remove(tcId);
if (metadataStore != null) {
metadataStore.closeAsync().whenComplete((v, ex) -> {
if (ex != null) {
LOG.error("Close transaction metadata store with id " + tcId, ex);
} else {
LOG.info("Removed and closed transaction meta store {}", tcId);
}
});
}
}
public CompletableFuture<TxnID> newTransaction(TransactionCoordinatorID tcId, long timeoutInMills) {
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.newTransaction(timeoutInMills);
}
public CompletableFuture<Void> addProducedPartitionToTxn(TxnID txnId, List<String> partitions) {
TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.addProducedPartitionToTxn(txnId, partitions);
}
public CompletableFuture<Void> addAckedPartitionToTxn(TxnID txnId, List<TransactionSubscription> partitions) {
TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.addAckedPartitionToTxn(txnId, partitions);
}
public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnId) {
TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.getTxnMeta(txnId);
}
public long getLowWaterMark(TxnID txnID) {
TransactionCoordinatorID tcId = getTcIdFromTxnId(txnID);
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return 0;
}
return store.getLowWaterMark();
}
public CompletableFuture<Void> updateTxnStatus(TxnID txnId, TxnStatus newStatus, TxnStatus expectedStatus) {
TransactionCoordinatorID tcId = getTcIdFromTxnId(txnId);
TransactionMetadataStore store = stores.get(tcId);
if (store == null) {
return FutureUtil.failedFuture(new CoordinatorNotFoundException(tcId));
}
return store.updateTxnStatus(txnId, newStatus, expectedStatus);
}
public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
TxnStatus newStatus;
switch (txnAction) {
case TxnAction.COMMIT_VALUE:
newStatus = TxnStatus.COMMITTING;
break;
case TxnAction.ABORT_VALUE:
newStatus = TxnStatus.ABORTING;
break;
default:
UnsupportedTxnActionException exception =
new UnsupportedTxnActionException(txnID, txnAction);
LOG.error(exception.getMessage());
completableFuture.completeExceptionally(exception);
return completableFuture;
}
completableFuture = updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
.thenCompose(ignored -> endTxnInTransactionBuffer(txnID, txnAction));
completableFuture.exceptionally(e -> {
if (!isRetryableException(e.getCause())) {
if (LOG.isDebugEnabled()) {
LOG.debug("End transaction op retry fail! TxnId : {}, TxnAction : {}", txnID, txnAction, e);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("End transaction op retry! TxnId : {}, TxnAction : {}", txnID, txnAction, e);
}
transactionOpRetryTimer.newTimeout(timeout -> endTransaction(txnID, txnAction),
endTransactionRetryIntervalTime, TimeUnit.MILLISECONDS);
}
return null;
});
return completableFuture;
}
public CompletableFuture<Void> endTransactionForTimeout(TxnID txnID) {
return getTxnMeta(txnID).thenCompose(txnMeta -> {
if (txnMeta.status() == TxnStatus.OPEN) {
return endTransaction(txnID, TxnAction.ABORT_VALUE);
} else {
return null;
}
}).exceptionally(e -> {
if (!(e instanceof TransactionNotFoundException)) {
endTransaction(txnID, TxnAction.ABORT_VALUE);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Transaction have been handle complete, "
+ "don't need to handle by transaction timeout! TxnId : {}", txnID);
}
}
return null;
});
}
private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction) {
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
if (throwable != null) {
resultFuture.completeExceptionally(throwable);
return;
}
long lowWaterMark = getLowWaterMark(txnID);
txnMeta.ackedPartitions().forEach(tbSub -> {
CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
if (TxnAction.COMMIT_VALUE == txnAction) {
actionFuture = tbClient.commitTxnOnSubscription(
tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
txnID.getLeastSigBits(), lowWaterMark);
} else if (TxnAction.ABORT_VALUE == txnAction) {
actionFuture = tbClient.abortTxnOnSubscription(
tbSub.getTopic(), tbSub.getSubscription(), txnID.getMostSigBits(),
txnID.getLeastSigBits(), lowWaterMark);
} else {
actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
}
completableFutureList.add(actionFuture);
});
txnMeta.producedPartitions().forEach(partition -> {
CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
if (TxnAction.COMMIT_VALUE == txnAction) {
actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(),
txnID.getLeastSigBits(), lowWaterMark);
} else if (TxnAction.ABORT_VALUE == txnAction) {
actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(),
txnID.getLeastSigBits(), lowWaterMark);
} else {
actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
}
completableFutureList.add(actionFuture);
});
try {
FutureUtil.waitForAll(completableFutureList).whenComplete((ignored, waitThrowable) -> {
if (waitThrowable != null) {
resultFuture.completeExceptionally(waitThrowable);
return;
}
resultFuture.complete(null);
});
} catch (Exception e) {
resultFuture.completeExceptionally(e);
}
});
return resultFuture.thenCompose((future) -> endTxnInTransactionMetadataStore(txnID, txnAction));
}
private static boolean isRetryableException(Throwable e) {
if (e instanceof TransactionMetadataStoreStateException
|| e instanceof RequestTimeoutException
|| e instanceof ManagedLedgerException
|| e instanceof PulsarClientException.BrokerPersistenceException) {
return true;
} else {
return false;
}
}
private CompletableFuture<Void> endTxnInTransactionMetadataStore(TxnID txnID, int txnAction) {
if (TxnAction.COMMIT.getValue() == txnAction) {
return updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING);
} else if (TxnAction.ABORT.getValue() == txnAction) {
return updateTxnStatus(txnID, TxnStatus.ABORTED, TxnStatus.ABORTING);
} else {
return FutureUtil.failedFuture(new InvalidTxnStatusException("Unsupported txnAction " + txnAction));
}
}
private TransactionCoordinatorID getTcIdFromTxnId(TxnID txnId) {
return new TransactionCoordinatorID(txnId.getMostSigBits());
}
public TransactionMetadataStoreProvider getTransactionMetadataStoreProvider() {
return transactionMetadataStoreProvider;
}
@VisibleForTesting
public Map<TransactionCoordinatorID, TransactionMetadataStore> getStores() {
return Collections.unmodifiableMap(stores);
}
}