blob: 6558eb8ca43c4567144d1196fd73286aa2e1321a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats;
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.resources.ClusterResources;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.resources.TenantResources;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStoreProvider;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* Test for consuming transaction messages.
*/
@Slf4j
@Test(groups = "broker")
public class TransactionBatchWriterMetricsTest extends MockedPulsarServiceBaseTest {
private final String clusterName = "test";
public static final NamespaceName DEFAULT_NAMESPACE = NamespaceName.get("public/default");
private final String topicNameSuffix = "t-rest-topic";
private final String topicName = DEFAULT_NAMESPACE.getPersistentTopicName(topicNameSuffix);
@BeforeClass
public void setup() throws Exception {
MLTransactionMetadataStoreProvider.initBufferedWriterMetrics("localhost");
MLPendingAckStoreProvider.initBufferedWriterMetrics("localhost");
super.internalSetup();
}
@AfterClass
protected void cleanup() throws Exception {
super.internalCleanup();
}
@Override
protected ServiceConfiguration getDefaultConf() {
ServiceConfiguration conf = super.getDefaultConf();
// enable transaction.
conf.setSystemTopicEnabled(true);
conf.setTransactionCoordinatorEnabled(true);
// enabled batch writer.
conf.setTransactionPendingAckBatchedWriteEnabled(true);
conf.setTransactionPendingAckBatchedWriteMaxRecords(10);
conf.setTransactionLogBatchedWriteEnabled(true);
conf.setTransactionLogBatchedWriteMaxRecords(10);
// wait for shutdown of the broker, this prevents flakiness which could be caused by metrics being
// unregistered asynchronously. This impacts the execution of the next test method if this would be happening.
conf.setBrokerShutdownTimeoutMs(5000L);
return conf;
}
@Override
protected void startBroker() throws Exception {
super.startBroker();
ensureClusterExists(pulsar, clusterName);
ensureTenantExists(pulsar.getPulsarResources().getTenantResources(), TopicName.PUBLIC_TENANT, clusterName);
ensureNamespaceExists(pulsar.getPulsarResources().getNamespaceResources(), DEFAULT_NAMESPACE,
clusterName);
ensureNamespaceExists(pulsar.getPulsarResources().getNamespaceResources(), NamespaceName.SYSTEM_NAMESPACE,
clusterName);
ensureTopicExists(pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources(),
SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN, 16);
}
@Test
public void testTransactionMetaLogMetrics() throws Exception{
String metricsLabelCluster = clusterName;
String metricsLabelBroker = pulsar.getAdvertisedAddress().split(":")[0];
admin.topics().createNonPartitionedTopic(topicName);
sendAndAckSomeMessages();
// call metrics
@Cleanup
Client client = ClientBuilder.newClient();
WebTarget target = client.target(brokerUrl + "/metrics/get");
Response response = target.request(MediaType.APPLICATION_JSON_TYPE).buildGet().invoke();
Assert.assertTrue(response.getStatus() / 200 == 1);
List<String> metricsLines = parseResponseEntityToList(response);
metricsLines = metricsLines.stream().filter(s -> !s.startsWith("#") && s.contains("bufferedwriter")).collect(
Collectors.toList());
// verify tc.
String metrics_key_txn_tc_record_count_sum =
"pulsar_txn_tc_bufferedwriter_batch_records_sum{cluster=\"%s\",broker=\"%s\"} ";
Assert.assertTrue(searchMetricsValue(metricsLines,
String.format(metrics_key_txn_tc_record_count_sum, metricsLabelCluster, metricsLabelBroker))
> 0);
String metrics_key_txn_tc_max_delay =
"pulsar_txn_tc_bufferedwriter_flush_trigger_max_delay_total{cluster=\"%s\",broker=\"%s\"} ";
Assert.assertTrue(searchMetricsValue(metricsLines,
String.format(metrics_key_txn_tc_max_delay, metricsLabelCluster, metricsLabelBroker))
> 0);
String metrics_key_txn_tc_bytes_size =
"pulsar_txn_tc_bufferedwriter_batch_size_bytes_sum{cluster=\"%s\",broker=\"%s\"} ";
Assert.assertTrue(searchMetricsValue(metricsLines,
String.format(metrics_key_txn_tc_bytes_size, metricsLabelCluster, metricsLabelBroker))
> 0);
// verify pending ack.
String metrics_key_txn_pending_ack_record_count_sum =
"pulsar_txn_pending_ack_store_bufferedwriter_batch_records_sum{cluster=\"%s\",broker=\"%s\"} ";
Assert.assertTrue(searchMetricsValue(metricsLines,
String.format(metrics_key_txn_pending_ack_record_count_sum, metricsLabelCluster, metricsLabelBroker))
> 0);
String metrics_key_txn_pending_ack_max_delay =
"pulsar_txn_pending_ack_store_bufferedwriter_flush_trigger_max_delay_total{cluster=\"%s\",broker=\"%s\"} ";
Assert.assertTrue(searchMetricsValue(metricsLines,
String.format(metrics_key_txn_pending_ack_max_delay, metricsLabelCluster, metricsLabelBroker))
> 0);
String metrics_key_txn_pending_ack_bytes_size =
"pulsar_txn_pending_ack_store_bufferedwriter_batch_size_bytes_sum{cluster=\"%s\",broker=\"%s\"} ";
Assert.assertTrue(searchMetricsValue(metricsLines,
String.format(metrics_key_txn_pending_ack_bytes_size, metricsLabelCluster, metricsLabelBroker))
> 0);
// cleanup.
response.close();
admin.topics().delete(topicName, true);
}
private static Double searchMetricsValue(List<String> metricsLines, String key){
for (int i = 0; i < metricsLines.size(); i++){
String metricsLine = metricsLines.get(i);
if (metricsLine.startsWith("#")){
continue;
}
if (metricsLine.startsWith(key)){
return Double.valueOf(metricsLine.split(" ")[1]);
}
}
return null;
}
private void sendAndAckSomeMessages() throws Exception {
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.batchingMaxMessages(2)
.create();
Consumer consumer = pulsarClient.newConsumer()
.subscriptionType(SubscriptionType.Shared)
.topic(topicName)
.isAckReceiptEnabled(true)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscriptionName("my-subscription")
.subscribe();
producer.sendAsync("normal message x".getBytes()).get();
for (int i = 0; i < 100; i++){
Transaction transaction =
pulsarClient.newTransaction().withTransactionTimeout(10, TimeUnit.SECONDS).build().get();
Message msg = consumer.receive();
producer.newMessage(transaction).value(("tx msg a-" + i).getBytes(StandardCharsets.UTF_8)).sendAsync();
producer.newMessage(transaction).value(("tx msg b-" + i).getBytes(StandardCharsets.UTF_8)).sendAsync();
consumer.acknowledgeAsync(msg.getMessageId(), transaction);
transaction.commit().get();
}
}
private static void ensureClusterExists(PulsarService pulsar, String cluster) throws Exception {
ClusterResources clusterResources = pulsar.getPulsarResources().getClusterResources();
ClusterData clusterData = ClusterData.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.serviceUrlTls(pulsar.getWebServiceAddress())
.brokerServiceUrl(pulsar.getBrokerServiceUrl())
.brokerServiceUrlTls(pulsar.getBrokerServiceUrl())
.build();
if (!clusterResources.clusterExists(cluster)) {
clusterResources.createCluster(cluster, clusterData);
}
}
private static void ensureTopicExists(NamespaceResources.PartitionedTopicResources partitionedTopicResources,
TopicName topicName, int numPartitions) throws Exception {
Optional<PartitionedTopicMetadata> getResult =
partitionedTopicResources.getPartitionedTopicMetadataAsync(topicName).get();
if (!getResult.isPresent()) {
partitionedTopicResources.createPartitionedTopic(topicName, new PartitionedTopicMetadata(numPartitions));
} else {
PartitionedTopicMetadata existsMeta = getResult.get();
if (existsMeta.partitions < numPartitions) {
partitionedTopicResources.updatePartitionedTopicAsync(topicName,
__ -> new PartitionedTopicMetadata(numPartitions)).get();
}
}
}
private static void ensureNamespaceExists(NamespaceResources namespaceResources, NamespaceName namespaceName,
String cluster) throws Exception {
if (!namespaceResources.namespaceExists(namespaceName)) {
Policies policies = new Policies();
policies.bundles = getBundles(16);
policies.replication_clusters = Collections.singleton(cluster);
namespaceResources.createPolicies(namespaceName, policies);
} else {
namespaceResources.setPolicies(namespaceName, policies -> {
policies.replication_clusters.add(cluster);
return policies;
});
}
}
private static void ensureTenantExists(TenantResources tenantResources, String tenant, String cluster)
throws Exception {
if (!tenantResources.tenantExists(tenant)) {
TenantInfoImpl publicTenant = new TenantInfoImpl(Collections.emptySet(), Collections.singleton(cluster));
tenantResources.createTenant(tenant, publicTenant);
} else {
tenantResources.updateTenantAsync(tenant, ti -> {
ti.getAllowedClusters().add(cluster);
return ti;
}).get();
}
}
private List<String> parseResponseEntityToList(Response response) throws Exception {
InputStream inputStream = (InputStream) response.getEntity();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
List<String> list = new ArrayList<>();
while (true){
String str = bufferedReader.readLine();
if (str == null){
break;
}
list.add(str);
}
return list;
}
protected PulsarClient newPulsarClient(String url, int intervalInSecs) throws PulsarClientException {
org.apache.pulsar.client.api.ClientBuilder clientBuilder =
PulsarClient.builder()
.serviceUrl(url)
.enableTransaction(true)
.statsInterval(intervalInSecs, TimeUnit.SECONDS);
customizeNewPulsarClientBuilder(clientBuilder);
return createNewPulsarClient(clientBuilder);
}
}