| /** |
| * Copyright 2016 Yahoo Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.yahoo.pulsar.broker.service; |
| |
| import static com.yahoo.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONFIGURATION_PATH; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertNotEquals; |
| import static org.testng.Assert.fail; |
| |
| import java.lang.reflect.Method; |
| import java.net.URI; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.bookkeeper.util.ZkUtils; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.ZooDefs; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.yahoo.pulsar.client.api.Consumer; |
| import com.yahoo.pulsar.client.api.ConsumerConfiguration; |
| import com.yahoo.pulsar.client.api.PulsarClient; |
| import com.yahoo.pulsar.client.api.PulsarClientException; |
| import com.yahoo.pulsar.client.api.SubscriptionType; |
| import com.yahoo.pulsar.client.impl.ConsumerImpl; |
| import com.yahoo.pulsar.common.util.ObjectMapperFactory; |
| |
| /** |
| */ |
| public class BrokerServiceThrottlingTest extends BrokerTestBase { |
| |
| @BeforeMethod |
| @Override |
| protected void setup() throws Exception { |
| super.baseSetup(); |
| } |
| |
| @AfterMethod |
| @Override |
| protected void cleanup() throws Exception { |
| super.internalCleanup(); |
| } |
| |
| /** |
| * Verifies: updating zk-thottling node reflects broker-maxConcurrentLookupRequest and updates semaphore. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testThrottlingLookupRequestSemaphore() throws Exception { |
| BrokerService service = pulsar.getBrokerService(); |
| assertNotEquals(service.lookupRequestSemaphore.get().availablePermits(), 0); |
| admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(0)); |
| Thread.sleep(1000); |
| assertEquals(service.lookupRequestSemaphore.get().availablePermits(), 0); |
| } |
| |
| /** |
| * Broker has maxConcurrentLookupRequest = 0 so, it rejects incoming lookup request and it cause consumer creation |
| * failure. |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testLookupThrottlingForClientByBroker0Permit() throws Exception { |
| |
| final String topicName = "persistent://prop/usw/my-ns/newTopic"; |
| |
| com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); |
| clientConf.setStatsInterval(0, TimeUnit.SECONDS); |
| String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); |
| PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf); |
| |
| ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); |
| Consumer consumer = pulsarClient.subscribe(topicName, "mysub", consumerConfig); |
| consumer.close(); |
| |
| int newPermits = 0; |
| admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(newPermits)); |
| // wait config to be updated |
| for (int i = 0; i < 5; i++) { |
| if (pulsar.getConfiguration().getMaxConcurrentLookupRequest() != newPermits) { |
| Thread.sleep(100 + (i * 10)); |
| } else { |
| break; |
| } |
| } |
| |
| try { |
| consumer = pulsarClient.subscribe(topicName, "mysub", consumerConfig); |
| consumer.close(); |
| fail("It should fail as throttling should not receive any request"); |
| } catch (com.yahoo.pulsar.client.api.PulsarClientException.TooManyRequestsException e) { |
| // ok as throttling set to 0 |
| } |
| } |
| |
| /** |
| * Verifies: Broker side throttling: |
| * |
| * <pre> |
| * 1. concurrent_consumer_creation > maxConcurrentLookupRequest at broker |
| * 2. few of the consumer creation must fail with TooManyLookupRequestException. |
| * </pre> |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testLookupThrottlingForClientByBroker() throws Exception { |
| final String topicName = "persistent://prop/usw/my-ns/newTopic"; |
| |
| com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); |
| clientConf.setStatsInterval(0, TimeUnit.SECONDS); |
| clientConf.setIoThreads(20); |
| clientConf.setConnectionsPerBroker(20); |
| String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); |
| PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf); |
| |
| ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); |
| consumerConfig.setSubscriptionType(SubscriptionType.Shared); |
| |
| int newPermits = 1; |
| admin.brokers().updateDynamicConfiguration("maxConcurrentLookupRequest", Integer.toString(newPermits)); |
| // wait config to be updated |
| for (int i = 0; i < 5; i++) { |
| if (pulsar.getConfiguration().getMaxConcurrentLookupRequest() != newPermits) { |
| Thread.sleep(100 + (i * 10)); |
| } else { |
| break; |
| } |
| } |
| |
| List<Consumer> successfulConsumers = Lists.newArrayList(); |
| ExecutorService executor = Executors.newFixedThreadPool(10); |
| final int totalConsumers = 20; |
| CountDownLatch latch = new CountDownLatch(totalConsumers); |
| for (int i = 0; i < totalConsumers; i++) { |
| executor.execute(() -> { |
| try { |
| successfulConsumers.add(pulsarClient.subscribe(topicName, "mysub", consumerConfig)); |
| } catch (PulsarClientException.TooManyRequestsException e) { |
| // ok |
| } catch (Exception e) { |
| fail("it shouldn't failed"); |
| } |
| latch.countDown(); |
| }); |
| } |
| latch.await(); |
| |
| for (int i = 0; i < successfulConsumers.size(); i++) { |
| successfulConsumers.get(i).close(); |
| } |
| pulsarClient.close(); |
| assertNotEquals(successfulConsumers.size(), totalConsumers); |
| } |
| |
| /** |
| * This testcase make sure that once consumer lost connection with broker, it always reconnects with broker by |
| * retrying on throttling-error exception also. |
| * |
| * <pre> |
| * 1. all consumers get connected |
| * 2. broker restarts with maxConcurrentLookupRequest = 1 |
| * 3. consumers reconnect and some get TooManyRequestException and again retries |
| * 4. eventually all consumers will successfully connect to broker |
| * </pre> |
| * |
| * @throws Exception |
| */ |
| @Test |
| public void testLookupThrottlingForClientByBrokerInternalRetry() throws Exception { |
| |
| final String topicName = "persistent://prop/usw/my-ns/newTopic"; |
| |
| com.yahoo.pulsar.client.api.ClientConfiguration clientConf = new com.yahoo.pulsar.client.api.ClientConfiguration(); |
| clientConf.setStatsInterval(0, TimeUnit.SECONDS); |
| clientConf.setIoThreads(20); |
| clientConf.setConnectionsPerBroker(20); |
| String lookupUrl = new URI("pulsar://localhost:" + BROKER_PORT).toString(); |
| PulsarClient pulsarClient = PulsarClient.create(lookupUrl, clientConf); |
| upsertLookupPermits(100); |
| ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); |
| consumerConfig.setSubscriptionType(SubscriptionType.Shared); |
| List<Consumer> consumers = Lists.newArrayList(); |
| ExecutorService executor = Executors.newFixedThreadPool(10); |
| final int totalConsumers = 8; |
| CountDownLatch latch = new CountDownLatch(totalConsumers); |
| for (int i = 0; i < totalConsumers; i++) { |
| executor.execute(() -> { |
| try { |
| consumers.add(pulsarClient.subscribe(topicName, "mysub", consumerConfig)); |
| } catch (PulsarClientException.TooManyRequestsException e) { |
| // ok |
| } catch (Exception e) { |
| fail("it shouldn't failed"); |
| } |
| latch.countDown(); |
| }); |
| } |
| latch.await(); |
| |
| stopBroker(); |
| conf.setMaxConcurrentLookupRequest(1); |
| startBroker(); |
| |
| // wait strategically for all consumers to reconnect |
| for (int i = 0; i < 5; i++) { |
| if (!areAllConsumersConnected(consumers)) { |
| Thread.sleep(1000 + (i * 500)); |
| } else { |
| break; |
| } |
| } |
| |
| int totalConnectedConsumers = 0; |
| for (int i = 0; i < consumers.size(); i++) { |
| if (((ConsumerImpl) consumers.get(i)).isConnected()) { |
| totalConnectedConsumers++; |
| } |
| consumers.get(i).close(); |
| |
| } |
| assertEquals(totalConnectedConsumers, totalConsumers); |
| |
| pulsarClient.close(); |
| } |
| |
| private boolean areAllConsumersConnected(List<Consumer> consumers) { |
| for (int i = 0; i < consumers.size(); i++) { |
| if (!((ConsumerImpl) consumers.get(i)).isConnected()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private void upsertLookupPermits(int permits) throws Exception { |
| Map<String, String> throttlingMap = Maps.newHashMap(); |
| throttlingMap.put("maxConcurrentLookupRequest", Integer.toString(permits)); |
| byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(throttlingMap); |
| if (mockZookKeeper.exists(BROKER_SERVICE_CONFIGURATION_PATH, false) != null) { |
| mockZookKeeper.setData(BROKER_SERVICE_CONFIGURATION_PATH, content, -1); |
| } else { |
| ZkUtils.createFullPathOptimistic(mockZookKeeper, BROKER_SERVICE_CONFIGURATION_PATH, content, |
| ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); |
| } |
| } |
| |
| } |