blob: a88ccd60ae4c414f045a899372432b2e4f0f1f5b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance;
import static org.apache.pulsar.common.util.PortManager.nextLockedFreePort;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import java.net.URI;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.annotations.Test;
@Slf4j
@Test(groups = "broker")
public class AdvertisedListenersTest extends MultiBrokerBaseTest {
@Override
protected int numberOfAdditionalBrokers() {
return 1;
}
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
updateConfig(conf, "BROKER-X");
}
@Override
protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) {
ServiceConfiguration conf = super.createConfForAdditionalBroker(additionalBrokerIndex);
updateConfig(conf, "BROKER-" + additionalBrokerIndex);
return conf;
}
private void updateConfig(ServiceConfiguration conf, String advertisedAddress) {
int pulsarPort = nextLockedFreePort();
int httpPort = nextLockedFreePort();
int httpsPort = nextLockedFreePort();
// Use invalid domain name as identifier and instead make sure the advertised listeners work as intended
conf.setAdvertisedAddress(advertisedAddress);
conf.setAdvertisedListeners(
"public:pulsar://localhost:" + pulsarPort +
",public_http:http://localhost:" + httpPort +
",public_https:https://localhost:" + httpsPort);
conf.setBrokerServicePort(Optional.of(pulsarPort));
conf.setWebServicePort(Optional.of(httpPort));
}
@Test
public void testLookup() throws Exception {
HttpGet request =
new HttpGet(pulsar.getWebServiceAddress() + "/lookup/v2/topic/persistent/public/default/my-topic");
request.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
request.addHeader(HttpHeaders.ACCEPT, "application/json");
final String topic = "my-topic";
@Cleanup
CloseableHttpClient httpClient = HttpClients.createDefault();
@Cleanup
CloseableHttpResponse response = httpClient.execute(request);
HttpEntity entity = response.getEntity();
LookupData ld = ObjectMapperFactory.getMapper().reader().readValue(EntityUtils.toString(entity), LookupData.class);
System.err.println("Lookup data: " + ld);
assertEquals(new URI(ld.getBrokerUrl()).getHost(), "localhost");
assertEquals(new URI(ld.getHttpUrl()).getHost(), "localhost");
// Produce data
@Cleanup
Producer<String> p = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();
p.send("hello");
// Verify we can get the correct HTTP redirect to the advertised listener
for (PulsarAdmin a : getAllAdmins()) {
TopicStats s = a.topics().getStats(topic);
assertNotNull(a.lookups().lookupTopic(topic));
assertEquals(s.getPublishers().size(), 1);
}
}
}