blob: 3ec536e6b2fd7f2ce5009fc9a533f6ca5a4e2a09 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.lookup.http;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriInfo;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.cache.ConfigurationCacheService;
import org.apache.pulsar.broker.lookup.NamespaceData;
import org.apache.pulsar.broker.lookup.RedirectData;
import org.apache.pulsar.broker.lookup.v1.TopicLookup;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
import org.mockito.ArgumentCaptor;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import com.google.common.collect.Sets;
/**
* HTTP lookup unit tests.
*/
@Test(groups = "broker")
public class HttpTopicLookupv2Test {
private PulsarService pulsar;
private NamespaceService ns;
private AuthorizationService auth;
private ServiceConfiguration config;
private ConfigurationCacheService mockConfigCache;
private ZooKeeperChildrenCache clustersListCache;
private ZooKeeperDataCache<ClusterData> clustersCache;
private ZooKeeperDataCache<Policies> policiesCache;
private Set<String> clusters;
@SuppressWarnings("unchecked")
@BeforeMethod
public void setUp() throws Exception {
pulsar = mock(PulsarService.class);
ns = mock(NamespaceService.class);
auth = mock(AuthorizationService.class);
mockConfigCache = mock(ConfigurationCacheService.class);
clustersListCache = mock(ZooKeeperChildrenCache.class);
clustersCache = mock(ZooKeeperDataCache.class);
policiesCache = mock(ZooKeeperDataCache.class);
config = spy(new ServiceConfiguration());
config.setClusterName("use");
clusters = new TreeSet<>();
clusters.add("use");
clusters.add("usc");
clusters.add("usw");
ClusterData useData = new ClusterData("http://broker.messaging.use.example.com:8080");
ClusterData uscData = new ClusterData("http://broker.messaging.usc.example.com:8080");
ClusterData uswData = new ClusterData("http://broker.messaging.usw.example.com:8080");
doReturn(config).when(pulsar).getConfiguration();
doReturn(mockConfigCache).when(pulsar).getConfigurationCache();
doReturn(clustersListCache).when(mockConfigCache).clustersListCache();
doReturn(clustersCache).when(mockConfigCache).clustersCache();
doReturn(policiesCache).when(mockConfigCache).policiesCache();
doReturn(Optional.of(useData)).when(clustersCache).get(AdminResource.path("clusters", "use"));
doReturn(Optional.of(uscData)).when(clustersCache).get(AdminResource.path("clusters", "usc"));
doReturn(Optional.of(uswData)).when(clustersCache).get(AdminResource.path("clusters", "usw"));
doReturn(CompletableFuture.completedFuture(Optional.of(useData))).when(clustersCache).getAsync(AdminResource.path("clusters", "use"));
doReturn(CompletableFuture.completedFuture(Optional.of(uscData))).when(clustersCache).getAsync(AdminResource.path("clusters", "usc"));
doReturn(CompletableFuture.completedFuture(Optional.of(uswData))).when(clustersCache).getAsync(AdminResource.path("clusters", "usw"));
doReturn(clusters).when(clustersListCache).get();
doReturn(ns).when(pulsar).getNamespaceService();
BrokerService brokerService = mock(BrokerService.class);
doReturn(brokerService).when(pulsar).getBrokerService();
doReturn(auth).when(brokerService).getAuthorizationService();
doReturn(new Semaphore(1000)).when(brokerService).getLookupRequestSemaphore();
}
@Test
public void crossColoLookup() throws Exception {
TopicLookup destLookup = spy(new TopicLookup());
doReturn(false).when(destLookup).isRequestHttps();
destLookup.setPulsar(pulsar);
doReturn("null").when(destLookup).clientAppId();
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
UriInfo uriInfo = mock(UriInfo.class);
uriField.set(destLookup, uriInfo);
URI uri = URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1");
doReturn(uri).when(uriInfo).getRequestUri();
doReturn(true).when(config).isAuthorizationEnabled();
AsyncResponse asyncResponse = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,
asyncResponse);
ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse).resume(arg.capture());
assertEquals(arg.getValue().getClass(), WebApplicationException.class);
WebApplicationException wae = (WebApplicationException) arg.getValue();
assertEquals(wae.getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
}
@Test
public void testNotEnoughLookupPermits() throws Exception {
BrokerService brokerService = pulsar.getBrokerService();
doReturn(new Semaphore(0)).when(brokerService).getLookupRequestSemaphore();
TopicLookup destLookup = spy(new TopicLookup());
doReturn(false).when(destLookup).isRequestHttps();
destLookup.setPulsar(pulsar);
doReturn("null").when(destLookup).clientAppId();
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
UriInfo uriInfo = mock(UriInfo.class);
uriField.set(destLookup, uriInfo);
URI uri = URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1");
doReturn(uri).when(uriInfo).getRequestUri();
doReturn(true).when(config).isAuthorizationEnabled();
AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,
asyncResponse1);
ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse1).resume(arg.capture());
assertEquals(arg.getValue().getClass(), WebApplicationException.class);
WebApplicationException wae = (WebApplicationException) arg.getValue();
assertEquals(wae.getResponse().getStatus(), Status.SERVICE_UNAVAILABLE.getStatusCode());
}
@Test
public void testValidateReplicationSettingsOnNamespace() throws Exception {
final String property = "my-prop";
final String cluster = "global";
final String ns1 = "ns1";
final String ns2 = "ns2";
Policies policies1 = new Policies();
doReturn(Optional.of(policies1)).when(policiesCache)
.get(AdminResource.path(POLICIES, property, cluster, ns1));
Policies policies2 = new Policies();
policies2.replication_clusters = Sets.newHashSet("invalid-localCluster");
doReturn(Optional.of(policies2)).when(policiesCache)
.get(AdminResource.path(POLICIES, property, cluster, ns2));
TopicLookup destLookup = spy(new TopicLookup());
doReturn(false).when(destLookup).isRequestHttps();
destLookup.setPulsar(pulsar);
doReturn("null").when(destLookup).clientAppId();
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
UriInfo uriInfo = mock(UriInfo.class);
uriField.set(destLookup, uriInfo);
doReturn(false).when(config).isAuthorizationEnabled();
AsyncResponse asyncResponse = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), property, cluster, ns1, "empty-cluster",
false, asyncResponse);
ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse).resume(arg.capture());
assertEquals(arg.getValue().getClass(), RestException.class);
AsyncResponse asyncResponse2 = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), property, cluster, ns2,
"invalid-localCluster", false, asyncResponse2);
ArgumentCaptor<Throwable> arg2 = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse2).resume(arg2.capture());
// Should have raised exception for invalid cluster
assertEquals(arg2.getValue().getClass(), RestException.class);
}
@Test
public void testDataPojo() {
final String url = "localhost:8080";
NamespaceData data1 = new NamespaceData(url);
assertEquals(data1.getBrokerUrl(), url);
RedirectData data2 = new RedirectData(url);
assertEquals(data2.getRedirectLookupAddress(), url);
}
}