blob: 9ecb2fb4590619ba670b0d3d04009783a4249931 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConnectionPool;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.netty.EventLoopUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker")
public class BrokerServiceThrottlingTest extends BrokerTestBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
super.baseSetup();
}
@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
super.internalCleanup();
}
/**
* Verifies: updating zk-throttling node reflects broker-maxConcurrentLookupRequest and updates semaphore.
*
* @throws Exception
*/
@Test
public void testThrottlingLookupRequestSemaphore() throws Exception {
BrokerService service = pulsar.getBrokerService();
assertNotEquals(service.lookupRequestSemaphore.get().availablePermits(), 0);
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(0));
Thread.sleep(1000);
assertEquals(service.lookupRequestSemaphore.get().availablePermits(), 0);
}
/**
* Broker has maxConcurrentLookupRequest = 0 so, it rejects incoming lookup request and it cause consumer creation
* failure.
*
* @throws Exception
*/
@Test
public void testLookupThrottlingForClientByBroker0Permit() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.build();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub").subscribe();
consumer.close();
int newPermits = 0;
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(newPermits));
// wait config to be updated
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getMaxConcurrentLookupRequest() != newPermits) {
Thread.sleep(100 + (i * 10));
} else {
break;
}
}
try {
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub").subscribe();
consumer.close();
fail("It should fail as throttling should not receive any request");
} catch (org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException e) {
// ok as throttling set to 0
}
}
/**
* Verifies: Broker side throttling:
*
* <pre>
* 1. concurrent_consumer_creation > maxConcurrentLookupRequest at broker
* 2. few of the consumer creation must fail with TooManyLookupRequestException.
* </pre>
*
* @throws Exception
*/
@Test
public void testLookupThrottlingForClientByBroker() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic";
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.ioThreads(20).connectionsPerBroker(20).build();
int newPermits = 1;
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(newPermits));
// wait config to be updated
for (int i = 0; i < 5; i++) {
if (pulsar.getConfiguration().getMaxConcurrentLookupRequest() != newPermits) {
Thread.sleep(100 + (i * 10));
} else {
break;
}
}
PulsarServiceNameResolver resolver = new PulsarServiceNameResolver();
resolver.updateServiceUrl(pulsar.getBrokerServiceUrl());
ClientConfigurationData conf = new ClientConfigurationData();
conf.setConnectionsPerBroker(20);
EventLoopGroup eventLoop = EventLoopUtil.newEventLoopGroup(20, false,
new DefaultThreadFactory("test-pool", Thread.currentThread().isDaemon()));
ExecutorService executor = Executors.newFixedThreadPool(10);
try (ConnectionPool pool = new ConnectionPool(conf, eventLoop)) {
final int totalConsumers = 20;
List<Future<?>> futures = new ArrayList<>();
// test for partitionMetadataRequest
for (int i = 0; i < totalConsumers; i++) {
long reqId = 0xdeadbeef + i;
Future<?> f = executor.submit(() -> {
ByteBuf request = Commands.newPartitionMetadataRequest(topicName, reqId);
pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request, reqId))
.get();
return null;
});
futures.add(f);
}
int rejects = 0;
for (Future<?> f : futures) {
try {
f.get();
} catch (ExecutionException e) {
Throwable rootCause = e;
while (rootCause instanceof ExecutionException) {
rootCause = rootCause.getCause();
}
if (rootCause instanceof
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException) {
rejects++;
} else {
throw e;
}
}
}
assertTrue(rejects > 0);
futures.clear();
// test for lookup
for (int i = 0; i < totalConsumers; i++) {
long reqId = 0xdeadfeef + i;
Future<?> f = executor.submit(() -> {
ByteBuf request = Commands.newLookup(topicName, true, reqId);
pool.getConnection(resolver.resolveHost())
.thenCompose(clientCnx -> clientCnx.newLookup(request, reqId))
.get();
return null;
});
futures.add(f);
}
rejects = 0;
for (Future<?> f : futures) {
try {
f.get();
} catch (ExecutionException e) {
Throwable rootCause = e;
while (rootCause instanceof ExecutionException) {
rootCause = rootCause.getCause();
}
if (rootCause instanceof
org.apache.pulsar.client.api.PulsarClientException.TooManyRequestsException) {
rejects++;
} else {
throw e;
}
}
}
assertTrue(rejects > 0);
} finally {
executor.shutdownNow();
eventLoop.shutdownNow();
}
}
/**
* This testcase make sure that once consumer lost connection with broker, it always reconnects with broker by
* retrying on throttling-error exception also.
*
* <pre>
* 1. all consumers get connected
* 2. broker restarts with maxConcurrentLookupRequest = 1
* 3. consumers reconnect and some get TooManyRequestException and again retries
* 4. eventually all consumers will successfully connect to broker
* </pre>
*
* @throws Exception
*/
@Test
public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exception {
final String topicName = "persistent://prop/ns-abc/newTopic-" + UUID.randomUUID().toString();
@Cleanup
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsar.getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
.ioThreads(20).connectionsPerBroker(20).build();
upsertLookupPermits(100);
List<Consumer<byte[]>> consumers = Collections.synchronizedList(Lists.newArrayList());
@Cleanup("shutdownNow")
ExecutorService executor = Executors.newFixedThreadPool(10);
final int totalConsumers = 8;
CountDownLatch latch = new CountDownLatch(totalConsumers);
for (int i = 0; i < totalConsumers; i++) {
executor.execute(() -> {
try {
consumers.add(pulsarClient.newConsumer().topic(topicName).subscriptionName("mysub")
.subscriptionType(SubscriptionType.Shared).subscribe());
} catch (PulsarClientException.TooManyRequestsException e) {
// ok
} catch (Exception e) {
fail("it shouldn't failed");
}
latch.countDown();
});
}
latch.await();
admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", "1");
admin.topics().unload(topicName);
// wait strategically for all consumers to reconnect
retryStrategically((test) -> areAllConsumersConnected(consumers), 5, 500);
int totalConnectedConsumers = 0;
for (Consumer<byte[]> consumer : consumers) {
if (consumer.isConnected()) {
totalConnectedConsumers++;
}
consumer.close();
}
assertEquals(totalConnectedConsumers, totalConsumers);
}
private boolean areAllConsumersConnected(List<Consumer<byte[]>> consumers) {
for (Consumer<byte[]> consumer : consumers) {
if (!consumer.isConnected()) {
return false;
}
}
return true;
}
private void upsertLookupPermits(int permits) throws Exception {
pulsar.getPulsarResources().getDynamicConfigResources().setDynamicConfigurationWithCreate(optMap -> {
Map<String, String> map = optMap.orElse(new TreeMap<>());
map.put("maxConcurrentLookupRequest", Integer.toString(permits));
return map;
});
}
}