blob: 341821892605ebc70a66c5028763c52daddd5ef0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.admin;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URI;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.TimeoutHandler;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.proto.PendingBookieOpsStats;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.admin.v1.BrokerStats;
import org.apache.pulsar.broker.admin.v1.Brokers;
import org.apache.pulsar.broker.admin.v1.Clusters;
import org.apache.pulsar.broker.admin.v1.Namespaces;
import org.apache.pulsar.broker.admin.v1.PersistentTopics;
import org.apache.pulsar.broker.admin.v1.Properties;
import org.apache.pulsar.broker.admin.v1.ResourceQuotas;
import org.apache.pulsar.broker.admin.v2.SchemasResource;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.conf.InternalConfigurationData;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.BundlesData;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.stats.AllocatorStats;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AdminTest extends MockedPulsarServiceBaseTest {
private static final Logger log = LoggerFactory.getLogger(AdminTest.class);
private final String configClusterName = "use";
private ConfigurationCacheService configurationCache;
private Clusters clusters;
private Properties properties;
private Namespaces namespaces;
private PersistentTopics persistentTopics;
private Brokers brokers;
private ResourceQuotas resourceQuotas;
private BrokerStats brokerStats;
private SchemasResource schemasResource;
private Field uriField;
private Clock mockClock = Clock.fixed(
Instant.ofEpochSecond(365248800),
ZoneId.of("-05:00")
);
public AdminTest() {
super();
}
@Override
@BeforeMethod
public void setup() throws Exception {
conf.setClusterName(configClusterName);
super.internalSetup();
configurationCache = pulsar.getConfigurationCache();
clusters = spy(new Clusters());
clusters.setPulsar(pulsar);
doReturn("test").when(clusters).clientAppId();
doNothing().when(clusters).validateSuperUserAccess();
properties = spy(new Properties());
properties.setPulsar(pulsar);
doReturn("test").when(properties).clientAppId();
doNothing().when(properties).validateSuperUserAccess();
namespaces = spy(new Namespaces());
namespaces.setServletContext(new MockServletContext());
namespaces.setPulsar(pulsar);
doReturn(mockZooKeeper).when(namespaces).localZk();
doReturn("test").when(namespaces).clientAppId();
doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc", "global"))).when(namespaces).clusters();
doNothing().when(namespaces).validateAdminAccessForTenant("my-tenant");
doNothing().when(namespaces).validateAdminAccessForTenant("other-tenant");
doNothing().when(namespaces).validateAdminAccessForTenant("new-property");
brokers = spy(new Brokers());
brokers.setPulsar(pulsar);
doReturn("test").when(brokers).clientAppId();
doNothing().when(brokers).validateSuperUserAccess();
uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
persistentTopics = spy(new PersistentTopics());
persistentTopics.setServletContext(new MockServletContext());
persistentTopics.setPulsar(pulsar);
doReturn(mockZooKeeper).when(persistentTopics).localZk();
doReturn("test").when(persistentTopics).clientAppId();
doReturn("persistent").when(persistentTopics).domain();
doReturn(Sets.newTreeSet(Lists.newArrayList("use", "usw", "usc"))).when(persistentTopics).clusters();
doNothing().when(persistentTopics).validateAdminAccessForTenant("my-tenant");
doNothing().when(persistentTopics).validateAdminAccessForTenant("other-tenant");
doNothing().when(persistentTopics).validateAdminAccessForTenant("prop-xyz");
resourceQuotas = spy(new ResourceQuotas());
resourceQuotas.setServletContext(new MockServletContext());
resourceQuotas.setPulsar(pulsar);
brokerStats = spy(new BrokerStats());
brokerStats.setServletContext(new MockServletContext());
brokerStats.setPulsar(pulsar);
doReturn(false).when(persistentTopics).isRequestHttps();
doReturn(null).when(persistentTopics).originalPrincipal();
doReturn("test").when(persistentTopics).clientAppId();
doReturn(mock(AuthenticationDataHttps.class)).when(persistentTopics).clientAuthData();
schemasResource = spy(new SchemasResource(mockClock));
schemasResource.setServletContext(new MockServletContext());
schemasResource.setPulsar(pulsar);
}
@Override
@AfterMethod(alwaysRun = true)
public void cleanup() throws Exception {
super.internalCleanup();
conf.setClusterName(configClusterName);
}
@Test
public void internalConfiguration() throws Exception {
InternalConfigurationData expectedData = new InternalConfigurationData(
pulsar.getConfiguration().getZookeeperServers(),
pulsar.getConfiguration().getConfigurationStoreServers(),
new ClientConfiguration().getZkLedgersRootPath(),
pulsar.getMetadataServiceUri(),
pulsar.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null));
assertEquals(brokers.getInternalConfigurationData(), expectedData);
}
@Test
public void clusters() throws Exception {
assertEquals(clusters.getClusters(), Lists.newArrayList());
verify(clusters, never()).validateSuperUserAccess();
clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com"));
verify(clusters, times(1)).validateSuperUserAccess();
// ensure to read from ZooKeeper directly
//clusters.clustersListCache().clear();
assertEquals(clusters.getClusters(), Lists.newArrayList("use"));
// Check creating existing cluster
try {
clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode());
}
// Check deleting non-existing cluster
try {
clusters.deleteCluster("usc");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}
assertEquals(clusters.getCluster("use"), new ClusterData("http://broker.messaging.use.example.com"));
verify(clusters, times(4)).validateSuperUserAccess();
clusters.updateCluster("use", new ClusterData("http://new-broker.messaging.use.example.com"));
verify(clusters, times(5)).validateSuperUserAccess();
assertEquals(clusters.getCluster("use"), new ClusterData("http://new-broker.messaging.use.example.com"));
verify(clusters, times(6)).validateSuperUserAccess();
try {
clusters.getNamespaceIsolationPolicies("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
}
NamespaceIsolationData policyData = new NamespaceIsolationData();
policyData.namespaces = new ArrayList<String>();
policyData.namespaces.add("dummy/colo/ns");
policyData.primary = new ArrayList<String>();
policyData.primary.add("localhost" + ":" + pulsar.getListenPortHTTP());
policyData.secondary = new ArrayList<String>();
policyData.auto_failover_policy = new AutoFailoverPolicyData();
policyData.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available;
policyData.auto_failover_policy.parameters = new HashMap<String, String>();
policyData.auto_failover_policy.parameters.put("min_limit", "1");
policyData.auto_failover_policy.parameters.put("usage_threshold", "90");
AsyncResponse response = mock(AsyncResponse.class);
clusters.setNamespaceIsolationPolicy(response,"use", "policy1", policyData);
clusters.getNamespaceIsolationPolicies("use");
try {
clusters.deleteCluster("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 412);
}
clusters.deleteNamespaceIsolationPolicy("use", "policy1");
assertTrue(clusters.getNamespaceIsolationPolicies("use").isEmpty());
clusters.deleteCluster("use");
verify(clusters, times(13)).validateSuperUserAccess();
assertEquals(clusters.getClusters(), Lists.newArrayList());
try {
clusters.getCluster("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
}
try {
clusters.updateCluster("use", new ClusterData());
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
}
try {
clusters.getNamespaceIsolationPolicies("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), 404);
}
// Test zk failures
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/admin/clusters");
});
configurationCache.clustersListCache().clear();
// clear caches to load data from metadata-store again
MetadataCacheImpl<ClusterData> clusterCache = (MetadataCacheImpl<ClusterData>) pulsar.getPulsarResources()
.getClusterResources().getCache();
MetadataCacheImpl isolationPolicyCache = (MetadataCacheImpl) pulsar.getPulsarResources()
.getNamespaceResources().getIsolationPolicies().getCache();
AbstractMetadataStore store = (AbstractMetadataStore) clusterCache.getStore();
clusterCache.invalidateAll();
store.invalidateAll();
try {
clusters.getClusters();
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.CREATE
&& path.equals("/admin/clusters/test");
});
try {
clusters.createCluster("test", new ClusterData("http://broker.messaging.test.example.com"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/clusters/test");
});
clusterCache.invalidateAll();
store.invalidateAll();
try {
clusters.updateCluster("test", new ClusterData("http://broker.messaging.test.example.com"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/clusters/test");
});
try {
clusters.getCluster("test");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN
&& path.equals("/admin/policies");
});
try {
clusters.deleteCluster("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET
&& path.equals("/admin/clusters/use/namespaceIsolationPolicies");
});
clusterCache.invalidateAll();
isolationPolicyCache.invalidateAll();
store.invalidateAll();
try {
clusters.deleteCluster("use");
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
// Check name validations
try {
clusters.createCluster("bf@", new ClusterData("http://dummy.messaging.example.com"));
fail("should have filed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
}
Object asynRequests(Consumer<TestAsyncResponse> function) throws Exception {
TestAsyncResponse ctx = new TestAsyncResponse();
function.accept(ctx);
ctx.latch.await();
if (ctx.e != null) {
throw (Exception) ctx.e;
}
return ctx.response;
}
@Test
public void properties() throws Throwable {
Object response = asynRequests(ctx -> properties.getTenants(ctx));
assertEquals(response, Lists.newArrayList());
verify(properties, times(1)).validateSuperUserAccess();
// create local cluster
clusters.createCluster(configClusterName, new ClusterData());
Set<String> allowedClusters = Sets.newHashSet();
allowedClusters.add(configClusterName);
TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), allowedClusters);
response = asynRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo));
verify(properties, times(2)).validateSuperUserAccess();
response = asynRequests(ctx -> properties.getTenants(ctx));
assertEquals(response, Lists.newArrayList("test-property"));
verify(properties, times(3)).validateSuperUserAccess();
response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property"));
assertEquals(response, tenantInfo);
verify(properties, times(4)).validateSuperUserAccess();
final TenantInfo newPropertyAdmin = new TenantInfo(Sets.newHashSet("role1", "other-role"), allowedClusters);
response = asynRequests(ctx -> properties.updateTenant(ctx, "test-property", newPropertyAdmin));
verify(properties, times(5)).validateSuperUserAccess();
// Wait for updateTenant to take effect
Thread.sleep(100);
response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property"));
assertEquals(response, newPropertyAdmin);
response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "test-property"));
assertNotSame(response, tenantInfo);
verify(properties, times(7)).validateSuperUserAccess();
// Check creating existing property
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "test-property", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.CONFLICT.getStatusCode());
}
// Check non-existing property
try {
response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "non-existing"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}
try {
response = asynRequests(ctx -> properties.updateTenant(ctx, "xxx-non-existing", newPropertyAdmin));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}
// Check deleting non-existing property
try {
response = asynRequests(ctx -> properties.deleteTenant(ctx, "non-existing", false));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}
// clear caches to load data from metadata-store again
MetadataCacheImpl<TenantInfo> cache = (MetadataCacheImpl<TenantInfo>) pulsar.getPulsarResources()
.getTenatResources().getCache();
AbstractMetadataStore store = (AbstractMetadataStore) cache.getStore();
cache.invalidateAll();
store.invalidateAll();
// Test zk failures
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN && path.equals("/admin/policies");
});
try {
response = asynRequests(ctx -> properties.getTenants(ctx));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET && path.equals("/admin/policies/my-tenant");
});
try {
response = asynRequests(ctx -> properties.getTenantAdmin(ctx, "my-tenant"));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET && path.equals("/admin/policies/my-tenant");
});
try {
response = asynRequests(ctx -> properties.updateTenant(ctx, "my-tenant", newPropertyAdmin));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.CREATE && path.equals("/admin/policies/test");
});
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "test", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.GET_CHILDREN && path.equals("/admin/policies/test-property");
});
try {
cache.invalidateAll();
store.invalidateAll();
response = asynRequests(ctx -> properties.deleteTenant(ctx, "test-property", false));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
response = asynRequests(ctx -> properties.createTenant(ctx, "error-property", tenantInfo));
mockZooKeeperGlobal.failConditional(Code.SESSIONEXPIRED, (op, path) -> {
return op == MockZooKeeper.Op.DELETE && path.equals("/admin/policies/error-property");
});
try {
response = asynRequests(ctx -> properties.deleteTenant(ctx, "error-property", false));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.INTERNAL_SERVER_ERROR.getStatusCode());
}
response = asynRequests(ctx -> properties.deleteTenant(ctx, "test-property", false));
response = asynRequests(ctx -> properties.deleteTenant(ctx, "error-property", false));
response = Lists.newArrayList();
response = asynRequests(ctx -> properties.getTenants(ctx));
assertEquals(response, Lists.newArrayList());
// Create a namespace to test deleting a non-empty property
TenantInfo newPropertyAdmin2 = new TenantInfo(Sets.newHashSet("role1", "other-role"), Sets.newHashSet("use"));
response = asynRequests(ctx -> properties.createTenant(ctx, "my-tenant", newPropertyAdmin2));
namespaces.createNamespace("my-tenant", "use", "my-namespace", new BundlesData());
try {
response = asynRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false));
fail("should have failed");
} catch (RestException e) {
// Ok
}
// Check name validation
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "test&", tenantInfo));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
// Check tenantInfo is null
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-null", null));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
// Check tenantInfo with empty cluster
String blankCluster = "";
Set<String> blankClusters = Sets.newHashSet(blankCluster);
TenantInfo tenantWithEmptyCluster = new TenantInfo(Sets.newHashSet("role1", "role2"), blankClusters);
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-is-empty", tenantWithEmptyCluster));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
// Check tenantInfo contains empty cluster
Set<String> containBlankClusters = Sets.newHashSet(blankCluster);
containBlankClusters.add(configClusterName);
TenantInfo tenantContainEmptyCluster = new TenantInfo(Sets.newHashSet(), containBlankClusters);
try {
response = asynRequests(ctx -> properties.createTenant(ctx, "tenant-config-contain-empty", tenantContainEmptyCluster));
fail("should have failed");
} catch (RestException e) {
assertEquals(e.getResponse().getStatus(), Status.PRECONDITION_FAILED.getStatusCode());
}
AsyncResponse response2 = mock(AsyncResponse.class);
namespaces.deleteNamespace(response2, "my-tenant", "use", "my-namespace", false, false);
ArgumentCaptor<Response> captor = ArgumentCaptor.forClass(Response.class);
verify(response2, timeout(5000).times(1)).resume(captor.capture());
assertEquals(captor.getValue().getStatus(), Status.NO_CONTENT.getStatusCode());
response = asynRequests(ctx -> properties.deleteTenant(ctx, "my-tenant", false));
}
@Test
public void brokers() throws Exception {
clusters.createCluster("use", new ClusterData("http://broker.messaging.use.example.com",
"https://broker.messaging.use.example.com:4443"));
URI requestUri = new URI(
"http://broker.messaging.use.example.com:8080/admin/brokers/use");
UriInfo mockUri = mock(UriInfo.class);
doReturn(requestUri).when(mockUri).getRequestUri();
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
uriField.set(brokers, mockUri);
Set<String> activeBrokers = brokers.getActiveBrokers("use");
assertEquals(activeBrokers.size(), 1);
assertEquals(activeBrokers, Sets.newHashSet(pulsar.getAdvertisedAddress() + ":" + pulsar.getListenPortHTTP().get()));
}
@Test
public void resourceQuotas() throws Exception {
// get Default Resource Quota
ResourceQuota quota = resourceQuotas.getDefaultResourceQuota();
assertNotNull(quota);
assertTrue(quota.getBandwidthIn() > 0);
// set Default Resource Quota
double defaultBandwidth = 1000;
quota.setBandwidthIn(defaultBandwidth);
quota.setBandwidthOut(defaultBandwidth);
resourceQuotas.setDefaultResourceQuota(quota);
assertEquals(defaultBandwidth, resourceQuotas.getDefaultResourceQuota().getBandwidthIn());
assertEquals(defaultBandwidth, resourceQuotas.getDefaultResourceQuota().getBandwidthOut());
String property = "prop-xyz";
String cluster = "use";
String namespace = "ns";
String bundleRange = "0x00000000_0xffffffff";
Policies policies = new Policies();
doReturn(policies).when(resourceQuotas).getNamespacePolicies(NamespaceName.get(property, cluster, namespace));
doReturn("client-id").when(resourceQuotas).clientAppId();
try {
resourceQuotas.setNamespaceBundleResourceQuota(property, cluster, namespace, bundleRange, quota);
fail();
} catch (Exception e) {
// OK : should fail without creating policies
}
try {
resourceQuotas.removeNamespaceBundleResourceQuota(property, cluster, namespace, bundleRange);
fail();
} catch (Exception e) {
// OK : should fail without creating policies
}
// create policies
TenantInfo admin = new TenantInfo();
admin.getAllowedClusters().add(cluster);
ClusterData clusterData = new ClusterData(cluster);
clusters.createCluster(cluster, clusterData );
asynRequests(ctx -> properties.createTenant(ctx, property, admin));
// customized bandwidth for this namespace
double customizeBandwidth = 3000;
quota.setBandwidthIn(customizeBandwidth);
quota.setBandwidthOut(customizeBandwidth);
// set and get Resource Quota
resourceQuotas.setNamespaceBundleResourceQuota(property, cluster, namespace, bundleRange, quota);
ResourceQuota bundleQuota = resourceQuotas.getNamespaceBundleResourceQuota(property, cluster, namespace,
bundleRange);
assertEquals(quota, bundleQuota);
// remove quota which sets to default quota
resourceQuotas.removeNamespaceBundleResourceQuota(property, cluster, namespace, bundleRange);
bundleQuota = resourceQuotas.getNamespaceBundleResourceQuota(property, cluster, namespace, bundleRange);
assertEquals(defaultBandwidth, bundleQuota.getBandwidthIn());
assertEquals(defaultBandwidth, bundleQuota.getBandwidthOut());
}
@Test
public void brokerStats() throws Exception {
doReturn("client-id").when(brokerStats).clientAppId();
Collection<Metrics> metrics = brokerStats.getMetrics();
assertNotNull(metrics);
LocalBrokerData loadReport = (LocalBrokerData) brokerStats.getLoadReport();
assertNotNull(loadReport);
assertNotNull(loadReport.getCpu());
Collection<Metrics> mBeans = brokerStats.getMBeans();
assertFalse(mBeans.isEmpty());
AllocatorStats allocatorStats = brokerStats.getAllocatorStats("default");
assertNotNull(allocatorStats);
Map<String, Map<String, PendingBookieOpsStats>> bookieOpsStats = brokerStats.getPendingBookieOpsStats();
assertTrue(bookieOpsStats.isEmpty());
StreamingOutput topic = brokerStats.getTopics2();
assertNotNull(topic);
try {
brokerStats.getBrokerResourceAvailability("prop", "use", "ns2");
fail("should have failed as ModularLoadManager doesn't support it");
} catch (RestException re) {
// Ok
}
}
@Test
public void persistentTopics() throws Exception {
final String property = "prop-xyz";
final String cluster = "use";
final String namespace = "ns";
final String topic = "ds1";
Policies policies = new Policies();
doReturn(policies).when(resourceQuotas).getNamespacePolicies(NamespaceName.get(property, cluster, namespace));
doReturn("client-id").when(resourceQuotas).clientAppId();
// create policies
TenantInfo admin = new TenantInfo();
admin.getAllowedClusters().add(cluster);
ZkUtils.createFullPathOptimistic(mockZooKeeperGlobal, PulsarWebResource.path(POLICIES, property, cluster, namespace),
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(new Policies()), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
AsyncResponse response = mock(AsyncResponse.class);
persistentTopics.getList(response, property, cluster, namespace);
verify(response, times(1)).resume(Lists.newArrayList());
// create topic
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
response = mock(AsyncResponse.class);
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
.newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));
TopicName topicName = TopicName.get("persistent", property, cluster, namespace, topic);
assertEquals(persistentTopics.getPartitionedTopicMetadata(topicName, true, false).partitions, 5);
CountDownLatch notificationLatch = new CountDownLatch(2);
configurationCache.policiesCache().registerListener((path, data, stat) -> {
notificationLatch.countDown();
});
// grant permission
final Set<AuthAction> actions = Sets.newHashSet(AuthAction.produce);
final String role = "test-role";
persistentTopics.grantPermissionsOnTopic(property, cluster, namespace, topic, role, actions);
// verify permission
Map<String, Set<AuthAction>> permission = persistentTopics.getPermissionsOnTopic(property, cluster,
namespace, topic);
assertEquals(permission.get(role), actions);
// remove permission
persistentTopics.revokePermissionsOnTopic(property, cluster, namespace, topic, role);
// Wait for cache to be updated
notificationLatch.await();
// verify removed permission
permission = persistentTopics.getPermissionsOnTopic(property, cluster, namespace, topic);
assertTrue(permission.isEmpty());
}
@Test
public void testRestExceptionMessage() {
String message = "my-message";
RestException exception = new RestException(Status.PRECONDITION_FAILED, message);
assertEquals(exception.getMessage(), message);
}
static class TestAsyncResponse implements AsyncResponse {
Object response;
Throwable e;
CountDownLatch latch = new CountDownLatch(1);
@Override
public boolean resume(Object response) {
this.response = response;
latch.countDown();
return true;
}
@Override
public boolean resume(Throwable response) {
this.e = response;
latch.countDown();
return true;
}
@Override
public boolean cancel() {
return false;
}
@Override
public boolean cancel(int retryAfter) {
return false;
}
@Override
public boolean cancel(Date retryAfter) {
return false;
}
@Override
public boolean isSuspended() {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public boolean setTimeout(long time, TimeUnit unit) {
return false;
}
@Override
public void setTimeoutHandler(TimeoutHandler handler) {
}
@Override
public Collection<Class<?>> register(Class<?> callback) {
return null;
}
@Override
public Map<Class<?>, Collection<Class<?>>> register(Class<?> callback, Class<?>... callbacks) {
return null;
}
@Override
public Collection<Class<?>> register(Object callback) {
return null;
}
@Override
public Map<Class<?>, Collection<Class<?>>> register(Object callback, Object... callbacks) {
return null;
}
}
}