blob: 1e5f4679492ad0af973901feec600549d372ad96 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin.v3;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.http.HttpStatus;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorStats;
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {
@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
conf.setEnablePackagesManagement(true);
conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
conf.setTransactionCoordinatorEnabled(true);
conf.setTransactionBufferSnapshotMaxTransactionCount(1);
return conf;
}
@BeforeMethod
@Override
protected void setup() throws Exception {
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("pulsar", tenantInfo);
admin.namespaces().createNamespace("pulsar/system", Set.of("test"));
admin.tenants().createTenant("public", tenantInfo);
admin.namespaces().createNamespace("public/default", Set.of("test"));
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Test(timeOut = 20000)
public void testListTransactionCoordinators() throws Exception {
initTransaction(4);
final List<TransactionCoordinatorInfo> result = admin
.transactions().listTransactionCoordinatorsAsync().get();
assertEquals(result.size(), 4);
final String expectedUrl = pulsar.getBrokerServiceUrl();
for (int i = 0; i < 4; i++) {
assertEquals(result.get(i).getBrokerServiceUrl(), expectedUrl);
}
}
@Test(timeOut = 20000)
public void testGetTransactionCoordinatorStats() throws Exception {
initTransaction(2);
getTransaction().commit().get();
getTransaction().abort().get();
TransactionCoordinatorStats transactionCoordinatorstats =
admin.transactions().getCoordinatorStatsByIdAsync(1).get();
verifyCoordinatorStats(transactionCoordinatorstats.state,
transactionCoordinatorstats.leastSigBits, transactionCoordinatorstats.lowWaterMark);
transactionCoordinatorstats = admin.transactions().getCoordinatorStatsByIdAsync(0).get();
verifyCoordinatorStats(transactionCoordinatorstats.state,
transactionCoordinatorstats.leastSigBits, transactionCoordinatorstats.lowWaterMark);
Map<Integer, TransactionCoordinatorStats> stats = admin.transactions().getCoordinatorStatsAsync().get();
assertEquals(stats.size(), 2);
transactionCoordinatorstats = stats.get(0);
verifyCoordinatorStats(transactionCoordinatorstats.state,
transactionCoordinatorstats.leastSigBits, transactionCoordinatorstats.lowWaterMark);
transactionCoordinatorstats = stats.get(1);
verifyCoordinatorStats(transactionCoordinatorstats.state,
transactionCoordinatorstats.leastSigBits, transactionCoordinatorstats.lowWaterMark);
}
@Test(timeOut = 20000)
public void testGetTransactionInBufferStats() throws Exception {
initTransaction(2);
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic = "persistent://public/default/testGetTransactionInBufferStats";
try {
admin.transactions()
.getTransactionInBufferStatsAsync(new TxnID(1, 1), topic).get();
fail("Should failed here");
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
}
try {
pulsar.getBrokerService().getTopic(topic, false);
admin.transactions()
.getTransactionInBufferStatsAsync(new TxnID(1, 1), topic).get();
fail("Should failed here");
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).sendTimeout(0, TimeUnit.SECONDS).create();
MessageId messageId = producer.newMessage(transaction).value("Hello pulsar!".getBytes()).send();
TransactionInBufferStats transactionInBufferStats = admin.transactions()
.getTransactionInBufferStatsAsync(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic).get();
PositionImpl position =
PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(), ((MessageIdImpl) messageId).getEntryId());
assertEquals(transactionInBufferStats.startPosition, position.toString());
assertFalse(transactionInBufferStats.aborted);
transaction.abort().get();
transactionInBufferStats = admin.transactions()
.getTransactionInBufferStatsAsync(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic).get();
assertNull(transactionInBufferStats.startPosition);
assertTrue(transactionInBufferStats.aborted);
}
@Test(timeOut = 20000)
public void testGetTransactionInPendingAckStats() throws Exception {
initTransaction(2);
final String topic = "persistent://public/default/testGetTransactionInBufferStats";
final String subName = "test";
try {
admin.transactions()
.getTransactionInPendingAckStatsAsync(new TxnID(1,
2), topic, subName).get();
fail("Should failed here");
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
}
try {
pulsar.getBrokerService().getTopic(topic, false);
admin.transactions()
.getTransactionInPendingAckStatsAsync(new TxnID(1,
2), topic, subName).get();
fail("Should failed here");
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.subscriptionName(subName).subscribe();
producer.sendAsync("Hello pulsar!".getBytes());
producer.sendAsync("Hello pulsar!".getBytes());
producer.sendAsync("Hello pulsar!".getBytes());
producer.sendAsync("Hello pulsar!".getBytes());
TransactionImpl transaction = (TransactionImpl) getTransaction();
TransactionInPendingAckStats transactionInPendingAckStats = admin.transactions()
.getTransactionInPendingAckStatsAsync(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic, subName).get();
assertNull(transactionInPendingAckStats.cumulativeAckPosition);
consumer.receive();
consumer.receive();
Message<byte[]> message = consumer.receive();
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) message.getMessageId();
consumer.acknowledgeCumulativeAsync(batchMessageId, transaction).get();
transactionInPendingAckStats = admin.transactions()
.getTransactionInPendingAckStatsAsync(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits()), topic, subName).get();
assertEquals(transactionInPendingAckStats.cumulativeAckPosition,
String.valueOf(batchMessageId.getLedgerId()) +
':' +
batchMessageId.getEntryId() +
':' +
batchMessageId.getBatchIndex());
}
@Test(timeOut = 20000)
public void testGetTransactionMetadata() throws Exception {
initTransaction(2);
long currentTime = System.currentTimeMillis();
final String topic1 = "persistent://public/default/testGetTransactionMetadata-1";
final String subName1 = "test1";
final String topic2 = "persistent://public/default/testGetTransactionMetadata-2";
final String subName2 = "test2";
final String subName3 = "test3";
admin.topics().createNonPartitionedTopic(topic1);
admin.topics().createNonPartitionedTopic(topic2);
TransactionImpl transaction = (TransactionImpl) getTransaction();
Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic1).create();
Producer<byte[]> producer2 = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic2).create();
Consumer<byte[]> consumer1 = pulsarClient.newConsumer(Schema.BYTES).topic(topic1)
.subscriptionName(subName1).subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer(Schema.BYTES).topic(topic2)
.subscriptionName(subName2).subscribe();
Consumer<byte[]> consumer3 = pulsarClient.newConsumer(Schema.BYTES).topic(topic2)
.subscriptionName(subName3).subscribe();
MessageId messageId1 = producer1.send("Hello pulsar!".getBytes());
MessageId messageId2 = producer2.send("Hello pulsar!".getBytes());
MessageId messageId3 = producer1.newMessage(transaction).value("Hello pulsar!".getBytes()).send();
MessageId messageId4 = producer2.newMessage(transaction).value("Hello pulsar!".getBytes()).send();
consumer1.acknowledgeCumulativeAsync(messageId1, transaction).get();
consumer2.acknowledgeCumulativeAsync(messageId2, transaction).get();
consumer3.acknowledgeCumulativeAsync(messageId2, transaction).get();
TxnID txnID = new TxnID(transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits());
TransactionMetadata transactionMetadata = admin.transactions()
.getTransactionMetadataAsync(new TxnID(transaction.getTxnIdMostBits(),
transaction.getTxnIdLeastBits())).get();
assertEquals(transactionMetadata.txnId, txnID.toString());
assertTrue(transactionMetadata.openTimestamp > currentTime);
assertEquals(transactionMetadata.timeoutAt, 5000L);
assertEquals(transactionMetadata.status, "OPEN");
Map<String, TransactionInBufferStats> producedPartitions = transactionMetadata.producedPartitions;
Map<String, Map<String, TransactionInPendingAckStats>> ackedPartitions = transactionMetadata.ackedPartitions;
PositionImpl position1 = getPositionByMessageId(messageId1);
PositionImpl position2 = getPositionByMessageId(messageId2);
PositionImpl position3 = getPositionByMessageId(messageId3);
PositionImpl position4 = getPositionByMessageId(messageId4);
assertFalse(producedPartitions.get(topic1).aborted);
assertFalse(producedPartitions.get(topic2).aborted);
assertEquals(producedPartitions.get(topic1).startPosition, position3.toString());
assertEquals(producedPartitions.get(topic2).startPosition, position4.toString());
assertEquals(ackedPartitions.get(topic1).size(), 1);
assertEquals(ackedPartitions.get(topic2).size(), 2);
assertEquals(ackedPartitions.get(topic1).get(subName1).cumulativeAckPosition, position1.toString());
assertEquals(ackedPartitions.get(topic2).get(subName2).cumulativeAckPosition, position2.toString());
assertEquals(ackedPartitions.get(topic2).get(subName3).cumulativeAckPosition, position2.toString());
}
@Test(timeOut = 20000)
public void testGetTransactionBufferStats() throws Exception {
initTransaction(2);
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic = "persistent://public/default/testGetTransactionBufferStats";
final String subName1 = "test1";
final String subName2 = "test2";
try {
admin.transactions()
.getTransactionBufferStatsAsync(topic).get();
fail("Should failed here");
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
}
try {
pulsar.getBrokerService().getTopic(topic, false);
admin.transactions()
.getTransactionBufferStatsAsync(topic).get();
fail("Should failed here");
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic).create();
Consumer<byte[]> consumer1 = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.subscriptionName(subName1).subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.subscriptionName(subName2).subscribe();
long currentTime = System.currentTimeMillis();
MessageId messageId = producer.newMessage(transaction).value("Hello pulsar!".getBytes()).send();
transaction.commit().get();
transaction = (TransactionImpl) getTransaction();
consumer1.acknowledgeAsync(messageId, transaction).get();
consumer2.acknowledgeAsync(messageId, transaction).get();
TransactionBufferStats transactionBufferStats = admin.transactions().
getTransactionBufferStatsAsync(topic).get();
assertEquals(transactionBufferStats.state, "Ready");
assertEquals(transactionBufferStats.maxReadPosition,
PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(),
((MessageIdImpl) messageId).getEntryId() + 1).toString());
assertTrue(transactionBufferStats.lastSnapshotTimestamps > currentTime);
assertNull(transactionBufferStats.lowWaterMarks);
transactionBufferStats = admin.transactions().getTransactionBufferStats(topic, true);
assertNotNull(transactionBufferStats.lowWaterMarks);
}
@DataProvider(name = "ackType")
public static Object[] ackType() {
return new Object[] { "cumulative", "individual"};
}
@Test(timeOut = 20000, dataProvider = "ackType")
public void testGetPendingAckStats(String ackType) throws Exception {
initTransaction(2);
final String topic = "persistent://public/default/testGetPendingAckStats";
final String subName = "test1";
try {
admin.transactions()
.getPendingAckStatsAsync(topic, subName).get();
fail("Should failed here");
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
}
try {
pulsar.getBrokerService().getTopic(topic, false);
admin.transactions()
.getPendingAckStatsAsync(topic, subName).get();
fail("Should failed here");
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.subscriptionName(subName).subscribe();
TransactionPendingAckStats transactionPendingAckStats = admin.transactions().
getPendingAckStatsAsync(topic, subName).get();
assertEquals(transactionPendingAckStats.state, "None");
producer.newMessage().value("Hello pulsar!".getBytes()).send();
TransactionImpl transaction = (TransactionImpl) getTransaction();
if (ackType.equals("individual")) {
consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction).get();
} else {
consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), transaction).get();
}
transactionPendingAckStats = admin.transactions().
getPendingAckStatsAsync(topic, subName).get();
assertNull(transactionPendingAckStats.lowWaterMarks);
assertEquals(transactionPendingAckStats.state, "Ready");
}
@Test
public void testTransactionGetStats() throws Exception {
initTransaction(1);
final String topic = "persistent://public/default/testGetPendingAckStats";
final String subName = "test1";
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.sendTimeout(0, TimeUnit.SECONDS).topic(topic).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.subscriptionName(subName).subscribe();
Transaction transaction1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
Transaction transaction2 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
producer.newMessage().send();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), transaction1).get();
for (int i = 0; i < 5; i++) {
producer.newMessage(transaction1).send();
}
transaction1.commit().get();
message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), transaction2).get();
producer.newMessage(transaction2).send();
TransactionBufferStats transactionBufferStats =
admin.transactions().getTransactionBufferStats(topic, true);
assertEquals(transactionBufferStats.ongoingTxnSize, 1);
assertNotNull(transactionBufferStats.lowWaterMarks);
TransactionPendingAckStats transactionPendingAckStats =
admin.transactions().getPendingAckStats(topic, subName, true);
assertEquals(transactionPendingAckStats.ongoingTxnSize, 1);
assertNotNull(transactionPendingAckStats.lowWaterMarks);
assertEquals(admin.transactions().getCoordinatorStatsById(0).ongoingTxnSize, 1);
}
@Test(timeOut = 20000)
public void testGetSlowTransactions() throws Exception {
initTransaction(2);
TransactionImpl transaction1 = (TransactionImpl) pulsarClient.newTransaction()
.withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
TransactionImpl transaction2 = (TransactionImpl) pulsarClient.newTransaction()
.withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
pulsarClient.newTransaction().withTransactionTimeout(20, TimeUnit.SECONDS).build();
pulsarClient.newTransaction().withTransactionTimeout(20, TimeUnit.SECONDS).build();
Map<String, TransactionMetadata> transactionMetadataMap = admin.transactions()
.getSlowTransactionsAsync(30, TimeUnit.SECONDS).get();
assertEquals(transactionMetadataMap.size(), 2);
TxnID txnID1 = new TxnID(transaction1.getTxnIdMostBits(), transaction1.getTxnIdLeastBits());
TxnID txnID2 = new TxnID(transaction2.getTxnIdMostBits(), transaction2.getTxnIdLeastBits());
TransactionMetadata transactionMetadata = transactionMetadataMap.get(txnID1.toString());
assertNotNull(transactionMetadata);
assertEquals(transactionMetadata.timeoutAt, 60000);
transactionMetadata = transactionMetadataMap.get(txnID2.toString());
assertNotNull(transactionMetadata);
assertEquals(transactionMetadata.timeoutAt, 60000);
}
private static PositionImpl getPositionByMessageId(MessageId messageId) {
return PositionImpl.get(((MessageIdImpl) messageId).getLedgerId(), ((MessageIdImpl) messageId).getEntryId());
}
@Test(timeOut = 20000)
public void testGetCoordinatorInternalStats() throws Exception {
initTransaction(1);
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(60, TimeUnit.SECONDS).build().get();
TransactionCoordinatorInternalStats stats = admin.transactions()
.getCoordinatorInternalStatsAsync(0, true).get();
verifyManagedLedgerInternalStats(stats.transactionLogStats.managedLedgerInternalStats, 26);
assertEquals(TopicName.get(TopicDomain.persistent.toString(), NamespaceName.SYSTEM_NAMESPACE,
MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + "0").getPersistenceNamingEncoding(),
stats.transactionLogStats.managedLedgerName);
transaction.commit().get();
stats = admin.transactions()
.getCoordinatorInternalStatsAsync(0, false).get();
assertNull(stats.transactionLogStats.managedLedgerInternalStats.ledgers.get(0).metadata);
assertEquals(TopicName.get(TopicDomain.persistent.toString(), NamespaceName.SYSTEM_NAMESPACE,
MLTransactionLogImpl.TRANSACTION_LOG_PREFIX + "0").getPersistenceNamingEncoding(),
stats.transactionLogStats.managedLedgerName);
}
@Test(timeOut = 20000)
public void testGetPendingAckInternalStats() throws Exception {
initTransaction(1);
TransactionImpl transaction = (TransactionImpl) getTransaction();
final String topic = "persistent://public/default/testGetPendingAckInternalStats";
final String subName = "test";
try {
admin.transactions()
.getPendingAckInternalStatsAsync(topic, subName, true).get();
fail("Should failed here");
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
}
try {
pulsar.getBrokerService().getTopic(topic, false);
admin.transactions()
.getPendingAckInternalStatsAsync(topic, subName, true).get();
fail("Should failed here");
} catch (ExecutionException ex) {
assertTrue(ex.getCause() instanceof PulsarAdminException.NotFoundException);
PulsarAdminException.NotFoundException cause = (PulsarAdminException.NotFoundException)ex.getCause();
assertEquals(cause.getMessage(), "Topic not found");
}
admin.topics().createNonPartitionedTopic(topic);
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic)
.subscriptionName(subName).subscribe();
MessageId messageId = producer.send("Hello pulsar!".getBytes());
consumer.acknowledgeAsync(messageId, transaction).get();
TransactionPendingAckInternalStats stats = admin.transactions()
.getPendingAckInternalStatsAsync(topic, subName, true).get();
ManagedLedgerInternalStats managedLedgerInternalStats = stats.pendingAckLogStats.managedLedgerInternalStats;
assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default",
"testGetPendingAckInternalStats" + "-"
+ subName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
stats.pendingAckLogStats.managedLedgerName);
verifyManagedLedgerInternalStats(managedLedgerInternalStats, 16);
ManagedLedgerInternalStats finalManagedLedgerInternalStats = managedLedgerInternalStats;
managedLedgerInternalStats.cursors.forEach((s, cursorStats) -> {
assertEquals(s, SystemTopicNames.PENDING_ACK_STORE_CURSOR_NAME);
assertEquals(cursorStats.readPosition, finalManagedLedgerInternalStats.lastConfirmedEntry);
});
stats = admin.transactions()
.getPendingAckInternalStatsAsync(topic, subName, false).get();
managedLedgerInternalStats = stats.pendingAckLogStats.managedLedgerInternalStats;
assertEquals(TopicName.get(TopicDomain.persistent.toString(), "public", "default",
"testGetPendingAckInternalStats" + "-"
+ subName + SystemTopicNames.PENDING_ACK_STORE_SUFFIX).getPersistenceNamingEncoding(),
stats.pendingAckLogStats.managedLedgerName);
assertNull(managedLedgerInternalStats.ledgers.get(0).metadata);
}
@Test(timeOut = 20000)
public void testGetTransactionBufferInternalStats() throws Exception {
// Initialize transaction
initTransaction(1);
// Create topics
final String topic1 = "persistent://public/default/testGetTransactionBufferInternalStats-1";
final String topic2 = "persistent://public/default/testGetTransactionBufferInternalStats-2";
final String topic3 = "persistent://public/default/testGetTransactionBufferInternalStats-3";
pulsar.getConfig().setTransactionCoordinatorEnabled(false);
admin.topics().createNonPartitionedTopic(topic1);
// Verify NotFoundException when transaction coordinator is disabled
try {
admin.transactions().getTransactionBufferInternalStatsAsync(topic1, true).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarAdminException.NotFoundException);
}
// Enable transaction coordinator and disable segmented snapshot
pulsar.getConfig().setTransactionCoordinatorEnabled(true);
pulsar.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);
// Send a message with a transaction and abort it
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES).topic(topic2).create();
TransactionImpl transaction = (TransactionImpl) getTransaction();
producer.newMessage(transaction).send();
transaction.abort().get();
// Get transaction buffer internal stats and verify single snapshot stats
TransactionBufferInternalStats stats = admin.transactions()
.getTransactionBufferInternalStatsAsync(topic2, true).get();
assertEquals(stats.snapshotType, AbortedTxnProcessor.SnapshotType.Single.toString());
assertNotNull(stats.singleSnapshotSystemTopicInternalStats);
// Get managed ledger internal stats for the transaction buffer snapshot topic
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(
TopicName.get(topic2).getNamespace() + "/" + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
verifyManagedLedgerInternalStats(stats.singleSnapshotSystemTopicInternalStats.managedLedgerInternalStats,
internalStats);
assertTrue(stats.singleSnapshotSystemTopicInternalStats.managedLedgerName
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT));
assertNull(stats.segmentInternalStats);
assertNull(stats.segmentIndexInternalStats);
// Configure segmented snapshot and set segment size
pulsar.getConfig().setTransactionBufferSnapshotSegmentSize(9);
pulsar.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
// Send a message with a transaction and abort it
producer = pulsarClient.newProducer(Schema.BYTES).topic(topic3).create();
transaction = (TransactionImpl) getTransaction();
producer.newMessage(transaction).send();
transaction.abort().get();
// Get transaction buffer internal stats and verify segmented snapshot stats
stats = admin.transactions().getTransactionBufferInternalStatsAsync(topic3, true).get();
assertEquals(stats.snapshotType, AbortedTxnProcessor.SnapshotType.Segment.toString());
assertNull(stats.singleSnapshotSystemTopicInternalStats);
assertNotNull(stats.segmentInternalStats);
// Get managed ledger internal stats for the transaction buffer segments topic
internalStats = admin.topics().getInternalStats(
TopicName.get(topic2).getNamespace() + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
verifyManagedLedgerInternalStats(stats.segmentInternalStats.managedLedgerInternalStats, internalStats);
assertTrue(stats.segmentInternalStats.managedLedgerName
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS));
// Get managed ledger internal stats for the transaction buffer indexes topic
assertNotNull(stats.segmentIndexInternalStats);
internalStats = admin.topics().getInternalStats(
TopicName.get(topic2).getNamespace() + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
verifyManagedLedgerInternalStats(stats.segmentIndexInternalStats.managedLedgerInternalStats, internalStats);
assertTrue(stats.segmentIndexInternalStats.managedLedgerName
.contains(SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES));
}
@Test(timeOut = 20000)
public void testTransactionNotEnabled() throws Exception {
cleanup();
conf.setTransactionCoordinatorEnabled(false);
setup();
try {
admin.transactions().getCoordinatorInternalStats(1, false);
} catch (PulsarAdminException ex) {
assertEquals(ex.getStatusCode(), HttpStatus.SC_SERVICE_UNAVAILABLE);
}
try {
admin.transactions().scaleTransactionCoordinators(1);
} catch (PulsarAdminException ex) {
assertEquals(ex.getStatusCode(), HttpStatus.SC_SERVICE_UNAVAILABLE);
}
}
@Test
public void testUpdateTransactionCoordinatorNumber() throws Exception {
int coordinatorSize = 3;
pulsar.getPulsarResources()
.getNamespaceResources()
.getPartitionedTopicResources()
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
new PartitionedTopicMetadata(coordinatorSize));
try {
admin.transactions().scaleTransactionCoordinators(coordinatorSize - 1);
fail();
} catch (PulsarAdminException pulsarAdminException) {
assertEquals(pulsarAdminException.getStatusCode(), HttpStatus.SC_NOT_ACCEPTABLE);
}
try {
admin.transactions().scaleTransactionCoordinators(-1);
fail();
} catch (PulsarAdminException pulsarAdminException) {
assertEquals(pulsarAdminException.getCause().getMessage(),
"Number of transaction coordinators must be more than 0");
}
admin.transactions().scaleTransactionCoordinators(coordinatorSize * 2);
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true));
pulsarClient.close();
pulsarClient = null;
Awaitility.await().until(() -> pulsar.getTransactionMetadataStoreService().getStores().size() ==
coordinatorSize * 2);
pulsar.getConfiguration().setAuthenticationEnabled(true);
pulsar.getConfiguration().setAuthorizationEnabled(true);
Set<String> proxyRoles = spy(Set.class);
doReturn(true).when(proxyRoles).contains(any());
pulsar.getConfiguration().setProxyRoles(proxyRoles);
try {
admin.transactions().scaleTransactionCoordinators(coordinatorSize * 2 + 1);
fail();
} catch (PulsarAdminException.NotAuthorizedException ignored) {
}
}
@Test
public void testGetRecoveryTime() throws Exception {
initTransaction(1);
final String topic = "persistent://public/default/testGetRecoveryTime";
final String subName = "test";
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.subscriptionName(subName)
.topic(topic)
.subscribe();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.sendTimeout(0, TimeUnit.SECONDS)
.topic(topic)
.create();
Awaitility.await().untilAsserted(() -> {
Map<Integer, TransactionCoordinatorStats> transactionCoordinatorStatsMap =
admin.transactions().getCoordinatorStats();
assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverStartTime, 0L);
assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverEndTime, 0L);
assertNotEquals(transactionCoordinatorStatsMap.get(0).recoverEndTime, -1L);
});
Awaitility.await().untilAsserted(() -> {
TransactionBufferStats transactionBufferStats = admin.transactions().getTransactionBufferStats(topic);
assertNotEquals(transactionBufferStats.recoverStartTime, 0L);
assertNotEquals(transactionBufferStats.recoverEndTime, 0L);
assertNotEquals(transactionBufferStats.recoverEndTime, -1L);
});
TransactionPendingAckStats transactionPendingAckStats =
admin.transactions().getPendingAckStats(topic, subName);
assertEquals(transactionPendingAckStats.recoverStartTime, 0L);
assertEquals(transactionPendingAckStats.recoverEndTime, 0L);
Transaction transaction1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.MINUTES)
.build()
.get();
producer.newMessage().send();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), transaction1);
transaction1.commit().get();
transactionPendingAckStats =
admin.transactions().getPendingAckStats(topic, subName);
assertNotEquals(transactionPendingAckStats.recoverStartTime, 0L);
assertNotEquals(transactionPendingAckStats.recoverEndTime, 0L);
assertNotEquals(transactionPendingAckStats.recoverEndTime, -1L);
}
@Test
public void testCheckPositionInPendingAckState() throws Exception {
String topic = "persistent://public/default/test";
String subName = "sub";
initTransaction(1);
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.sendTimeout(5, TimeUnit.SECONDS)
.enableBatching(false)
.topic(topic)
.create();
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscribe();
producer.newMessage().send();
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
MessageIdImpl messageId = (MessageIdImpl) message.getMessageId();
PositionInPendingAckStats result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(result.state, PositionInPendingAckStats.State.PendingAckNotReady);
consumer.acknowledgeAsync(messageId, transaction).get();
result = admin.transactions().getPositionStatsInPendingAck(topic, subName,
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(result.state, PositionInPendingAckStats.State.PendingAck);
transaction.commit().get();
Awaitility.await().untilAsserted(() -> {
PositionInPendingAckStats r = admin.transactions().getPositionStatsInPendingAck(topic, subName,
messageId.getLedgerId(), messageId.getEntryId(), null);
assertEquals(r.state, PositionInPendingAckStats.State.MarkDelete);
});
}
@Test
public void testGetPositionStatsInPendingAckStatsFroBatch() throws Exception {
String topic = "persistent://public/default/test";
String subscriptionName = "my-subscription-batch";
initTransaction(1);
pulsar.getBrokerService()
.getManagedLedgerConfig(TopicName.get(topic)).get()
.setDeletionAtBatchIndexLevelEnabled(true);
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true)
.batchingMaxMessages(3)
// set batch max publish delay big enough to make sure entry has 3 messages
.batchingMaxPublishDelay(3, TimeUnit.SECONDS)
.topic(topic).create();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.subscriptionName(subscriptionName)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.isAckReceiptEnabled(true)
.topic(topic)
.subscribe();
List<MessageId> messageIds = new ArrayList<>();
List<CompletableFuture<MessageId>> futureMessageIds = new ArrayList<>();
List<String> messages = new ArrayList<>();
for (int i = 0; i < 3; i++) {
String message = "my-message-" + i;
messages.add(message);
CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(message);
futureMessageIds.add(messageIdCompletableFuture);
}
for (CompletableFuture<MessageId> futureMessageId : futureMessageIds) {
MessageId messageId = futureMessageId.get();
messageIds.add(messageId);
}
Transaction transaction = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.DAYS)
.build()
.get();
Message<String> message1 = consumer.receive();
Message<String> message2 = consumer.receive();
BatchMessageIdImpl messageId = (BatchMessageIdImpl) message2.getMessageId();
consumer.acknowledgeAsync(messageId, transaction).get();
PositionInPendingAckStats positionStatsInPendingAckStats =
admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 1);
assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.PendingAck);
positionStatsInPendingAckStats =
admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 2);
assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.NotInPendingAck);
positionStatsInPendingAckStats =
admin.transactions().getPositionStatsInPendingAck(topic, subscriptionName,
messageId.getLedgerId(), messageId.getEntryId(), 10);
assertEquals(positionStatsInPendingAckStats.state, PositionInPendingAckStats.State.InvalidPosition);
}
private static void verifyCoordinatorStats(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");
assertEquals(sequenceId, 0);
assertEquals(lowWaterMark, 0);
}
private void initTransaction(int coordinatorSize) throws Exception {
pulsar.getPulsarResources()
.getNamespaceResources()
.getPartitionedTopicResources()
.createPartitionedTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN,
new PartitionedTopicMetadata(coordinatorSize));
admin.lookups().lookupTopic(SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN.toString());
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true));
pulsarClient.close();
pulsarClient = null;
Awaitility.await().until(() ->
pulsar.getTransactionMetadataStoreService().getStores().size() == coordinatorSize);
replacePulsarClient(PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true));
}
private Transaction getTransaction() throws Exception {
return pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS).build().get();
}
private static void verifyManagedLedgerInternalStats(ManagedLedgerInternalStats managedLedgerInternalStats,
long totalSize) {
assertEquals(managedLedgerInternalStats.entriesAddedCounter, 1);
assertEquals(managedLedgerInternalStats.numberOfEntries, 1);
assertEquals(managedLedgerInternalStats.totalSize, totalSize);
assertEquals(managedLedgerInternalStats.currentLedgerEntries, 1);
assertEquals(managedLedgerInternalStats.currentLedgerSize, totalSize);
assertNull(managedLedgerInternalStats.lastLedgerCreationFailureTimestamp);
assertEquals(managedLedgerInternalStats.waitingCursorsCount, 0);
assertEquals(managedLedgerInternalStats.pendingAddEntriesCount, 0);
assertNotNull(managedLedgerInternalStats.lastConfirmedEntry);
assertEquals(managedLedgerInternalStats.ledgers.size(), 1);
assertNotNull(managedLedgerInternalStats.ledgers.get(0).metadata);
assertEquals(managedLedgerInternalStats.cursors.size(), 1);
}
private static void verifyManagedLedgerInternalStats(ManagedLedgerInternalStats internalStats,
ManagedLedgerInternalStats persistentTopicStats) {
assertEquals(persistentTopicStats.entriesAddedCounter, internalStats.entriesAddedCounter);
assertEquals(persistentTopicStats.numberOfEntries, internalStats.numberOfEntries);
assertEquals(persistentTopicStats.totalSize, internalStats.totalSize);
assertEquals(persistentTopicStats.currentLedgerEntries, internalStats.currentLedgerEntries);
assertEquals(persistentTopicStats.currentLedgerSize, internalStats.currentLedgerSize);
assertEquals(persistentTopicStats.lastLedgerCreationFailureTimestamp, internalStats.lastLedgerCreationFailureTimestamp);
assertEquals(persistentTopicStats.waitingCursorsCount, internalStats.waitingCursorsCount);
assertEquals(persistentTopicStats.pendingAddEntriesCount, internalStats.pendingAddEntriesCount);
assertEquals(persistentTopicStats.lastConfirmedEntry, internalStats.lastConfirmedEntry);
assertNotNull(internalStats.ledgers.get(0).metadata);
assertEquals(persistentTopicStats.ledgers.size(), internalStats.ledgers.size());
assertEquals(persistentTopicStats.cursors.size(), internalStats.cursors.size());
}
}