blob: 342a409c4ae51293baa786103ecab9524b76475b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.BoundType;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import java.lang.reflect.Field;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.ws.rs.client.InvocationCallback;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response.Status;
import lombok.Builder;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.testcontext.SpyConfig;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.client.admin.internal.LookupImpl;
import org.apache.pulsar.client.admin.internal.TenantsImpl;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundleFactory;
import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.PartitionedManagedLedgerInfo;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.BrokerAssignment;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.policies.data.PartitionedTopicInternalStats;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PoliciesUtil;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicHashPositions;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.PulsarCompactionServiceFactory;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker-admin")
public class AdminApiTest extends MockedPulsarServiceBaseTest {
private static final Logger LOG = LoggerFactory.getLogger(AdminApiTest.class);
private MockedPulsarService mockPulsarSetup;
private PulsarService otherPulsar;
private PulsarAdmin adminTls;
private PulsarAdmin otheradmin;
private NamespaceBundleFactory bundleFactory;
@BeforeClass
@Override
public void setup() throws Exception {
setupConfigAndStart(null);
}
@Override
protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder pulsarTestContextBuilder) {
pulsarTestContextBuilder.spyConfigCustomizer(
// verify(compactor) is used in this test class
builder -> builder.compactor(SpyConfig.SpyType.SPY_ALSO_INVOCATIONS));
}
private void applyDefaultConfig() {
conf.setSystemTopicEnabled(false);
conf.setTopicLevelPoliciesEnabled(false);
conf.setLoadBalancerEnabled(true);
conf.setBrokerServicePortTls(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTlsCertificateFilePath(BROKER_CERT_FILE_PATH);
conf.setTlsKeyFilePath(BROKER_KEY_FILE_PATH);
conf.setMessageExpiryCheckIntervalInMinutes(1);
conf.setSubscriptionExpiryCheckIntervalInMinutes(1);
conf.setBrokerDeleteInactiveTopicsEnabled(false);
conf.setNumExecutorThreadPoolSize(5);
}
private void setupConfigAndStart(java.util.function.Consumer<ServiceConfiguration> configurationConsumer) throws Exception {
applyDefaultConfig();
if (configurationConsumer != null) {
configurationConsumer.accept(conf);
}
super.internalSetup();
bundleFactory = new NamespaceBundleFactory(pulsar, Hashing.crc32());
adminTls = spy(PulsarAdmin.builder().tlsTrustCertsFilePath(CA_CERT_FILE_PATH)
.serviceHttpUrl(brokerUrlTls.toString()).build());
// create otherbroker to test redirect on calls that need
// namespace ownership
mockPulsarSetup = new MockedPulsarService(this.conf);
mockPulsarSetup.setup();
otherPulsar = mockPulsarSetup.getPulsar();
otheradmin = mockPulsarSetup.getAdmin();
setupClusters();
}
@AfterMethod(alwaysRun = true)
public void resetClusters() throws Exception {
pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
for (String tenant : admin.tenants().getTenants()) {
for (String namespace : admin.namespaces().getNamespaces(tenant)) {
deleteNamespaceWithRetry(namespace, true);
}
admin.tenants().deleteTenant(tenant, true);
}
for (String cluster : admin.clusters().getClusters()) {
admin.clusters().deleteCluster(cluster);
}
pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
resetConfig();
applyDefaultConfig();
setupClusters();
}
private void setupClusters() throws PulsarAdminException {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
admin.tenants().createTenant("prop-xyz", tenantInfo);
admin.namespaces().createNamespace("prop-xyz/ns1", Set.of("test"));
}
@AfterClass(alwaysRun = true)
@Override
public void cleanup() throws Exception {
adminTls.close();
super.internalCleanup();
mockPulsarSetup.cleanup();
}
@DataProvider(name = "numBundles")
public static Object[][] numBundles() {
return new Object[][] { { 1 }, { 4 } };
}
@DataProvider(name = "bundling")
public static Object[][] bundling() {
return new Object[][] { { 0 }, { 4 } };
}
@DataProvider(name = "topicName")
public Object[][] topicNamesProvider() {
return new Object[][] { { "topic_+&*%{}() \\$@#^%" }, { "simple-topicName" } };
}
@DataProvider(name = "topicType")
public Object[][] topicTypeProvider() {
return new Object[][] { { TopicDomain.persistent.value() }, { TopicDomain.non_persistent.value() } };
}
@DataProvider(name = "topicNamesForAllTypes")
public Object[][] topicNamesForAllTypesProvider() {
final List<Object[]> topicNames = new ArrayList<>();
for (int i = 0; i < topicTypeProvider().length; i++) {
for (int j = 0; j < topicNamesProvider().length; j++) {
topicNames.add(new Object[]{ topicTypeProvider()[i][0], topicNamesProvider()[j][0] });
}
}
return topicNames.toArray(new Object[topicNamesProvider().length * topicTypeProvider().length][]);
}
@Test
public void clusters() throws Exception {
admin.clusters().createCluster("usw",
ClusterData.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build());
// "test" cluster is part of config-default cluster and it's znode gets created when PulsarService creates
// failure-domain znode of this default cluster
assertEquals(admin.clusters().getClusters(), List.of("test", "usw"));
assertEquals(admin.clusters().getCluster("test"),
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
admin.clusters().updateCluster("usw",
ClusterData.builder().serviceUrl("http://new-broker.messaging.usw.example.com:8080").build());
assertEquals(admin.clusters().getClusters(), List.of("test", "usw"));
assertEquals(admin.clusters().getCluster("usw"),
ClusterData.builder().serviceUrl("http://new-broker.messaging.usw.example.com:8080").build());
admin.clusters().updateCluster("usw",
ClusterData.builder()
.serviceUrl("http://new-broker.messaging.usw.example.com:8080")
.serviceUrlTls("https://new-broker.messaging.usw.example.com:4443")
.build());
assertEquals(admin.clusters().getClusters(), List.of("test", "usw"));
assertEquals(admin.clusters().getCluster("usw"),
ClusterData.builder()
.serviceUrl("http://new-broker.messaging.usw.example.com:8080")
.serviceUrlTls("https://new-broker.messaging.usw.example.com:4443")
.build());
admin.clusters().deleteCluster("usw");
Awaitility.await()
.untilAsserted(() -> assertEquals(admin.clusters().getClusters(), List.of("test")));
deleteNamespaceWithRetry("prop-xyz/ns1", false);
admin.clusters().deleteCluster("test");
assertEquals(admin.clusters().getClusters(), new ArrayList<>());
// Check name validation
try {
admin.clusters().createCluster("bf!", ClusterData.builder().serviceUrl("http://dummy.messaging.example.com").build());
fail("should have failed");
} catch (PulsarAdminException e) {
assertTrue(e instanceof PreconditionFailedException);
}
}
@Test
public void clusterNamespaceIsolationPolicies() throws PulsarAdminException {
try {
// create
String policyName1 = "policy-1";
Map<String, String> parameters1 = new HashMap<>();
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");
NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList("other/use/other.*"))
.primary(Lists.newArrayList("prod1-broker[4-6].messaging.use.example.com"))
.secondary(Lists.newArrayList("prod1-broker.*.messaging.use.example.com"))
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
.build())
.build();
admin.clusters().createNamespaceIsolationPolicy("test", policyName1, nsPolicyData1);
String policyName2 = "policy-2";
Map<String, String> parameters2 = new HashMap<>();
parameters2.put("min_limit", "1");
parameters2.put("usage_threshold", "100");
NamespaceIsolationData nsPolicyData2 = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList("other/use/other.*"))
.primary(Collections.singletonList("prod1-broker[4-6].messaging.use.example.com"))
.secondary(Collections.singletonList("prod1-broker.*.messaging.use.example.com"))
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
.build())
.build();
admin.clusters().createNamespaceIsolationPolicy("test", policyName2, nsPolicyData2);
// verify create indirectly with get
Map<String, ? extends NamespaceIsolationData> policiesMap = admin.clusters().getNamespaceIsolationPolicies("test");
assertEquals(policiesMap.get(policyName1), nsPolicyData1);
assertEquals(policiesMap.get(policyName2), nsPolicyData2);
// verify update of primary
nsPolicyData1.getPrimary().remove(0);
nsPolicyData1.getPrimary().add("prod1-broker[1-2].messaging.use.example.com");
admin.clusters().updateNamespaceIsolationPolicy("test", policyName1, nsPolicyData1);
// verify primary change
policiesMap = admin.clusters().getNamespaceIsolationPolicies("test");
assertEquals(policiesMap.get(policyName1), nsPolicyData1);
// verify update of secondary
nsPolicyData1.getSecondary().remove(0);
nsPolicyData1.getSecondary().add("prod1-broker[3-4].messaging.use.example.com");
admin.clusters().updateNamespaceIsolationPolicy("test", policyName1, nsPolicyData1);
// verify secondary change
policiesMap = admin.clusters().getNamespaceIsolationPolicies("test");
assertEquals(policiesMap.get(policyName1), nsPolicyData1);
// verify update of failover policy limit
nsPolicyData1.getAutoFailoverPolicy().getParameters().put("min_limit", "10");
admin.clusters().updateNamespaceIsolationPolicy("test", policyName1, nsPolicyData1);
// verify min_limit change
policiesMap = admin.clusters().getNamespaceIsolationPolicies("test");
assertEquals(policiesMap.get(policyName1), nsPolicyData1);
// verify update of failover usage_threshold limit
nsPolicyData1.getAutoFailoverPolicy().getParameters().put("usage_threshold", "80");
admin.clusters().updateNamespaceIsolationPolicy("test", policyName1, nsPolicyData1);
// verify usage_threshold change
policiesMap = admin.clusters().getNamespaceIsolationPolicies("test");
assertEquals(policiesMap.get(policyName1), nsPolicyData1);
// verify single get
NamespaceIsolationDataImpl policy1Data = (NamespaceIsolationDataImpl) admin.clusters().getNamespaceIsolationPolicy("test", policyName1);
assertEquals(policy1Data, nsPolicyData1);
// verify creation of more than one policy
admin.clusters().createNamespaceIsolationPolicy("test", policyName2, nsPolicyData1);
try {
admin.clusters().getNamespaceIsolationPolicy("test", "no-such-policy");
fail("should have raised exception");
} catch (PulsarAdminException e) {
assertTrue(e instanceof NotFoundException);
}
// verify delete cluster failed
try {
admin.clusters().deleteCluster("test");
fail("should have raised exception");
} catch (PulsarAdminException e) {
assertTrue(e instanceof PreconditionFailedException);
}
// verify delete
admin.clusters().deleteNamespaceIsolationPolicy("test", policyName1);
admin.clusters().deleteNamespaceIsolationPolicy("test", policyName2);
try {
admin.clusters().getNamespaceIsolationPolicy("test", policyName1);
fail("should have raised exception");
} catch (PulsarAdminException e) {
assertTrue(e instanceof NotFoundException);
}
try {
admin.clusters().getNamespaceIsolationPolicy("test", policyName2);
fail("should have raised exception");
} catch (PulsarAdminException e) {
assertTrue(e instanceof NotFoundException);
}
try {
admin.clusters().getNamespaceIsolationPolicies("usc");
fail("should have raised exception");
} catch (PulsarAdminException e) {
assertTrue(e instanceof NotFoundException);
}
try {
admin.clusters().getNamespaceIsolationPolicy("usc", "no-such-cluster");
fail("should have raised exception");
} catch (PulsarAdminException e) {
assertTrue(e instanceof PreconditionFailedException);
}
try {
admin.clusters().createNamespaceIsolationPolicy("usc", "no-such-cluster", nsPolicyData1);
fail("should have raised exception");
} catch (PulsarAdminException e) {
assertTrue(e instanceof PreconditionFailedException);
}
try {
admin.clusters().updateNamespaceIsolationPolicy("usc", "no-such-cluster", policy1Data);
fail("should have raised exception");
} catch (PulsarAdminException e) {
assertTrue(e instanceof PreconditionFailedException);
}
} catch (PulsarAdminException e) {
LOG.warn("TEST FAILED [{}]", e.getMessage());
throw e;
}
// validate regex: invalid regex for primary and secondary
Map<String, String> parameters = new HashMap<>();
parameters.put("min_limit", "1");
parameters.put("usage_threshold", "100");
NamespaceIsolationData.Builder nsRegexPolicy = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList("other/use/other.*"))
.primary(List.of("prod1-broker[45-46].messaging.use.example.com"))
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters)
.build());
try {
admin.clusters().createNamespaceIsolationPolicy("test", "invalid_primary", nsRegexPolicy.build());
fail("should have failed with invalid regex");
}catch (PulsarAdminException e) {
//Ok
}
nsRegexPolicy.primary(List.of("prod1-broker[45-46].messaging.use.example.com",
"prod1-broker[4-5].messaging.use.example.com"))
.secondary(Collections.singletonList("prod1-broker[45-46].messaging.use.example.com"));
try {
admin.clusters().createNamespaceIsolationPolicy("test", "invalid_primary", nsRegexPolicy.build());
fail("should have failed with invalid regex");
} catch (PulsarAdminException e) {
// Ok
}
}
@Test
public void brokers() throws Exception {
List<String> list = admin.brokers().getActiveBrokers("test");
Assert.assertNotNull(list);
Assert.assertEquals(list.size(), 1);
List<String> list1 = admin.brokers().getActiveBrokers();
Assert.assertNotNull(list1);
Assert.assertEquals(list1.size(), 1);
List<String> list2 = otheradmin.brokers().getActiveBrokers("test");
Assert.assertNotNull(list2);
Assert.assertEquals(list2.size(), 1);
BrokerInfo leaderBroker = admin.brokers().getLeaderBroker();
Assert.assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get());
Map<String, NamespaceOwnershipStatus> nsMap = admin.brokers().getOwnedNamespaces("test", list.get(0));
// since sla-monitor ns is not created nsMap.size() == 1 (for HeartBeat Namespace)
Assert.assertEquals(nsMap.size(), 2);
for (String ns : nsMap.keySet()) {
NamespaceOwnershipStatus nsStatus = nsMap.get(ns);
if (ns.equals(
NamespaceService.getHeartbeatNamespace(pulsar.getAdvertisedAddress(), pulsar.getConfiguration())
+ "/0x00000000_0xffffffff")) {
assertEquals(nsStatus.broker_assignment, BrokerAssignment.shared);
assertFalse(nsStatus.is_controlled);
assertTrue(nsStatus.is_active);
}
}
String[] parts = list.get(0).split(":");
Assert.assertEquals(parts.length, 2);
Map<String, NamespaceOwnershipStatus> nsMap2 = adminTls.brokers().getOwnedNamespaces("test",
String.format("%s:%d", parts[0], pulsar.getListenPortHTTPS().get()));
Assert.assertEquals(nsMap2.size(), 2);
deleteNamespaceWithRetry("prop-xyz/ns1", false);
admin.clusters().deleteCluster("test");
assertEquals(admin.clusters().getClusters(), new ArrayList<>());
}
public void testUpdateDynamicLoadBalancerSheddingIntervalMinutes() throws Exception {
// update configuration
admin.brokers().updateDynamicConfiguration("loadBalancerSheddingIntervalMinutes", "10");
// wait config to be updated
Awaitility.await().until(() -> {
return conf.getLoadBalancerSheddingIntervalMinutes() == 10;
});
// verify value is updated
assertEquals(conf.getLoadBalancerSheddingIntervalMinutes(), 10);
}
@Test
public void testUpdateDynamicCacheConfigurationWithZkWatch() throws Exception {
// update configuration
admin.brokers().updateDynamicConfiguration("managedLedgerCacheSizeMB", "1");
admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionWatermark", "0.8");
admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionTimeThresholdMillis", "2000");
// wait config to be updated
Awaitility.await().until(() -> {
return pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize() == 1 * 1024L * 1024L
&& pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark() == 0.8
&& pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold() == TimeUnit.MILLISECONDS
.toNanos(2000);
});
// verify value is updated
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(), 1 * 1024L * 1024L);
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(), 0.8);
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), TimeUnit.MILLISECONDS
.toNanos(2000));
restartBroker();
// verify value again
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(), 1 * 1024L * 1024L);
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(), 0.8);
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), TimeUnit.MILLISECONDS
.toNanos(2000));
}
/**
* <pre>
* Verifies: zk-update configuration updates service-config
* 1. create znode for dynamic-config
* 2. start pulsar service so, pulsar can set the watch on that znode
* 3. update the configuration with new value
* 4. wait and verify that new value has been updated
* </pre>
*
* @throws Exception
*/
@Test
public void testUpdateDynamicConfigurationWithZkWatch() throws Exception {
final int initValue = 30000;
pulsar.getConfiguration().setBrokerShutdownTimeoutMs(initValue);
// (1) try to update dynamic field
final long shutdownTime = 10;
// update configuration
admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
// sleep incrementally as zk-watch notification is async and may take some time
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() != initValue) {
Thread.sleep(50 + (i * 10));
}
}
// wait config to be updated
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getBrokerShutdownTimeoutMs() != shutdownTime) {
Thread.sleep(100 + (i * 10));
} else {
break;
}
}
// verify value is updated
assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
// (2) try to update non-dynamic field
try {
admin.brokers().updateDynamicConfiguration("metadataStoreUrl", "zk:test-zk:1234");
} catch (Exception e) {
assertTrue(e instanceof PreconditionFailedException);
}
// (3) try to update non-existent field
try {
admin.brokers().updateDynamicConfiguration("test", Long.toString(shutdownTime));
} catch (Exception e) {
assertTrue(e instanceof PreconditionFailedException);
}
// (4) try to update dynamic-field with special char "/" and "%"
String user1 = "test/test%&$*/^";
String user2 = "user2/password";
final String configValue = user1 + "," + user2;
admin.brokers().updateDynamicConfiguration("superUserRoles", configValue);
String storedValue = admin.brokers().getAllDynamicConfigurations().get("superUserRoles");
assertEquals(configValue, storedValue);
retryStrategically((test) -> pulsar.getConfiguration().getSuperUserRoles().size() == 2, 5, 200);
assertTrue(pulsar.getConfiguration().getSuperUserRoles().contains(user1));
assertTrue(pulsar.getConfiguration().getSuperUserRoles().contains(user2));
admin.brokers().updateDynamicConfiguration("loadManagerClassName", SimpleLoadManagerImpl.class.getName());
retryStrategically((test) -> pulsar.getConfiguration().getLoadManagerClassName()
.equals(SimpleLoadManagerImpl.class.getName()), 150, 5);
assertEquals(pulsar.getConfiguration().getLoadManagerClassName(), SimpleLoadManagerImpl.class.getName());
admin.brokers().deleteDynamicConfiguration("loadManagerClassName");
assertFalse(admin.brokers().getAllDynamicConfigurations().containsKey("loadManagerClassName"));
}
/**
* Verifies broker sets watch on dynamic-configuration map even with invalid init json data
*
* <pre>
* 1. Set invalid json at dynamic-config znode
* 2. Broker fails to deserialize znode content but sets the watch on znode
* 3. Update znode with valid json map
* 4. Broker should get watch and update the dynamic-config map
* </pre>
*
* @throws Exception
*/
@Test
public void testInvalidDynamicConfigContentInMetadata() throws Exception {
final int newValue = 10;
// set invalid data into dynamic-config node so, broker startup fail to deserialize data
pulsar.getLocalMetadataStore().put("/admin/configuration", "$".getBytes(),
Optional.empty()).join();
stopBroker();
// start broker: it should have set watch even if with failure of deserialization
startBroker();
Assert.assertNotEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), newValue);
// update zk with config-value which should fire watch and broker should update the config value
Map<String, String> configMap = new HashMap<>();
configMap.put("brokerShutdownTimeoutMs", Integer.toString(newValue));
pulsar.getLocalMetadataStore().put("/admin/configuration",
ObjectMapperFactory.getMapper().writer().writeValueAsBytes(configMap),
Optional.empty()).join();
// wait config to be updated
Awaitility.await().until(() -> pulsar.getConfiguration().getBrokerShutdownTimeoutMs() == newValue);
// verify value is updated
assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), newValue);
}
/**
* <pre>
* verifies: that registerListener updates pulsar.config value with newly updated zk-dynamic config
* 1.start pulsar
* 2.update zk-config with admin api
* 3. trigger watch and listener
* 4. verify that config is updated
* </pre>
*
* @throws Exception
*/
@Test(timeOut = 30000)
public void testUpdateDynamicLocalConfiguration() throws Exception {
// (1) try to update dynamic field
final long initValue = 30000;
final long shutdownTime = 10;
pulsar.getConfiguration().setBrokerShutdownTimeoutMs(initValue);
// update configuration
admin.brokers().updateDynamicConfiguration("brokerShutdownTimeoutMs", Long.toString(shutdownTime));
// verify value is updated
Awaitility.waitAtMost(30, TimeUnit.SECONDS).untilAsserted(() -> {
assertEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
});
}
@Test
public void testUpdatableConfigurationName() throws Exception {
// (1) try to update dynamic field
final String configName = "brokerShutdownTimeoutMs";
assertTrue(admin.brokers().getDynamicConfigurationNames().contains(configName));
}
@Test
public void testGetDynamicLocalConfiguration() throws Exception {
// (1) try to update dynamic field
final String configName = "brokerShutdownTimeoutMs";
final long shutdownTime = 10;
pulsar.getConfiguration().setBrokerShutdownTimeoutMs(30000);
Map<String, String> configs = admin.brokers().getAllDynamicConfigurations();
assertTrue(configs.isEmpty());
assertNotEquals(pulsar.getConfiguration().getBrokerShutdownTimeoutMs(), shutdownTime);
// update configuration
admin.brokers().updateDynamicConfiguration(configName, Long.toString(shutdownTime));
// Now, znode is created: updateConfigurationAndRegisterListeners and check if configuration updated
assertEquals(Long.parseLong(admin.brokers().getAllDynamicConfigurations().get(configName)), shutdownTime);
}
@Test
public void properties() throws Exception {
try {
admin.tenants().getTenantInfo("does-not-exist");
fail("should have failed");
} catch (PulsarAdminException e) {
assertTrue(e instanceof NotFoundException);
}
Set<String> allowedClusters = Set.of("test");
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), allowedClusters);
admin.tenants().updateTenant("prop-xyz", tenantInfo);
assertEquals(admin.tenants().getTenants(), List.of("prop-xyz"));
assertEquals(admin.tenants().getTenantInfo("prop-xyz"), tenantInfo);
TenantInfoImpl newTenantAdmin = new TenantInfoImpl(Set.of("role3", "role4"), allowedClusters);
admin.tenants().updateTenant("prop-xyz", newTenantAdmin);
assertEquals(admin.tenants().getTenantInfo("prop-xyz"), newTenantAdmin);
try {
admin.tenants().deleteTenant("prop-xyz");
fail("should have failed");
} catch (PulsarAdminException e) {
assertTrue(e instanceof ConflictException);
assertEquals(e.getStatusCode(), 409);
assertEquals(e.getMessage(), "The tenant still has active namespaces");
}
deleteNamespaceWithRetry("prop-xyz/ns1", false);
admin.tenants().deleteTenant("prop-xyz");
assertEquals(admin.tenants().getTenants(), new ArrayList<>());
// Check name validation
try {
admin.tenants().createTenant("prop-xyz&", tenantInfo);
fail("should have failed");
} catch (PulsarAdminException e) {
assertTrue(e instanceof PreconditionFailedException);
}
}
@Test
public void namespaces() throws Exception {
admin.clusters().createCluster("usw", ClusterData.builder().build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"),
Set.of("test", "usw"));
admin.tenants().updateTenant("prop-xyz", tenantInfo);
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1").bundles, PoliciesUtil.defaultBundle());
admin.namespaces().createNamespace("prop-xyz/ns2", Set.of("test"));
admin.namespaces().createNamespace("prop-xyz/ns3", 4);
admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns3", Set.of("test"));
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns3").bundles.getNumBundles(), 4);
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns3").bundles.getBoundaries().size(), 5);
deleteNamespaceWithRetry("prop-xyz/ns3", false);
try {
admin.namespaces().createNamespace("non-existing/ns1");
fail("Should not have passed");
} catch (NotFoundException e) {
// Ok
}
assertEquals(admin.namespaces().getNamespaces("prop-xyz"), List.of("prop-xyz/ns1", "prop-xyz/ns2"));
assertEquals(admin.namespaces().getNamespaces("prop-xyz"), List.of("prop-xyz/ns1", "prop-xyz/ns2"));
try {
admin.namespaces().createNamespace("prop-xyz/ns4", Set.of("usc"));
fail("Should not have passed");
} catch (NotAuthorizedException e) {
// Ok, got the non authorized exception since usc cluster is not in the allowed clusters list.
}
// test with url style role.
admin.namespaces().grantPermissionOnNamespace("prop-xyz/ns1",
"spiffe://developer/passport-role", EnumSet.allOf(AuthAction.class));
admin.namespaces().grantPermissionOnNamespace("prop-xyz/ns1", "my-role", EnumSet.allOf(AuthAction.class));
Policies policies = new Policies();
policies.replication_clusters = Set.of("test");
policies.bundles = PoliciesUtil.defaultBundle();
policies.auth_policies.getNamespaceAuthentication().put("spiffe://developer/passport-role", EnumSet.allOf(AuthAction.class));
policies.auth_policies.getNamespaceAuthentication().put("my-role", EnumSet.allOf(AuthAction.class));
policies.is_allow_auto_update_schema = conf.isAllowAutoUpdateSchemaEnabled();
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1"), policies);
assertEquals(admin.namespaces().getPermissions("prop-xyz/ns1"), policies.auth_policies.getNamespaceAuthentication());
assertEquals(admin.namespaces().getTopics("prop-xyz/ns1"), new ArrayList<>());
admin.namespaces().revokePermissionsOnNamespace("prop-xyz/ns1", "spiffe://developer/passport-role");
admin.namespaces().revokePermissionsOnNamespace("prop-xyz/ns1", "my-role");
policies.auth_policies.getNamespaceAuthentication().remove("spiffe://developer/passport-role");
policies.auth_policies.getNamespaceAuthentication().remove("my-role");
policies.is_allow_auto_update_schema = conf.isAllowAutoUpdateSchemaEnabled();
assertEquals(admin.namespaces().getPolicies("prop-xyz/ns1"), policies);
assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), null);
admin.namespaces().setPersistence("prop-xyz/ns1", new PersistencePolicies(3, 2, 1, 10.0));
assertEquals(admin.namespaces().getPersistence("prop-xyz/ns1"), new PersistencePolicies(3, 2, 1, 10.0));
// Force topic creation and namespace being loaded
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1/my-topic")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.close();
admin.topics().delete("persistent://prop-xyz/ns1/my-topic");
admin.namespaces().unloadNamespaceBundle("prop-xyz/ns1", "0x00000000_0xffffffff");
NamespaceName ns = NamespaceName.get("prop-xyz/ns1");
// Now, w/ bundle policies, we will use default bundle
NamespaceBundle defaultBundle = bundleFactory.getFullBundle(ns);
int i = 0;
for (; i < 10; i++) {
Optional<NamespaceEphemeralData> data1 = pulsar.getNamespaceService().getOwnershipCache()
.getOwnerAsync(defaultBundle).get();
if (!data1.isPresent()) {
// Already unloaded
break;
}
LOG.info("Waiting for unload namespace {} to complete. Current service unit isDisabled: {}", defaultBundle,
data1.get().isDisabled());
Thread.sleep(1000);
}
assertTrue(i < 10);
deleteNamespaceWithRetry("prop-xyz/ns1", false);
assertEquals(admin.namespaces().getNamespaces("prop-xyz"), List.of("prop-xyz/ns2"));
try {
admin.namespaces().unload("prop-xyz/ns1");
fail("should have raised exception");
} catch (Exception e) {
// OK excepted
}
// Force topic creation and namespace being loaded
producer = pulsarClient.newProducer(Schema.BYTES).topic("persistent://prop-xyz/ns2/my-topic").create();
producer.close();
admin.topics().delete("persistent://prop-xyz/ns2/my-topic");
// both unload and delete should succeed for ns2 on other broker with a redirect
// otheradmin.namespaces().unload("prop-xyz/use/ns2");
}
@Test(dataProvider = "topicName")
public void persistentTopics(String topicName) throws Exception {
final String subName = topicName;
assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>());
final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName;
// Force to create a topic
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 0);
assertEquals(admin.topics().getList("prop-xyz/ns1"),
List.of("persistent://prop-xyz/ns1/" + topicName));
// create consumer and subscription
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
assertEquals(admin.topics().getSubscriptions(persistentTopicName), List.of(subName));
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 10);
TopicStats topicStats = admin.topics().getStats(persistentTopicName);
assertEquals(topicStats.getSubscriptions().keySet(), Set.of(subName));
assertEquals(topicStats.getSubscriptions().get(subName).getConsumers().size(), 1);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
assertEquals(topicStats.getPublishers().size(), 0);
assertEquals(topicStats.getOwnerBroker(),
pulsar.getAdvertisedAddress() + ":" + pulsar.getConfiguration().getWebServicePort().get());
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(persistentTopicName, false);
assertEquals(internalStats.cursors.keySet(), Set.of(Codec.encode(subName)));
List<Message<byte[]>> messages = admin.topics().peekMessages(persistentTopicName, subName, 3);
assertEquals(messages.size(), 3);
for (int i = 0; i < 3; i++) {
String expectedMessage = "message-" + i;
assertEquals(messages.get(i).getData(), expectedMessage.getBytes());
}
messages = admin.topics().peekMessages(persistentTopicName, subName, 15);
assertEquals(messages.size(), 10);
for (int i = 0; i < 10; i++) {
String expectedMessage = "message-" + i;
assertEquals(messages.get(i).getData(), expectedMessage.getBytes());
}
admin.topics().skipMessages(persistentTopicName, subName, 5);
topicStats = admin.topics().getStats(persistentTopicName);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 5);
admin.topics().skipAllMessages(persistentTopicName, subName);
topicStats = admin.topics().getStats(persistentTopicName);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 0);
publishNullValueMessageOnPersistentTopic(persistentTopicName, 10);
topicStats = admin.topics().getStats(persistentTopicName);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), 10);
messages = admin.topics().peekMessages(persistentTopicName, subName, 10);
assertEquals(messages.size(), 10);
for (int i = 0; i < 10; i++) {
assertNull(messages.get(i).getData());
assertNull(messages.get(i).getValue());
}
admin.topics().skipAllMessages(persistentTopicName, subName);
consumer.close();
client.close();
admin.topics().deleteSubscription(persistentTopicName, subName);
assertEquals(admin.topics().getSubscriptions(persistentTopicName), new ArrayList<>());
topicStats = admin.topics().getStats(persistentTopicName);
assertEquals(topicStats.getSubscriptions().keySet(), new TreeSet<>());
assertEquals(topicStats.getPublishers().size(), 0);
try {
admin.topics().skipAllMessages(persistentTopicName, subName);
} catch (NotFoundException e) {
assertTrue(e.getMessage().contains(subName));
}
admin.topics().delete(persistentTopicName);
try {
admin.topics().delete(persistentTopicName);
fail("Should have received 404");
} catch (NotFoundException e) {
assertTrue(e.getMessage().contains(persistentTopicName));
}
assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>());
}
@Test(dataProvider = "topicName")
public void testSkipHoleMessages(String topicName) throws Exception {
final String subName = topicName;
assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>());
final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName;
// Force to create a topic
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 0);
assertEquals(admin.topics().getList("prop-xyz/ns1"),
List.of("persistent://prop-xyz/ns1/" + topicName));
// create consumer and subscription
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
AtomicInteger total = new AtomicInteger();
Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName)
.messageListener(new MessageListener<byte[]>() {
@SneakyThrows
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
if (total.get() %2 !=0){
// artificially created 50 hollow messages
consumer.acknowledge(msg);
}
total.incrementAndGet();
}
})
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
assertEquals(admin.topics().getSubscriptions(persistentTopicName), List.of(subName));
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 100);
TimeUnit.SECONDS.sleep(2);
TopicStats topicStats = admin.topics().getStats(persistentTopicName);
long msgBacklog = topicStats.getSubscriptions().get(subName).getMsgBacklog();
log.info("back={}",msgBacklog);
int skipNumber = 20;
admin.topics().skipMessages(persistentTopicName, subName, skipNumber);
topicStats = admin.topics().getStats(persistentTopicName);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), msgBacklog - skipNumber);
}
@Test(dataProvider = "topicNamesForAllTypes")
public void partitionedTopics(String topicType, String topicName) throws Exception {
final String namespace = "prop-xyz/ns1";
final String partitionedTopicName = topicType + "://" + namespace + "/" + topicName;
final String anotherTopic = topicType + "://" + namespace + "/ds2";
// TODO: there're some gaps between non-persistent topics and persistent topics, so some checks will be skipped
// for non-persistent topics. After the gaps were filled, we can remove this check.
final boolean isPersistent = topicType.equals(TopicDomain.persistent.value());
assertEquals(admin.topics().getPartitionedTopicList(namespace), new ArrayList<>());
try {
admin.topics().getPartitionedTopicMetadata(partitionedTopicName);
fail("getPartitionedTopicMetadata of " + partitionedTopicName + " should not succeed");
} catch (NotFoundException expected) {
}
admin.topics().createPartitionedTopic(partitionedTopicName, 4);
assertEquals(admin.topics().getPartitionedTopicList(namespace),
List.of(partitionedTopicName));
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 4);
List<String> topics;
if (isPersistent) {
// TODO: for non-persistent topics getList will return 0
topics = admin.topics().getList(namespace);
assertEquals(topics.size(), 4);
}
try {
admin.topics().getPartitionedTopicMetadata(anotherTopic);
fail("getPartitionedTopicMetadata of " + anotherTopic + " should not succeed");
} catch (NotFoundException expected) {
}
PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(partitionedTopicName,false);
// check the getPartitionedStats for PartitionedTopic returns only partitions metadata, and no partitions info
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
topicStats.getMetadata().partitions);
assertEquals(topicStats.getPartitions().size(), 0);
List<String> subscriptions = admin.topics().getSubscriptions(partitionedTopicName);
assertEquals(subscriptions.size(), 0);
// create consumer and subscription
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer = client.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).subscribe();
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), List.of("my-sub"));
try {
if (isPersistent) {
// TODO: for non-persistent topics, deleteSubscription might throw NotFoundException
admin.topics().deleteSubscription(partitionedTopicName, "my-sub");
// TODO: for non-persistent topics, deleteSubscription won't fail
fail("should have failed");
}
} catch (PreconditionFailedException e) {
// ok
} catch (Exception e) {
fail(e.getMessage());
}
Consumer<byte[]> consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub-1")
.subscribe();
if (isPersistent) {
// TODO: for non-persistent topics, getSubscriptions will return a empty set
assertEquals(new HashSet<>(admin.topics().getSubscriptions(partitionedTopicName)),
Set.of("my-sub", "my-sub-1"));
}
consumer1.close();
if (isPersistent) {
// TODO: for non-persistent topics, deleteSubscription might throw NotFoundException
admin.topics().deleteSubscription(partitionedTopicName, "my-sub-1");
// TODO: for non-persistent topics, getSubscriptions will return a empty set
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), List.of("my-sub"));
}
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();
for (int i = 0; i < 10; i++) {
String message = "message-" + i;
producer.send(message.getBytes());
}
assertEquals(new HashSet<>(admin.topics().getList(namespace)),
Set.of(partitionedTopicName + "-partition-0", partitionedTopicName + "-partition-1",
partitionedTopicName + "-partition-2", partitionedTopicName + "-partition-3"));
// test cumulative stats for partitioned topic
topicStats = admin.topics().getPartitionedStats(partitionedTopicName,false);
if (isPersistent) {
// TODO: for non-persistent topics, the subscription doesn't exist
assertEquals(topicStats.getSubscriptions().keySet(), Set.of("my-sub"));
assertEquals(topicStats.getSubscriptions().get("my-sub").getConsumers().size(), 1);
assertEquals(topicStats.getSubscriptions().get("my-sub").getMsgBacklog(), 10);
}
assertEquals(topicStats.getPublishers().size(), 1);
assertEquals(topicStats.getPartitions(), new HashMap<>());
// test per partition stats for partitioned topic
topicStats = admin.topics().getPartitionedStats(partitionedTopicName,true);
assertEquals(topicStats.getMetadata().partitions, 4);
assertEquals(topicStats.getPartitions().keySet(),
Set.of(partitionedTopicName + "-partition-0", partitionedTopicName + "-partition-1",
partitionedTopicName + "-partition-2", partitionedTopicName + "-partition-3"));
TopicStats partitionStats = topicStats.getPartitions().get(partitionedTopicName + "-partition-0");
assertEquals(partitionStats.getPublishers().size(), 1);
if (isPersistent) {
// TODO: for non-persistent topics, the subscription doesn't exist
assertEquals(partitionStats.getSubscriptions().get("my-sub").getConsumers().size(), 1);
assertEquals(partitionStats.getSubscriptions().get("my-sub").getMsgBacklog(), 3, 1);
}
try {
admin.topics().skipMessages(partitionedTopicName, "my-sub", 5);
fail("skip messages for partitioned topics should fail");
} catch (Exception e) {
// ok
}
if (isPersistent) {
// TODO: for non-persistent topics, skilAllMessages will cause 500 internal error
admin.topics().skipAllMessages(partitionedTopicName, "my-sub");
topicStats = admin.topics().getPartitionedStats(partitionedTopicName, false);
assertEquals(topicStats.getSubscriptions().get("my-sub").getMsgBacklog(), 0);
}
producer.close();
consumer.close();
if (isPersistent) {
// TODO: for non-persistent topics, deleteSubscription might throw NotFoundException
admin.topics().deleteSubscription(partitionedTopicName, "my-sub");
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), new ArrayList<>());
}
try {
admin.topics().createPartitionedTopic(partitionedTopicName, 32);
fail("Should have failed as the partitioned topic already exists");
} catch (ConflictException ignore) {
}
producer = client.newProducer(Schema.BYTES)
.topic(partitionedTopicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
if (isPersistent) {
// TODO: for non-persistent topics getList will return 0
topics = admin.topics().getList(namespace);
assertEquals(topics.size(), 4);
}
try {
admin.topics().deletePartitionedTopic(partitionedTopicName);
fail("The topic is busy");
} catch (PreconditionFailedException pfe) {
// ok
}
producer.close();
client.close();
admin.topics().deletePartitionedTopic(partitionedTopicName);
try {
admin.topics().getPartitionedTopicMetadata(partitionedTopicName);
fail("getPartitionedTopicMetadata of " + partitionedTopicName + " should not succeed");
} catch (NotFoundException expected) {
}
admin.topics().createPartitionedTopic(partitionedTopicName, 32);
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 32);
try {
admin.topics().deletePartitionedTopic(anotherTopic);
fail("Should have failed as the partitioned topic was not created");
} catch (NotFoundException nfe) {
assertTrue(nfe.getMessage().contains(anotherTopic));
}
admin.topics().deletePartitionedTopic(partitionedTopicName);
// delete a partitioned topic in a global namespace
admin.topics().createPartitionedTopic(partitionedTopicName, 4);
admin.topics().deletePartitionedTopic(partitionedTopicName);
}
@Test
public void testGetPartitionedInternalInfo() throws Exception {
String partitionedTopic = "my-topic" + UUID.randomUUID().toString();
assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"), new ArrayList<>());
final String partitionedTopicName = "persistent://prop-xyz/ns1/" + partitionedTopic;
admin.topics().createPartitionedTopic(partitionedTopicName, 2);
assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"), List.of(partitionedTopicName));
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2);
String partitionTopic0 = partitionedTopicName + "-partition-0";
String partitionTopic1 = partitionedTopicName + "-partition-1";
String partitionTopic0InfoResponse = admin.topics().getInternalInfo(partitionTopic0);
String partitionTopic1InfoResponse = admin.topics().getInternalInfo(partitionTopic1);
// expected managed info
PartitionedManagedLedgerInfo partitionedManagedLedgerInfo = new PartitionedManagedLedgerInfo();
partitionedManagedLedgerInfo.version = 0L;
partitionedManagedLedgerInfo.partitions.put(partitionTopic0,
ObjectMapperFactory.getMapper().reader().readValue(partitionTopic0InfoResponse, ManagedLedgerInfo.class));
partitionedManagedLedgerInfo.partitions.put(partitionTopic1,
ObjectMapperFactory.getMapper().reader().readValue(partitionTopic1InfoResponse, ManagedLedgerInfo.class));
String expectedResult = ObjectMapperFactory.getMapper().writer().writeValueAsString(partitionedManagedLedgerInfo);
String partitionTopicInfoResponse = admin.topics().getInternalInfo(partitionedTopicName);
assertEquals(partitionTopicInfoResponse, expectedResult);
}
@Test
public void testGetStats() throws Exception {
final String topic = "persistent://prop-xyz/ns1/my-topic" + UUID.randomUUID().toString();
admin.topics().createNonPartitionedTopic(topic);
String subName = "my-sub";
// create consumer and subscription
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(subName).subscribe();
TopicStats topicStats = admin.topics().getStats(topic, false, false, true);
assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0);
assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), 0);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), -1);
// publish several messages
publishMessagesOnPersistentTopic(topic, 10);
Thread.sleep(1000);
topicStats = admin.topics().getStats(topic, false, true, true);
assertTrue(topicStats.getEarliestMsgPublishTimeInBacklogs() > 0);
assertTrue(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog() > 0);
assertTrue(topicStats.getSubscriptions().get(subName).getBacklogSize() > 0);
for (int i = 0; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
}
Thread.sleep(1000);
topicStats = admin.topics().getStats(topic, false, true, true);
assertEquals(topicStats.getEarliestMsgPublishTimeInBacklogs(), 0);
assertEquals(topicStats.getSubscriptions().get(subName).getEarliestMsgPublishTimeInBacklog(), 0);
assertEquals(topicStats.getSubscriptions().get(subName).getBacklogSize(), 0);
}
@Test
public void testGetPartitionedStatsContainSubscriptionType() throws Exception {
final String topic = "persistent://prop-xyz/ns1/my-topic" + UUID.randomUUID();
final int numPartitions = 4;
admin.topics().createPartitionedTopic(topic, numPartitions);
// create consumer and subscription
final String subName = "my-sub";
@Cleanup Consumer<byte[]> exclusiveConsumer = pulsarClient.newConsumer().topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
TopicStats topicStats = admin.topics().getPartitionedStats(topic, false);
assertEquals(topicStats.getSubscriptions().size(), 1);
assertEquals(topicStats.getSubscriptions().get(subName).getType(), SubscriptionType.Exclusive.toString());
}
@Test
public void testGetPartitionedStatsInternal() throws Exception {
String partitionedTopic = "my-topic" + UUID.randomUUID().toString();
String subName = "my-sub";
assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"), new ArrayList<>());
final String partitionedTopicName = "persistent://prop-xyz/ns1/" + partitionedTopic;
admin.topics().createPartitionedTopic(partitionedTopicName, 2);
assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"), List.of(partitionedTopicName));
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 2);
// create consumer and subscription
@Cleanup
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(partitionedTopicName).subscriptionName(subName).subscribe();
// publish several messages
publishMessagesOnPersistentTopic(partitionedTopicName, 10);
String partitionTopic0 = partitionedTopicName + "-partition-0";
String partitionTopic1 = partitionedTopicName + "-partition-1";
Thread.sleep(1000);
PersistentTopicInternalStats internalStats0 = admin.topics().getInternalStats(partitionTopic0, false);
assertEquals(internalStats0.cursors.keySet(), Set.of(Codec.encode(subName)));
PersistentTopicInternalStats internalStats1 = admin.topics().getInternalStats(partitionTopic1, false);
assertEquals(internalStats1.cursors.keySet(), Set.of(Codec.encode(subName)));
// expected internal stats
PartitionedTopicMetadata partitionedTopicMetadata = new PartitionedTopicMetadata(2);
PartitionedTopicInternalStats expectedInternalStats = new PartitionedTopicInternalStats(partitionedTopicMetadata);
expectedInternalStats.partitions.put(partitionTopic0, internalStats0);
expectedInternalStats.partitions.put(partitionTopic1, internalStats1);
// partitioned internal stats
PartitionedTopicInternalStats partitionedInternalStats = admin.topics().getPartitionedInternalStats(partitionedTopicName);
String expectedResult = ObjectMapperFactory.getMapper().writer().writeValueAsString(expectedInternalStats);
String result = ObjectMapperFactory.getMapper().writer().writeValueAsString(partitionedInternalStats);
assertEquals(result, expectedResult);
}
@Test(dataProvider = "numBundles")
public void testDeleteNamespaceBundle(Integer numBundles) throws Exception {
deleteNamespaceWithRetry("prop-xyz/ns1", false);
admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns1-bundles", Set.of("test"));
// since we have 2 brokers running, we try to let both of them acquire bundle ownership
admin.lookups().lookupTopic("persistent://prop-xyz/ns1-bundles/ds1");
admin.lookups().lookupTopic("persistent://prop-xyz/ns1-bundles/ds2");
admin.lookups().lookupTopic("persistent://prop-xyz/ns1-bundles/ds3");
admin.lookups().lookupTopic("persistent://prop-xyz/ns1-bundles/ds4");
assertEquals(admin.namespaces().getTopics("prop-xyz/ns1-bundles"), new ArrayList<>());
deleteNamespaceWithRetry("prop-xyz/ns1-bundles", false);
assertEquals(admin.namespaces().getNamespaces("prop-xyz", "test"), new ArrayList<>());
}
@Test
public void testDeleteTenantForcefully() throws Exception {
// allow forced deletion of tenants
pulsar.getConfiguration().setForceDeleteTenantAllowed(true);
String tenant = "my-tenant";
assertFalse(admin.tenants().getTenants().contains(tenant));
// create tenant
admin.tenants().createTenant(tenant,
new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
assertTrue(admin.tenants().getTenants().contains(tenant));
// create namespace
String namespace = tenant + "/my-ns";
admin.namespaces().createNamespace("my-tenant/my-ns", Set.of("test"));
assertEquals(admin.namespaces().getNamespaces(tenant), List.of("my-tenant/my-ns"));
// create topic
String topic = namespace + "/my-topic";
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());
try {
admin.tenants().deleteTenant(tenant, false);
fail("should have failed");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
}
// allow forced deletion of namespaces
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
// delete tenant forcefully
admin.tenants().deleteTenant(tenant, true);
Awaitility.await().untilAsserted(() -> {
assertFalse(admin.tenants().getTenants().contains(tenant));
});
final String managedLedgerPathForTenant = "/managed-ledgers/" + tenant;
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgerPathForTenant).join());
admin.tenants().createTenant(tenant,
new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
assertTrue(admin.tenants().getTenants().contains(tenant));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
// reset back to false
pulsar.getConfiguration().setForceDeleteTenantAllowed(false);
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
}
@Test
public void testDeleteNamespaceForcefully() throws Exception {
// allow forced deletion of namespaces
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(true);
String tenant = "my-tenant";
assertFalse(admin.tenants().getTenants().contains(tenant));
// create tenant
admin.tenants().createTenant(tenant,
new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
assertTrue(admin.tenants().getTenants().contains(tenant));
// create namespace
String namespace = tenant + "/my-ns";
admin.namespaces().createNamespace("my-tenant/my-ns", Set.of("test"));
assertEquals(admin.namespaces().getNamespaces(tenant), List.of("my-tenant/my-ns"));
// create topic
String topic = namespace + "/my-topic";
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());
try {
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
}
// delete namespace forcefully
deleteNamespaceWithRetry(namespace, true);
assertFalse(admin.namespaces().getNamespaces(tenant).contains(namespace));
assertTrue(admin.namespaces().getNamespaces(tenant).isEmpty());
final String managedLedgerPath = "/managed-ledgers/" + namespace;
final String persistentDomain = managedLedgerPath + "/" + TopicDomain.persistent.value();
final String nonPersistentDomain = managedLedgerPath + "/" + TopicDomain.non_persistent.value();
assertFalse(pulsar.getLocalMetadataStore().exists(managedLedgerPath).join());
assertFalse(pulsar.getLocalMetadataStore().exists(persistentDomain).join());
assertFalse(pulsar.getLocalMetadataStore().exists(nonPersistentDomain).join());
// reset back to false
pulsar.getConfiguration().setForceDeleteNamespaceAllowed(false);
}
@Test
public void testForceDeleteTenantNotAllowed() throws Exception {
assertFalse(pulsar.getConfiguration().isForceDeleteTenantAllowed());
String tenant = "my-tenant";
assertFalse(admin.tenants().getTenants().contains(tenant));
// create tenant
admin.tenants().createTenant(tenant,
new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test")));
assertTrue(admin.tenants().getTenants().contains(tenant));
// create namespace
String namespace = tenant + "/my-ns";
admin.namespaces().createNamespace("my-tenant/my-ns", Set.of("test"));
assertEquals(admin.namespaces().getNamespaces(tenant), List.of("my-tenant/my-ns"));
// create topic
String topic = namespace + "/my-topic";
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());
try {
admin.tenants().deleteTenant(tenant, false);
fail("should have failed");
} catch (PulsarAdminException e) {
// Expected: cannot delete non-empty tenant
}
try {
admin.tenants().deleteTenant(tenant, true);
fail("should have failed");
} catch (PulsarAdminException e) {
// Expected: cannot delete due to broker is not allowed
}
assertTrue(admin.tenants().getTenants().contains(tenant));
}
@Test
public void testNamespaceSplitBundle() throws Exception {
admin.namespaces().createNamespace("prop-xyz/splitBundle", Set.of("test"));
// Force to create a topic
final String namespace = "prop-xyz/splitBundle";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("message".getBytes());
publishMessagesOnPersistentTopic(topicName, 0);
assertEquals(admin.topics().getList(namespace), List.of(topicName));
try {
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null);
} catch (Exception e) {
fail("split bundle shouldn't have thrown exception");
}
// bundle-factory cache must have updated split bundles
NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
String[] splitRange = { namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff" };
for (int i = 0; i < bundles.getBundles().size(); i++) {
assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
}
producer.close();
}
@Test
public void testNamespaceSplitBundleWithTopicCountEquallyDivideAlgorithm() throws Exception {
// Force to create a topic
final String namespace = "prop-xyz/ns1";
List<String> topicNames = List.of(
(new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-1").toString(),
(new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-2").toString());
List<Producer<byte[]>> producers = new ArrayList<>(2);
for (String topicName : topicNames) {
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producers.add(producer);
producer.send("message".getBytes());
}
assertTrue(admin.topics().getList(namespace).containsAll(topicNames));
try {
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true,
NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE);
} catch (Exception e) {
fail("split bundle shouldn't have thrown exception");
}
NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(0)));
NamespaceBundle bundle2 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(1)));
assertNotEquals(bundle1, bundle2);
String[] splitRange = { namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff" };
for (int i = 0; i < bundles.getBundles().size(); i++) {
assertNotEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
}
for (Producer<byte[]> producer : producers) {
producer.close();
}
}
@Test
public void testNamespacesGetTopicHashPositions() throws Exception {
// Force to create a namespace with only one bundle and create a topic
final String namespace = "prop-xyz/ns-one-bundle";
final String topic = "persistent://"+ namespace + "/topic";
final int topicPartitionNumber = 4;
Policies policies = new Policies();
policies.bundles = PoliciesUtil.getBundles(1);
admin.namespaces().createNamespace(namespace, policies);
admin.topics().createPartitionedTopic(topic, topicPartitionNumber);
admin.lookups().lookupPartitionedTopic(topic);
// check bundles and bundle boundaries
BundlesData bundleData = admin.namespaces().getBundles(namespace);
assertEquals(bundleData.getNumBundles(), 1);
assertEquals(bundleData.getBoundaries().size(), 2);
assertEquals(bundleData.getBoundaries().get(0), "0x00000000");
assertEquals(bundleData.getBoundaries().get(1), "0xffffffff");
// test get topic position for partitioned-topic name
String bundleRange = "0x00000000_0xffffffff";
TopicHashPositions topicHashPositions =
admin.namespaces().getTopicHashPositions(namespace, bundleRange, Collections.singletonList(topic));
assertEquals(topicHashPositions.getNamespace(), "prop-xyz/ns-one-bundle");
assertEquals(topicHashPositions.getBundle(), "0x00000000_0xffffffff");
assertEquals(topicHashPositions.getTopicHashPositions().size(), topicPartitionNumber);
final HashFunction hashFunction = Hashing.crc32();
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-0"),
hashFunction.hashString(topic + "-partition-0", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-1"),
hashFunction.hashString(topic + "-partition-1", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-2"),
hashFunction.hashString(topic + "-partition-2", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-3"),
hashFunction.hashString(topic + "-partition-3", StandardCharsets.UTF_8).padToLong());
// test get hash position for topic partition
List<String> partitions = new ArrayList<>();
partitions.add(topic + "-partition-0");
partitions.add(topic + "-partition-1");
partitions.add(topic + "-partition-2");
partitions.add(topic + "-partition-3");
topicHashPositions = admin.namespaces().getTopicHashPositions(namespace, bundleRange, partitions);
assertEquals(topicHashPositions.getNamespace(), "prop-xyz/ns-one-bundle");
assertEquals(topicHashPositions.getBundle(), "0x00000000_0xffffffff");
assertEquals(topicHashPositions.getTopicHashPositions().size(), topicPartitionNumber);
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-0"),
hashFunction.hashString(topic + "-partition-0", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-1"),
hashFunction.hashString(topic + "-partition-1", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-2"),
hashFunction.hashString(topic + "-partition-2", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-3"),
hashFunction.hashString(topic + "-partition-3", StandardCharsets.UTF_8).padToLong());
// test non-exist topic
topicHashPositions = admin.namespaces().getTopicHashPositions(namespace,
bundleRange, Collections.singletonList(topic + "no-exist"));
assertEquals(topicHashPositions.getTopicHashPositions().size(), 0);
// test topics is null
topicHashPositions = admin.namespaces().getTopicHashPositions(namespace,
bundleRange, null);
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-0"),
hashFunction.hashString(topic + "-partition-0", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-1"),
hashFunction.hashString(topic + "-partition-1", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-2"),
hashFunction.hashString(topic + "-partition-2", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-3"),
hashFunction.hashString(topic + "-partition-3", StandardCharsets.UTF_8).padToLong());
// test topics is empty
topicHashPositions = admin.namespaces().getTopicHashPositions(namespace,
bundleRange, new ArrayList<>());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-0"),
hashFunction.hashString(topic + "-partition-0", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-1"),
hashFunction.hashString(topic + "-partition-1", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-2"),
hashFunction.hashString(topic + "-partition-2", StandardCharsets.UTF_8).padToLong());
assertEquals((long) topicHashPositions.getTopicHashPositions().get(topic + "-partition-3"),
hashFunction.hashString(topic + "-partition-3", StandardCharsets.UTF_8).padToLong());
}
@Test
public void testNamespaceSplitBundleWithSpecifiedPositionsDivideAlgorithm() throws Exception {
// 1. Force to create a topic
final String namespace = "prop-xyz/ns-one-bundle";
final String topic = "persistent://"+ namespace + "/topic";
final int topicPartitionNumber = 4;
Policies policies = new Policies();
policies.bundles = PoliciesUtil.getBundles(1);
admin.namespaces().createNamespace(namespace, policies);
admin.topics().createPartitionedTopic(topic, topicPartitionNumber);
// 2. trigger bundle loading
admin.lookups().lookupPartitionedTopic(topic);
// 3. check namespace bundle and topics exist
List<String> topics = admin.topics().getList(namespace);
assertTrue(topics.contains(topic + "-partition-0"));
assertTrue(topics.contains(topic + "-partition-1"));
assertTrue(topics.contains(topic + "-partition-2"));
assertTrue(topics.contains(topic + "-partition-3"));
// 4. check bundles and bundle boundaries
BundlesData bundleData = admin.namespaces().getBundles(namespace);
assertEquals(bundleData.getNumBundles(), 1);
assertEquals(bundleData.getBoundaries().size(), 2);
assertEquals(bundleData.getBoundaries().get(0), "0x00000000");
assertEquals(bundleData.getBoundaries().get(1), "0xffffffff");
// 5. calculate positions for split
final HashFunction hashFunction = Hashing.crc32();
List<Long> hashPositions = new ArrayList<>();
hashPositions.add(hashFunction.hashString(topic + "-partition-0", StandardCharsets.UTF_8).padToLong());
hashPositions.add(hashFunction.hashString(topic + "-partition-1", StandardCharsets.UTF_8).padToLong());
hashPositions.add(hashFunction.hashString(topic + "-partition-2", StandardCharsets.UTF_8).padToLong());
hashPositions.add(hashFunction.hashString(topic + "-partition-3", StandardCharsets.UTF_8).padToLong());
// 6. do split by SPECIFIED_POSITIONS_DIVIDE
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false,
NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE, hashPositions);
// 7. check split result
NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
assertEquals(bundles.getBundles().size(), 5);
Collections.sort(hashPositions);
String[] splitRange = {
"0x00000000_" + String.format("0x%08x",hashPositions.get(0)),
String.format("0x%08x", hashPositions.get(0)) + "_" + String.format("0x%08x", hashPositions.get(1)),
String.format("0x%08x", hashPositions.get(1)) + "_" + String.format("0x%08x", hashPositions.get(2)),
String.format("0x%08x", hashPositions.get(2)) + "_" + String.format("0x%08x", hashPositions.get(3)),
String.format("0x%08x", hashPositions.get(3)) + "_0xffffffff"
};
Set<String> bundleRanges = new HashSet<>();
bundles.getBundles().forEach(bundle -> bundleRanges.add(bundle.getBundleRange()));
Lists.newArrayList(splitRange).forEach(bundleRanges::remove);
assertEquals(bundleRanges.size(), 0);
// 8. check split result from admin cli tool
BundlesData adminBundleData = admin.namespaces().getBundles(namespace);
assertEquals(adminBundleData.getNumBundles(), 5);
String[] boundaries = {
"0x00000000",
String.format("0x%08x", hashPositions.get(0)),
String.format("0x%08x", hashPositions.get(1)),
String.format("0x%08x", hashPositions.get(2)),
String.format("0x%08x", hashPositions.get(3)),
"0xffffffff"
};
Lists.newArrayList(boundaries).forEach(adminBundleData.getBoundaries()::remove);
assertEquals(adminBundleData.getBoundaries().size(), 0);
// 9. test split at full upper and lower boundaries
List<Long> fullBoundaries =
Lists.newArrayList(NamespaceBundles.FULL_UPPER_BOUND, NamespaceBundles.FULL_UPPER_BOUND);
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_" + String.format("0x%08x",hashPositions.get(0)),
false, NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE, fullBoundaries);
admin.namespaces().splitNamespaceBundle(namespace, String.format("0x%08x", hashPositions.get(3)) + "_0xffffffff",
false, NamespaceBundleSplitAlgorithm.SPECIFIED_POSITIONS_DIVIDE, fullBoundaries);
// 10. full upper or lower boundaries does not split bundle
NamespaceBundles nbs = bundleFactory.getBundles(NamespaceName.get(namespace));
assertEquals(nbs.getBundles().size(), 5);
BundlesData adminBD = admin.namespaces().getBundles(namespace);
assertEquals(adminBD.getNumBundles(), 5);
assertEquals(adminBD.getBoundaries().size(), 6);
}
@Test
public void testNamespaceSplitBundleWithInvalidAlgorithm() {
// Force to create a topic
final String namespace = "prop-xyz/ns1";
try {
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true,
"invalid_test");
fail("unsupported namespace bundle split algorithm");
} catch (PulsarAdminException ignored) {
}
}
@Test
public void testNamespaceSplitBundleWithDefaultTopicCountEquallyDivideAlgorithm() throws Exception {
cleanup();
setupConfigAndStart(conf -> conf
.setDefaultNamespaceBundleSplitAlgorithm(NamespaceBundleSplitAlgorithm.TOPIC_COUNT_EQUALLY_DIVIDE));
// Force to create a topic
final String namespace = "prop-xyz/ns1";
List<String> topicNames = Lists.newArrayList(
(new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-1").toString(),
(new StringBuilder("persistent://")).append(namespace).append("/topicCountEquallyDivideAlgorithum-2").toString());
List<Producer<byte[]>> producers = new ArrayList<>(2);
for (String topicName : topicNames) {
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producers.add(producer);
producer.send("message".getBytes());
}
assertTrue(admin.topics().getList(namespace).containsAll(topicNames));
try {
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", true, null);
} catch (Exception e) {
fail("split bundle shouldn't have thrown exception");
}
NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
NamespaceBundle bundle1 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(0)));
NamespaceBundle bundle2 = pulsar.getNamespaceService().getBundle(TopicName.get(topicNames.get(1)));
assertNotEquals(bundle1, bundle2);
String[] splitRange = { namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff" };
for (int i = 0; i < bundles.getBundles().size(); i++) {
assertNotEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
}
for (Producer<byte[]> producer : producers) {
producer.close();
}
}
@Test
public void testNamespaceSplitBundleConcurrent() throws Exception {
// Force to create a topic
final String namespace = "prop-xyz/ns1";
final String topicName = (new StringBuilder("persistent://")).append(namespace).append("/ds2").toString();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.send("message".getBytes());
publishMessagesOnPersistentTopic(topicName, 0);
assertEquals(admin.topics().getList(namespace), Lists.newArrayList(topicName));
try {
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0xffffffff", false, null);
} catch (Exception e) {
fail("split bundle shouldn't have thrown exception", e);
}
// bundle-factory cache must have updated split bundles
NamespaceBundles bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
String[] splitRange = { namespace + "/0x00000000_0x7fffffff", namespace + "/0x7fffffff_0xffffffff" };
for (int i = 0; i < bundles.getBundles().size(); i++) {
assertEquals(bundles.getBundles().get(i).toString(), splitRange[i]);
}
@Cleanup("shutdownNow")
ExecutorService executorService = Executors.newCachedThreadPool();
try {
List<Future<Void>> futures = executorService.invokeAll(Arrays.asList(() -> {
log.info("split 2 bundles at the same time. spilt: 0x00000000_0x7fffffff ");
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x7fffffff", false, null);
return null;
}, () -> {
log.info("split 2 bundles at the same time. spilt: 0x7fffffff_0xffffffff ");
admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xffffffff", false, null);
return null;
}));
for (Future<?> f : futures) {
f.get();
}
} catch (Exception e) {
fail("split bundle shouldn't have thrown exception", e);
}
Awaitility.await().untilAsserted(() ->
assertEquals(bundleFactory.getBundles(NamespaceName.get(namespace)).getBundles().size(), 4));
String[] splitRange4 = { namespace + "/0x00000000_0x3fffffff", namespace + "/0x3fffffff_0x7fffffff",
namespace + "/0x7fffffff_0xbfffffff", namespace + "/0xbfffffff_0xffffffff" };
bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
for (int i = 0; i < bundles.getBundles().size(); i++) {
assertEquals(bundles.getBundles().get(i).toString(), splitRange4[i]);
}
try {
List<Future<Void>> futures = executorService.invokeAll(Arrays.asList(() -> {
log.info("split 4 bundles at the same time. spilt: 0x00000000_0x3fffffff ");
admin.namespaces().splitNamespaceBundle(namespace, "0x00000000_0x3fffffff", false, null);
return null;
}, () -> {
log.info("split 4 bundles at the same time. spilt: 0x3fffffff_0x7fffffff ");
admin.namespaces().splitNamespaceBundle(namespace, "0x3fffffff_0x7fffffff", false, null);
return null;
}, () -> {
log.info("split 4 bundles at the same time. spilt: 0x7fffffff_0xbfffffff ");
admin.namespaces().splitNamespaceBundle(namespace, "0x7fffffff_0xbfffffff", false, null);
return null;
}, () -> {
log.info("split 4 bundles at the same time. spilt: 0xbfffffff_0xffffffff ");
admin.namespaces().splitNamespaceBundle(namespace, "0xbfffffff_0xffffffff", false, null);
return null;
}));
for (Future<?> f : futures) {
f.get();
}
} catch (Exception e) {
fail("split bundle shouldn't have thrown exception", e);
}
Awaitility.await().untilAsserted(() ->
assertEquals(bundleFactory.getBundles(NamespaceName.get(namespace)).getBundles().size(), 8));
String[] splitRange8 = { namespace + "/0x00000000_0x1fffffff", namespace + "/0x1fffffff_0x3fffffff",
namespace + "/0x3fffffff_0x5fffffff", namespace + "/0x5fffffff_0x7fffffff",
namespace + "/0x7fffffff_0x9fffffff", namespace + "/0x9fffffff_0xbfffffff",
namespace + "/0xbfffffff_0xdfffffff", namespace + "/0xdfffffff_0xffffffff" };
bundles = bundleFactory.getBundles(NamespaceName.get(namespace));
for (int i = 0; i < bundles.getBundles().size(); i++) {
assertEquals(bundles.getBundles().get(i).toString(), splitRange8[i]);
}
producer.close();
}
@Test
public void testNamespaceUnloadBundle() throws Exception {
admin.namespaces().createNamespace("prop-xyz/unloadBundle", Set.of("test"));
assertEquals(admin.topics().getList("prop-xyz/unloadBundle"), new ArrayList<>());
// Force to create a topic
publishMessagesOnPersistentTopic("persistent://prop-xyz/unloadBundle/ds2", 0);
assertEquals(admin.topics().getList("prop-xyz/unloadBundle"),
Lists.newArrayList("persistent://prop-xyz/unloadBundle/ds2"));
// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://prop-xyz/unloadBundle/ds2")
.subscriptionName("my-sub").subscribe();
assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/unloadBundle/ds2"),
Lists.newArrayList("my-sub"));
// Create producer
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/unloadBundle/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < 10; i++) {
String message = "message-" + i;
producer.send(message.getBytes());
}
consumer.close();
producer.close();
try {
admin.namespaces().unloadNamespaceBundle("prop-xyz/unloadBundle", "0x00000000_0xffffffff");
} catch (Exception e) {
fail("Unload shouldn't have throw exception");
}
// check that no one owns the namespace
NamespaceBundle bundle = bundleFactory.getBundle(NamespaceName.get("prop-xyz/unloadBundle"),
Range.range(0L, BoundType.CLOSED, 0xffffffffL, BoundType.CLOSED));
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle));
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle));
pulsarClient.shutdown();
LOG.info("--- RELOAD ---");
// Force reload of namespace and wait for topic to be ready
Awaitility.await().timeout(30, TimeUnit.SECONDS).ignoreExceptionsInstanceOf(PulsarAdminException.class)
.until(() -> admin.topics().getStats("persistent://prop-xyz/unloadBundle/ds2") != null);
admin.topics().deleteSubscription("persistent://prop-xyz/unloadBundle/ds2", "my-sub");
admin.topics().delete("persistent://prop-xyz/unloadBundle/ds2");
}
@Test(dataProvider = "numBundles")
public void testNamespaceBundleUnload(Integer numBundles) throws Exception {
admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns1-bundles", Set.of("test"));
assertEquals(admin.topics().getList("prop-xyz/ns1-bundles"), new ArrayList<>());
// Force to create a topic
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1-bundles/ds2", 0);
assertEquals(admin.topics().getList("prop-xyz/ns1-bundles"),
Lists.newArrayList("persistent://prop-xyz/ns1-bundles/ds2"));
// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2")
.subscriptionName("my-sub").subscribe();
assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1-bundles/ds2"),
Lists.newArrayList("my-sub"));
// Create producer
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < 10; i++) {
String message = "message-" + i;
producer.send(message.getBytes());
}
NamespaceBundle bundle = pulsar.getNamespaceService()
.getBundle(TopicName.get("persistent://prop-xyz/ns1-bundles/ds2"));
consumer.close();
producer.close();
admin.namespaces().unloadNamespaceBundle("prop-xyz/ns1-bundles", bundle.getBundleRange());
// check that no one owns the namespace bundle
assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle));
assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle));
LOG.info("--- RELOAD ---");
// Force reload of namespace and wait for topic to be ready
Awaitility.await().timeout(30, TimeUnit.SECONDS).ignoreExceptionsInstanceOf(PulsarAdminException.class)
.until(() -> admin.topics().getStats("persistent://prop-xyz/ns1-bundles/ds2") != null);
admin.topics().deleteSubscription("persistent://prop-xyz/ns1-bundles/ds2", "my-sub");
admin.topics().delete("persistent://prop-xyz/ns1-bundles/ds2");
}
@Test
public void testDeleteSubscription() throws Exception {
final String subName = "test-sub";
final String persistentTopicName = "persistent://prop-xyz/ns1/test-sub-topic";
// disable auto subscription creation
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);
// create a topic and produce some messages
publishMessagesOnPersistentTopic(persistentTopicName, 5);
assertEquals(admin.topics().getList("prop-xyz/ns1"),
List.of(persistentTopicName));
// create the subscription by PulsarAdmin
admin.topics().createSubscription(persistentTopicName, subName, MessageId.earliest);
assertEquals(admin.topics().getSubscriptions(persistentTopicName), List.of(subName));
// create consumer and subscription
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName).subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
// try to delete the subscription with a connected consumer
try {
admin.topics().deleteSubscription(persistentTopicName, subName);
fail("should have failed");
} catch (PreconditionFailedException e) {
assertEquals(e.getStatusCode(), Status.PRECONDITION_FAILED.getStatusCode());
}
// failed to delete the subscription
assertEquals(admin.topics().getSubscriptions(persistentTopicName), List.of(subName));
// try to delete the subscription with a connected consumer forcefully
admin.topics().deleteSubscription(persistentTopicName, subName, true);
// delete the subscription successfully
assertEquals(admin.topics().getSubscriptions(persistentTopicName).size(), 0);
// reset to default
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
client.close();
}
@Test(dataProvider = "bundling")
public void testClearBacklogOnNamespace(Integer numBundles) throws Exception {
admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns1-bundles", Set.of("test"));
// create consumer and subscription
@Cleanup
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2").subscriptionName("my-sub")
.subscribe();
@Cleanup
Consumer<byte[]> consumer2 =
pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2").subscriptionName("my-sub-1")
.subscribe();
@Cleanup
Consumer<byte[]> consumer3 =
pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2").subscriptionName("my-sub-2")
.subscribe();
@Cleanup
Consumer<byte[]> consumer4 =
pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds1").subscriptionName("my-sub")
.subscribe();
@Cleanup
Consumer<byte[]> consumer5 =
pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds1").subscriptionName("my-sub-1")
.subscribe();
// Create producer
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1-bundles/ds2")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < 10; i++) {
String message = "message-" + i;
producer.send(message.getBytes());
}
producer.close();
// Create producer
Producer<byte[]> producer1 = pulsarClient.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1-bundles/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = 0; i < 10; i++) {
String message = "message-" + i;
producer1.send(message.getBytes());
}
producer1.close();
admin.namespaces().clearNamespaceBacklogForSubscription("prop-xyz/ns1-bundles", "my-sub");
long backlog = admin.topics().getStats("persistent://prop-xyz/ns1-bundles/ds2").getSubscriptions()
.get("my-sub").getMsgBacklog();
assertEquals(backlog, 0);
backlog = admin.topics().getStats("persistent://prop-xyz/ns1-bundles/ds1").getSubscriptions()
.get("my-sub").getMsgBacklog();
assertEquals(backlog, 0);
backlog = admin.topics().getStats("persistent://prop-xyz/ns1-bundles/ds1").getSubscriptions()
.get("my-sub-1").getMsgBacklog();
assertEquals(backlog, 10);
admin.namespaces().clearNamespaceBacklog("prop-xyz/ns1-bundles");
backlog = admin.topics().getStats("persistent://prop-xyz/ns1-bundles/ds1").getSubscriptions()
.get("my-sub-1").getMsgBacklog();
assertEquals(backlog, 0);
backlog = admin.topics().getStats("persistent://prop-xyz/ns1-bundles/ds2").getSubscriptions()
.get("my-sub-1").getMsgBacklog();
assertEquals(backlog, 0);
backlog = admin.topics().getStats("persistent://prop-xyz/ns1-bundles/ds2").getSubscriptions()
.get("my-sub-2").getMsgBacklog();
assertEquals(backlog, 0);
}
@Test(dataProvider = "bundling")
public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception {
admin.namespaces().createNamespace("prop-xyz/ns1-bundles", numBundles);
admin.namespaces().setNamespaceReplicationClusters("prop-xyz/ns1-bundles", Set.of("test"));
// create consumer and subscription
Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2")
.subscriptionName("my-sub").subscribe();
Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2")
.subscriptionName("my-sub-1").subscribe();
@Cleanup
Consumer<byte[]> consumer3 = pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds2")
.subscriptionName("my-sub-2").subscribe();
Consumer<byte[]> consumer4 = pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds1")
.subscriptionName("my-sub").subscribe();
Consumer<byte[]> consumer5 = pulsarClient.newConsumer().topic("persistent://prop-xyz/ns1-bundles/ds1")
.subscriptionName("my-sub-1").subscribe();
try {
admin.namespaces().unsubscribeNamespace("prop-xyz/ns1-bundles", "my-sub");
fail("should have failed");
} catch (PreconditionFailedException e) {
// ok
}
consumer1.close();
try {
admin.namespaces().unsubscribeNamespace("prop-xyz/ns1-bundles", "my-sub");
fail("should have failed");
} catch (PreconditionFailedException e) {
// ok
}
consumer4.close();
admin.namespaces().unsubscribeNamespace("prop-xyz/ns1-bundles", "my-sub");
assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1-bundles/ds2"),
List.of("my-sub-1", "my-sub-2"));
assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1-bundles/ds1"),
List.of("my-sub-1"));
consumer2.close();
consumer5.close();
admin.namespaces().unsubscribeNamespace("prop-xyz/ns1-bundles", "my-sub-1");
assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1-bundles/ds2"),
List.of("my-sub-2"));
assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1-bundles/ds1"),
new ArrayList<>());
}
private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception {
return publishMessagesOnPersistentTopic(topicName, messages, 0, false);
}
private List<MessageId> publishNullValueMessageOnPersistentTopic(String topicName, int messages) throws Exception {
return publishMessagesOnPersistentTopic(topicName, messages, 0, true);
}
private List<MessageId> publishMessagesOnPersistentTopic(String topicName, int messages, int startIdx,
boolean nullValue) throws Exception {
List<MessageId> messageIds = new ArrayList<>();
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
for (int i = startIdx; i < (messages + startIdx); i++) {
if (nullValue) {
messageIds.add(producer.send(null));
} else {
String message = "message-" + i;
messageIds.add(producer.send(message.getBytes()));
}
}
producer.close();
return messageIds;
}
@Test
public void backlogQuotas() throws Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1"),
new HashMap<>());
Map<BacklogQuotaType, BacklogQuota> quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1");
assertEquals(quotaMap.size(), 0);
assertNull(quotaMap.get(BacklogQuotaType.destination_storage));
admin.namespaces().setBacklogQuota("prop-xyz/ns1",
BacklogQuota.builder()
.limitSize(1 * 1024 * 1024)
.retentionPolicy(RetentionPolicy.producer_exception)
.build());
quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1");
assertEquals(quotaMap.size(), 1);
assertEquals(quotaMap.get(BacklogQuotaType.destination_storage),
BacklogQuota.builder()
.limitSize(1 * 1024 * 1024)
.retentionPolicy(RetentionPolicy.producer_exception)
.build());
admin.namespaces().removeBacklogQuota("prop-xyz/ns1");
quotaMap = admin.namespaces().getBacklogQuotaMap("prop-xyz/ns1");
assertEquals(quotaMap.size(), 0);
assertNull(quotaMap.get(BacklogQuotaType.destination_storage));
}
@Test
public void statsOnNonExistingTopics() throws Exception {
try {
admin.topics().getStats("persistent://prop-xyz/ns1/ghostTopic");
fail("The topic doesn't exist");
} catch (NotFoundException e) {
assertTrue(e.getMessage().contains("persistent://prop-xyz/ns1/ghostTopic"));
}
}
@Test
public void testDeleteFailedReturnCode() throws Exception {
String topicName = "persistent://prop-xyz/ns1/my-topic";
Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topicName)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
try {
admin.topics().delete(topicName);
fail("The topic is busy");
} catch (PreconditionFailedException e) {
// OK
}
producer.close();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("sub").subscribe();
try {
admin.topics().delete(topicName);
fail("The topic is busy");
} catch (PreconditionFailedException e) {
// OK
}
try {
admin.topics().deleteSubscription(topicName, "sub");
fail("The topic is busy");
} catch (PreconditionFailedException e) {
// Ok
}
consumer.close();
// Now should succeed
admin.topics().delete(topicName);
}
private static class IncompatibleTenantAdmin {
public Set<String> allowedClusters;
public int someNewIntField;
public String someNewString;
}
@Test
public void testJacksonWithTypeDifferences() throws Exception {
String expectedJson = "{\"adminRoles\":[\"role1\",\"role2\"],\"allowedClusters\":[\"usw\",\"test\"]}";
IncompatibleTenantAdmin r1 = ObjectMapperFactory.getMapper().reader().forType(IncompatibleTenantAdmin.class)
.readValue(expectedJson);
assertEquals(r1.allowedClusters, Set.of("test", "usw"));
assertEquals(r1.someNewIntField, 0);
assertNull(r1.someNewString);
}
@Test
public void testBackwardCompatibility() throws Exception {
assertEquals(admin.tenants().getTenants(), List.of("prop-xyz"));
assertEquals(admin.tenants().getTenantInfo("prop-xyz").getAdminRoles(),
List.of("role1", "role2"));
assertEquals(admin.tenants().getTenantInfo("prop-xyz").getAllowedClusters(), Set.of("test"));
// Try to deserialize property JSON with IncompatibleTenantAdmin format
// it should succeed ignoring missing fields
TenantsImpl properties = (TenantsImpl) admin.tenants();
IncompatibleTenantAdmin result = properties.request(properties.getWebTarget().path("prop-xyz"))
.get(IncompatibleTenantAdmin.class);
assertEquals(result.allowedClusters, Set.of("test"));
assertEquals(result.someNewIntField, 0);
assertNull(result.someNewString);
deleteNamespaceWithRetry("prop-xyz/ns1", false);
admin.tenants().deleteTenant("prop-xyz");
assertEquals(admin.tenants().getTenants(), new ArrayList<>());
}
@Test(dataProvider = "topicName")
public void persistentTopicsCursorReset(String topicName) throws Exception {
admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(10, 10));
assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>());
topicName = "persistent://prop-xyz/ns1/" + topicName;
try {
admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
} catch (PulsarAdminException.NotFoundException e) {
assertTrue(e.getMessage().contains(topicName));
}
admin.topics().createNonPartitionedTopic(topicName);
try {
admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
} catch (PulsarAdminException.NotFoundException e) {
assertTrue(e.getMessage().contains("my-sub"));
}
// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-sub").startMessageIdInclusive()
.subscriptionType(SubscriptionType.Exclusive)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
assertEquals(admin.topics().getSubscriptions(topicName), List.of("my-sub"));
publishMessagesOnPersistentTopic(topicName, 5, 0, false);
// Allow at least 1ms for messages to have different timestamps
Thread.sleep(1);
long messageTimestamp = System.currentTimeMillis();
publishMessagesOnPersistentTopic(topicName, 5, 5, false);
List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10);
assertEquals(messages.size(), 10);
for (int i = 0; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
}
// messages should still be available due to retention
admin.topics().resetCursor(topicName, "my-sub", messageTimestamp);
int receivedAfterReset = 0;
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(message.getData(), expected.getBytes());
}
assertEquals(receivedAfterReset, 5);
consumer.close();
admin.topics().deleteSubscription(topicName, "my-sub");
assertEquals(admin.topics().getSubscriptions(topicName), new ArrayList<>());
admin.topics().delete(topicName);
}
@Test(dataProvider = "topicName")
public void persistentTopicsCursorResetAfterReset(String topicName) throws Exception {
admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(10, 10));
assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>());
topicName = "persistent://prop-xyz/ns1/" + topicName;
// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-sub").startMessageIdInclusive()
.subscriptionType(SubscriptionType.Exclusive)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
assertEquals(admin.topics().getSubscriptions(topicName), List.of("my-sub"));
publishMessagesOnPersistentTopic(topicName, 5, 0, false);
// Allow at least 1ms for messages to have different timestamps
Thread.sleep(1);
long firstTimestamp = System.currentTimeMillis();
publishMessagesOnPersistentTopic(topicName, 3, 5, false);
Thread.sleep(1);
long secondTimestamp = System.currentTimeMillis();
publishMessagesOnPersistentTopic(topicName, 2, 8, false);
List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10);
assertEquals(messages.size(), 10);
messages.forEach(message -> {
LOG.info("Peeked message: {}", new String(message.getData()));
});
for (int i = 0; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
}
admin.topics().resetCursor(topicName, "my-sub", firstTimestamp);
int receivedAfterReset = 0;
// Should received messages from 5-9
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(new String(message.getData()), expected);
}
assertEquals(receivedAfterReset, 5);
// Reset at 2nd timestamp
receivedAfterReset = 0;
admin.topics().resetCursor(topicName, "my-sub", secondTimestamp);
// Should received messages from 8-9
for (int i = 8; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(new String(message.getData()), expected);
}
assertEquals(receivedAfterReset, 2);
consumer.close();
admin.topics().deleteSubscription(topicName, "my-sub");
assertEquals(admin.topics().getSubscriptions(topicName), new ArrayList<>());
admin.topics().delete(topicName);
}
@Test
public void persistentTopicsCursorResetAndFailover() throws Exception {
final String namespace = "prop-xyz/ns1";
final String topicName = "persistent://" + namespace + "/reset-cursor-and-failover";
final String subName = "sub1";
admin.namespaces().setRetention(namespace, new RetentionPolicies(10, 10));
// Create consumer and failover subscription
Consumer<byte[]> consumerA = pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subName).startMessageIdInclusive()
.consumerName("consumerA").subscriptionType(SubscriptionType.Failover)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
publishMessagesOnPersistentTopic(topicName, 5, 0, false);
// Allow at least 1ms for messages to have different timestamps
Thread.sleep(1);
long messageTimestamp = System.currentTimeMillis();
publishMessagesOnPersistentTopic(topicName, 5, 5, false);
// Currently the active consumer is consumerA
for (int i = 0; i < 10; i++) {
Message<byte[]> message = consumerA.receive(5, TimeUnit.SECONDS);
consumerA.acknowledge(message);
}
admin.topics().resetCursor(topicName, subName, messageTimestamp);
// In v2.5 or later, the first connected consumer is active.
// So consumerB connected later will not be active.
// cf. https://github.com/apache/pulsar/pull/4604
Thread.sleep(1000);
Consumer<byte[]> consumerB = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.consumerName("consumerB").subscriptionType(SubscriptionType.Failover)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
int receivedAfterReset = 0;
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumerA.receive(5, TimeUnit.SECONDS);
consumerA.acknowledge(message);
++receivedAfterReset;
String expected = "message-" + i;
assertEquals(message.getData(), expected.getBytes());
}
assertEquals(receivedAfterReset, 5);
// Closing consumerA activates consumerB
consumerA.close();
publishMessagesOnPersistentTopic(topicName, 5, 10, false);
int receivedAfterFailover = 0;
for (int i = 10; i < 15; i++) {
Message<byte[]> message = consumerB.receive(5, TimeUnit.SECONDS);
consumerB.acknowledge(message);
++receivedAfterFailover;
String expected = "message-" + i;
assertEquals(message.getData(), expected.getBytes());
}
assertEquals(receivedAfterFailover, 5);
consumerB.close();
admin.topics().deleteSubscription(topicName, subName);
admin.topics().delete(topicName);
}
@Test(dataProvider = "topicName")
public void partitionedTopicsCursorReset(String topicName) throws Exception {
admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(10, 10));
topicName = "persistent://prop-xyz/ns1/" + topicName;
try {
admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
} catch (PulsarAdminException.NotFoundException e) {
assertTrue(e.getMessage().contains(topicName));
}
admin.topics().createPartitionedTopic(topicName, 4);
try {
admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
} catch (PulsarAdminException.NotFoundException e) {
assertTrue(e.getMessage().contains("my-sub"));
}
// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-sub").startMessageIdInclusive()
.subscriptionType(SubscriptionType.Exclusive)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS).subscribe();
List<String> topics = admin.topics().getList("prop-xyz/ns1");
assertEquals(topics.size(), 4);
assertEquals(admin.topics().getSubscriptions(topicName), List.of("my-sub"));
publishMessagesOnPersistentTopic(topicName, 5, 0, false);
Thread.sleep(1);
long timestamp = System.currentTimeMillis();
publishMessagesOnPersistentTopic(topicName, 5, 5, false);
for (int i = 0; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
}
// messages should still be available due to retention
admin.topics().resetCursor(topicName, "my-sub", timestamp);
Set<String> expectedMessages = new HashSet<>();
Set<String> receivedMessages = new HashSet<>();
for (int i = 5; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
expectedMessages.add("message-" + i);
receivedMessages.add(new String(message.getData()));
}
receivedMessages.removeAll(expectedMessages);
assertEquals(receivedMessages.size(), 0);
consumer.close();
admin.topics().deleteSubscription(topicName, "my-sub");
admin.topics().deletePartitionedTopic(topicName);
}
@Test
public void persistentTopicsInvalidCursorReset() throws Exception {
admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(10, 10));
assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>());
String topicName = "persistent://prop-xyz/ns1/invalidcursorreset";
// Force to create a topic
publishMessagesOnPersistentTopic(topicName, 0);
assertEquals(admin.topics().getList("prop-xyz/ns1"), List.of(topicName));
// create consumer and subscription
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer = client.newConsumer().topic(topicName).subscriptionName("my-sub")
.subscriptionType(SubscriptionType.Exclusive).subscribe();
assertEquals(admin.topics().getSubscriptions(topicName), List.of("my-sub"));
publishMessagesOnPersistentTopic(topicName, 10);
List<Message<byte[]>> messages = admin.topics().peekMessages(topicName, "my-sub", 10);
assertEquals(messages.size(), 10);
for (int i = 0; i < 10; i++) {
Message<byte[]> message = consumer.receive();
consumer.acknowledge(message);
}
// use invalid timestamp
try {
admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis() - 190000);
} catch (Exception e) {
// fail the test
throw e;
}
admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis() + 90000);
consumer = client.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe();
consumer.close();
client.close();
admin.topics().deleteSubscription(topicName, "my-sub");
assertEquals(admin.topics().getSubscriptions(topicName), new ArrayList<>());
admin.topics().delete(topicName);
}
@Value
@Builder
static class CustomTenantAdmin implements TenantInfo {
private final int newTenant;
private final Set<String> adminRoles;
private final Set<String> allowedClusters;
}
@Test
public void testObjectWithUnknownProperties() {
TenantInfo pa = TenantInfo.builder()
.adminRoles(Set.of("test_appid1", "test_appid2"))
.allowedClusters(Set.of("test"))
.build();
CustomTenantAdmin cpa = CustomTenantAdmin.builder()
.adminRoles(pa.getAdminRoles())
.allowedClusters(pa.getAllowedClusters())
.newTenant(100)
.build();
try {
admin.tenants().createTenant("test-property", cpa);
} catch (Exception e) {
fail("Should not happen : ", e);
}
}
/**
* <pre>
* Verify: PersistentTopicsBase.expireMessages()/expireMessagesForAllSubscriptions()
* 1. Created multiple shared subscriptions and publisher on topic
* 2. Publish messages on the topic
* 3. expire message on sub-1 : backlog for sub-1 must be 0
* 4. expire message on all subscriptions: backlog for all subscription must be 0
* </pre>
*
* @throws Exception
*/
@Test
public void testPersistentTopicsExpireMessages() throws Exception {
// Force to create a topic
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 0);
assertEquals(admin.topics().getList("prop-xyz/ns1"),
List.of("persistent://prop-xyz/ns1/ds2"));
// create consumer and subscription
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer().topic("persistent://prop-xyz/ns1/ds2")
.subscriptionType(SubscriptionType.Shared);
Consumer<byte[]> consumer1 = consumerBuilder.clone().subscriptionName("my-sub1").subscribe();
Consumer<byte[]> consumer2 = consumerBuilder.clone().subscriptionName("my-sub2").subscribe();
Consumer<byte[]> consumer3 = consumerBuilder.clone().subscriptionName("my-sub3").subscribe();
assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2").size(), 3);
List<MessageId> messageIds = publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2", 10);
TopicStats topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
assertEquals(topicStats.getSubscriptions().get("my-sub1").getMsgBacklog(), 10);
assertEquals(topicStats.getSubscriptions().get("my-sub2").getMsgBacklog(), 10);
assertEquals(topicStats.getSubscriptions().get("my-sub3").getMsgBacklog(), 10);
Thread.sleep(1000);
admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub1", 1);
// Wait at most 2 seconds for sub1's message to expire.
Awaitility.await().untilAsserted(() -> assertTrue(
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").getSubscriptions().get("my-sub1").getLastMarkDeleteAdvancedTimestamp() > 0L));
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
SubscriptionStats subStats1 = topicStats.getSubscriptions().get("my-sub1");
assertEquals(subStats1.getMsgBacklog(), 0);
SubscriptionStats subStats2 = topicStats.getSubscriptions().get("my-sub2");
assertEquals(subStats2.getMsgBacklog(), 10);
assertEquals(subStats2.getLastMarkDeleteAdvancedTimestamp(), 0L);
SubscriptionStats subStats3 = topicStats.getSubscriptions().get("my-sub3");
assertEquals(subStats3.getMsgBacklog(), 10);
assertEquals(subStats3.getLastMarkDeleteAdvancedTimestamp(), 0L);
admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2", "my-sub2",
messageIds.get(4), false);
// Wait at most 2 seconds for sub2's message to expire.
Awaitility.await().untilAsserted(() -> assertTrue(
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").getSubscriptions().get("my-sub2").getLastMarkDeleteAdvancedTimestamp() > 0L));
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
subStats1 = topicStats.getSubscriptions().get("my-sub1");
assertEquals(subStats1.getMsgBacklog(), 0);
assertTrue(subStats1.getLastMarkDeleteAdvancedTimestamp() > 0L);
long sub2lastMarkDeleteAdvancedTimestamp = subStats1.getLastMarkDeleteAdvancedTimestamp();
subStats2 = topicStats.getSubscriptions().get("my-sub2");
assertEquals(subStats2.getMsgBacklog(), 5);
subStats3 = topicStats.getSubscriptions().get("my-sub3");
assertEquals(subStats3.getMsgBacklog(), 10);
assertEquals(subStats3.getLastMarkDeleteAdvancedTimestamp(), 0L);
try {
admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds2", 1);
} catch (Exception e) {
// my-sub1 has no msg backlog, so expire message won't be issued on that subscription
assertTrue(e.getMessage().startsWith("Expire message by timestamp not issued on topic"));
}
// Wait at most 2 seconds for sub3's message to expire.
Awaitility.await().untilAsserted(() -> assertTrue(
admin.topics().getStats("persistent://prop-xyz/ns1/ds2").getSubscriptions().get("my-sub3").getLastMarkDeleteAdvancedTimestamp() > 0L));
topicStats = admin.topics().getStats("persistent://prop-xyz/ns1/ds2");
subStats1 = topicStats.getSubscriptions().get("my-sub1");
assertEquals(subStats1.getMsgBacklog(), 0);
assertEquals(subStats1.getLastMarkDeleteAdvancedTimestamp(), subStats1.getLastMarkDeleteAdvancedTimestamp());
// Wait at most 2 seconds for rest of sub2's message to expire.
subStats2 = topicStats.getSubscriptions().get("my-sub2");
assertEquals(subStats2.getMsgBacklog(), 0);
assertTrue(subStats2.getLastMarkDeleteAdvancedTimestamp() > sub2lastMarkDeleteAdvancedTimestamp);
subStats3 = topicStats.getSubscriptions().get("my-sub3");
assertEquals(subStats3.getMsgBacklog(), 0);
consumer1.close();
consumer2.close();
consumer3.close();
}
@Test
public void testPersistentTopicsExpireMessagesInvalidPartitionIndex() throws Exception {
// Force to create a topic
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 0);
assertEquals(admin.topics().getList("prop-xyz/ns1"),
List.of("persistent://prop-xyz/ns1/ds2-partition-2"));
// create consumer and subscription
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
.topic("persistent://prop-xyz/ns1/ds2-partition-2")
.subscriptionType(SubscriptionType.Shared);
@Cleanup
Consumer<byte[]> consumer = consumerBuilder.clone().subscriptionName("my-sub").subscribe();
assertEquals(admin.topics().getSubscriptions("persistent://prop-xyz/ns1/ds2-partition-2").size(), 1);
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/ds2-partition-2", 10);
try {
admin.topics().expireMessages("persistent://prop-xyz/ns1/ds2-partition-2", "my-sub",
new MessageIdImpl(1, 1, 1), false);
} catch (Exception e) {
assertTrue(e.getMessage().contains("Invalid parameter for expire message by position"));
}
}
/**
* Verify: PersistentTopicsBase.expireMessages()/expireMessagesForAllSubscriptions() for PartitionTopic
*
* @throws Exception
*/
@Test
public void testPersistentTopicExpireMessageOnPartitionTopic() throws Exception {
admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/ds1", 4);
// create consumer and subscription
URL pulsarUrl = new URL(pulsar.getWebServiceAddress());
@Cleanup
PulsarClient client = PulsarClient.builder().serviceUrl(pulsarUrl.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer = client.newConsumer().topic("persistent://prop-xyz/ns1/ds1")
.subscriptionName("my-sub").subscribe();
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic("persistent://prop-xyz/ns1/ds1")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.RoundRobinPartition)
.create();
for (int i = 0; i < 10; i++) {
String message = "message-" + i;
producer.send(message.getBytes());
}
PartitionedTopicStats topicStats = admin.topics().getPartitionedStats("persistent://prop-xyz/ns1/ds1",
true);
assertEquals(topicStats.getSubscriptions().get("my-sub").getMsgBacklog(), 10);
TopicStats partitionStatsPartition0 = topicStats.getPartitions()
.get("persistent://prop-xyz/ns1/ds1-partition-0");
TopicStats partitionStatsPartition1 = topicStats.getPartitions()
.get("persistent://prop-xyz/ns1/ds1-partition-1");
assertEquals(partitionStatsPartition0.getSubscriptions().get("my-sub").getMsgBacklog(), 3, 1);
assertEquals(partitionStatsPartition1.getSubscriptions().get("my-sub").getMsgBacklog(), 3, 1);
Thread.sleep(1000);
admin.topics().expireMessagesForAllSubscriptions("persistent://prop-xyz/ns1/ds1", 1);
Thread.sleep(1000);
topicStats = admin.topics().getPartitionedStats("persistent://prop-xyz/ns1/ds1", true);
partitionStatsPartition0 = topicStats.getPartitions().get("persistent://prop-xyz/ns1/ds1-partition-0");
partitionStatsPartition1 = topicStats.getPartitions().get("persistent://prop-xyz/ns1/ds1-partition-1");
assertEquals(partitionStatsPartition0.getSubscriptions().get("my-sub").getMsgBacklog(), 0);
assertEquals(partitionStatsPartition1.getSubscriptions().get("my-sub").getMsgBacklog(), 0);
producer.close();
consumer.close();
client.close();
}
@Test
public void testNamespaceNotExist() {
final String nonPartitionedTopic = "persistent://prop-xyz/no-exist/non-partitioned-topic";
try {
admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
fail("should failed for namespaces not exist");
} catch (Exception e) {
assertTrue(e instanceof NotFoundException);
assertTrue(e.getMessage().equals("Namespace not found"));
}
}
@Test
public void testPersistentTopicCreation() throws Exception {
final String nonPartitionedtopic = "persistent://prop-xyz/ns1/non-partitioned-topic";
final String partitionedtopic = "persistent://prop-xyz/ns1/partitioned-topic";
admin.topics().createNonPartitionedTopic(nonPartitionedtopic);
try {
admin.topics().createNonPartitionedTopic(nonPartitionedtopic);
fail("should not be able to create an existed non-partitioned topic");
} catch (PulsarAdminException e) {
assertTrue(e instanceof ConflictException);
}
admin.topics().createPartitionedTopic(partitionedtopic, 2);
try {
admin.topics().createPartitionedTopic(partitionedtopic, 1);
fail("should not be able to create an existed partitioned topic");
} catch (PulsarAdminException e) {
assertTrue(e instanceof ConflictException);
}
try {
admin.topics().createPartitionedTopic(nonPartitionedtopic, 2);
fail("should not be able to create a partitioned topic with the same name");
} catch (PulsarAdminException e) {
assertTrue(e instanceof ConflictException);
}
try {
admin.topics().createNonPartitionedTopic(partitionedtopic);
fail("should not be able to create a non-partitioned topic with the same name");
} catch (PulsarAdminException e) {
assertTrue(e instanceof ConflictException);
}
// Check create partitioned topic with substring topic name
admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/create_substring_topic", 1);
admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/substring_topic", 1);
}
/**
* This test-case verifies that broker should support both url/uri encoding for topic-name. It calls below api with
* url-encoded and also uri-encoded topic-name in http request: a. PartitionedMetadataLookup b. TopicLookupBase c.
* Topic Stats
*
* @param topicName
* @throws Exception
*/
@Test(dataProvider = "topicName")
public void testPulsarAdminForUriAndUrlEncoding(String topicName) throws Exception {
final String ns1 = "prop-xyz/ns1";
final String topic1 = "persistent://" + ns1 + "/" + topicName;
final String urlEncodedTopic = Codec.encode(topicName);
final String uriEncodedTopic = urlEncodedTopic.replaceAll("\\+", "%20");
final int numOfPartitions = 4;
admin.topics().createPartitionedTopic(topic1, numOfPartitions);
// Create a consumer to get stats on this topic
@Cleanup
Consumer<byte[]> consumer =
pulsarClient.newConsumer().topic(topic1).subscriptionName("my-subscriber-name").subscribe();
TopicsImpl persistent = (TopicsImpl) admin.topics();
Field field = TopicsImpl.class.getDeclaredField("adminV2Topics");
field.setAccessible(true);
WebTarget persistentTopics = (WebTarget) field.get(persistent);
// (1) Get PartitionedMetadata : with Url and Uri encoding
final CompletableFuture<PartitionedTopicMetadata> urlEncodedPartitionedMetadata = new CompletableFuture<>();
// (a) Url encoding
persistent.asyncGetRequest(
persistentTopics.path("persistent").path(ns1).path(urlEncodedTopic).path("partitions"),
new InvocationCallback<PartitionedTopicMetadata>() {
@Override
public void completed(PartitionedTopicMetadata response) {
urlEncodedPartitionedMetadata.complete(response);
}
@Override
public void failed(Throwable e) {
urlEncodedPartitionedMetadata.completeExceptionally(e);
}
});
final CompletableFuture<PartitionedTopicMetadata> uriEncodedPartitionedMetadata = new CompletableFuture<>();
// (b) Uri encoding
persistent.asyncGetRequest(
persistentTopics.path("persistent").path(ns1).path(uriEncodedTopic).path("partitions"),
new InvocationCallback<PartitionedTopicMetadata>() {
@Override
public void completed(PartitionedTopicMetadata response) {
uriEncodedPartitionedMetadata.complete(response);
}
@Override
public void failed(Throwable e) {
uriEncodedPartitionedMetadata.completeExceptionally(e);
}
});
assertEquals(urlEncodedPartitionedMetadata.get().partitions, numOfPartitions);
assertEquals(urlEncodedPartitionedMetadata.get().partitions, (uriEncodedPartitionedMetadata.get().partitions));
// (2) Get Topic Lookup
LookupImpl lookup = (LookupImpl) admin.lookups();
Field field2 = LookupImpl.class.getDeclaredField("v2lookup");
field2.setAccessible(true);
WebTarget target2 = (WebTarget) field2.get(lookup);
// (a) Url encoding
LookupData urlEncodedLookupData = lookup
.request(target2.path("/topic/persistent").path(ns1 + "/" + urlEncodedTopic))
.get(LookupData.class);
// (b) Uri encoding
LookupData uriEncodedLookupData = lookup
.request(target2.path("/topic/persistent").path(ns1 + "/" + uriEncodedTopic))
.get(LookupData.class);
Assert.assertNotNull(urlEncodedLookupData.getBrokerUrl());
assertEquals(urlEncodedLookupData.getBrokerUrl(), uriEncodedLookupData.getBrokerUrl());
// partitioned topic lookup
Map<String, String> lookupDataList = lookup.lookupPartitionedTopic(topic1);
assertEquals(numOfPartitions, lookupDataList.keySet().size());
// (3) Get Topic Stats
final CompletableFuture<TopicStats> urlStats = new CompletableFuture<>();
// (a) Url encoding
persistent.asyncGetRequest(persistentTopics.path("persistent").path(ns1).path(urlEncodedTopic + "-partition-1").path("stats"),
new InvocationCallback<TopicStats>() {
@Override
public void completed(TopicStats response) {
urlStats.complete(response);
}
@Override
public void failed(Throwable e) {
urlStats.completeExceptionally(e);
}
});
// (b) Uri encoding
final CompletableFuture<TopicStats> uriStats = new CompletableFuture<>();
persistent.asyncGetRequest(
persistentTopics.path("persistent").path(ns1).path(uriEncodedTopic + "-partition-1").path("stats"),
new InvocationCallback<TopicStats>() {
@Override
public void completed(TopicStats response) {
uriStats.complete(response);
}
@Override
public void failed(Throwable e) {
uriStats.completeExceptionally(e);
}
});
assertEquals(urlStats.get().getSubscriptions().size(), 1);
assertEquals(uriStats.get().getSubscriptions().size(), 1);
}
static class MockedPulsarService extends MockedPulsarServiceBaseTest {
private ServiceConfiguration conf;
public MockedPulsarService(ServiceConfiguration conf) {
super();
this.conf = conf;
}
@Override
protected void setup() throws Exception {
super.conf.setLoadManagerClassName(conf.getLoadManagerClassName());
super.conf.setSystemTopicEnabled(conf.isSystemTopicEnabled());
super.internalSetup();
}
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
public PulsarService getPulsar() {
return pulsar;
}
public PulsarAdmin getAdmin() {
return admin;
}
}
@Test
public void testTopicBundleRangeLookup() throws PulsarAdminException, PulsarServerException, Exception {
admin.clusters().createCluster("usw", ClusterData.builder().build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"),
Set.of("test", "usw"));
admin.tenants().updateTenant("prop-xyz", tenantInfo);
admin.namespaces().createNamespace("prop-xyz/getBundleNs", 100);
assertEquals(admin.namespaces().getPolicies("prop-xyz/getBundleNs").bundles.getNumBundles(), 100);
// (1) create a topic
final String topicName = "persistent://prop-xyz/getBundleNs/topic1";
String bundleRange = admin.lookups().getBundleRange(topicName);
assertEquals(bundleRange, pulsar.getNamespaceService().getBundle(TopicName.get(topicName)).getBundleRange());
}
@Test
public void testTriggerCompaction() throws Exception {
String topicName = "persistent://prop-xyz/ns1/topic1";
// create a topic by creating a producer
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
// mock actual compaction, we don't need to really run it
CompletableFuture<Long> promise = new CompletableFuture<Long>();
Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
doReturn(promise).when(compactor).compact(topicName);
admin.topics().triggerCompaction(topicName);
// verify compact called once
verify(compactor).compact(topicName);
try {
admin.topics().triggerCompaction(topicName);
fail("Shouldn't be able to run while already running");
} catch (ConflictException e) {
// expected
}
// compact shouldn't have been called again
verify(compactor).compact(topicName);
// complete first compaction, and trigger again
promise.complete(1L);
admin.topics().triggerCompaction(topicName);
// verify compact was called again
verify(compactor, times(2)).compact(topicName);
}
@Test
public void testTriggerCompactionPartitionedTopic() throws Exception {
String topicName = "persistent://prop-xyz/ns1/test-part";
int numPartitions = 2;
admin.topics().createPartitionedTopic(topicName, numPartitions);
// create a partitioned topic by creating a producer
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
// mock actual compaction, we don't need to really run it
CompletableFuture<Long> promise = new CompletableFuture<>();
Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
doReturn(promise).when(compactor).compact(topicName + "-partition-0");
CompletableFuture<Long> promise1 = new CompletableFuture<>();
doReturn(promise1).when(compactor).compact(topicName + "-partition-1");
admin.topics().triggerCompaction(topicName);
// verify compact called once by each partition topic
verify(compactor).compact(topicName + "-partition-0");
verify(compactor).compact(topicName + "-partition-1");
try {
admin.topics().triggerCompaction(topicName);
fail("Shouldn't be able to run while already running");
} catch (PulsarAdminException e) {
// expected
}
// compact shouldn't have been called again
verify(compactor).compact(topicName + "-partition-0");
verify(compactor).compact(topicName + "-partition-1");
// complete first compaction, and trigger again
promise.complete(1L);
promise1.complete(1L);
admin.topics().triggerCompaction(topicName);
// verify compact was called again
verify(compactor, times(2)).compact(topicName + "-partition-0");
verify(compactor, times(2)).compact(topicName + "-partition-1");
}
@Test
public void testCompactionStatus() throws Exception {
String topicName = "persistent://prop-xyz/ns1/topic1";
// create a topic by creating a producer
pulsarClient.newProducer(Schema.BYTES).topic(topicName).create().close();
assertNotNull(pulsar.getBrokerService().getTopicReference(topicName));
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.NOT_RUN);
// mock actual compaction, we don't need to really run it
CompletableFuture<Long> promise = new CompletableFuture<Long>();
Compactor compactor = ((PulsarCompactionServiceFactory)pulsar.getCompactionServiceFactory()).getCompactor();
doReturn(promise).when(compactor).compact(topicName);
admin.topics().triggerCompaction(topicName);
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.RUNNING);
promise.complete(1L);
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
CompletableFuture<Long> errorPromise = new CompletableFuture<Long>();
doReturn(errorPromise).when(compactor).compact(topicName);
admin.topics().triggerCompaction(topicName);
errorPromise.completeExceptionally(new Exception("Failed at something"));
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.ERROR);
assertTrue(admin.topics().compactionStatus(topicName).lastError.contains("Failed at something"));
}
@Test(timeOut = 20000)
public void testTopicStatsLastExpireTimestampForSubscription() throws PulsarAdminException, PulsarClientException, InterruptedException {
admin.namespaces().setNamespaceMessageTTL("prop-xyz/ns1", 10);
final String topic = "persistent://prop-xyz/ns1/testTopicStatsLastExpireTimestampForSubscription";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
for (int i = 0; i < 10; i++) {
producer.send(new byte[1024 * i * 5]);
}
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().size(), 1);
Assert.assertEquals(admin.topics().getStats(topic).getSubscriptions().values().iterator().next().getLastExpireTimestamp(), 0L);
Thread.sleep(10000);
// Update policy to trigger message expiry check.
admin.namespaces().setNamespaceMessageTTL("prop-xyz/ns1", 5);
Awaitility.await().until(() -> admin.topics().getStats(topic).getSubscriptions().values().iterator().next().getLastExpireTimestamp() > 0L);
}
@Test(timeOut = 150000)
public void testSubscriptionExpiry() throws Exception {
final String namespace1 = "prop-xyz/sub-gc1";
final String namespace2 = "prop-xyz/sub-gc2";
final String namespace3 = "prop-xyz/sub-gc3";
final String topic1 = "persistent://" + namespace1 + "/testSubscriptionExpiry";
final String topic2 = "persistent://" + namespace2 + "/testSubscriptionExpiry";
final String topic3 = "persistent://" + namespace3 + "/testSubscriptionExpiry";
final String sub = "sub1";
admin.namespaces().createNamespace(namespace1, Set.of("test"));
admin.namespaces().createNamespace(namespace2, Set.of("test"));
admin.namespaces().createNamespace(namespace3, Set.of("test"));
admin.topics().createSubscription(topic1, sub, MessageId.latest);
admin.topics().createSubscription(topic2, sub, MessageId.latest);
admin.topics().createSubscription(topic3, sub, MessageId.latest);
admin.namespaces().setSubscriptionExpirationTime(namespace1, 0);
admin.namespaces().setSubscriptionExpirationTime(namespace2, 1);
admin.namespaces().setSubscriptionExpirationTime(namespace3, 1);
admin.namespaces().removeSubscriptionExpirationTime(namespace3);
Assert.assertEquals((int) admin.namespaces().getSubscriptionExpirationTime(namespace1), 0);
Assert.assertEquals((int) admin.namespaces().getSubscriptionExpirationTime(namespace2), 1);
Assert.assertNull(admin.namespaces().getSubscriptionExpirationTime(namespace3));
Awaitility.await().timeout(120, TimeUnit.SECONDS)
.until(() -> admin.topics().getSubscriptions(topic2).size() == 0);
Assert.assertEquals(admin.topics().getSubscriptions(topic1).size(), 1);
Assert.assertEquals(admin.topics().getSubscriptions(topic2).size(), 0);
Assert.assertEquals(admin.topics().getSubscriptions(topic3).size(), 1);
admin.topics().delete(topic1);
admin.topics().delete(topic2);
admin.topics().delete(topic3);
deleteNamespaceWithRetry(namespace1, false);
deleteNamespaceWithRetry(namespace2, false);
deleteNamespaceWithRetry(namespace3, false);
}
@Test
public void testCreateAndDeleteNamespaceWithBundles() throws Exception {
admin.clusters().createCluster("usw", ClusterData.builder().build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"),
Set.of("test", "usw"));
admin.tenants().updateTenant("prop-xyz", tenantInfo);
String ns = BrokerTestUtil.newUniqueName("prop-xyz/ns");
admin.namespaces().createNamespace(ns, 24);
deleteNamespaceWithRetry(ns, false);
// Re-create and re-delete
admin.namespaces().createNamespace(ns, 32);
deleteNamespaceWithRetry(ns, false);
}
@Test
public void testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages";
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("sub-1")
.subscribe();
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();
final int messages = 33;
for (int i = 0; i < messages; i++) {
producer.send(new byte[1024 * i * 5]);
}
TopicStats topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.getSubscriptions().get("sub-1").getLastMarkDeleteAdvancedTimestamp(), 0L);
for (int i = 0; i < messages; i++) {
consumer.acknowledgeCumulative(consumer.receive());
}
// Wait ack send
Thread.sleep(1000);
topicStats = admin.topics().getStats(topic);
assertEquals(topicStats.getBacklogSize(), 0);
assertTrue(topicStats.getSubscriptions().get("sub-1").getLastMarkDeleteAdvancedTimestamp() > 0L);
}
@Test
public void testGetTtlDurationDefaultInSeconds() throws Exception {
cleanup();
setupConfigAndStart(conf -> conf.setTtlDurationDefaultInSeconds(3600));
Integer seconds = admin.namespaces().getPolicies("prop-xyz/ns1").message_ttl_in_seconds;
assertNull(seconds);
}
@Test
public void testGetReadPositionWhenJoining() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testGetReadPositionWhenJoining-" + UUID.randomUUID().toString();
final String subName = "my-sub";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
final int messages = 10;
MessageIdImpl messageId = null;
for (int i = 0; i < messages; i++) {
messageId = (MessageIdImpl) producer.send(("Hello Pulsar - " + i).getBytes());
}
List<Consumer<byte[]>> consumers = new ArrayList<>();
for (int i = 0; i < 2; i++) {
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionName(subName)
.subscribe();
consumers.add(consumer);
}
TopicStats stats = admin.topics().getStats(topic);
Assert.assertEquals(stats.getSubscriptions().size(), 1);
SubscriptionStats subStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getConsumers().size(), 2);
ConsumerStats consumerStats = subStats.getConsumers().get(0);
Assert.assertEquals(consumerStats.getReadPositionWhenJoining(),
PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId() + 1).toString());
for (Consumer<byte[]> consumer : consumers) {
consumer.close();
}
}
@Test
public void testPartitionedTopicMsgDelayedAggregated() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testPartitionedTopicMsgDelayedAggregated-" + UUID.randomUUID().toString();
final String subName = "my-sub";
final int numPartitions = 2;
cleanup();
setupConfigAndStart(conf -> {
conf.setSubscriptionRedeliveryTrackerEnabled(true);
conf.setDelayedDeliveryEnabled(true);
});
admin.topics().createPartitionedTopic(topic, numPartitions);
for (int i = 0; i < 2; i++) {
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subName)
.subscribe();
}
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.create();
final int messages = 100;
for (int i = 0; i < messages; i++) {
String msg = "Hello Pulsar - " + i;
producer.send(msg.getBytes());
producer.newMessage().deliverAfter(1L, TimeUnit.HOURS).value(msg.getBytes()).send();
}
PartitionedTopicStats partitionedTopicStats = admin.topics().getPartitionedStats(topic, false);
Assert.assertNotNull(partitionedTopicStats);
SubscriptionStats subStats = partitionedTopicStats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getMsgBacklog(), subStats.getMsgBacklogNoDelayed() + subStats.getMsgDelayed());
partitionedTopicStats = admin.topics().getPartitionedStats(topic, true);
Assert.assertNotNull(partitionedTopicStats);
subStats = partitionedTopicStats.getSubscriptions().get(subName);
Assert.assertNotNull(subStats);
Assert.assertEquals(subStats.getMsgBacklog(), subStats.getMsgBacklogNoDelayed() + subStats.getMsgDelayed());
Assert.assertNotNull(partitionedTopicStats.getPartitions());
Assert.assertEquals(partitionedTopicStats.getPartitions().size(), numPartitions);
long sumMsgBacklog = 0;
long sumMsgBacklogNoDelayed = 0;
long sumMsgDelayed = 0;
for(TopicStats stats: partitionedTopicStats.getPartitions().values()){
Assert.assertNotNull(stats);
SubscriptionStats partitionedSubStats = stats.getSubscriptions().get(subName);
Assert.assertNotNull(partitionedSubStats);
sumMsgBacklog += partitionedSubStats.getMsgBacklog();
sumMsgBacklogNoDelayed += partitionedSubStats.getMsgBacklogNoDelayed();
sumMsgDelayed += partitionedSubStats.getMsgDelayed();
}
Assert.assertEquals(sumMsgBacklog, sumMsgBacklogNoDelayed + sumMsgDelayed);
Assert.assertEquals(sumMsgBacklog, subStats.getMsgBacklog());
Assert.assertEquals(sumMsgBacklogNoDelayed, subStats.getMsgBacklogNoDelayed());
Assert.assertEquals(sumMsgDelayed, subStats.getMsgDelayed());
}
@Test(timeOut = 20000)
public void testPartitionedTopicTruncate() throws Exception {
final String topicName = "persistent://prop-xyz/ns1/testTruncateTopic2-" + UUID.randomUUID().toString();
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topicName,6);
admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(60, 50));
List<MessageId> messageIds = publishMessagesOnPersistentTopic(topicName, 10);
admin.topics().createSubscription(topicName, subName, messageIds.get(0));
admin.topics().unload(topicName);
publishMessagesOnPersistentTopic(topicName, 10);
admin.topics().unload(topicName);
publishMessagesOnPersistentTopic(topicName, 10);
admin.topics().truncate(topicName);
PartitionedTopicInternalStats stats = admin.topics().getPartitionedInternalStats(topicName);
for (Map.Entry<String, PersistentTopicInternalStats> statsEntry : stats.partitions.entrySet()) {
assertTrue(statsEntry.getValue().ledgers.size() <= 2);
}
}
@Test(timeOut = 20000)
public void testNonPartitionedTopicTruncate() throws Exception {
final String topicName = "persistent://prop-xyz/ns1/testTruncateTopic1-" + UUID.randomUUID().toString();
final String subName = "my-sub";
cleanup();
setupConfigAndStart(conf -> {
conf.setTopicLevelPoliciesEnabled(true);
conf.setSystemTopicEnabled(true);
});
admin.topics().createNonPartitionedTopic(topicName);
admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(60, 50));
List<MessageId> messageIds = publishMessagesOnPersistentTopic(topicName, 10);
admin.topics().createSubscription(topicName, subName, messageIds.get(0));
admin.topics().unload(topicName);
publishMessagesOnPersistentTopic(topicName, 10);
admin.topics().unload(topicName);
publishMessagesOnPersistentTopic(topicName, 10);
admin.topics().truncate(topicName);
PersistentTopicInternalStats stats = admin.topics().getInternalStats(topicName);
assertTrue(stats.ledgers.size() <= 1);
}
@Test(timeOut = 20000)
public void testNonPersistentTopicTruncate() throws Exception {
final String topicName = "non-persistent://prop-xyz/ns1/testTruncateTopic3-" + UUID.randomUUID().toString();
admin.topics().createNonPartitionedTopic(topicName);
assertThrows(() -> {admin.topics().truncate(topicName);});
}
@Test(timeOut = 20000)
public void testPeekEncryptedMessages() throws Exception {
final String topicName = "persistent://prop-xyz/ns1/testPeekEncryptedMessages-" + UUID.randomUUID().toString();
final String subName = "my-sub";
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().createSubscription(topicName, subName, MessageId.latest);
final Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.enableBatching(true)
.addEncryptionKey("my-app-key")
.defaultCryptoKeyReader("file:./src/test/resources/certificate/public-key.client-rsa.pem")
.create();
for (int i = 0; i < 5; i++) {
producer.send(("message-" + i).getBytes());
}
producer.close();
final List<Message<byte[]>> peekedMessages = admin.topics().peekMessages(topicName, subName, 5);
assertEquals(peekedMessages.size(), 5);
final Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topicName)
.subscriptionName(subName)
.cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.subscribe();
final List<Message<byte[]>> receivedMessages = new ArrayList<>();
for (int i = 0; i < 5; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
receivedMessages.add(msg);
consumer.acknowledge(msg);
}
consumer.unsubscribe();
for (int i = 0; i < 5; i++) {
assertEquals(peekedMessages.get(i).getMessageId(), receivedMessages.get(i).getMessageId());
assertEquals(peekedMessages.get(i).getData(), receivedMessages.get(i).getData());
}
}
@Test
public void testGetPartitionStatsWithEarliestTimeInBacklog() throws PulsarAdminException, PulsarClientException {
final String topicName = "persistent://prop-xyz/ns1/testPeekEncryptedMessages-" + UUID.randomUUID();
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topicName, 3);
PartitionedTopicStats partitionedStats =
admin.topics().getPartitionedStats(topicName, true, true, true, true);
long value1 = partitionedStats.getEarliestMsgPublishTimeInBacklogs();
Assert.assertEquals(value1, 0);
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topicName)
.create();
producer.send("Test".getBytes(StandardCharsets.UTF_8));
@Cleanup
Consumer<byte[]> subscribe = pulsarClient.newConsumer()
.subscriptionName(subName)
.topic(topicName)
.subscribe();
long value2 = partitionedStats.getEarliestMsgPublishTimeInBacklogs();
Assert.assertNotEquals(value2, 0);
}
@Test
public void testRetentionAndBacklogQuotaCheck() throws PulsarAdminException {
String namespace = "prop-xyz/ns1";
//test size check.
admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, 10));
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).build());
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024).build());
});
//test time check
admin.namespaces().setRetention(namespace, new RetentionPolicies(10, -1));
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60).build());
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(11 * 60).build());
});
// test both size and time.
admin.namespaces().setRetention(namespace, new RetentionPolicies(10, 10));
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).build());
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(9 * 60).build());
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(9 * 1024 * 1024).
limitTime(9 * 60).build());
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitSize(100 * 1024 * 1024).build());
});
Assert.expectThrows(PulsarAdminException.PreconditionFailedException.class, () -> {
admin.namespaces().setBacklogQuota(namespace, BacklogQuota.builder().limitTime(100 * 60).build());
});
}
}