| /** |
| * Copyright 2016 Yahoo Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.yahoo.pulsar.broker.loadbalance; |
| |
| import static org.mockito.Mockito.atLeastOnce; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.spy; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| import static org.testng.Assert.assertEquals; |
| import static org.testng.Assert.assertNotEquals; |
| import static org.testng.Assert.assertNull; |
| import static org.testng.Assert.assertTrue; |
| import static org.testng.Assert.fail; |
| |
| import org.apache.commons.lang3.SystemUtils; |
| import java.io.IOException; |
| import java.lang.reflect.Field; |
| import java.net.URL; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import com.yahoo.pulsar.broker.loadbalance.impl.*; |
| import org.apache.bookkeeper.test.PortManager; |
| import org.apache.bookkeeper.util.ZkUtils; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.annotations.AfterMethod; |
| import org.testng.annotations.BeforeMethod; |
| import org.testng.annotations.Test; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.collect.Sets; |
| import com.yahoo.pulsar.broker.PulsarService; |
| import com.yahoo.pulsar.broker.ServiceConfiguration; |
| import com.yahoo.pulsar.broker.admin.AdminResource; |
| import com.yahoo.pulsar.client.admin.BrokerStats; |
| import com.yahoo.pulsar.client.admin.PulsarAdmin; |
| import com.yahoo.pulsar.client.api.Authentication; |
| import com.yahoo.pulsar.client.api.ClientConfiguration; |
| import com.yahoo.pulsar.client.api.PulsarClient; |
| import com.yahoo.pulsar.common.naming.NamespaceName; |
| import com.yahoo.pulsar.common.policies.data.AutoFailoverPolicyData; |
| import com.yahoo.pulsar.common.policies.data.AutoFailoverPolicyType; |
| import com.yahoo.pulsar.common.policies.data.NamespaceIsolationData; |
| import com.yahoo.pulsar.common.policies.data.ResourceQuota; |
| import com.yahoo.pulsar.common.policies.data.loadbalancer.BrokerUsage; |
| import com.yahoo.pulsar.common.policies.data.loadbalancer.JvmUsage; |
| import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats; |
| import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUnitRanking; |
| import com.yahoo.pulsar.common.policies.data.loadbalancer.ResourceUsage; |
| import com.yahoo.pulsar.common.policies.data.loadbalancer.SystemResourceUsage; |
| import com.yahoo.pulsar.common.policies.impl.NamespaceIsolationPolicies; |
| import com.yahoo.pulsar.common.util.ObjectMapperFactory; |
| import com.yahoo.pulsar.zookeeper.LocalBookkeeperEnsemble; |
| import com.yahoo.pulsar.zookeeper.LocalZooKeeperCache; |
| import com.yahoo.pulsar.zookeeper.ZooKeeperChildrenCache; |
| |
| import jersey.repackaged.com.google.common.collect.Maps; |
| |
| /** |
| */ |
| public class SimpleLoadManagerImplTest { |
| LocalBookkeeperEnsemble bkEnsemble; |
| |
| URL url1; |
| PulsarService pulsar1; |
| PulsarAdmin admin1; |
| |
| URL url2; |
| PulsarService pulsar2; |
| PulsarAdmin admin2; |
| |
| BrokerStats brokerStatsClient1; |
| BrokerStats brokerStatsClient2; |
| |
| ExecutorService executor = new ThreadPoolExecutor(5, 20, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>()); |
| |
| private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); |
| private final int PRIMARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort(); |
| private final int SECONDARY_BROKER_WEBSERVICE_PORT = PortManager.nextFreePort(); |
| private final int PRIMARY_BROKER_PORT = PortManager.nextFreePort(); |
| private final int SECONDARY_BROKER_PORT = PortManager.nextFreePort(); |
| private static final Logger log = LoggerFactory.getLogger(SimpleLoadManagerImplTest.class); |
| |
| static { |
| System.setProperty("test.basePort", "16100"); |
| } |
| |
| @BeforeMethod |
| void setup() throws Exception { |
| |
| // Start local bookkeeper ensemble |
| bkEnsemble = new LocalBookkeeperEnsemble(3, ZOOKEEPER_PORT, PortManager.nextFreePort()); |
| bkEnsemble.start(); |
| |
| // Start broker 1 |
| ServiceConfiguration config1 = spy(new ServiceConfiguration()); |
| config1.setClusterName("use"); |
| config1.setWebServicePort(PRIMARY_BROKER_WEBSERVICE_PORT); |
| config1.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); |
| config1.setBrokerServicePort(PRIMARY_BROKER_PORT); |
| pulsar1 = new PulsarService(config1); |
| |
| pulsar1.start(); |
| |
| url1 = new URL("http://127.0.0.1" + ":" + PRIMARY_BROKER_WEBSERVICE_PORT); |
| admin1 = new PulsarAdmin(url1, (Authentication) null); |
| brokerStatsClient1 = admin1.brokerStats(); |
| |
| // Start broker 2 |
| ServiceConfiguration config2 = new ServiceConfiguration(); |
| config2.setClusterName("use"); |
| config2.setWebServicePort(SECONDARY_BROKER_WEBSERVICE_PORT); |
| config2.setZookeeperServers("127.0.0.1" + ":" + ZOOKEEPER_PORT); |
| config2.setBrokerServicePort(SECONDARY_BROKER_PORT); |
| pulsar2 = new PulsarService(config2); |
| |
| pulsar2.start(); |
| |
| url2 = new URL("http://127.0.0.1" + ":" + SECONDARY_BROKER_WEBSERVICE_PORT); |
| admin2 = new PulsarAdmin(url2, (Authentication) null); |
| brokerStatsClient2 = admin2.brokerStats(); |
| |
| createNamespacePolicies(pulsar1); |
| Thread.sleep(100); |
| } |
| |
| @AfterMethod |
| void shutdown() throws Exception { |
| log.info("--- Shutting down ---"); |
| executor.shutdown(); |
| |
| admin1.close(); |
| admin2.close(); |
| |
| pulsar2.close(); |
| pulsar1.close(); |
| |
| bkEnsemble.stop(); |
| } |
| |
| private void createNamespacePolicies(PulsarService pulsar) throws Exception { |
| NamespaceIsolationPolicies policies = new NamespaceIsolationPolicies(); |
| // set up policy that use this broker as primary |
| NamespaceIsolationData policyData = new NamespaceIsolationData(); |
| policyData.namespaces = new ArrayList<String>(); |
| policyData.namespaces.add("pulsar/use/primary-ns.*"); |
| policyData.primary = new ArrayList<String>(); |
| policyData.primary.add(pulsar1.getAdvertisedAddress() + "*"); |
| policyData.secondary = new ArrayList<String>(); |
| policyData.secondary.add("prod2-broker([78]).messaging.usw.example.co.*"); |
| policyData.auto_failover_policy = new AutoFailoverPolicyData(); |
| policyData.auto_failover_policy.policy_type = AutoFailoverPolicyType.min_available; |
| policyData.auto_failover_policy.parameters = new HashMap<String, String>(); |
| policyData.auto_failover_policy.parameters.put("min_limit", "1"); |
| policyData.auto_failover_policy.parameters.put("usage_threshold", "100"); |
| policies.setPolicy("primaryBrokerPolicy", policyData); |
| |
| ObjectMapper jsonMapper = ObjectMapperFactory.create(); |
| ZooKeeper globalZk = pulsar.getGlobalZkCache().getZooKeeper(); |
| ZkUtils.createFullPathOptimistic(globalZk, AdminResource.path("clusters", "use", "namespaceIsolationPolicies"), |
| new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); |
| byte[] content = jsonMapper.writeValueAsBytes(policies.getPolicies()); |
| globalZk.setData(AdminResource.path("clusters", "use", "namespaceIsolationPolicies"), content, -1); |
| |
| } |
| |
| @Test(enabled = true) |
| public void testBasicBrokerSelection() throws Exception { |
| LoadManager loadManager = new SimpleLoadManagerImpl(pulsar1); |
| PulsarResourceDescription rd = new PulsarResourceDescription(); |
| rd.put("memory", new ResourceUsage(1024, 4096)); |
| rd.put("cpu", new ResourceUsage(10, 100)); |
| rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024)); |
| rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024)); |
| |
| ResourceUnit ru1 = new SimpleResourceUnit("http://prod2-broker7.messaging.usw.example.com:8080", rd); |
| Set<ResourceUnit> rus = new HashSet<ResourceUnit>(); |
| rus.add(ru1); |
| LoadRanker lr = new ResourceAvailabilityRanker(); |
| AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRankingsInstance = new AtomicReference<>(Maps.newTreeMap()); |
| sortedRankingsInstance.get().put(lr.getRank(rd), rus); |
| |
| Field sortedRankings = SimpleLoadManagerImpl.class.getDeclaredField("sortedRankings"); |
| sortedRankings.setAccessible(true); |
| sortedRankings.set(loadManager, sortedRankingsInstance); |
| |
| ResourceUnit found = ((SimpleLoadManagerImpl) loadManager) |
| .getLeastLoaded(new NamespaceName("pulsar/use/primary-ns.10")); |
| // broker is not active so found should be null |
| assertEquals(found, null, "found a broker when expected none to be found"); |
| |
| } |
| |
| private void setObjectField(Class objClass, Object objInstance, String fieldName, Object newValue) |
| throws Exception { |
| Field field = objClass.getDeclaredField(fieldName); |
| field.setAccessible(true); |
| field.set(objInstance, newValue); |
| } |
| |
| @Test(enabled = true) |
| public void testPrimary() throws Exception { |
| LoadManager loadManager = new SimpleLoadManagerImpl(pulsar1); |
| PulsarResourceDescription rd = new PulsarResourceDescription(); |
| rd.put("memory", new ResourceUsage(1024, 4096)); |
| rd.put("cpu", new ResourceUsage(10, 100)); |
| rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024)); |
| rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024)); |
| |
| ResourceUnit ru1 = new SimpleResourceUnit( |
| "http://" + pulsar1.getAdvertisedAddress() + ":" + pulsar1.getConfiguration().getWebServicePort(), rd); |
| Set<ResourceUnit> rus = new HashSet<ResourceUnit>(); |
| rus.add(ru1); |
| LoadRanker lr = new ResourceAvailabilityRanker(); |
| |
| // inject the load report and rankings |
| Map<ResourceUnit, com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport> loadReports = new HashMap<>(); |
| com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport loadReport = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport(); |
| loadReport.setSystemResourceUsage(new SystemResourceUsage()); |
| loadReports.put(ru1, loadReport); |
| setObjectField(SimpleLoadManagerImpl.class, loadManager, "currentLoadReports", loadReports); |
| |
| ResourceUnitRanking ranking = new ResourceUnitRanking(loadReport.getSystemResourceUsage(), |
| new HashSet<String>(), new ResourceQuota(), new HashSet<String>(), new ResourceQuota()); |
| Map<ResourceUnit, ResourceUnitRanking> rankings = new HashMap<>(); |
| rankings.put(ru1, ranking); |
| setObjectField(SimpleLoadManagerImpl.class, loadManager, "resourceUnitRankings", rankings); |
| |
| AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRankingsInstance = new AtomicReference<>(Maps.newTreeMap()); |
| sortedRankingsInstance.get().put(lr.getRank(rd), rus); |
| setObjectField(SimpleLoadManagerImpl.class, loadManager, "sortedRankings", sortedRankingsInstance); |
| |
| ResourceUnit found = ((SimpleLoadManagerImpl) loadManager) |
| .getLeastLoaded(new NamespaceName("pulsar/use/primary-ns.10")); |
| // broker is not active so found should be null |
| assertNotEquals(found, null, "did not find a broker when expected one to be found"); |
| |
| } |
| |
| @Test(enabled = false) |
| public void testPrimarySecondary() throws Exception { |
| LocalZooKeeperCache mockCache = mock(LocalZooKeeperCache.class); |
| ZooKeeperChildrenCache zooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class); |
| |
| Set<String> activeBrokers = Sets.newHashSet("prod2-broker7.messaging.use.example.com:8080", |
| "prod2-broker8.messaging.use.example.com:8080", "prod2-broker9.messaging.use.example.com:8080"); |
| when(mockCache.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT)).thenReturn(activeBrokers); |
| when(zooKeeperChildrenCache.get()).thenReturn(activeBrokers); |
| when(zooKeeperChildrenCache.get(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT)).thenReturn(activeBrokers); |
| |
| Field zkCacheField = PulsarService.class.getDeclaredField("localZkCache"); |
| zkCacheField.setAccessible(true); |
| |
| LocalZooKeeperCache originalLZK1 = (LocalZooKeeperCache) zkCacheField.get(pulsar1); |
| LocalZooKeeperCache originalLZK2 = (LocalZooKeeperCache) zkCacheField.get(pulsar2); |
| log.info("lzk are {} 2: {}", originalLZK1.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT), |
| originalLZK2.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT)); |
| zkCacheField.set(pulsar1, mockCache); |
| |
| LocalZooKeeperCache newZk = (LocalZooKeeperCache) pulsar1.getLocalZkCache(); |
| log.info("lzk mocked are {}", newZk.getChildren(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT)); |
| |
| ZooKeeperChildrenCache availableActiveBrokers = new ZooKeeperChildrenCache(pulsar1.getLocalZkCache(), |
| SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT); |
| |
| log.info("lzk mocked active brokers are {}", |
| availableActiveBrokers.get(SimpleLoadManagerImpl.LOADBALANCE_BROKERS_ROOT)); |
| |
| LoadManager loadManager = new SimpleLoadManagerImpl(pulsar1); |
| |
| PulsarResourceDescription rd = new PulsarResourceDescription(); |
| rd.put("memory", new ResourceUsage(1024, 4096)); |
| rd.put("cpu", new ResourceUsage(10, 100)); |
| rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024)); |
| rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024)); |
| |
| ResourceUnit ru1 = new SimpleResourceUnit("http://prod2-broker7.messaging.usw.example.com:8080", rd); |
| Set<ResourceUnit> rus = new HashSet<ResourceUnit>(); |
| rus.add(ru1); |
| LoadRanker lr = new ResourceAvailabilityRanker(); |
| AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRankingsInstance = new AtomicReference<>(Maps.newTreeMap()); |
| sortedRankingsInstance.get().put(lr.getRank(rd), rus); |
| |
| Field sortedRankings = SimpleLoadManagerImpl.class.getDeclaredField("sortedRankings"); |
| sortedRankings.setAccessible(true); |
| sortedRankings.set(loadManager, sortedRankingsInstance); |
| |
| ResourceUnit found = ((SimpleLoadManagerImpl) loadManager) |
| .getLeastLoaded(new NamespaceName("pulsar/use/primary-ns.10")); |
| assertEquals(found.getResourceId(), ru1.getResourceId()); |
| |
| zkCacheField.set(pulsar1, originalLZK1); |
| } |
| |
| @Test |
| public void testResourceDescription() { |
| |
| PulsarResourceDescription rd = new PulsarResourceDescription(); |
| rd.put("memory", new ResourceUsage(1024, 4096)); |
| rd.put("cpu", new ResourceUsage(10, 100)); |
| rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024)); |
| rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024)); |
| |
| PulsarResourceDescription rd1 = new PulsarResourceDescription(); |
| rd1.put("memory", new ResourceUsage(2048, 4096)); |
| rd1.put("cpu", new ResourceUsage(50, 100)); |
| rd1.put("bandwidthIn", new ResourceUsage(550 * 1024, 1024 * 1024)); |
| rd1.put("bandwidthOut", new ResourceUsage(850 * 1024, 1024 * 1024)); |
| |
| assertTrue(rd.compareTo(rd1) == 1); |
| assertTrue(rd1.calculateRank() > rd.calculateRank()); |
| |
| SimpleLoadCalculatorImpl calc = new SimpleLoadCalculatorImpl(); |
| calc.recaliberateResourceUsagePerServiceUnit(null); |
| assertNull(calc.getResourceDescription(null)); |
| |
| } |
| |
| @Test |
| public void testLoadReportParsing() throws Exception { |
| |
| ObjectMapper mapper = ObjectMapperFactory.create(); |
| com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport reportData = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport(); |
| reportData.setName("b1"); |
| SystemResourceUsage resource = new SystemResourceUsage(); |
| ResourceUsage resourceUsage = new ResourceUsage(); |
| resource.setBandwidthIn(resourceUsage); |
| resource.setBandwidthOut(resourceUsage); |
| resource.setMemory(resourceUsage); |
| resource.setCpu(resourceUsage); |
| reportData.setSystemResourceUsage(resource); |
| String loadReportJson = mapper.writeValueAsString(reportData); |
| LoadReport loadReport = PulsarLoadReportImpl.parse(loadReportJson); |
| assertEquals( |
| loadReport.getResourceUnitDescription().getResourceUsage().get("bandwidthIn").compareTo(resourceUsage), |
| 0); |
| } |
| |
| @Test(enabled = true) |
| public void testDoLoadShedding() throws Exception { |
| LoadManager loadManager = spy(new SimpleLoadManagerImpl(pulsar1)); |
| PulsarResourceDescription rd = new PulsarResourceDescription(); |
| rd.put("memory", new ResourceUsage(1024, 4096)); |
| rd.put("cpu", new ResourceUsage(10, 100)); |
| rd.put("bandwidthIn", new ResourceUsage(250 * 1024, 1024 * 1024)); |
| rd.put("bandwidthOut", new ResourceUsage(550 * 1024, 1024 * 1024)); |
| |
| ResourceUnit ru1 = new SimpleResourceUnit("http://prod1-broker7.messaging.gq1.yahoo.com:8080", rd); |
| ResourceUnit ru2 = new SimpleResourceUnit("http://prod2-broker7.messaging.gq1.yahoo.com:8080", rd); |
| Set<ResourceUnit> rus = new HashSet<ResourceUnit>(); |
| rus.add(ru1); |
| rus.add(ru2); |
| LoadRanker lr = new ResourceAvailabilityRanker(); |
| AtomicReference<Map<Long, Set<ResourceUnit>>> sortedRankingsInstance = new AtomicReference<>(Maps.newTreeMap()); |
| sortedRankingsInstance.get().put(lr.getRank(rd), rus); |
| |
| Field sortedRankings = SimpleLoadManagerImpl.class.getDeclaredField("sortedRankings"); |
| sortedRankings.setAccessible(true); |
| sortedRankings.set(loadManager, sortedRankingsInstance); |
| |
| // inject the load report and rankings |
| SystemResourceUsage systemResource = new SystemResourceUsage(); |
| systemResource.setBandwidthIn(new ResourceUsage(90, 100)); |
| Map<String, NamespaceBundleStats> stats = Maps.newHashMap(); |
| NamespaceBundleStats nsb1 = new NamespaceBundleStats(); |
| nsb1.msgRateOut = 10000; |
| NamespaceBundleStats nsb2 = new NamespaceBundleStats(); |
| nsb2.msgRateOut = 10000; |
| stats.put("property/cluster/namespace1/0x00000000_0xFFFFFFFF", nsb1); |
| stats.put("property/cluster/namespace2/0x00000000_0xFFFFFFFF", nsb2); |
| |
| Map<ResourceUnit, com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport> loadReports = new HashMap<>(); |
| com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport loadReport1 = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport(); |
| loadReport1.setSystemResourceUsage(systemResource); |
| loadReport1.setBundleStats(stats); |
| com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport loadReport2 = new com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport(); |
| loadReport2.setSystemResourceUsage(new SystemResourceUsage()); |
| loadReport2.setBundleStats(stats); |
| loadReports.put(ru1, loadReport1); |
| loadReports.put(ru2, loadReport2); |
| setObjectField(SimpleLoadManagerImpl.class, loadManager, "currentLoadReports", loadReports); |
| |
| ((SimpleLoadManagerImpl) loadManager).doLoadShedding(); |
| verify(loadManager, atLeastOnce()).doLoadShedding(); |
| } |
| |
| @Test |
| public void testNamespaceBundleStats() { |
| NamespaceBundleStats nsb1 = new NamespaceBundleStats(); |
| nsb1.msgRateOut = 10000; |
| nsb1.producerCount = 1; |
| nsb1.consumerCount = 1; |
| nsb1.cacheSize = 4; |
| nsb1.msgRateIn = 500; |
| nsb1.msgThroughputIn = 30; |
| nsb1.msgThroughputOut = 30; |
| NamespaceBundleStats nsb2 = new NamespaceBundleStats(); |
| nsb2.msgRateOut = 20000; |
| nsb2.producerCount = 300; |
| nsb2.consumerCount = 300; |
| nsb2.cacheSize = 110000; |
| nsb2.msgRateIn = 5000; |
| nsb2.msgThroughputIn = 110000.0; |
| nsb2.msgThroughputOut = 110000.0; |
| assertTrue(nsb1.compareTo(nsb2) == -1); |
| assertTrue(nsb1.compareByMsgRate(nsb2) == -1); |
| assertTrue(nsb1.compareByTopicConnections(nsb2) == -1); |
| assertTrue(nsb1.compareByCacheSize(nsb2) == -1); |
| assertTrue(nsb1.compareByBandwidthOut(nsb2) == -1); |
| assertTrue(nsb1.compareByBandwidthIn(nsb2) == -1); |
| } |
| |
| @Test |
| public void testBrokerHostUsage() { |
| BrokerHostUsage brokerUsage; |
| if (SystemUtils.IS_OS_LINUX) { |
| brokerUsage = new LinuxBrokerHostUsageImpl(pulsar1); |
| } else { |
| brokerUsage = new GenericBrokerHostUsageImpl(pulsar1); |
| } |
| brokerUsage.getBrokerHostUsage(); |
| // Ok |
| } |
| |
| @Test |
| public void testTask() throws Exception { |
| LoadManager loadManager = mock(LoadManager.class); |
| AtomicReference<LoadManager> atomicLoadManager = new AtomicReference<>(loadManager); |
| LoadResourceQuotaUpdaterTask task1 = new LoadResourceQuotaUpdaterTask(atomicLoadManager); |
| task1.run(); |
| verify(loadManager, times(1)).writeResourceQuotasToZooKeeper(); |
| |
| |
| LoadSheddingTask task2 = new LoadSheddingTask(atomicLoadManager); |
| task2.run(); |
| verify(loadManager, times(1)).doLoadShedding(); |
| } |
| |
| @Test |
| public void testUsage() { |
| Map<String, Object> metrics = Maps.newHashMap(); |
| metrics.put("brk_conn_cnt", new Long(1)); |
| metrics.put("brk_repl_conn_cnt", new Long(1)); |
| metrics.put("jvm_thread_cnt", new Long(1)); |
| BrokerUsage brokerUsage = BrokerUsage.populateFrom(metrics); |
| assertEquals(brokerUsage.getConnectionCount(), 1); |
| assertEquals(brokerUsage.getReplicationConnectionCount(), 1); |
| JvmUsage jvmUage = JvmUsage.populateFrom(metrics); |
| assertEquals(jvmUage.getThreadCount(), 1); |
| SystemResourceUsage usage = new SystemResourceUsage(); |
| double usageLimit = 10.0; |
| usage.setBandwidthIn(new ResourceUsage(usageLimit, usageLimit)); |
| assertEquals(usage.getBandwidthIn().usage, usageLimit); |
| usage.reset(); |
| assertNotEquals(usage.getBandwidthIn().usage, usageLimit); |
| } |
| |
| } |