blob: e08cd911ad7eab357f977d4c6094905c96696dea [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.Cleanup;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.HttpLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
@Test(groups = "broker-api")
public class PulsarMultiListenersWithInternalListenerNameTest extends MockedPulsarServiceBaseTest {
private final boolean withInternalListener;
private ExecutorService executorService;
private String hostAndBrokerPort;
private String hostAndBrokerPortSsl;
private EventLoopGroup eventExecutors;
public PulsarMultiListenersWithInternalListenerNameTest() {
this(true);
}
protected PulsarMultiListenersWithInternalListenerNameTest(boolean withInternalListener) {
this.withInternalListener = withInternalListener;
// enable port forwarding from the configured advertised port to the dynamic listening port
this.enableBrokerGateway = true;
}
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
this.executorService = Executors.newFixedThreadPool(1);
this.eventExecutors = new NioEventLoopGroup();
this.isTcpLookup = true;
String host = InetAddress.getLocalHost().getHostAddress();
int brokerPort = getFreePort();
hostAndBrokerPort = host + ":" + brokerPort;
int brokerPortSsl = getFreePort();
hostAndBrokerPortSsl = host + ":" + brokerPortSsl;
super.internalSetup();
}
private static int getFreePort() {
try (ServerSocket serverSocket = new ServerSocket(0)) {
return serverSocket.getLocalPort();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
protected void doInitConf() throws Exception {
super.doInitConf();
this.conf.setClusterName("localhost");
this.conf.setAdvertisedListeners(String.format("internal:pulsar://%s,internal:pulsar+ssl://%s",
hostAndBrokerPort, hostAndBrokerPortSsl));
if (withInternalListener) {
this.conf.setInternalListenerName("internal");
}
}
@Override
protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.listenerName("internal");
}
@Test
public void testFindBrokerWithListenerName() throws Exception {
admin.clusters().createCluster("localhost", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfo tenantInfo = TenantInfo.builder()
.allowedClusters(Collections.singleton("localhost"))
.build();
this.admin.tenants().createTenant("public", tenantInfo);
this.admin.namespaces().createNamespace("public/default");
doFindBrokerWithListenerName(true);
doFindBrokerWithListenerName(false);
}
private void doFindBrokerWithListenerName(boolean useHttp) throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setListenerName("internal");
conf.setServiceUrl(pulsar.getWebServiceAddress());
conf.setMaxLookupRedirects(10);
@Cleanup
LookupService lookupService = useHttp ? new HttpLookupService(conf, eventExecutors) :
new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient,
lookupUrl.toString(), "internal", false, this.executorService);
// test request 1
{
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getKey().toString(), hostAndBrokerPort);
Assert.assertEquals(result.getValue().toString(), hostAndBrokerPort);
}
// test request 2
{
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getKey().toString(), hostAndBrokerPort);
Assert.assertEquals(result.getValue().toString(), hostAndBrokerPort);
}
}
@Test
public void testHttpLookupRedirect() throws Exception {
admin.clusters().createCluster("localhost", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfo tenantInfo = TenantInfo.builder()
.allowedClusters(Collections.singleton("localhost"))
.build();
this.admin.tenants().createTenant("public", tenantInfo);
this.admin.namespaces().createNamespace("public/default");
ClientConfigurationData conf = new ClientConfigurationData();
conf.setListenerName("internal");
conf.setServiceUrl(pulsar.getWebServiceAddress());
conf.setMaxLookupRedirects(10);
@Cleanup
HttpLookupService lookupService = new HttpLookupService(conf, eventExecutors);
NamespaceService namespaceService = pulsar.getNamespaceService();
LookupResult lookupResult = new LookupResult(pulsar.getWebServiceAddress(), null,
pulsar.getBrokerServiceUrl(), null, true);
Optional<LookupResult> optional = Optional.of(lookupResult);
String address = "192.168.0.1:8080";
String httpAddress = "192.168.0.1:8081";
NamespaceEphemeralData namespaceEphemeralData = new NamespaceEphemeralData("pulsar://" + address,
null, "http://" + httpAddress, null, false);
LookupResult lookupResult2 = new LookupResult(namespaceEphemeralData);
Optional<LookupResult> optional2 = Optional.of(lookupResult2);
doReturn(CompletableFuture.completedFuture(optional), CompletableFuture.completedFuture(optional2))
.when(namespaceService).getBrokerServiceUrlAsync(any(), any());
CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
lookupService.getBroker(TopicName.get("persistent://public/default/test"));
Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getKey().toString(), address);
Assert.assertEquals(result.getValue().toString(), address);
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
if (this.executorService != null) {
this.executorService.shutdownNow();
}
if (eventExecutors != null) {
eventExecutors.shutdownGracefully();
}
super.internalCleanup();
}
}