blob: 1ee7a41f5ae0b218aab02fc6944dc772311798bb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.TimeoutHandler;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.v1.BrokerStats;
import org.apache.pulsar.broker.admin.v1.Brokers;
import org.apache.pulsar.broker.admin.v1.Clusters;
import org.apache.pulsar.broker.admin.v1.Namespaces;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
import org.apache.pulsar.broker.admin.v1.Properties;
import org.apache.pulsar.broker.admin.v1.ResourceQuotas;
import org.apache.pulsar.broker.admin.v2.SchemasResource;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.stats.AllocatorStats;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.awaitility.Awaitility;
import org.mockito.ArgumentCaptor;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
public class AdminTest extends MockedPulsarServiceBaseTest {
private final String configClusterName = "use";
private Clusters clusters;
private Properties properties;
private Namespaces namespaces;
private PersistentTopics persistentTopics;
private Brokers brokers;
private ResourceQuotas resourceQuotas;
private BrokerStats brokerStats;
private SchemasResource schemasResource;
private Field uriField;
public AdminTest() {
super();
}
@Override
@BeforeMethod
public void setup() throws Exception {
conf.setClusterName(configClusterName);
super.internalSetup();
clusters = spy(Clusters.class);
clusters.setPulsar(pulsar);
doReturn("test").when(clusters).clientAppId();
doNothing().when(clusters).validateSuperUserAccess();
properties = spy(Properties.class);
properties.setPulsar(pulsar);
doReturn("test").when(properties).clientAppId();
doNothing().when(properties).validateSuperUserAccess();
namespaces = spy(Namespaces.class);
namespaces.setServletContext(new MockServletContext());
namespaces.setPulsar(pulsar);
doReturn("test").when(namespaces).clientAppId();
doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
doNothing().when(namespaces).validateAdminAccessForTenant("my-tenant");
doNothing().when(namespaces).validateAdminAccessForTenant("other-tenant");
doNothing().when(namespaces).validateAdminAccessForTenant("new-property");
brokers = spy(Brokers.class);
brokers.setPulsar(pulsar);
doReturn("test").when(brokers).clientAppId();
doNothing().when(brokers).validateSuperUserAccess();
uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
persistentTopics = spy(PersistentTopics.class);
persistentTopics.setServletContext(new MockServletContext());
persistentTopics.setPulsar(pulsar);
doReturn("test").when(persistentTopics).clientAppId();
doReturn("persistent").when(persistentTopics).domain();
doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc"))).when(persistentTopics).clusters();
doNothing().when(persistentTopics).validateAdminAccessForTenant("my-tenant");
doNothing().when(persistentTopics).validateAdminAccessForTenant("other-tenant");
doNothing().when(persistentTopics).validateAdminAccessForTenant("prop-xyz");
resourceQuotas = spy(ResourceQuotas.class);
resourceQuotas.setServletContext(new MockServletContext());
resourceQuotas.setPulsar(pulsar);
brokerStats = spy(BrokerStats.class);
brokerStats.setServletContext(new MockServletContext());
brokerStats.setPulsar(pulsar);
doReturn(false).when(persistentTopics).isRequestHttps();
doReturn(null).when(persistentTopics).originalPrincipal();
doReturn("test").when(persistentTopics).clientAppId();
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();
schemasResource = spy(SchemasResource.class);
schemasResource.setServletContext(new MockServletContext());
schemasResource.setPulsar(pulsar);
}
@Override
@AfterMethod(alwaysRun = true)
public void cleanup() throws Exception {
super.internalCleanup();
conf.setClusterName(configClusterName);
}
@Test
public void internalConfiguration() throws Exception {
ServiceConfiguration conf = pulsar.getConfiguration();
InternalConfigurationData expectedData = new InternalConfigurationData(
conf.getMetadataStoreUrl(),
conf.getConfigurationMetadataStoreUrl(),
new ClientConfiguration().getZkLedgersRootPath(),
conf.isBookkeeperMetadataStoreSeparated() ? conf.getBookkeeperMetadataStoreUrl() : null,
pulsar.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
assertEquals(brokers.getInternalConfigurationData(), expectedData);
}
@Test
public void clusters() throws Exception {
assertEquals(clusters.getClusters(), Lists.newArrayList());
verify(clusters, never()).validateSuperUserAccess();
clusters.createCluster("use", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build());
verify(clusters, times(1)).validateSuperUserAccess();
// ensure to read from ZooKeeper directly
//clusters.clustersListCache().clear();
assertEquals(clusters.getClusters(), Lists.newArrayList("use"));
// Check creating existing cluster
try {
clusters.createCluster("use", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build());
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode());
}
// Check deleting non-existing cluster
try {
clusters.deleteCluster("usc");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}
assertEquals(clusters.getCluster("use"), ClusterDataImpl.builder().serviceUrl("http://broker.messaging.use.example.com:8080").build());
verify(clusters, times(4)).validateSuperUserAccess();
clusters.updateCluster("use", ClusterDataImpl.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build());
verify(clusters, times(5)).validateSuperUserAccess();
assertEquals(clusters.getCluster("use"), ClusterData.builder().serviceUrl("http://new-broker.messaging.use.example.com:8080").build());
verify(clusters, times(6)).validateSuperUserAccess();
try {
clusters.getNamespaceIsolationPolicies("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
}
Map<String, String> parameters1 = new HashMap<>();
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "90");
NamespaceIsolationDataImpl policyData = NamespaceIsolationDataImpl.builder()
.namespaces(Collections.singletonList("dummy/colo/ns"))
.primary(Collections.singletonList("localhost" + ":" + pulsar.getListenPortHTTP()))
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
.build())
.build();
AsyncResponse response = mock(AsyncResponse.class);
clusters.setNamespaceIsolationPolicy(response,"use", "policy1", policyData);
clusters.getNamespaceIsolationPolicies("use");
try {
clusters.deleteCluster("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 412);
}
clusters.deleteNamespaceIsolationPolicy("use", "policy1");
assertTrue(clusters.getNamespaceIsolationPolicies("use").isEmpty());
clusters.deleteCluster("use");
verify(clusters, times(13)).validateSuperUserAccess();
assertEquals(clusters.getClusters(), Lists.newArrayList());
try {
clusters.getCluster("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
}
try {
clusters.updateCluster("use", ClusterDataImpl.builder().build());
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
}
try {
clusters.getNamespaceIsolationPolicies("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
}
// Test zk failures
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/admin/clusters");
});
// clear caches to load data from metadata-store again
MetadataCacheImpl<ClusterData> clusterCache = (MetadataCacheImpl<ClusterData>) pulsar.getPulsarResources()
.getClusterResources().getCache();
MetadataCacheImpl isolationPolicyCache = (MetadataCacheImpl) pulsar.getPulsarResources()
.getNamespaceResources().getIsolationPolicies().getCache();
AbstractMetadataStore store = (AbstractMetadataStore) clusterCache.getStore();
clusterCache.invalidateAll();
store.invalidateAll();
try {
clusters.getClusters();
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.CREATE
&& path.equals("/admin/clusters/test");
});
try {
clusters.createCluster("test", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com:8080").build());
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/clusters/test");
});
clusterCache.invalidateAll();
store.invalidateAll();
try {
clusters.updateCluster("test", ClusterDataImpl.builder().serviceUrl("http://broker.messaging.test.example.com").build());
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/clusters/test");
});
try {
clusters.getCluster("test");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/admin/policies");
});
try {
clusters.deleteCluster("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/clusters/use/namespaceIsolationPolicies");
});
clusterCache.invalidateAll();
isolationPolicyCache.invalidateAll();
store.invalidateAll();
try {
clusters.deleteCluster("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
// Check name validations
try {
clusters.createCluster("bf@", ClusterDataImpl.builder().serviceUrl("http://dummy.messaging.example.com").build());
fail("should have filed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
// Check authentication and listener name
try {
clusters.createCluster("auth", ClusterDataImpl.builder()
.serviceUrl("http://dummy.web.example.com")
.serviceUrlTls("")
.brokerServiceUrl("http://dummy.messaging.example.com")
.brokerServiceUrlTls("")
.authenticationPlugin("authenticationPlugin")
.authenticationParameters("authenticationParameters")
.listenerName("listenerName")
.build());
ClusterData cluster = clusters.getCluster("auth");
assertEquals(cluster.getAuthenticationPlugin(), "authenticationPlugin");
assertEquals(cluster.getAuthenticationParameters(), "authenticationParameters");
assertEquals(cluster.getListenerName(), "listenerName");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
}
Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception {
TestAsyncResponse ctx = new TestAsyncResponse();
function.accept(ctx);
ctx.latch.await();
if (ctx.e != null) {
throw (Exception) ctx.e;
}
return ctx.response;
}
@Test
public void properties() throws Throwable {
Object response = asynRequests(ctx -> properties.getTenants(ctx));
assertEquals(response, Lists.newArrayList());
verify(properties, times(1)).validateSuperUserAccess();
// create local cluster
clusters.createCluster(configClusterName, ClusterDataImpl.builder().build());
Set<String> allowedClusters = Sets.newHashSet();
allowedClusters.add(configClusterName);
TenantInfoImpl tenantInfo = TenantInfoImpl.builder()
.adminRoles(Sets.newHashSet("role1", "role2"))
.allowedClusters(allowedClusters)
.build();
response = asynRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo));
verify(properties, times(2)).validateSuperUserAccess();
response = asynRequests(ctx -> properties.getTenants(ctx));
assertEquals(response, Lists.newArrayList("test-property"));
verify(properties, times(3)).validateSuperUserAccess();
response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property"));
assertEquals(response, tenantInfo);
verify(properties, times(4)).validateSuperUserAccess();
final TenantInfoImpl newPropertyAdmin = TenantInfoImpl.builder()
.adminRoles(Sets.newHashSet("role1", "other-role"))
.allowedClusters(allowedClusters)
.build();
response = asynRequests(ctx -> properties.updateTenant(ctx, "test-property", newPropertyAdmin));
verify(properties, times(5)).validateSuperUserAccess();
// Wait for updateTenant to take effect
Thread.sleep(100);
response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property"));
assertEquals(response, newPropertyAdmin);
response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property"));
assertNotSame(response, tenantInfo);
verify(properties, times(7)).validateSuperUserAccess();
// Check creating existing property
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode());
}
// Check non-existing property
try {
response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "non-existing"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}
try {
response = asynRequests(ctx -> properties.updateTenant(ctx, "xxx-non-existing", newPropertyAdmin));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}
// Check deleting non-existing property
try {
response = asynRequests(ctx -> properties.deleteTenant(ctx, "non-existing", false));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}
// clear caches to load data from metadata-store again
MetadataCacheImpl<TenantInfo> cache = (MetadataCacheImpl<TenantInfo>) pulsar.getPulsarResources()
.getTenantResources().getCache();
AbstractMetadataStore store = (AbstractMetadataStore) cache.getStore();
cache.invalidateAll();
store.invalidateAll();
// Test zk failures
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN && path.equals("/admin/policies");
});
try {
response = asynRequests(ctx -> properties.getTenants(ctx));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET && path.equals("/admin/policies/my-tenant");
});
try {
response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "my-tenant"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET && path.equals("/admin/policies/my-tenant");
});
try {
response = asynRequests(ctx -> properties.updateTenant(ctx, "my-tenant", newPropertyAdmin));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.CREATE && path.equals("/admin/policies/test");
});
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "test", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN && path.equals("/admin/policies/test-property");
});
try {
cache.invalidateAll();
store.invalidateAll();
response = asynRequests(ctx -> properties.deleteTenant(ctx, "test-property", false));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
response = asynRequests(ctx -> properties.createTenant(ctx, "error-property", tenantInfo));
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.DELETE && path.equals("/admin/policies/error-property");
});
try {
response = asynRequests(ctx -> properties.deleteTenant(ctx, "error-property", false));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
response = asynRequests(ctx -> properties.deleteTenant(ctx, "test-property", false));
response = asynRequests(ctx -> properties.deleteTenant(ctx, "error-property", false));
response = Lists.newArrayList();
response = asynRequests(ctx -> properties.getTenants(ctx));
assertEquals(response, Lists.newArrayList());
// Create a namespace to test deleting a non-empty property
TenantInfoImpl newPropertyAdmin2 = TenantInfoImpl.builder()
.adminRoles(Sets.newHashSet("role1", "other-role"))
.allowedClusters(Sets.newHashSet("use"))
.build();
response = asynRequests(ctx -> properties.createTenant(ctx, "my-tenant", newPropertyAdmin2));
namespaces.createNamespace("my-tenant", "use", "my-namespace", BundlesData.builder().build());
try {
response = asynRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false));
fail("should have failed");
} catch (RestException e) {
// Ok
}
// Check name validation
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "test&", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
// Check tenantInfo is null
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-null", null));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
// Check tenantInfo with empty cluster
String blankCluster = "";
Set<String> blankClusters = Sets.newHashSet(blankCluster);
TenantInfoImpl tenantWithEmptyCluster = TenantInfoImpl.builder()
.adminRoles(Sets.newHashSet("role1", "role2"))
.allowedClusters(blankClusters)
.build();
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-empty", tenantWithEmptyCluster));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
// Check tenantInfo contains empty cluster
Set<String> containBlankClusters = Sets.newHashSet(blankCluster);
containBlankClusters.add(configClusterName);
TenantInfoImpl tenantContainEmptyCluster = TenantInfoImpl.builder()
.adminRoles(Sets.newHashSet())
.allowedClusters(containBlankClusters)
.build();
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-contain-empty", tenantContainEmptyCluster));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
AsyncResponse response2 = mock(AsyncResponse.class);
namespaces.deleteNamespace(response2, "my-tenant", "use", "my-namespace", false, false);
ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
verify(response2, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
response = asynRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false));
}
@Test
public void brokers() throws Exception {
clusters.createCluster("use", ClusterDataImpl.builder()
.serviceUrl("http://broker.messaging.use.example.com")
.serviceUrlTls("https://broker.messaging.use.example.com:4443")
.build());
URI requestUri = new URI(
"http://broker.messaging.use.example.com:8080/admin/brokers/use");
UriInfo mockUri = mock(UriInfo.class);
doReturn(requestUri).when(mockUri).getRequestUri();
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
uriField.set(brokers, mockUri);
Set<String> activeBrokers = brokers.getActiveBrokers("use");
assertEquals(activeBrokers.size(), 1);
assertEquals(activeBrokers, Sets.newHashSet(pulsar.getAdvertisedAddress() + ":" + pulsar.getListenPortHTTP().get()));
BrokerInfo leaderBroker = brokers.getLeaderBroker();
assertEquals(leaderBroker.getServiceUrl(), pulsar.getLeaderElectionService().getCurrentLeader().map(LeaderBroker::getServiceUrl).get());
}
@Test
public void resourceQuotas() throws Exception {
// get Default Resource Quota
ResourceQuota quota = resourceQuotas.getDefaultResourceQuota();
assertNotNull(quota);
assertTrue(quota.getBandwidthIn() > 0);
// set Default Resource Quota
double defaultBandwidth = 1000;
quota.setBandwidthIn(defaultBandwidth);
quota.setBandwidthOut(defaultBandwidth);
resourceQuotas.setDefaultResourceQuota(quota);
assertEquals(defaultBandwidth, resourceQuotas.getDefaultResourceQuota().getBandwidthIn());
assertEquals(defaultBandwidth, resourceQuotas.getDefaultResourceQuota().getBandwidthOut());
String property = "prop-xyz";
String cluster = "use";
String namespace = "ns";
String bundleRange = "0x00000000_0xffffffff";
Policies policies = new Policies();
doReturn(policies).when(resourceQuotas).getNamespacePolicies(NamespaceName.get(property, cluster, namespace));
doReturn("client-id").when(resourceQuotas).clientAppId();
try {
resourceQuotas.setNamespaceBundleResourceQuota(property, cluster, namespace, bundleRange, quota);
fail();
} catch (Exception e) {
// OK : should fail without creating policies
}
try {
resourceQuotas.removeNamespaceBundleResourceQuota(property, cluster, namespace, bundleRange);
fail();
} catch (Exception e) {
// OK : should fail without creating policies
}
// create policies
TenantInfoImpl admin = TenantInfoImpl.builder()
.allowedClusters(Collections.singleton(cluster))
.build();
ClusterDataImpl clusterData = ClusterDataImpl.builder().serviceUrl(cluster).build();
clusters.createCluster(cluster, clusterData );
asynRequests(ctx -> properties.createTenant(ctx, property, admin));
// customized bandwidth for this namespace
double customizeBandwidth = 3000;
quota.setBandwidthIn(customizeBandwidth);
quota.setBandwidthOut(customizeBandwidth);
// set and get Resource Quota
resourceQuotas.setNamespaceBundleResourceQuota(property, cluster, namespace, bundleRange, quota);
ResourceQuota bundleQuota = resourceQuotas.getNamespaceBundleResourceQuota(property, cluster, namespace,
bundleRange);
assertEquals(quota, bundleQuota);
// remove quota which sets to default quota
resourceQuotas.removeNamespaceBundleResourceQuota(property, cluster, namespace, bundleRange);
bundleQuota = resourceQuotas.getNamespaceBundleResourceQuota(property, cluster, namespace, bundleRange);
assertEquals(defaultBandwidth, bundleQuota.getBandwidthIn());
assertEquals(defaultBandwidth, bundleQuota.getBandwidthOut());
}
@Test
public void brokerStats() throws Exception {
doReturn("client-id").when(brokerStats).clientAppId();
Collection<Metrics> metrics = brokerStats.getMetrics();
assertNotNull(metrics);
LocalBrokerData loadReport = (LocalBrokerData) brokerStats.getLoadReport();
assertNotNull(loadReport);
assertNotNull(loadReport.getCpu());
Collection<Metrics> mBeans = brokerStats.getMBeans();
assertFalse(mBeans.isEmpty());
AllocatorStats allocatorStats = brokerStats.getAllocatorStats("default");
assertNotNull(allocatorStats);
Map<String, Map<String, PendingBookieOpsStats>> bookieOpsStats = brokerStats.getPendingBookieOpsStats();
assertTrue(bookieOpsStats.isEmpty());
StreamingOutput topic = brokerStats.getTopics2();
assertNotNull(topic);
try {
brokerStats.getBrokerResourceAvailability("prop", "use", "ns2");
fail("should have failed as ModularLoadManager doesn't support it");
} catch (RestException re) {
// Ok
}
}
@Test
public void persistentTopics() throws Exception {
final String property = "prop-xyz";
final String cluster = "use";
final String namespace = "ns";
final String topic = "ds1";
Policies policies = new Policies();
doReturn(policies).when(resourceQuotas).getNamespacePolicies(NamespaceName.get(property, cluster, namespace));
doReturn("client-id").when(resourceQuotas).clientAppId();
// create policies
TenantInfo admin = TenantInfo.builder()
.allowedClusters(Collections.singleton(cluster))
.build();
pulsar.getPulsarResources().getTenantResources().createTenant(property, admin);
pulsar.getPulsarResources().getNamespaceResources()
.createPolicies(NamespaceName.get(property, cluster, namespace), new Policies());
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.getList(response, property, cluster, namespace);
verify(response, times(1)).resume(Lists.newArrayList());
// create topic
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
.newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));
TopicName topicName = TopicName.get("persistent", property, cluster, namespace, topic);
assertEquals(persistentTopics.getPartitionedTopicMetadata(topicName, true, false).partitions, 5);
// grant permission
final Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce);
final String role = "test-role";
persistentTopics.grantPermissionsOnTopic(property, cluster, namespace, topic, role, actions);
// verify permission
Map<String, Set<AuthAction>> permission = persistentTopics.getPermissionsOnTopic(property, cluster,
namespace, topic);
assertEquals(permission.get(role), actions);
// remove permission
persistentTopics.revokePermissionsOnTopic(property, cluster, namespace, topic, role);
// verify removed permission
Awaitility.await().untilAsserted(() -> {
Map<String, Set<AuthAction>> p = persistentTopics.getPermissionsOnTopic(property, cluster, namespace, topic);
assertTrue(p.isEmpty());
});
}
@Test
public void testRestExceptionMessage() {
String message = "my-message";
RestException exception = new RestException(Status.PRECONDITION_FAILED, message);
assertEquals(exception.getMessage(), message);
}
@Test
public void testUpdatePartitionedTopicCoontainedInOldTopic() throws Exception {
final String property = "prop-xyz";
final String cluster = "use";
final String namespace = "ns";
final String partitionedTopicName = "old-special-topic";
final String partitionedTopicName2 = "special-topic";
pulsar.getPulsarResources().getNamespaceResources()
.createPolicies(NamespaceName.get(property, cluster, namespace), new Policies());
AsyncResponse response1 = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response1, property, cluster, namespace, partitionedTopicName, 5, false);
verify(response1, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
AsyncResponse response2 = mock(AsyncResponse.class);
responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response2, property, cluster, namespace, partitionedTopicName2, 2, false);
verify(response2, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
persistentTopics.updatePartitionedTopic(property, cluster, namespace, partitionedTopicName2, false, false,
false, 10);
}
static class TestAsyncResponse implements AsyncResponse {
Object response;
Throwable e;
CountDownLatch latch = new CountDownLatch(1);
@Override
public boolean resume(Object response) {
this.response = response;
latch.countDown();
return true;
}
@Override
public boolean resume(Throwable response) {
this.e = response;
latch.countDown();
return true;
}
@Override
public boolean cancel() {
return false;
}
@Override
public boolean cancel(int retryAfter) {
return false;
}
@Override
public boolean cancel(Date retryAfter) {
return false;
}
@Override
public boolean isSuspended() {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public boolean setTimeout(long time, TimeUnit unit) {
return false;
}
@Override
public void setTimeoutHandler(TimeoutHandler handler) {
}
@Override
public Collection<Class<?>> register(Class<?> callback) {
return null;
}
@Override
public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback, Class<?>... callbacks) {
return null;
}
@Override
public Collection<Class<?>> register(Object callback) {
return null;
}
@Override
public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... callbacks) {
return null;
}
}
}