blob: e50a39c5dfd9108662153a862c6b298de8d8d823 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreState;
import org.apache.pulsar.transaction.coordinator.TransactionSubscription;
import org.apache.pulsar.transaction.coordinator.TxnMeta;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class TransactionMetadataStoreServiceTest extends BrokerTestBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
ServiceConfiguration configuration = getDefaultConf();
configuration.setTransactionCoordinatorEnabled(true);
super.baseSetup(configuration);
admin.tenants().createTenant("pulsar", new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16);
admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString());
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test
public void testAddAndRemoveTransactionMetadataStore() throws Exception {
TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
Assert.assertNotNull(transactionMetadataStoreService);
admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.getPartition(0).toString());
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await().until(() ->
transactionMetadataStoreService.getStores().size() == 1);
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
Awaitility.await().until(() ->
transactionMetadataStoreService.getStores().size() == 0);
}
@Test
public void testNewTransaction() throws Exception {
TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0));
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(1));
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(2));
Awaitility.await().until(() ->
transactionMetadataStoreService.getStores().size() == 3);
checkTransactionMetadataStoreReady((MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)));
checkTransactionMetadataStoreReady((MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(1)));
checkTransactionMetadataStoreReady((MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(2)));
TxnID txnID0 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5).get();
TxnID txnID1 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(1), 5).get();
TxnID txnID2 = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(2), 5).get();
Assert.assertEquals(txnID0.getMostSigBits(), 0);
Assert.assertEquals(txnID1.getMostSigBits(), 1);
Assert.assertEquals(txnID2.getMostSigBits(), 2);
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(1));
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(2));
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0);
}
@Test
public void testAddProducedPartitionToTxn() throws Exception {
TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await().until(() ->
transactionMetadataStoreService.getStores().size() == 1);
MLTransactionMetadataStore transactionMetadataStore =
(MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000).get();
List<String> partitions = new ArrayList<>();
partitions.add("ptn-0");
partitions.add("ptn-1");
partitions.add("ptn-2");
transactionMetadataStoreService.addProducedPartitionToTxn(txnID, partitions);
TxnMeta txn = transactionMetadataStoreService.getTxnMeta(txnID).get();
assertEquals(txn.status(), TxnStatus.OPEN);
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0);
}
@Test
public void testAddAckedPartitionToTxn() throws Exception {
TransactionMetadataStoreService transactionMetadataStoreService = pulsar.getTransactionMetadataStoreService();
transactionMetadataStoreService.handleTcClientConnect(TransactionCoordinatorID.get(0)).get();
Awaitility.await().until(() ->
transactionMetadataStoreService.getStores().size() == 1);
MLTransactionMetadataStore transactionMetadataStore =
(MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
TxnID txnID = transactionMetadataStoreService.newTransaction(TransactionCoordinatorID.get(0), 5000).get();
List<TransactionSubscription> partitions = new ArrayList<>();
partitions.add(TransactionSubscription.builder().topic("ptn-1").subscription("sub-1").build());
partitions.add(TransactionSubscription.builder().topic("ptn-2").subscription("sub-1").build());
partitions.add(TransactionSubscription.builder().topic("ptn-3").subscription("sub-1").build());
transactionMetadataStoreService.addAckedPartitionToTxn(txnID, partitions);
TxnMeta txn = transactionMetadataStoreService.getTxnMeta(txnID).get();
assertEquals(txn.status(), TxnStatus.OPEN);
transactionMetadataStoreService.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
Assert.assertEquals(transactionMetadataStoreService.getStores().size(), 0);
}
@Test
public void testTimeoutTracker() throws Exception {
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await()
.until(() -> pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) != null);
MLTransactionMetadataStore transactionMetadataStore =
(MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
field.setAccessible(true);
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
int i = -1;
while (++i < 1000) {
try {
transactionMetadataStore.newTransaction(2000).get();
} catch (Exception e) {
//no operation
}
}
txnMap.forEach((txnID, txnMetaListPair) ->
Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS)
.until(() -> txnMap.size() == 0);
}
@Test
public void testTimeoutTrackerExpired() throws Exception {
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await().until(() -> pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) != null);
MLTransactionMetadataStore transactionMetadataStore =
(MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
field.setAccessible(true);
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
transactionMetadataStore.newTransaction(2000).get();
assertEquals(txnMap.size(), 1);
txnMap.forEach((txnID, txnMetaListPair) ->
Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == 0);
transactionMetadataStore.newTransaction(2000).get();
assertEquals(txnMap.size(), 1);
txnMap.forEach((txnID, txnMetaListPair) ->
Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == 0);
}
@Test
public void testTimeoutTrackerMultiThreading() throws Exception {
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await()
.until(() -> pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) != null);
MLTransactionMetadataStore transactionMetadataStore =
(MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
field.setAccessible(true);
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
new Thread(() -> {
int i = -1;
while (++i < 100) {
try {
transactionMetadataStore.newTransaction(1000);
} catch (Exception e) {
//no operation
}
}
}).start();
new Thread(() -> {
int i = -1;
while (++i < 100) {
try {
transactionMetadataStore.newTransaction(2000);
} catch (Exception e) {
//no operation
}
}
}).start();
new Thread(() -> {
int i = -1;
while (++i < 100) {
try {
transactionMetadataStore.newTransaction(3000);
} catch (Exception e) {
//no operation
}
}
}).start();
new Thread(() -> {
int i = -1;
while (++i < 100) {
try {
transactionMetadataStore.newTransaction(4000);
} catch (Exception e) {
//no operation
}
}
}).start();
checkoutTimeout(txnMap, 300);
checkoutTimeout(txnMap, 200);
checkoutTimeout(txnMap, 100);
checkoutTimeout(txnMap, 0);
}
private void checkoutTimeout(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap, int time) {
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS)
.until(() -> txnMap.size() == time);
}
@Test
public void transactionTimeoutRecoverTest() throws Exception {
int timeout = 2000;
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await()
.until(() -> pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) != null);
MLTransactionMetadataStore transactionMetadataStore =
(MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
transactionMetadataStore.newTransaction(timeout);
pulsar.getTransactionMetadataStoreService()
.removeTransactionMetadataStore(TransactionCoordinatorID.get(0));
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await()
.until(() -> pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) != null);
transactionMetadataStore =
(MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
field.setAccessible(true);
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
Awaitility.await().until(() -> txnMap.size() == 0);
}
@DataProvider(name = "txnStatus")
public Object[][] txnStatus() {
return new Object[][] { { TxnStatus.OPEN }, { TxnStatus.ABORTING }, { TxnStatus.COMMITTING } };
}
@Test(dataProvider = "txnStatus")
public void testEndTransactionOpRetry(TxnStatus txnStatus) throws Exception {
int timeOut = 3000;
pulsar.getTransactionMetadataStoreService().handleTcClientConnect(TransactionCoordinatorID.get(0));
Awaitility.await()
.until(() -> pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0)) != null);
MLTransactionMetadataStore transactionMetadataStore =
(MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
.getStores().get(TransactionCoordinatorID.get(0));
checkTransactionMetadataStoreReady(transactionMetadataStore);
TxnID txnID = transactionMetadataStore.newTransaction(timeOut - 2000).get();
TxnMeta txnMeta = transactionMetadataStore.getTxnMeta(txnID).get();
txnMeta.updateTxnStatus(txnStatus, TxnStatus.OPEN);
Field field = TransactionMetadataStoreState.class.getDeclaredField("state");
field.setAccessible(true);
field.set(transactionMetadataStore, TransactionMetadataStoreState.State.None);
CompletableFuture<Void> completableFuture = null;
try {
completableFuture = pulsar.getTransactionMetadataStoreService().endTransaction(txnID, TxnAction.COMMIT.getValue(),
false);
completableFuture.get(5, TimeUnit.SECONDS);
fail();
} catch (Exception e) {
if (txnStatus == TxnStatus.OPEN || txnStatus == TxnStatus.COMMITTING) {
assertTrue(e instanceof TimeoutException);
} else if (txnStatus == TxnStatus.ABORTING) {
assertTrue(e.getCause() instanceof CoordinatorException.InvalidTxnStatusException);
} else {
fail();
}
}
assertEquals(txnMeta.status(), txnStatus);
field = TransactionMetadataStoreState.class.getDeclaredField("state");
field.setAccessible(true);
field.set(transactionMetadataStore, TransactionMetadataStoreState.State.Ready);
if (txnStatus == TxnStatus.ABORTING) {
pulsar.getTransactionMetadataStoreService()
.endTransaction(txnID, TxnAction.ABORT.getValue(), false).get();
}
Awaitility.await().atMost(timeOut, TimeUnit.MILLISECONDS).until(() -> {
try {
transactionMetadataStore.getTxnMeta(txnID).get();
return false;
} catch (ExecutionException e) {
return e.getCause() instanceof CoordinatorException.TransactionNotFoundException;
}
});
}
private void checkTransactionMetadataStoreReady(MLTransactionMetadataStore transactionMetadataStore) throws NoSuchMethodException {
Method method = TransactionMetadataStoreState.class.getDeclaredMethod("checkIfReady");
method.setAccessible(true);
Awaitility.await()
.until(() -> (Boolean) method.invoke(transactionMetadataStore));
}
}